Saturday, September 3, 2016

Spark Misc Part 2



TODO - low latency real time query
https://github.com/spark-jobserver/spark-jobserver

rest server
https://github.com/cloudera/livy
http://livy.io/overview.html
https://spark-summit.org/eu-2015/events/building-a-rest-job-server-for-interactive-spark-as-a-service/
Livy (currently an alpha release) is a service that enables easy interaction with an Apache Spark cluster over a REST interface. It enables easy submission of Spark jobs or snippets of Spark code, synchronous or asynchronous result retrieval, as well as SparkContext management, all via a simple REST interface or a RPC client library. Livy also simplifies the interaction between Spark from application servers, thus enabling the use of Spark for interactive web/mobile applications. Additional features include:
  • Have long running SparkContexts that can be used for multiple Spark jobs, by multiple clients
  • Share cached RDDs or Dataframes across multiple jobs and clients
  • Multiple SparkContexts can be managed simultaneously, and they run on the cluster (YARN/Mesos) instead of the Livy Server for good fault tolerance and concurrency
  • Jobs can be submitted as precompiled jars, snippets of code, or via Java/Scala client API

https://toree.apache.org/

https://zeppelin.apache.org/
http://stackoverflow.com/questions/36484362/notebook-as-production-rest-api
Perhaps you're looking for a Spark REST server solutions, like:

Web app
http://stackoverflow.com/questions/29276381/using-apache-spark-as-a-backend-for-web-application
Another option would be to use Databricks Cloud, but it's not publicly available yet.

Apache Zeppelin provides a framework for interactively ingesting and visualizing data (via web application) using apache spark as the back end.
It looks like that project uses a single sparkContext for low latency query jobs.
Most (all?) docs on spark speak about running app with spark-submit or using spark-shell for interactive usage (sorry but spark&scala shell are so disappointing...). They never speak about using spark in an interactive app, such as a web-app. It is possible (I tried), but there are indeed some issues to be check, such as sharing sparkContext as you mentioned, and also some issues about managing dependencies. You can checks the two getting-started-prototypes I made to use spark in a spring web-app. It is in java, but I would strongly recommend using scala. I did not work long enough with this to learn a lot. However I can say that it is possible, and it works well (tried on a 12 nodes cluster + app running on an edge node)
Just remember that the spark driver, i.e. where the code with rdd is running, should be physically on the same cluster that the spark nodes: there are lots of communications between the driver and the workers.

http://julien.diener.website/?p=136
For my (professional) work, I choosed the second solution: add the spark jar to maven jetty plugin which is used during development, and I included the jetty-runner to the project which I run with spark jar added to to classpath (using the `–jar` option).

https://github.com/julien-diener/sparkwebapp
The main trouble come from conflict between the spark and hadoop dependencies, and maybe the spring ones. In the previous stand alone app csvconverter, the dependencies were only used for compilation, but at run time another, global, jar is used: the $SPARK_HOME/lib/spark-assembly-1.1.1-hadoop2.4.0.jar provided by the spark installation.
The solution used here is to replace the maven dependencies included in the war by this spark-assembly jar. See this SO answer for more details.

Deployment
http://arturmkrtchyan.com/apache-spark-hidden-rest-api
https://issues.apache.org/jira/browse/SPARK-5388
https://gist.github.com/arturmkrtchyan/5d8559b2911ac951d34a
http://apache-spark-user-list.1001560.n3.nabble.com/Calling-spark-from-a-java-web-application-td20007.html
Let’s say you have a quartz-scheduler based application and you want to periodically submit applications to perform some batch processing.

The second would be to use an open source project called spark-jobserver which tries to solve the problem by providing REST API to submit applications. But this means you have to add additional statefull service to your tech stack, maintain it and take care of availability.
It turns out there is a third surprisingly easy option which is not documented. Spark has a hidden REST API which handles application submission, status checking and cancellation.

http://spark.apache.org/docs/latest/cluster-overview.html
Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).
Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers  which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.
  1. Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). However, it also means that data cannot be shared across different Spark applications (instances of SparkContext) without writing it to an external storage system.
  2. Spark is agnostic to the underlying cluster manager. As long as it can acquire executor processes, and these communicate with each other, it is relatively easy to run it even on a cluster manager that also supports other applications (e.g. Mesos/YARN).
  3. The driver program must listen for and accept incoming connections from its executors throughout its lifetime (e.g., see spark.driver.port in the network config section). As such, the driver program must be network addressable from the worker nodes.
  4. Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network. If you’d like to send requests to the cluster remotely, it’s better to open an RPC to the driver and have it submit operations from nearby than to run a driver far away from the worker nodes.

Applications can be submitted to a cluster of any type using the spark-submit script
TermMeaning
ApplicationUser program built on Spark. Consists of a driver program and executors on the cluster.
Application jarA jar containing the user's Spark application. In some cases users will want to create an "uber jar" containing their application along with its dependencies. The user's jar should never include Hadoop or Spark libraries, however, these will be added at runtime.
Driver programThe process running the main() function of the application and creating the SparkContext
Cluster managerAn external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)
Deploy modeDistinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster.
Worker nodeAny node that can run application code in the cluster
ExecutorA process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
TaskA unit of work that will be sent to one executor
JobA parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. savecollect); you'll see this term used in the driver's logs.
StageEach job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce)
http://spark.apache.org/docs/latest/submitting-applications.html
  • --master: The master URL for the cluster (e.g. spark://23.195.26.187:7077)
  • --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client
  • --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).
  • application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
  • application-arguments: Arguments passed to the main method of your main class, if any
A common deployment strategy is to submit your application from a gateway machine that is physically co-located with your worker machines (e.g. Master node in a standalone EC2 cluster). In this setup, client mode is appropriate. In client mode, the driver is launched directly within the spark-submit process which acts as a client to the cluster. The input and output of the application is attached to the console. Thus, this mode is especially suitable for applications that involve the REPL (e.g. Spark shell).
Alternatively, if your application is submitted from a machine far from the worker machines (e.g. locally on your laptop), it is common to use cluster mode to minimize network latency between the drivers and the executors
  • local: - a URI starting with local:/ is expected to exist as a local file on each worker node. This means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, or shared via NFS, GlusterFS, etc.
Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. This can use up a significant amount of space over time and will need to be cleaned up. With YARN, cleanup is handled automatically, and with Spark standalone, automatic cleanup can be configured with the spark.worker.cleanup.appDataTtl property.

The web server can also act as the Spark driver. So it would have a SparkContext instance and contain the code for working with RDDs.
The advantage of this is that the Spark executors are long-lived. You save time by not having to start/stop them all the time. You can cache RDDs between operations.
A disadvantage is that since the executors are running all the time, they take up memory that other processes in the cluster could possibly use. Another one is that you cannot have more than one instance of the web server, since you cannot have more than one SparkContext to the same Spark application.

https://aws.amazon.com/blogs/big-data/submitting-user-applications-with-spark-submit/
The spark-submit script is a convenient way to launch Spark application on the YARN or Mesos cluster. However, due to distributed nature of the cluster the application has to be prepared as single Java ARchive (JAR). This archive includes all classes from your project with all of its dependencies.

http://mkuthan.github.io/blog/2016/03/11/spark-application-assembly/
There are experimental configuration flags for Spark spark.driver.userClassPathFirst and spark.executor.userClassPathFirst. In theory it gives user-added jars precedence over Spark’s own jars when loading classes in the the driver. But in practice it does not work, at least for me :–(.
In general you should avoid external dependencies at all cost when you develop application deployed on the YARN cluster. Classloader hell is even bigger than in JEE containers like JBoss or WebLogic. Look for the libraries with minimal transitive dependencies and narrowed features. For example, if you need a cache, choose Caffeine over Guava.
http://stackoverflow.com/questions/30413488/apache-spark-application-deployment-best-practices
  1. Deploying my program as a jar, and running the various tasks with spark-submit - which seems to be the way recommended in the spark docs. Some thoughts about this strategy:
    • how do you start/stop tasks - just using simple bash scripts?
    • how is scheduling managed? - simply use cron?
    • any resilience? (e.g. Who schedules the jobs to run if the driver server dies?)
  2. Creating a separate webapp as the driver program.
    • creates a spark context programmatically to talk to the spark cluster
    • allowing users to kick off tasks through the http interface
    • using Quartz (for example) to manage scheduling
    • could use cluster with zookeeper election for resilience


local uses 1 thread.
local[N] uses N threads.
local[*] uses as many threads as there are cores.
local[N, M] and local[*, M]
local-cluster[numSlaves, coresPerSlave, memoryPerSlave]
http://stackoverflow.com/questions/38490941/spark-read-an-inputstream-instead-of-file

https://spark.apache.org/docs/latest/programming-guide.html
  • All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory")textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").
  • The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
Spark support read gz file - filr that's created by gzip command - without tar
the file name has to be *.gz
I have to confirm that the above solution DOESN'T work as long as the file name doesn't end with '.gz'... really annoying
http://stackoverflow.com/questions/16302385/gzip-support-in-spark
Support for gzip input files should work the same as it does in Hadoop. For example, sc.textFile("myFile.gz") should automatically decompress and read gzip-compressed files (textFile() is actually implemented using Hadoop's TextInputFormat, which supports gzip-compressed files).
I would note that if you call sc.textFile() on a gzipped file, Spark will give you an RDD with only 1 partition (as of 0.9.0). This is because gzipped files are not splittable. If you don't repartition the RDD somehow, any operations on that RDD will be limited to a single core
textFile works with txt.gz but not tar.gz. It adds extra characters in the result. I ended up cleaning it up myself manually

http://blog.madhukaraphatak.com/spark-rdd-fold/
val maxSalaryEmployee = employeeRDD.fold(dummyEmployee)((acc,employee) => { 
if(acc._2 < employee._2) employee else acc})
println("employee with maximum salary is"+maxSalaryEmployee)

In this example, employees are grouped by department name. If you want to find the maximum salaries in a given department we can use following code.
val deptEmployees = List(
      ("cs",("jack",1000.0)),
      ("cs",("bron",1200.0)),
      ("phy",("sam",2200.0)),
      ("phy",("ronaldo",500.0))
    )
  val employeeRDD = sparkContext.makeRDD(deptEmployees)

  val maxByDept = employeeRDD.foldByKey(("dummy",0.0))
  ((acc,element)=> if(acc._2 > element._2) acc else element)
  
  println("maximum salaries in each dept" + maxByDept.collect().toList)
http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/
data = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )

sumCount = data.combineByKey(lambda value: (value, 1),
                             lambda x, value: (x[0] + value, x[1] + 1),
                             lambda x, y: (x[0] + y[0], x[1] + y[1]))

averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))

print averageByKey.collectAsMap()

The higher-order reduceByKey method takes an associative binary operator as input and reduces values with the same key to a single value using the specified binary operator.

The reduceByKey method can be used for aggregating values by key.
val pairRdd = sc.parallelize(List(("a", 1), ("b",2), ("c",3), ("a", 11), ("b",22), ("a",111)))
val sumByKeyRdd = pairRdd.reduceByKey((x,y) => x+y)
val minByKeyRdd = pairRdd.reduceByKey((x,y) => if (x < y) x else y)

countByValue
The countByValue method returns a count of each unique element in the source RDD. It returns an instance of the Map class containing each unique element and its count as a key-value pair.
val rdd = sc.parallelize(List(1, 2, 3, 4, 1, 2, 3, 1, 2, 1))
val counts = rdd.countByValue

The higher-order reduce method aggregates the elements of the source RDD using an associative and commutative binary operator provided to it. It is similar to the fold method; however, it does not require a neutral zero value.

val numbersRdd = sc.parallelize(List(2, 5, 3, 1))
val sum = numbersRdd.reduce ((x, y) => x + y)
val product = numbersRdd.reduce((x, y) => x * y)


The higher-order fold method aggregates the elements in the source RDD using the specified neutral zero value and an associative binary operator. It first aggregates the elements in each RDD partition and then aggregates the results from each partition.
val numbersRdd = sc.parallelize(List(2, 5, 3, 1))
val sum = numbersRdd.fold(0) ((partialSum, x) => partialSum + x)
val product = numbersRdd.fold(1) ((partialProduct, x) => partialProduct * x)

https://www.quora.com/Apache-Spark-why-is-reduce-implemented-as-Spark-Action-and-reduceByKey-as-Spark-transformation

Because reduce is aggregating/combining all the elements, while reduceByKey defined on RDDs of pairs is aggregating/combining all the elements for a specific key thereby its output is a Map<Key, Value> and since it may still be processed with other transformations, and still being a potentially large distributed collection, why not letting it continue to be an RDD[Key,Value], it is optimal from a pipelining perspective. The reduce cannot result in an RDD simply because it is a single value as output.

The key differences between reduce() and reduceByKey() are
  1. reduce() outputs a collection which does not add to the directed acyclic graph (DAG) so is implemented as an action. Because once the collection is returned, we know no longer refer to it as an RDD which is the basic dataset unit in spark. However, reduceByKey() returns an RDD which is just another level/state in the DAG, therefore is a transformation.
  2. reduce() is a function that operates on an RDD of objects while reduceByKey() is a function that operates on an RDD of key-value pairs. To put it more technically, reduce() function is a member of RDD[T] class while reduceByKey() is a member of the PairRDDFunctions[K, V] class.
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/
Spark RDD reduceByKey function merges the values for each key using an associative reduce function.

  • reduceByKey is a transformation operation in Spark hence it is lazily evaluated
  • It is a wide operation as it shuffles data from multiple partitions and creates another RDD
  • Before sending data across the partitions, it also merges the data locally using the same associative function for optimized data shuffling
  • It can only be used with RDDs which contains key and value pairs kind of elements
  • It accepts a Commutative and Associative function as an argument
    • The parameter function should have two arguments of the same data type
    • The return type of the function also must be same as argument types
scala> val x = sc.parallelize(Array(("a", 1), ("b", 1), ("a", 1),
     | ("a", 1), ("b", 1), ("b", 1),
     | ("b", 1), ("b", 1)), 3)
// Applying reduceByKey operation on x
scala> val y = x.reduceByKey((accum, n) => (accum + n))
scala> y.collect
res0: Array[(String, Int)] = Array((a,3), (b,5))
// Another way of applying associative function
scala> val y = x.reduceByKey(_ + _)

implicits
object implicits extends SQLImplicits with Serializable {
  protected override def _sqlContext: SQLContext = SparkSession.this.sqlContext
}
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-dataset.html#implicits
DatasetHolder case class offers three methods that do the conversions from Seq[T] or RDD[T] types to a Dataset[T]:
  • toDS(): Dataset[T]
  • toDF(): DataFrame
  • toDF(colNames: String*): DataFrame
Note
DataFrame is a mere type alias for Dataset[Row] since Spark 2.0.0.
DatasetHolder is used by SQLImplicits that is available to use after importing implicits object of SparkSession.
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-sparksession.html#implicits


The implicits object is a helper class with the Scala implicit methods to convert Scala objects to DatasetsDataFrames and Columns. It also defines Encoders for Scala’s "primitive" types (e.g. IntDoubleString) and their products and collections.

spark-csv
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
val escapeQuotes = getBool("escapeQuotes", true)
val quoteAll = getBool("quoteAll", false)
val delimiter = CSVTypeCast.toChar(
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
val headerFlag = getBool("header")
val inferSchemaFlag = getBool("inferSchema")
val ignoreLeadingWhiteSpaceFlag = getBool("ignoreLeadingWhiteSpace")
val ignoreTrailingWhiteSpaceFlag = getBool("ignoreTrailingWhiteSpace")
val quote = getChar("quote", '\"')
val escape = getChar("escape", '\\')
val comment = getChar("comment", '\u0000')

https://community.hortonworks.com/questions/49802/escaping-double-quotes-in-spark-dataframe.html

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-sparksession.html

Encoder is the fundamental concept in the serialization and deserialization (SerDe) framework in Spark SQL 2.0. Spark SQL uses the SerDe framework for IO to make it efficient time- and space-wise.


Encoder[T], is used to convert (encode and decode) any JVM object or primitive of type T (that could be your domain object) to and from Spark SQL’s InternalRow which is the internal binary row format representation (using Catalyst expressions and code generation).


Encoders know the schema of the records. This is how they offer significantly faster serialization and deserialization (comparing to the default Java or Kryo serializers).
scala> val personEncoder = Encoders.product[Person]
personEncoder: org.apache.spark.sql.Encoder[Person] = class[id[0]: bigint, name[0]: string]

scala> personEncoder.schema
res0: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(name,StringType,true))

scala> personEncoder.clsTag
res1: scala.reflect.ClassTag[Person] = Person

You can create custom encoders using static methods of Encoders object. Note however that encoders for common Scala types and their product types are already available in implicits object.
+



The default encoders are already imported in spark-shell.


Encoders map columns (of your dataset) to fields (of your JVM object) by name. It is by Encoders that you can bridge JVM objects to data sources (CSV, JDBC, Parquet, Avro, JSON, Cassandra, Elasticsearch, memsql) and vice versa.


You can find methods to create encoders for Java’s object types, e.g. BooleanIntegerLongDoubleStringjava.sql.Timestamp or Byte array, that could be composed to create more advanced encoders for Java bean classes (using bean method).
+



You can also create encoders based on Kryo or Java serializers.
scala> Encoders.kryo[Person]

scala> Encoders.javaSerialization[Person]



You can create encoders for Scala’s tuples and case classes, IntLongDouble, etc.
+

scala> Encoders.tuple(Encoders.scalaLong, Encoders.STRING, Encoders.scalaBoolean)
scala> val one = spark.createDataset(Seq(1))

createDataset creates a LocalRelation logical query plan (for the input data collection) or LogicalRDD (for the input RDD[T]).

Tip
You’d be better off using Scala implicits and toDS method instead (that does this conversion automatically for you).
val spark: SparkSession = ...
import spark.implicits._

scala> val one = Seq(1).toDS
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show

catalog attribute is an interface to the current catalog (of databases, tables, functions, table columns, and temporary views).

scala> spark.catalog.listTables.show


table creates a DataFrame from records in the tableName table (if exists).
val df = spark.table("mytable")



You can have multiple SparkSessions in a single Spark application.



// Variant 1: filter operator accepts a Scala function
dataset.filter(n => n % 2 == 0).count

// Variant 2: filter operator accepts a Column-based SQL expression
dataset.filter('value % 2 === 0).count

// Variant 3: filter operator accepts a SQL query
dataset.filter("value % 2 = 0").count
 DataFrame - the flagship data abstraction of previous versions of Spark SQL - is currently a mere type alias for Dataset[Row]:
type DataFrame = Dataset[Row]
scala> spark.range(1).filter('id === 0).explain(true)

It is only with Datasets to have syntax and analysis checks at compile time (that was not possible using DataFrame, regular SQL queries or even RDDs).

Using Dataset objects turns DataFrames of Row instances into a DataFrames of case classes with proper names and types (following their equivalents in the case classes). Instead of using indices to access respective fields in a DataFrame and cast it to a type, all this is automatically handled by Datasets and checked by the Scala compiler.


The default storage level for Datasets is MEMORY_AND_DISK because recomputing the in-memory columnar representation of the underlying table is expensive. See Persisting Dataset (persist method) in this document.

Spark 2.0 has introduced a new query model called Structured Streaming for continuous incremental execution of structured queries. That made possible to consider Datasets a static and bounded as well as streaming and unbounded data sets with a single unified API for different execution models.

Dataset is local if it was created from local collections using SparkSession.emptyDataset or SparkSession.createDataset methods and their derivatives like toDF. If so, the queries on the Dataset can be optimized and run locally, i.e. without using Spark executors.


withColumn method returns a new DataFrame with the new column col with colName name added.
+


scala> val df = Seq((1, "jeden"), (2, "dwa")).toDF("number", "polish")



scala> df.withColumn("polish", lit(1)).show



scala> val idCol = dataset("id")
http://stackoverflow.com/questions/30354483/convert-spark-row-to-typed-array-of-doubles
SparkDataFrame.map{r => 
  val array = r.toSeq.toArray 
  val doubleArra = array.map(_.toDouble) 
} // Fails with value toDouble is not a member of any

http://lxw1234.com/archives/2015/07/345.htm
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
该函数根据weights权重,将一个RDD切分成多个RDD。

Return an RDD with the elements from self that are not in other.
rdd1 = $sc.parallelize([["a", 1], ["a", 2], ["b", 3], ["c", 4]])
rdd2 = $sc.parallelize([["a", 2], ["c", 6]])
rdd1.subtract(rdd2).collect
# => [["a", 1], ["b", 3], ["c", 4]]

http://www.agildata.com/apache-spark-2-0-api-improvements-rdd-dataframe-dataset-sql/
var ds: Dataset[String] = spark.createDataset(List("one","two","three"))

Converting an RDD to a Dataset

SparkSession provides a createDataset method for converting an RDD to a Dataset. This only works if you import spark.implicits_ (where spark is the name of the SparkSession variable).
import spark.implicits._
val rdd: RDD[Person] = ??? // assume this exists
val dataset: Dataset[Person] = spark.createDataset[Person](rdd)

A DataFrame (which is really a Dataset[Row]) can be converted to a Dataset of a specific class by performing a map() operation.
http://stackoverflow.com/questions/34077353/how-to-change-dataframe-column-names-in-pyspark
df = df.withColumnRenamed("colName", "newColName").withColumnRenamed("colName2", "newColName2")


Spark-CSV
http://stackoverflow.com/questions/37778369/spark-csv-to-dataframe-skip-first-row
spark csv to dataframe skip first row

https://github.com/databricks/spark-csv/issues/327
Option two: Create your customized schema and specify the mode option as DROPMALFORMEDwhich will drop the first line since it contains less token than expected in the customSchema:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};

val customSchema = StructType(Array(StructField("id", IntegerType, true), 
                                    StructField("name", StringType, true),
                                    StructField("age", IntegerType, true)))

val df = sqlContext.read.format("com.databricks.spark.csv").
                         option("header", "true").
                         option("mode", "DROPMALFORMED").
                         schema(customSchema).load("test.txt")

df.show
val file = sc.textFile("pathToYourCsvFile")

val df = file.map(line => line.split(",")).
              filter(lines => lines.length == 3 && lines(0)!= "id").
              map(row => (row(0), row(1), row(2))).
              toDF("id", "name", "age")
https://github.com/databricks/spark-csv/issues/321
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('cars.csv')

  val ticket:StructType = StructType( Array(
    StructField("tkt_id", StringType, false),
    StructField("tkt_number", StringType, true),
    StructField("tkt_date_of_issue", StringType, true),
    StructField("tkt_pax_type", StringType, true),
    StructField("tkt_off_id", StringType, true),
    StructField("tkt_total_amount", StringType, true))
val tkt_df = sqlContext.load("com.databricks.spark.csv", ticket, Map("path" -> ticket_dump_path, "header" -> "true")) 

customSchema = StructType([
  StructField("cl_from", LongType(), True),
  StructField("cl_to", StringType(), True),
  StructField("cl_sortkey", StringType(), True),
  StructField("cl_timestamp", TimestampType(), True),
  StructField("cl_sortkey_prefix", StringType(), True),
  StructField("cl_collation", StringType(), True),
  StructField("cl_type", StringType(), True),
  ])


categoryLinksTableStringsDF = (sqlContext.read.format('com.databricks.spark.csv')
 .options(header='false')
 .load(dbwikidir+name, schema = customSchemaStrings)
 .cache() )
http://www.nodalpoint.com/spark-data-frames-from-csv-files-handling-headers-column-types/

Use repartition() method from Dataset. According to Scaladoc, there is no option to set number of paritions while reading
public Dataset<AccountingData> GetDataFrameFromTextFile()
{     // The schema is encoded in a string
    String schemaString = "id firstname lastname accountNo";

    // Generate the schema based on the string of schema
    List<StructField> fields = new ArrayList<>();
    for (String fieldName : schemaString.split("\t")) {
        StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
        fields.add(field);
    }
    StructType schema = DataTypes.createStructType(fields);

    return  sparksession.read().schema(schema)
            .option("mode", "DROPMALFORMED")
            .option("sep", "|")
            .option("ignoreLeadingWhiteSpace", true)
            .option("ignoreTrailingWhiteSpace ", true)
            .csv("D:\\HadoopDirectory\Employee.txt").as(Encoders.bean(Employee.class));
}

Second : Do not infer schema with this library, select columns you want and then give types by cast().
val path = "path-to-file"

val df = sqlContext.read.format("csv").option("inferSchema", "false").load(path)
val prunedDf = df.select(df("field").cast(IntegerType).as("field"))

Spark-shell:
sc.parallelize(1 to 9, 1);
sc.parallelize('a' to 'z').map(s => Try(s.toDouble)).collect()

sc.parallelize("a123").map(s => Try(s.toDouble)).collect()

sc.parallelize(Seq("a123")).map(s => Try(s.toDouble)).collect()

sc.parallelize(Seq("a123")).map(s =>
   try {
     Left(s.toDouble)
   } catch {
     case e: Throwable => Right(s)
  }
 ).collect()
res6: Array[scala.util.Try[Double]] = Array(Failure(java.lang.NumberFormatException: For input string: "a123"))

val rdd = sc.parallelize((0 to 10).toList).map(i => Try(i / 0))
http://rcardin.github.io/big-data/apache-spark/scala/programming/2016/09/25/try-again-apache-spark.html
More or less a monad is a generic container that enhances with additional properties a simple type. Scala offers at least three different types of monads that help us to deal with exceptional situations:
  1. Option and its two subclasses, Some[T] and None. This monad acts like a list of one or zero elements and we can use it when we are not interested in the details of the error situation we can encounter.
  2. Either and its two subclasses, Left[T] and Right[K]. This monad lets you to return two different types of objects, T and K, respectively in the case of an exceptional behaviour of the process and in the case of a correct behaviour.
  3. Try and its two sublcasses, Success[T] and Failure[T]. This monad is similar to the Either. Instead of using a generic type T for the Left subclass, the Failure uses always a type that is a subclass of Throwable. The Try type is available in Scala since version 2.10.
Then, if your aim is that of tracing the exceptions rise during the processing of an RDD and continuing to elaborate value not in error only, the Try[T] monad suites perfectly your needs. This amazing type comes with a useful apply factory method on the companion object, that lets you to build a Success or Failure object directly from the result of a computation.
















// ...omissis...
val tokens = 
  lines.flatMap(_ split " ")
       .map (s => Try(s(10)))

If the computation produces a value, than an object of type Success[T] is built, a Failure object is built otherwise. The types are immutable. The Failure type gives access to its attribute exception, that contains the error rise during computation.
So, your RDD[T] will become a RDD[Try[T]]. Using this escamotage, we can now use the same data structure to forward both data and exceptions.

val successes = 
  rdd.collect {
    // The method is applied only to elements of type Success.
    case Success(x) => x
  }
val rdd = sc.parallelize((0 to 10).toList).map(i => Try(i / 0))

Advanced Analytics with Spark
Whenever an operation has two mutually exclusive outcomes, we can use Scala’s Either[L, R] type to represent the return type of the operation.
def safe[S, T](f: S => T): S => Either[T, (S, Exception)] = {
  new Function[S, Either[T, (S, Exception)]] with Serializable {
    def apply(s: S): Either[T, (S, Exception)] = {
      try {
        Left(f(s))
      } catch {
        case e: Exception => Right((s, e))
      }
    }
  }
}
https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark
Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:
  • Anonymous function syntax, which can be used for short pieces of code.
  • Static methods in a global singleton object. For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)
Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:
class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
Here, if we create a new MyClass instance and call doStuff on it, the map inside there references the func1 method of that MyClass instance, so the whole object needs to be sent to the cluster. It is similar to writing rdd.map(x => this.func1(x)).
In a similar way, accessing fields of the outer object will reference the whole object:
class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
is equivalent to writing rdd.map(x => this.field + x), which references all of this. To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally:
def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

The behavior of the above code is undefined, and may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.

The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.

In local mode, in some circumstances the foreach function will actually execute within the same JVM as the driver and will reference the same original counter, and may actually update it.

Shared Variables

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.


http://stackoverflow.com/questions/29685330/how-to-set-and-get-static-variables-from-spark
Ok, there is basically 2 ways to take a value known to the master to the executors:
  1. Put the value inside a closure to be serialized to the executors to perform a task. This is the most common one and very simple/elegant. Sample and doc here.
  2. Create a broadcast variable with the data. This is good for immutable data of a big size, so you want to guarantee it is send only once. Also good if the same data is used over and over. Sample and doc here.
No need to use static variables in either case. But, if you DO want to have static values available on your executor VMs, you need to do one of these:
  1. If the values are fixed or the configuration is available on the executor nodes (lives inside the jar, etc), then you can have a lazy val, guaranteeing initialization only once.
  2. You can call mapPartitions() with code that uses one of the 2 options above, then store the values on your static variable/object. mapPartitions is guaranteed to run only once for each partition (much better than once per line) and is good for this kind of thing (initializing DB connections, etc).
object MyStaticObject
{
  lazy val MyStaticValue = {
     // Call a database, read a file included in the Jar, do expensive initialization computation, etc
     4
  }
} 
Since each Executor corresponds to a JVM, once the classes are loaded MyStaticObject will be initialized. The lazy keyword guarantees that the MyStaticValue variable will only be initialized the first time it is actually requested, and hold its value ever since.

Logging
https://www.mapr.com/blog/how-log-apache-spark
val log = LogManager.getRootLogger
val data 
= sc.parallelize(1 to 100000)

data.map { value => 
   log
.info(value)
   value.toString
}
This will fail when running on Spark. Spark complaints that thelog object is not Serializable so it cannot be sent over the network to the Spark workers.
This problem is actually easy to solve. Let’s create a class that does something to our data set while doing a lot of logging.
class Mapper(n: Int) extends Serializable{
 @transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")

 def doSomeMappingOnDataSetAndLogIt(rdd: RDD[Int]): RDD[String] =
   rdd.map{ i =>
     log.warn("mapping: " + i)
     (+ n).toString
   
}
}
Mapper receives a RDD[Int] and returns a RDD[String] and it also logs what value its being mapped. In this case, noted how thelog object has been marked as @transient which allows the serialization system to ignore the log object. Now, Mapper is being serialized and sent to each worker but the log object is being resolved when it is needed in the worker, solving our problem.
http://stackoverflow.com/questions/29208844/apache-spark-logging-within-scala
import org.slf4j.LoggerFactory
val LOG = LoggerFactory.getLogger(getClass) 
Just before the place where I use LOG in distributed functional code, I copy logger reference to a local constant.
val LOG = this.LOG

   @transient lazy val log = Logger.getLogger(getClass.getName)    

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-dataframe-row.html
val row = Row(1, "hello")
scala> row.getAs[Int](0)
scala> row.getAs[String](1)

https://github.com/databricks/spark-csv/issues/171

http://stackoverflow.com/questions/5336540/is-there-a-way-to-let-arguments-be-var-instead-of-val
Q1: Mutating the input parameters is often seen as bad style and makes it harder to reason about code.

http://icemelon.daoapp.io/sparkxue-xi-bi-ji/
  • broadcast variables在每个机器(不是task)上缓存了个只读的变量。可以给每个node高效的读取大量数据。spark用高效的broadcast算法减少了通讯开销。
  • spark 会在一个stage内自动broadcast相同的数据。所以显式的broadcast只有在不同stage里分享数据才用到。这个自动机制会先序列化数据,要用的时候再反序列化,所以当一个任务对非序列化数据需求很高时,也可以显式broadcast
  • 当用了val broadcastVar = sc.broadcast(v),应该用broadcastVar.value而不是继续用v
  • accumulator由SparkContext.accumulator(v)建立,task能进行+=操作,不能读。只有driver能读。
  • 系统自带的accumulator类型是long,可以继承 AccumulatorV2这个abstract class来实现自定义类型。
  • accmulator只能在action里操作。原因有两个,一是transformation操作可能会重新计算导致多次赋值;二是spark是lazy evaluation的,只有执行action操作时候相关代码才会真的运行。
Spark Accumulator
http://www.edureka.co/blog/spark-accumulators-explained

This guarantees that the accumulator blankLines is updated across every executor and the updates are relayed back to the driver.
We can implement other counters for network errors or zero sales value, etc. The full source code along with the implementation of the other counters can be found here.

  1. Computations inside transformations are evaluated lazily, so unless an action happens on an RDD thetransformationsare not executed. As a result of this, accumulators used inside functions like map() or filter() wont get executed unless some action happen on the RDD.
  2. Spark guarantees to update accumulators inside actionsonly once. So even if a task is restarted and the lineage is recomputed, the accumulators will be updated only once.
  3. Spark does not guarantee this for transformations. So if a task is restarted and the lineage is recomputed, there are chances of undesirable side effects when the accumulators will be updated more than once.
To be on the safe side, always use accumulators inside actions ONLY.

https://issues.apache.org/jira/browse/SPARK-15208
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala


http://imranrashid.com/posts/Spark-Accumulators/
  1. The type of data is not limited to a Long
  2. The user can define an arbitrary commutative and associative operation to merge values (instead of being limited to + on natural numbers).
val bucketCounts = sc.accumulator(new Array[Double](nBuckets))(VectorAccumulatorParam)

http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset-in-spark-1-6
Using generic encoders.
There are two generic encoders available for now kryo and javaSerialization where the latter one is explicitly described as:
extremely inefficient and should only be used as the last resort.
class Bar(i: Int) {
  override def toString = s"bar $i"
  def bar = i
}
you can use these encoders by adding implicit encoder:
object BarEncoders {
  implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
  org.apache.spark.sql.Encoders.kryo[Bar]
}
which can be used together as follows:

object Main {
  def main(args: Array[String]) {
    val sc = new SparkContext("local",  "test", new SparkConf())
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    import BarEncoders._

    val ds = Seq(new Bar(1)).toDS
    ds.show

    sc.stop()
  }
}

  1. It is also possible to encode tuples using kryo encoder for specific field:
    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    
    Please note that we don't depend on implicit encoders here but pass encoder explicitly so this most likely won't work with toDS method.
  2. Using implicit conversions:
    Provide implicit conversions between representation which can be encoded and custom class, for example:
    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }

Unfortunately, virtually nothing has been added to help with this. Searching for @since 2.0.0 in Encoders.scala or SQLImplicits.scala finds things mostly to do with primitive types (and some tweaking of case classes). So, first thing to say: there currently is no real good support for custom class encoders. With that out of the way, what follows is some tricks which do as good a job as we can ever hope to, given what we currently have at our disposal. As an upfront disclaimer: this won't work perfectly and I'll do my best to make all limitations clear and upfront.

What exactly is the problem

When you want to make a dataset, Spark "requires an encoder (to convert a JVM object of type T to and from the internal Spark SQL representation) that is generally created automatically through implicits from a SparkSession, or can be created explicitly by calling static methods on Encoders" (taken from the docs on createDataset). An encoder will take the form Encoder[T] where T is the type you are encoding. The first suggestion is to add import spark.implicits._ (which gives you these implicit encoders) and the second suggestion is to explicitly pass in the implicit encoder using this set of encoder related functions.
There is no encoder available for regular classes, so
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
will give you the following implicit related compile time error:
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases
However, if you wrap whatever type you just used to get the above error in some class that extends Product, the error confusingly gets delayed to runtime, so
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Compiles just fine, but fails at runtime with
java.lang.UnsupportedOperationException: No Encoder found for MyObj
The reason for this is that the encoders Spark creates with the implicits are actually only made at runtime (via scala relfection). In this case, all Spark checks at compile time is that the outermost class extends Product (which all case classes do), and only realizes at runtime that it still doesn't know what to do with MyObj (the same problem occurs if I tried to make a Dataset[(Int,MyObj)] - Spark waits until runtime to barf on MyObj). These are central problems that are in dire need of being fixed:
  • some classes that extend Product compile despite always crashing at runtime and
  • there is no way of passing in custom encoders for nested types (I have no way of feeding Spark an encoder for just MyObj such that it then knows how to encode Wrap[MyObj] or (Int,MyObj)).

Just use kryo

The solution everyone suggests is to use the kryo encoder.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
So, why not just make an implicit that does this automatically?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)
And now, it seems like I can do almost anything I want (the example below won't work in the spark-shell where spark.implicits._ is automatically imported)
class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
Or almost. The problem is that using kryo leads to Spark just storing every row in the dataset as a flat binary object. For mapfilterforeach that is enough, but for operations like join, Spark really needs these to be separated into columns. Inspecting the schema for d2 or d3, you see there is just one binary column:
d2.printSchema
// root
//  |-- value: binary (nullable = true)

Partial solution for tuples

So, using the magic of implicits in Scala (more in 6.26.3 Overloading Resolution), I can make myself a series of implicits that will do as good a job as possible, at least for tuples, and will work well with existing implicits:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

ERRRO handling
https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark/
val transformed = source
  .flatMap(e => Try{myCustomFunction(e)}.toOption)
  // other actions
Here the function myCustomFunction is executed within a Scala Try block, then converted into an Option. The code is put in the context of a flatMap, so the result is that all the elements that can be converted using the custom function will be present in the resulting RDD. Elements whose transformation function throws an exception will be automatically discarded. 
Why don’t we collect all exceptions, alongside the input data that caused them? If the exception are (as the word suggests) not the default case, they could all be collected by the driver and then printed out to the console for debugging.

// define an accumulable collection for exceptions
val accumulable = sc.accumulableCollection(mutable.HashSet[(Any, Throwable)]())
 
val transformed = source.flatMap(e => {
  val fe = Try{myCustomFunction(e)}
  val trial = fe match {
    case Failure(t) =>
      // push to an accumulable collection 
      // both the input data and the throwable
      accumulable += (e, t)
      fe
    case t: Try[U] => t
  }
  trial.toOption
})
 
// call at least one action on 'transformed' (eg. count)
transformed.count
// at the end of the process, print the exceptions
accumulable.value.foreach{case (i, e) => {
  println(s"--- Exception on input: ($i)")
  // using org.apache.commons.lang3.exception.ExceptionUtils
  println(ExceptionUtils.getStackTrace(e))
}}

http://stackoverflow.com/questions/30343588/spark-how-to-handle-error-case-in-rdd-map-method-correctly
I want to extract some fields from the text and convert them into CSV format like:
<value1>,<value5>,<valuek>,<valuen>
The following code is how I do this:
val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz")
val records = lines.map { line =>
    val mp = line.split("&")
                 .map(_.split("="))
                 .filter(_.length >= 2)
                 .map(t => (t(0), t(1))).toMap

    (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn"))
}
I would like to know that, if some line of the input text is of wrong format or invalid, then the map()function cannot return a valid value. This should very common in text processing, what is the best practice to deal with this problem?
in order to manage this errors you can use the scala's class Try within a flatMap operation, in code:
    val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz")
    val records = lines.flatMap (line =>
        Try{
          val mp = line.split("&")
              .map(_.split("="))
              .filter(_.length >= 2)
              .map(t => (t(0), t(1))).toMap

          (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn"))
      } match {
        case Success(map) => Seq(map)
        case _ => Seq()
    })
With this you have only the "good ones" but if you want both (the errors and the good ones) i would recommend to use a map function that returns a Scala Either and then use a Spark filter, in code:
    val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz")
    val goodBadRecords = lines.map (line =>
        Try{
          val mp = line.split("&")
              .map(_.split("="))
              .filter(_.length >= 2)
              .map(t => (t(0), t(1))).toMap

          (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn"))
      } match {
        case Success(map) => Right(map)
        case Failure(e) => Left(e)
    })
    val records = goodBadRecords.filter(_.isRight)
    val errors = goodBadRecords.filter(_.isLeft)


  val out1 = in.map(a => Try(a.toInt))
  val results = out1.filter(_.isSuccess).map(_.get)

You can use a combination of Try and map/filter.
Try will wrap your computation into Success, if they behave as expected, or Failure, if an exception is thrown. Then you can filter what you want - in this case the successful computations, but you could also filter the error cases for logging purposes.
I recommend you to use filter/map
rdd.filter(r=>NumberUtils.isNumber(r)).map(r=> r.toInt)
or flatmap
exampleRDD.flatMap(r=> {if (NumberUtils.isNumber(r)) Some(r.toInt) else  None})
Otherwise you can catch exception in map function
myRDD.map(r => { try{
        r.toInt
    }catch {
        case runtime: RuntimeException => {
        -1
        }
    }
})
and then apply filter(on -1)

How to deal with bad data
http://spark.apache.org/docs/latest/configuration.html
spark.task.maxFailures4Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1.
http://discourse.snowplowanalytics.com/t/debugging-bad-rows-in-spark-and-zeppelin-tutorial/400
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dealing_with_bad_data.html

http://dataottam.com/2015/11/16/top-3-methods-of-skipping-big-datas-bad-data-using-hadoop/

http://conferences.oreilly.com/strata/big-data-conference-ca-2015/public/schedule/detail/38391
ReduceByKey vs. GroupByKey
ReduceByKey should be preferred over GroupByKey, as ReduceByKey automatically combines data before shuffling, therefore minimizing the amount of data transferred over the network compared to GroupByKey.
Execution in the Driver vs. Executor
Traditional Map-Reduce requires writing a controller main class, a map class, and a reduce class. Spark allows you to write one simple program for all those pieces, but that makes it less clear in the API where code is executed.

  • Understanding the Shuffle in Spark
    • Common cause of inefficiency.
  • Understanding when code runs on the driver vs. the workers.
    • Common cause of errors.
  • How to factor your code:
    • For reuse between batch & streaming.
    • For easy testing.

With ReduceByKey, data is combined so each partition outputs at most one value for each key to send over the network.

With GroupByKey, all the data is wastefully sent over the network and collected on the reduce workers.

Prefer ReduceByKey over GroupByKey
But reduceByKey is more efficient.
In fact, groupByKey can cause of out of disk problems.

Caveat: Not all problems that can be solved by groupByKey can be calculated with reduceByKey.
ReduceByKey requires combining all your values into another value with the exact same type.

reduceByKey, aggregateByKey, foldByKey, and combineByKey,  preferred over groupByKey

Join a Large Table with a Small Table
join_rdd = sqlContext.sql(“select *
  FROM people_in_the_us
  JOIN states
  ON people_in_the_us.state = states.name”)

print join_rdd.toDebugString()
ShuffledHashJoin?
BroadcastHashJoin?

ShuffledHashJoin
All the data for the US will be shuffled into only 50 keys for each of the states.
Problems:
Uneven Sharding
Limited parallelism w/ 50 output partitions

BroadcastHashJoin
Solution: Broadcast the Small RDD to all worker nodes.
Parallelism of the large RDD is maintained (n output partitions), and shuffle is not even needed.

How to Configure BroadcastHashJoin
For Spark 1.2:
Set spark.sql.autoBroadcastJoinThreshold.
sqlContext.sql(“ANALYZE TABLE state_info COMPUTE STATISTICS noscan”)
Use .toDebugString() or EXPLAIN to double check.

Join a Medium Table with a Huge Table
join_rdd = sqlContext.sql(“select *
  FROM people_in_california
  LEFT JOIN all_the_people_in_the_world
  ON people_in_california.id =
     all_the_people_in_the_world.id”)


Final output keys = keys people_in_california, so this don’t need a huge Spark cluster, right?
Left Join - Shuffle Step
Shuffles everything before dropping keys
Not a Problem:
Even Sharding
Good Parallelism
The Size of the Spark Cluster to run this job is limited by the Large table rather than the Medium Sized Table.

What’s a Better Solution?
Filter the World World RDD for only entries that match the CA ID
Benefits:
Less Data shuffled over the network and less shuffle space needed.
More transforms, but still faster.

You should understand your data and it’s unique properties in order to best optimize your Spark Job.
In Practice: Detecting Shuffle Problems
Things to Look for:
Tasks that take much longer to run than others.
Speculative tasks that are launching.
Shards that have a lot more input or shuffle output than others.

Execution on the Driver vs. Workers
The main program are executed on the Spark Driver.
Transformations are executed on the Spark Workers.
Actions may transfer data from the Workers to the Driver.

What happens when calling collect()
collect() sends all the partitions to the single driver
collect() on a large RDD can trigger a OOM error

Don’t call collect() on a large RDD
Be cautious with all actions that may return unbounded output.
Option 1:  Choose actions that return a bounded output per partition, such as count() or take(N) .
Option 2:  Choose actions that outputs directly from the workers such as saveAsTextFile().

Commonly Serialization Errors
Hadoop Writables
Map to/from a serializable form
Capturing a full Non-Serializable object
Copy the required serializable parts locally
Network Connections
Create the connection on the worker

Serialization Error
myNonSerializable = …
output = sparkContext
   .textFile(“hdfs://…”)
   .map(lambda l: myNonSerializable.value + l)
   .take(n)
print output  …
Spark will try to send myNonSerializable from the Driver to the Worker node by serializing it, and error.

RDDs within RDDs - not even once
Only the driver can perform operations on RDDs

map+get:
rdd.map{(key, value) => otherRdd.get(key)...}
can normally be replaced with a join:
rdd.join(otherRdd).map{}
map+map:
rdd.map{e => otherRdd.map{ … }}
is normally an attempt at a cartesian:
rdd.cartesian(otherRdd).map()

Writing a Large RDD to a Database
Option 1: DIY
Initialize the Database Connection on the Worker rather than the Driver
Network sockets are non-serializable
Use foreachPartition
Re-use the connection between elements

Option 2: DBOutputFormat
Database must speak JDBC
Extend DBWritable and save with saveAsHadoopDataset
data.forEachPartition{records => {
  // Create the connection on the executor
  val connection = new HappyDatabase(...)
  records.foreach{record =>
    connection.//implementation specific
    }
  }
}

DBOutputFormat
case class CatRec(name: String, age: Int) extends DBWritable {
  override def write(s: PreparedStatement ) {
    s.setString(1, name); s.setInt(2, age)
}}
val tableName = "table"
val fields = Array("name", "age")
val job = new JobConf()
DBConfiguration.configureDB(job,  "com.mysql.jdbc.Driver", "..")
DBOutputFormat.setOutput(job, tableName, fields:_*)
records.saveAsHadoopDataset(job)

Reuse Code on Batch & Streaming

val ips = logs.transform(extractIp)

def extractIp(
  logs: RDD[String]) = {
   logs.map(_.split(“ “)(0))
}

val ips = extractIp(logs)
Use transform on a DStream to reuse your RDD to RDD functions from your batch Spark jobs.
Use foreachRDD on a DStream to reuse your RDD output
functions from your batch Spark jobs.

Testing Spark Programs
This one is kind of cheating, but the point is that if we take our anonymous functions and instead put them in an object we can just use standard Scala/Java testing.

trait SSC extends BeforeAndAfterAll { self: Suite =>

  @transient private var _sc: SparkContext = _
  def sc: SparkContext = _sc

  var conf = new SparkConf(false)

  override def beforeAll() {
    _sc = new SparkContext("local[4]", "test", conf)
    super.beforeAll()
  }

  override def afterAll() {
    LocalSparkContext.stop(_sc)
    _sc = null
    super.afterAll()
  }
}
Or just include http://spark-packages.org/package/holdenk/spark-testing-base

Sometimes we want to test that our Spark transformations make sense in addition to the functions we supply to them. For this we need to test with RDDs. Thankfully Spark’s local mode gives us a good way to get started. Looking at Spark’s own test cases can give us an idea of how to get started with writing these tests. To get started we need a Spark Context, so lets look at how Spark’s testing handles this.
https://trongkhoanguyenblog.wordpress.com/2015/03/28/understand-the-scheduler-component-in-spark-core/
http://stackoverflow.com/questions/24696777/what-is-the-relationship-between-workers-worker-instances-and-executors
Your first question depends on what you mean by 'instances'. A node is a machine, and there's not a good reason to run more than one worker per machine. So two worker nodes typically means two machines, each a Spark worker.
Workers hold many executors, for many applications. One application has executors on many workers.
the use of multiple worker instances is only relevant in standalone mode.

http://sonra.io/2015/06/03/multiple-spark-worker-instances-on-a-single-node-why-more-of-less-is-more-than-less/
If you are running Spark in standalone mode on memory rich nodes it can be beneficial to have multiple worker instances on the same node as a very large heap size has two disadvantages:
– Garbage collector pauses can hurt throughput of Spark jobs.
– Heap size of >32 GB can’t use CompressedOoops. So 35 GB is actually less than 32 GB.
https://www.sigmoid.com/apache-spark-internals/
  • Job: A piece of code which reads some input from HDFS or local, performs some computation on the data and writes some output data.
  • Stages: Jobs are divided into stages. Stages are classified as a Map or reduce stages (Its easier to understand if you have worked on Hadoop and want to correlate). Stages are divided based on computational boundaries, all computations (operators) cannot be Updated in a single Stage. It happens over many stages.
  • Tasks: Each stage has some tasks, one task per partition. One task is executed on one partition of data on one executor (machine).
  • DAG: DAG stands for Directed Acyclic Graph, in the present context its a DAG of operators.
  • Executor: The process responsible for executing a task.
  • Driver: The program/process responsible for running the Job over the Spark Engine
  • Master: The machine on which the Driver program runs
  • Slave: The machine on which the Executor program runs
  1. The first layer is the interpreter, Spark uses a Scala interpreter, with some modifications.
  2. As you enter your code in spark console (creating RDD’s and applying operators), Spark creates a operator graph.
  3. When the user runs an action (like collect), the Graph is submitted to a DAG Scheduler. The DAG scheduler divides operator graph into (map and reduce) stages.
  4. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together to optimize the graph. For e.g. Many map operators can be scheduled in a single stage. This optimization is key to Sparks performance. The final result of a DAG scheduler is a set of stages.
  5. The stages are passed on to the Task Scheduler. The task scheduler launches tasks via cluster manager. (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t know about dependencies among stages.
  6. The Worker executes the tasks. A new JVM is started per job. The worker knows only about the code that is passed to it.
How Apache Spark works
Spark caches the data to be processed, allowing it to me 100 times faster than Hadoop. Spark uses Akka for Multithreading, managing executor state, scheduling tasks.
It uses Jetty to share files (Jars and other files), Http Broadcast, run Spark Web UI. Spark is highly configurable, and is capable of utilizing the existing components already existing in the Hadoop Eco-System. This has allowed Spark to grow exponentially, and in a little time many organisations are already using it in production.
http://stackoverflow.com/questions/29995938/when-specifying-localn1-n2-n3-for-spark-master-what-are-the-three-parameters
The master specification is parsed in SparkContext.createTaskScheduler. (See the link for the implementation.) The possibilities with local are:
  • local uses 1 thread.
  • local[N] uses N threads.
  • local[*] uses as many threads as there are cores.
  • local[N, M] and local[*, M] are like above, but set the maximal task failures to M. This allows you to enable retries when running locally. (Normally local retries are disabled. Enabling them is useful for testing.)
  • local-cluster[numSlaves, coresPerSlave, memoryPerSlave] starts executors in separate processes as configured, but it does not require running workers and masters. It's a lightweight way to simulate a cluster in unit tests. (See also SPARK-595.)
https://issues.apache.org/jira/browse/SPARK-595
it could be useful for users who run Spark on a single machine with a large amount of memory (say 512 gigabytes, for example)

This is really useful for running tests and simulating conditions that cannot be simulated in the local[n] node, namely serialization issues and multi-JVM issues.
Databricks Spark Reference Applications

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
If you see this error:org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...

The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet:NotSerializable notSerializable = new NotSerializable(); JavaRDD<String> rdd = sc.textFile("/tmp/myfile"); rdd.map(s -> notSerializable.doSomething(s)).collect();


This will trigger that error. Here are some ideas to fix this error:
Serializable the class
Declare the instance only within the lambda function passed in map.
Make the NotSerializable object as a static and create it once per machine.
Call rdd.forEachPartition and create the NotSerializable object in there like this:rdd.forEachPartition(iter -> { NotSerializable notSerializable = new NotSerializable(); // ...Now process iter });
'

The best means of checking whether a task ran locally is to inspect a given stage in the Spark UI. Notice from the screenshot below that the "Locality Level" column displays which locality a given task ran with.

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time - so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so the job can still proceed, but should still be avoided - when Spark needs to spill to disk, performance is severely impacted.
Here are more functions to prefer over groupByKey:

combineByKey can be used when you are combining elements but your return type differs from your input value type.
foldByKey merges the values for each key using an associative function and a neutral "zero value".

val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)
  .collect()

val wordCountsWithGroup = wordPairsRDD
  .groupByKey()
  .map(t => (t._1, t._2.sum))
  .collect()
how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.

To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time - so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so the job can still proceed, but should still be avoided - when Spark needs to spill to disk, performance is severely impacted.
When dealing with vast amounts of data, a common problem is that a small amount of the data is malformed or corrupt. Using a filter transformation, you can easily discard bad inputs, or use a maptransformation if it's possible to fix the bad input. Or perhaps the best option is to use a flatMapfunction where you can try fixing the input but fall back to discarding the input if you can't.
def try_correct_json(json_string):
  try:
    # First check if the json is okay.
    json.loads(json_string)
    return [json_string]
  except ValueError:
    try:
      # If not, try correcting it by adding a ending brace.
      try_to_correct_json = json_string + "}"
      json.loads(try_to_correct_json)
      return [try_to_correct_json]
    except ValueError:
      # The malformed json input can't be recovered, drop this input.
      return []
Now, we can apply that function to fix our input and try again. This time we will succeed to read in three inputs:
corrected_input_rdd = input_rdd.flatMap(try_correct_json)
sqlContext.jsonRDD(corrected_input_rdd).registerTempTable("valueTable")
sqlContext.sql("select * from valueTable").collect()
# Returns [Row(value=1), Row(value=2), Row(value=3)]

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html
If your RDD is so large that all of it's elements won't fit in memory on the drive machine, don't do this:

val values = myVeryLargeRDD.collect()
Collect will attempt to copy every single element in the RDD onto the single driver program, and then run out of memory and crash.

Instead, you can make sure the number of elements you return is capped by calling take or takeSample, or perhaps filtering or sampling your RDD.

Similarly, be cautious of these other actions as well unless you are sure your dataset size is small enough to fit in memory:

countByKey
countByValue
collectAsMap
If you really do need every one of these values of the RDD and the data is too big to fit into memory, you can write out the RDD to files or export the RDD to a database that is large enough to hold all the data.
By default, maven does not include dependency jars when it builds a target. When running a Spark job, if the Spark worker machines don't contain the dependency jars - there will be an error that a class cannot be found.
The easiest way to work around this is to create a shaded or uber jar to package the dependencies in the jar as well.
It is possible to opt out certain dependencies from being included in the uber jar by marking them as <scope>provided</scope>. Spark dependencies should be marked as provided since they are already on the Spark cluster. You may also exclude other jars that you have installed on your worker machines.
http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou
Not to get into too many details, but when you run different transformations on a RDD (mapflatMapfilter and others), your transformation code (closure) is:
  1. serialized on the driver node,
  2. shipped to the appropriate nodes in the cluster,
  3. deserialized,
  4. and finally executed on the nodes
You can of course run this locally (as in your example), but all those phases (apart from shipping over network) still occur. [This lets you catch any bugs even before deploying to production]
What happens in your second case is that you are calling a method, defined in class testing from inside the map function. Spark sees that and since methods cannot be serialized on their own, Spark tries to serialize the whole testing class, so that the code will still work when executed in another JVM. You have two possibilities:
Either you make class testing serializable, so the whole class can be serialized by Spark:
or you make someFunc function instead of a method (functions are objects in Scala), so that Spark will be able to serialize it.
EDIT (2015-03-15): SPARK-5307 introduced SerializationDebugger and Spark 1.3.0 is the first version to use it. It adds serialization path to a NotSerializableException. When a NotSerializableException is encountered, the debugger visits the object graph to find the path towards the object that cannot be serialized, and constructs information to help user to find the object.

http://docs.spring.io/spring-hadoop/docs/current/reference/html/springandhadoop-spark.html
http://stackoverflow.com/questions/30053449/use-spring-together-with-spark
I'm developing a Spark Application and I'm used to Spring as a Dependency Injection Framework. Now I'm stuck with the problem, that the processing part uses the @Autowired functionality of Spring, but it is serialized and deserialized by Spark.
http://stackoverflow.com/questions/24637312/spark-driver-in-apache-spark
A Spark driver is the process that creates and owns an instance of SparkContext. It is your Spark application that launches the main method in which the instance of SparkContext is created. It is the cockpit of jobs and tasks execution (using DAGScheduler and Task Scheduler). It hosts Web UI for the environment
It splits a Spark application into tasks and schedules them to run on executors. A driver is where the task scheduler lives and spawns tasks across workers. A driver coordinates workers and overall execution of tasks.

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts