Tuesday, October 4, 2016

Spark Misc Part 3



http://spark.apache.org/docs/latest/configuration.html
increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.;
spark.driver.memory1gAmount of memory to use for the driver process, i.e. where SparkContext is initialized. (e.g. 1g2g).
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.
spark.executor.memory1gAmount of memory to use per executor process (e.g. 2g8g).
http://m.blog.csdn.net/article/details?id=52492854
16/09/10 09:41:27 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
设置-Xms256m -Xmx1024m 解决

16/09/10 10:00:04 INFO SharedState: Warehouse path is 'file:G:\IMFBigDataSpark2016\IMFJavaWorkspace_Spark200\Spark200Demo/spark-warehouse'.
Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:G:/IMFBigDataSpark2016/IMFJavaWorkspace_Spark200/Spark200Demo/spark-warehouse
 at org.apache.hadoop.fs.Path.initialize(Path.java:206)

.config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFJavaWorkspace_Spark200/Spark200Demo/spark-warehouse")
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-caching.html
RDDs can be cached using cache operation. They can also be persisted using persist operation.

The difference between cache and persist operations is purely syntactic. cache is a synonym of persist or persist(MEMORY_ONLY), i.e. cache is merely persist with the default storage level MEMORY_ONLY.
https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark
Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:
  • Anonymous function syntax, which can be used for short pieces of code.
  • Static methods in a global singleton object. For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)
Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:
class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
Here, if we create a new MyClass instance and call doStuff on it, the map inside there references the func1 method of that MyClass instance, so the whole object needs to be sent to the cluster. It is similar to writing rdd.map(x => this.func1(x)).
In a similar way, accessing fields of the outer object will reference the whole object:
class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
is equivalent to writing rdd.map(x => this.field + x), which references all of this. To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally:
def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

http://stackoverflow.com/questions/28845172/passing-functions-to-spark-what-is-the-risk-of-referencing-the-whole-object



https://blog.knoldus.com/2015/10/21/demystifying-asynchronous-actions-in-spark/
So for above question answer is simple apache spark also provide a asyn action for concurrent execution of jobs, Few Asynchronous actions spark provide as follows
collectAsync() -> Returns a future for retrieving all elements of this RDD.
countAsync() -> Returns a future for counting the number of elements in the RDD.
foreachAsync(scala.Function1<T,scala.runtime.BoxedUnit> f) -> Applies a function f to all elements of this RDD.
foreachPartitionAsync(scala.Function1<scala.collection.Iterator,scala.runtime.BoxedUnit> f) ->
Applies a function f to each partition of this RDD.
takeAsync(int num) -> Returns a future for retrieving the first num elements of the RDD.
val rdd = sc.parallelize(List(32, 34, 2, 3, 4, 54, 3), 4)
rdd.collectAsync().map{ x => x.map{x=> println("Items in the list:"+x)} }
val rddCount = sc.parallelize(List(434, 3, 2, 43, 45, 3, 2), 4)
rddCount.countAsync().map { x =>println("Number of items in the list: "+x) }

By default spark scheduler run spark jobs in FIFO (First In First Out) fashion. In FIFO scheduler the priority is given to the first job and then second and so on. If the jobs is not using whole cluster then second job is also run parallel but if first job is too big then second job will wait soo long even it take too less to execute. So for solution spark provide fair scheduler, fair scheduler jobs will execute in “round robin” fashion.
To configure job scheduler we need to set configuration for it as follows
val conf = new SparkConf().setAppName("spark_auth")
.setMaster("local[*]").set("spark.scheduler.mode", "FAIR")
After configure FAIR scheduling you can see both the jobs are running concurrently and share resources of the spark cluster.
http://stackoverflow.com/questions/37421746/how-to-make-two-spark-rdd-run-parallel
I have found similar behavior. Running RDDs either in Serial or parallel doesn't make any difference due to the number of executors, executor cores you set in your spark-submit.
Let's say we have 2 RDDs as you mentioned above. Let's say each RDD takes 1 hr with 1 executor and 1 core each. We cannot increase the performance with 1 executor and 1 core (Spark config), even if spark runs both RDDs in parallel unless you increase the executors and cores.
So, Running two RDDs in parallel is not going to increase the performance.

High Performance Spark: Best practices for scaling and optimizing Apache Spark
SparkContext vs SparkSession

Prior to Spark 2.0.0, the three main connection objects were SparkContext, SqlContext, and HiveContext. The SparkContext object was the connection to a Spark execution environment and created RDDs and others, SQLContext worked with SparkSQL in the background of SparkContext, and HiveContext interacted with the Hive stores.

Spark 2.0.0 introduced Datasets/DataFrames as the main distributed data abstraction interface and the SparkSession object as the entry point to a Spark execution environment.

the underlying structures are still RRDs. Also, to interact with RDDs, we still need a SparkContext object and we can get one from the SparkSession object.

val sparkSession = new SparkSession.builder.master(master_path).appName("application name").config("optional configuration parameters").getOrCreate()

Get the SparkContext object from SparkSession for things such as accumulators, distributing cache files, and working with RDD.

sc.getConf.toDebugString

PairRDDFunctions, OrderedRDDFunctions and GroupedRDDFunctions
NewHadoopRDD, ShuffledRDD, JavaRDD

Wide vs. Narrow Dependencies
map, filter, mapPartitions and flatMap, coalesce
the set of parent partitions can be determined regardless of the values of the data in the partitions, the transformation qualifies as narrow.

Transformations with wide dependencies include, sort, reduceByKey, groupByKey, join, and anything that calls for repartition.
data is partitioned according to its value.

Join is a bit more complicated, since it can have wide or narrow dependencies depending on how the two parent RDDs are partitioned.

A Spark application consists of a driver process, which is where the high-level Spark logic is written, and a series of executor processes that can be scattered across the nodes of a cluster. The Spark program itself runs in the driver node and parts are sent to the executors.

static allocation and dynamic allocation. With static allocation, each application is allotted a finite maximum of resources on the cluster and reserves them for the duration of the application (as long as the Spark Context is still running). Within the static allocation category, there are many kinds of resource allocation available, depending on the cluster.

In dynamic allocation, executors are added and removed from a Spark application as needed, based on a set of heuristics for estimated resource requirement.

A Spark application corresponds to a set of Spark jobs defined by one Spark Context in the driver program. A Spark application begins when a Spark Context is started. When the Spark Context is started, each worker node starts an executor (its own Java Virtual Machine, JVM).

what happens when we start a Spark Context. First The driver program pings the cluster manager. The cluster manager launches a number of Spark executors, JVMs on the worker nodes of the cluster. One node can have multiple Spark executors, but an executor cannot span multiple nodes.

By default, Spark queues jobs in a first in, first out basis. However, Spark does offer a fair scheduler, which assigns tasks to concurrent jobs in round-robin fashion, i.e. parceling out a few tasks for each job until the jobs are all complete. The fair scheduler ensures that jobs get a more even share of cluster resources.

With each action, the Spark scheduler builds an execution graph and launches a Spark job. Each job consists of stages, which are steps in the transformation of the data needed to materialize the final RDD. Each stage consists of a collection of tasks that represent each parallel computation and are performed on the executors.

Each job may contain several stages which correspond to each wide transformation. Each stage is composed of one or many tasks which correspond to a parallelizable unit of computation done in each stage. There is one task for each partition in the resulting RDD of that stage.

DAGScheduler
Directed Acyclic Graph (a DAG)

Each Spark job corresponds to one action
a job is defined by calling an action
wide transformations define the breakdown of jobs into stages.

Each stage corresponds to a ShuffleDependency created by a wide transformation in the Spark program. At a high level, one stage can be thought of as the set of computations (tasks) that can each be computed on one executor without communication with other executors or with the driver

Because the stage boundaries require communication with the driver, the stages associated with one job generally have to be executed in sequence rather than in parallel.

A stage consists of tasks

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts