http://spark.apache.org/docs/latest/configuration.html
increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.;
http://m.blog.csdn.net/article/details?id=52492854
16/09/10 09:41:27 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
.config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFJavaWorkspace_Spark200/Spark200Demo/spark-warehouse")
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-caching.html
https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark
http://stackoverflow.com/questions/28845172/passing-functions-to-spark-what-is-the-risk-of-referencing-the-whole-object
https://blog.knoldus.com/2015/10/21/demystifying-asynchronous-actions-in-spark/
High Performance Spark: Best practices for scaling and optimizing Apache Spark
SparkContext vs SparkSession
Prior to Spark 2.0.0, the three main connection objects were SparkContext, SqlContext, and HiveContext. The SparkContext object was the connection to a Spark execution environment and created RDDs and others, SQLContext worked with SparkSQL in the background of SparkContext, and HiveContext interacted with the Hive stores.
Spark 2.0.0 introduced Datasets/DataFrames as the main distributed data abstraction interface and the SparkSession object as the entry point to a Spark execution environment.
the underlying structures are still RRDs. Also, to interact with RDDs, we still need a SparkContext object and we can get one from the SparkSession object.
val sparkSession = new SparkSession.builder.master(master_path).appName("application name").config("optional configuration parameters").getOrCreate()
Get the SparkContext object from SparkSession for things such as accumulators, distributing cache files, and working with RDD.
sc.getConf.toDebugString
PairRDDFunctions, OrderedRDDFunctions and GroupedRDDFunctions
NewHadoopRDD, ShuffledRDD, JavaRDD
Wide vs. Narrow Dependencies
map, filter, mapPartitions and flatMap, coalesce
the set of parent partitions can be determined regardless of the values of the data in the partitions, the transformation qualifies as narrow.
Transformations with wide dependencies include, sort, reduceByKey, groupByKey, join, and anything that calls for repartition.
data is partitioned according to its value.
Join is a bit more complicated, since it can have wide or narrow dependencies depending on how the two parent RDDs are partitioned.
A Spark application consists of a driver process, which is where the high-level Spark logic is written, and a series of executor processes that can be scattered across the nodes of a cluster. The Spark program itself runs in the driver node and parts are sent to the executors.
static allocation and dynamic allocation. With static allocation, each application is allotted a finite maximum of resources on the cluster and reserves them for the duration of the application (as long as the Spark Context is still running). Within the static allocation category, there are many kinds of resource allocation available, depending on the cluster.
In dynamic allocation, executors are added and removed from a Spark application as needed, based on a set of heuristics for estimated resource requirement.
A Spark application corresponds to a set of Spark jobs defined by one Spark Context in the driver program. A Spark application begins when a Spark Context is started. When the Spark Context is started, each worker node starts an executor (its own Java Virtual Machine, JVM).
what happens when we start a Spark Context. First The driver program pings the cluster manager. The cluster manager launches a number of Spark executors, JVMs on the worker nodes of the cluster. One node can have multiple Spark executors, but an executor cannot span multiple nodes.
By default, Spark queues jobs in a first in, first out basis. However, Spark does offer a fair scheduler, which assigns tasks to concurrent jobs in round-robin fashion, i.e. parceling out a few tasks for each job until the jobs are all complete. The fair scheduler ensures that jobs get a more even share of cluster resources.
With each action, the Spark scheduler builds an execution graph and launches a Spark job. Each job consists of stages, which are steps in the transformation of the data needed to materialize the final RDD. Each stage consists of a collection of tasks that represent each parallel computation and are performed on the executors.
Each job may contain several stages which correspond to each wide transformation. Each stage is composed of one or many tasks which correspond to a parallelizable unit of computation done in each stage. There is one task for each partition in the resulting RDD of that stage.
DAGScheduler
Directed Acyclic Graph (a DAG)
Each Spark job corresponds to one action
a job is defined by calling an action
wide transformations define the breakdown of jobs into stages.
Each stage corresponds to a ShuffleDependency created by a wide transformation in the Spark program. At a high level, one stage can be thought of as the set of computations (tasks) that can each be computed on one executor without communication with other executors or with the driver
Because the stage boundaries require communication with the driver, the stages associated with one job generally have to be executed in sequence rather than in parallel.
A stage consists of tasks
increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.;
spark.driver.memory | 1g | Amount of memory to use for the driver process, i.e. where SparkContext is initialized. (e.g. 1g , 2g ).Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file. |
spark.executor.memory | 1g | Amount of memory to use per executor process (e.g. 2g , 8g ). |
16/09/10 09:41:27 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
设置-Xms256m -Xmx1024m 解决
16/09/10 10:00:04 INFO SharedState: Warehouse path is 'file:G:\IMFBigDataSpark2016\IMFJavaWorkspace_Spark200\Spark200Demo/spark-warehouse'.
Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:G:/IMFBigDataSpark2016/IMFJavaWorkspace_Spark200/Spark200Demo/spark-warehouse
at org.apache.hadoop.fs.Path.initialize(Path.java:206)
Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:G:/IMFBigDataSpark2016/IMFJavaWorkspace_Spark200/Spark200Demo/spark-warehouse
at org.apache.hadoop.fs.Path.initialize(Path.java:206)
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-caching.html
The difference between
cache
and persist
operations is purely syntactic. cache
is a synonym of persist
or persist(MEMORY_ONLY)
, i.e. cache
is merely persist
with the default storage level MEMORY_ONLY
.
Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:
- Anonymous function syntax, which can be used for short pieces of code.
- Static methods in a global singleton object. For example, you can define
object MyFunctions
and then passMyFunctions.func1
, as follows:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
Here, if we create a new
MyClass
instance and call doStuff
on it, the map
inside there references the func1
method of that MyClass
instance, so the whole object needs to be sent to the cluster. It is similar to writing rdd.map(x => this.func1(x))
.
In a similar way, accessing fields of the outer object will reference the whole object:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
is equivalent to writing
rdd.map(x => this.field + x)
, which references all of this
. To avoid this issue, the simplest way is to copy field
into a local variable instead of accessing it externally:def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
http://stackoverflow.com/questions/28845172/passing-functions-to-spark-what-is-the-risk-of-referencing-the-whole-object
https://blog.knoldus.com/2015/10/21/demystifying-asynchronous-actions-in-spark/
So for above question answer is simple apache spark also provide a asyn action for concurrent execution of jobs, Few Asynchronous actions spark provide as follows
collectAsync() -> Returns a future for retrieving all elements of this RDD.
countAsync() -> Returns a future for counting the number of elements in the RDD.
foreachAsync(scala.Function1<T,scala.runtime.BoxedUnit> f) -> Applies a function f to all elements of this RDD.
foreachPartitionAsync(scala.Function1<scala.collection.Iterator,scala.runtime.BoxedUnit> f) ->
Applies a function f to each partition of this RDD.
takeAsync(int num) -> Returns a future for retrieving the first num elements of the RDD.
countAsync() -> Returns a future for counting the number of elements in the RDD.
foreachAsync(scala.Function1<T,scala.runtime.BoxedUnit> f) -> Applies a function f to all elements of this RDD.
foreachPartitionAsync(scala.Function1<scala.collection.Iterator,scala.runtime.BoxedUnit> f) ->
Applies a function f to each partition of this RDD.
takeAsync(int num) -> Returns a future for retrieving the first num elements of the RDD.
val
rdd
=
sc.parallelize(List(
32
,
34
,
2
,
3
,
4
,
54
,
3
),
4
)
rdd.collectAsync().map{ x
=
> x.map{x
=
> println(
"Items in the list:"
+x)} }
val
rddCount
=
sc.parallelize(List(
434
,
3
,
2
,
43
,
45
,
3
,
2
),
4
)
rddCount.countAsync().map { x
=
>println(
"Number of items in the list: "
+x) }
By default spark scheduler run spark jobs in FIFO (First In First Out) fashion. In FIFO scheduler the priority is given to the first job and then second and so on. If the jobs is not using whole cluster then second job is also run parallel but if first job is too big then second job will wait soo long even it take too less to execute. So for solution spark provide fair scheduler, fair scheduler jobs will execute in “round robin” fashion.
To configure job scheduler we need to set configuration for it as follows
val conf = new SparkConf().setAppName("spark_auth") .setMaster("local[*]").set("spark.scheduler.mode", "FAIR")
After configure FAIR scheduling you can see both the jobs are running concurrently and share resources of the spark cluster.
http://stackoverflow.com/questions/37421746/how-to-make-two-spark-rdd-run-parallel
I have found similar behavior. Running RDDs either in Serial or parallel doesn't make any difference due to the number of executors, executor cores you set in your spark-submit.
Let's say we have 2 RDDs as you mentioned above. Let's say each RDD takes 1 hr with 1 executor and 1 core each. We cannot increase the performance with 1 executor and 1 core (Spark config), even if spark runs both RDDs in parallel unless you increase the executors and cores.
So, Running two RDDs in parallel is not going to increase the performance.
High Performance Spark: Best practices for scaling and optimizing Apache Spark
SparkContext vs SparkSession
Prior to Spark 2.0.0, the three main connection objects were SparkContext, SqlContext, and HiveContext. The SparkContext object was the connection to a Spark execution environment and created RDDs and others, SQLContext worked with SparkSQL in the background of SparkContext, and HiveContext interacted with the Hive stores.
Spark 2.0.0 introduced Datasets/DataFrames as the main distributed data abstraction interface and the SparkSession object as the entry point to a Spark execution environment.
the underlying structures are still RRDs. Also, to interact with RDDs, we still need a SparkContext object and we can get one from the SparkSession object.
val sparkSession = new SparkSession.builder.master(master_path).appName("application name").config("optional configuration parameters").getOrCreate()
Get the SparkContext object from SparkSession for things such as accumulators, distributing cache files, and working with RDD.
sc.getConf.toDebugString
PairRDDFunctions, OrderedRDDFunctions and GroupedRDDFunctions
NewHadoopRDD, ShuffledRDD, JavaRDD
Wide vs. Narrow Dependencies
map, filter, mapPartitions and flatMap, coalesce
the set of parent partitions can be determined regardless of the values of the data in the partitions, the transformation qualifies as narrow.
Transformations with wide dependencies include, sort, reduceByKey, groupByKey, join, and anything that calls for repartition.
data is partitioned according to its value.
Join is a bit more complicated, since it can have wide or narrow dependencies depending on how the two parent RDDs are partitioned.
A Spark application consists of a driver process, which is where the high-level Spark logic is written, and a series of executor processes that can be scattered across the nodes of a cluster. The Spark program itself runs in the driver node and parts are sent to the executors.
static allocation and dynamic allocation. With static allocation, each application is allotted a finite maximum of resources on the cluster and reserves them for the duration of the application (as long as the Spark Context is still running). Within the static allocation category, there are many kinds of resource allocation available, depending on the cluster.
In dynamic allocation, executors are added and removed from a Spark application as needed, based on a set of heuristics for estimated resource requirement.
A Spark application corresponds to a set of Spark jobs defined by one Spark Context in the driver program. A Spark application begins when a Spark Context is started. When the Spark Context is started, each worker node starts an executor (its own Java Virtual Machine, JVM).
what happens when we start a Spark Context. First The driver program pings the cluster manager. The cluster manager launches a number of Spark executors, JVMs on the worker nodes of the cluster. One node can have multiple Spark executors, but an executor cannot span multiple nodes.
By default, Spark queues jobs in a first in, first out basis. However, Spark does offer a fair scheduler, which assigns tasks to concurrent jobs in round-robin fashion, i.e. parceling out a few tasks for each job until the jobs are all complete. The fair scheduler ensures that jobs get a more even share of cluster resources.
With each action, the Spark scheduler builds an execution graph and launches a Spark job. Each job consists of stages, which are steps in the transformation of the data needed to materialize the final RDD. Each stage consists of a collection of tasks that represent each parallel computation and are performed on the executors.
Each job may contain several stages which correspond to each wide transformation. Each stage is composed of one or many tasks which correspond to a parallelizable unit of computation done in each stage. There is one task for each partition in the resulting RDD of that stage.
DAGScheduler
Directed Acyclic Graph (a DAG)
Each Spark job corresponds to one action
a job is defined by calling an action
wide transformations define the breakdown of jobs into stages.
Each stage corresponds to a ShuffleDependency created by a wide transformation in the Spark program. At a high level, one stage can be thought of as the set of computations (tasks) that can each be computed on one executor without communication with other executors or with the driver
Because the stage boundaries require communication with the driver, the stages associated with one job generally have to be executed in sequence rather than in parallel.
A stage consists of tasks