In earlier versions of spark, spark context was entry point for Spark. As RDD was main API, it was created and manipulated using context API’s. For every other API,we needed to use different contexts.For streaming, we needed StreamingContext, for SQL sqlContext and for hive HiveContext. But as DataSet and Dataframe API’s are becoming new standard API’s we need an entry point build for them. So in Spark 2.0, we have a new entry point for DataSet and Dataframe API’s called as Spark Session.
SparkSession is essentially combination of SQLContext, HiveContext and future StreamingContext. All the API’s available on those contexts are available on spark session also. Spark session internally has a spark context for actual computation.
SparkSession follows builder factory design pattern. The below is the code to create a spark session.
The above is similar to creating an SparkContext with local and creating an SQLContext wrapping it. If you need to create, hive context you can use below code to create spark session with hive support.
http://blog.madhukaraphatak.com/introduction-to-spark-two-part-2/ A Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each dataset also has an untyped view called a DataFrame, which is a Dataset of Row.
” RDD represents an immutable,partitioned collection of elements that can be operated on in parallel “
The major difference is, dataset is collection of domain specific objects where as RDD is collection of any object. Domain object part of definition signifies the schema part of dataset. So dataset API is always strongly typed and optimized using schema where RDD is not.
Dataframe is special dataset where there is no compilation checks for schema. So this makes dataSet new single abstraction replacing RDD from earlier versions of spark.
The as[String] part of code assigns the needed schema for dataset.
Here data will be of the type of DataSet[String]. Remember to import sparkSession.implicits._ for all schema conversion magic.
Once we have grouped, we can count each word using count method. It’s similar to reduceByKey of RDD.
valcounts=groupedWords.count()
val words = data.flatMap(value => value.split("\\s+"))
val groupedWords = words.groupByKey(_.toLowerCase)
val counts = groupedWords.count()
counts.show()
valdsToRDD=ds.rdd
Dataset
Converting a RDD to dataframe is little bit work as we need to specify the schema. Here we are showing how to convert RDD[String] to DataFrame[String].
http://vishnuviswanath.com/spark_session.html SparkSession is new entry point from Spark 2.0. Prior to 2.0, we had only SparkContext and SQLContext, and also we would create StreamingContext (if using streaming). It looks like SparkSession is part of the Spark’s plan of unifying the APIs from Spark 2.0.
getOrCreate method of SparkSession builder does the following:
Create a SparkConf
Get a SparkContext (using SparkContext.getOrCreate(sparkConf))
Get a SparkSession (using SQLContext.getOrCreate(sparkContext).sparkSession)
Once spark session is created, it can be used to read data from various sources.
The latter option is also useful for reading JSON messages from Kafka with Spark Streaming.
If you are just playing around with DataFrames you can use show method to print DataFrame to console: df.show
Simply running sqlContext.read.json(events) will not load data, since DataFrames are evaluated lazily. But it will trigger schema inference, spark will go over RDD to determine schema that fits the data: df.printSchema
It is possible to provide schema explicitly to avoid that extra scan:
val schema = (new StructType).add("action", StringType).add("timestamp", TimestampType)
val df = sqlContext.read.schema(schema).json(events)
df.show Look at 2nd row in the result set, as you may see, there is no conversion from string to integer. But here is one more big problem, if you try to set type for which parser doesn’t has conversion, it won’t simply discard value and set that field to null, instead it will consider entire row as incorrect, and set all fields to nulls. The good news is that you can read all values as strings.
If you can’t be sure in a quality of you data, the best option is to explicitly provide schema forcing StringType for all untrusted fields to avoid extra RDD scan, and then cast those columns to desired type:
See the code of org.apache.hadoop.mapred.FileInputFormat.getSplits().
Block size depends on S3 file system implementation (see FileStatus.getBlockSize()). E.g. S3AFileStatus just set it equals to 0 (and then FileInputFormat.computeSplitSize() comes into play).
Also, you don't get splits if your InputFormat is not splittable :)
Spark will treat S3 as if it were a block-based filesystem, so partitioning rules for HDFS and S3 inputs are the same: by default you will get one partition per one block. It is worth inspecting number of created partitions yourself:
val inputRDD = sc.textFile("s3a://...")
println(inputRDD.partitions.length)
The Dataset API, released as an API preview in Spark 1.6, aims to provide the best of both worlds; the familiar object-oriented programming style and compile-time type-safety of the RDD API but with the performance benefits of the Catalyst query optimizer. Datasets also use the same efficient off-heap storage mechanism as the DataFrame API.
When it comes to serializing data, the Dataset API has the concept of encoders which translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders which are very advanced in that they generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object. Spark does not yet provide an API for implementing custom encoders, but that is planned for a future release.
Additionally, the Dataset API is designed to work equally well with both Java and Scala. When working with Java objects, it is important that they are fully bean-compliant. In writing the examples to accompany this article, we ran into errors when trying to create a Dataset in Java from a list of Java objects that were not fully bean-compliant.
DataFrame is simply a type alias of Dataset[Row] . These operations are also referred as “untyped transformations” in contrast to “typed transformations” that come with strongly typed Scala/Java Datasets.
The conversion from Dataset[Row] to Dataset[Person] is very simple in spark
DataFrame result = sQLContext.sql("SELECT * FROM peoples WHERE name='test'");
At this point, Spark converts your data into DataFrame = Dataset[Row], a collection of generic Row object, since it does not know the exact type.
// Create an Encoders for Java beansEncoder<Person> personEncoder =Encoders.bean(Person.class);Dataset<Person> personDF = result.as(personEncoder);
personDF.show();
Now, Spark converts the Dataset[Row] -> Dataset[Person] type-specific Scala / Java JVM object, as dictated by the class Person.
Send objects from a Spark (Streaming or DataFrames) into Solr.
Read the results from a Solr query as a Spark RDD or DataFrame.
Stream documents from Solr using /export handler (only works for exporting fields that have docValues enabled).
Read large result sets from Solr using cursors or with /export handler.
Data locality. If Spark workers and Solr processes are co-located on the same nodes, the partitions are placed on the nodes where the replicas are located.
Spark’s ability to leverage lazy evaluation within memory computations make it particularly unique
Spark currently supports three kinds of cluster managers: the manager included in Spark, called the Standalone Cluster Manager, which requires Spark to be installed in each node of a cluster, Apache Mesos; and Hadoop YARN.
Resilient Distributed Datasets (RDDs) are a representation of lazily evaluated statically typed distributed collections.
Driver: master node
Spark represents large datasets as RDDs, immutable distributed collections of objects, which are stored in the executors or (slave nodes). The objects that comprise RDDs are called partitions.
The Spark cluster manager handles starting and distributing the Spark executors across a distributed system according to the configuration parameters set by the Spark application. The Spark execution engine itself distributes data across the executors for a computation
Spark evaluates RDDs lazily, computing RDD transformations only when the final RDD data needs to be computed
RDDs are immutable, so transforming an RDD returns a new RDD rather than the existing one.
An action is a Spark operation which returns something other than an RDD
Actions trigger the scheduler, which builds a directed acyclic graph (called the DAG), based on the dependencies between RDD transformations.
Lazy evaluation allows Spark to chain together operations that don’t require communication with the driver (called transformations with one-to-one dependencies) to avoid doing multiple passes through the data.
LAZY EVALUATION & FAULT TOLERANCE
each partition of the data contains the dependency information needed to re-calculate the partition.
since the RDD itself contains all the dependency information needed to replicate each of its partitions. Thus, if a partition is lost, the RDD has enough information about its lineage to recompute it, and that computation can be parallelized to make recovery faster.
Spark’s biggest performance advantage over MapReduce is in use cases involving repeated computations
Spark offers three options for memory management: in memory deserialized data, in memory as serialized data, and on disk
The persist() function in the RDD class lets the user control how the RDD is stored.
Spark defines an RDD interface with the properties that each type of RDD must implement. These properties include the RDD’s dependencies and information about data locality that are needed for the execution engine to compute that RDD. Since RDDs are statically typed and immutable, calling a transformation on one RDD will not modify the original RDD but rather return a new RDD object with a new definition of the RDD’s properties.
The Spark Context represents the connection to a Spark cluster and one running Spark application.
dependencies() Returns a sequence of dependency objects. The dependencies let the scheduler know how this RDD depends on other RDDs. There are two kinds of dependencies: Narrow Dependencies (NarrowDependency objects), which represent partitions that depend on one or a small subset of partitions in the parent, and Wide Dependencies (ShuffleDependency objects), which are used when a partition can only be computed by rearranging all the data in the parent.
PairRDDFunctions, OrderedRDDFunctions and GroupedRDDFunctions. The additional methods in these classes are made available by implicit conversion from the abstract RDD class, based on type information or when a transformation is applied to an RDD.
a semi-structured data type, called DataFrames and a typed version called Dataset
Spark currently supports three kinds of cluster managers: the manager included in Spark, called the Standalone Cluster Manager, which requires Spark to be installed in each node of a cluster, Apache Mesos; and Hadoop YARN.
Resilient Distributed Datasets (RDDs). RDDs are a representation of lazily evaluated statically typed distributed collections.
An action is a Spark operation which returns something other than an RDD.
Lazy evaluation allows Spark to chain together operations that don’t require communication with the driver (called transformations with one-to-one dependencies) to avoid doing multiple passes through the data.
Spark is fault-tolerant, because each partition of the data contains the dependency information needed to re-calculate the partition. Distributed systems, based on mutable objects and strict evaluation paradigms, provide fault tolerance by logging updates or duplicating data across machines. In contrast, Spark does not need to maintain a log of updates to each RDD or log the actual intermediary steps, since the RDD itself contains all the dependency information needed to replicate each of its partitions. Thus, if a partition is lost, the RDD has enough information about its lineage to recompute it, and that computation can be parallelized to make recovery faster.
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# sc 是一个已存的SparkContext
sqlContext = SQLContext(sc)
# 读取
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)
# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")
# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")
# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
print(name)
public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10)); JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);// Split each line into words JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String x) {return Arrays.asList(x.split(" ")); } });// Count each word in each batch JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) {returnnew Tuple2<String, Integer>(s, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) {return i1 + i2; } });// Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print(); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate }}