Wednesday, December 23, 2015

Spark Misc



Encoders
http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset-in-spark-1-6
  1. Using generic encoders.
    There are two generic encoders available for now kryo and javaSerialization where the latter one is explicitly described as:
    extremely inefficient and should only be used as the last resort.
    Assuming following class
    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    you can use these encoders by adding implicit encoder:
    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    which can be used together as follows:
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    It stores objects as binary column so when converted to DataFrame you get following schema:
    root
     |-- value: binary (nullable = true)
    It is also possible to encode tuples using kryo encoder for specific field:
    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    Please note that we don't depend on implicit encoders here but pass encoder explicitly so this most likely won't work with toDS method.
  2. Using implicit conversions:
    Provide implicit conversions between representation which can be encoded and custom class, for example:
    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
logging
https://github.com/phatak-dev/spark2.0-examples/blob/master/src/main/scala/com/madhukaraphatak/examples/sparktwo/TimeWindowExample.scala
sparkSession.sparkContext.setLogLevel("ERROR")


http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
1) Added import org.apache.log4j.{Level, Logger} in import section
2) Added following line after creation of spark context object i.e. after val sc = new SparkContext(conf):
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
Spark Session
http://blog.madhukaraphatak.com/introduction-to-spark-two-part-1/
https://github.com/phatak-dev/spark2.0-examples/tree/master/src/main/scala/com/madhukaraphatak/examples/sparktwo
In earlier versions of spark, spark context was entry point for Spark. As RDD was main API, it was created and manipulated using context API’s. For every other API,we needed to use different contexts.For streaming, we needed StreamingContext, for SQL sqlContext and for hive HiveContext. But as DataSet and Dataframe API’s are becoming new standard API’s we need an entry point build for them. So in Spark 2.0, we have a new entry point for DataSet and Dataframe API’s called as Spark Session.
SparkSession is essentially combination of SQLContext, HiveContext and future StreamingContext. All the API’s available on those contexts are available on spark session also. Spark session internally has a spark context for actual computation.
SparkSession follows builder factory design pattern. The below is the code to create a spark session.
val sparkSession = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()
The above is similar to creating an SparkContext with local and creating an SQLContext wrapping it. If you need to create, hive context you can use below code to create spark session with hive support.
val sparkSession = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .enableHiveSupport()
      .getOrCreate()
enableHiveSupport on factory enables hive support which is similiar to HiveContext.
val df = sparkSession.read.option("header","true").
    csv("src/main/resources/sales.csv")
http://blog.madhukaraphatak.com/introduction-to-spark-two-part-2/
Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each dataset also has an untyped view called a DataFrame, which is a Dataset of Row.

” RDD represents an immutable,partitioned collection of elements that can be operated on in parallel “
The major difference is, dataset is collection of domain specific objects where as RDD is collection of any object. Domain object part of definition signifies the schema part of dataset. So dataset API is always strongly typed and optimized using schema where RDD is not.
Dataframe is special dataset where there is no compilation checks for schema. So this makes dataSet new single abstraction replacing RDD from earlier versions of spark.
The as[String] part of code assigns the needed schema for dataset.
import sparkSession.implicits._
val data = sparkSession.read.text("src/main/resources/data.txt").as[String]
Here data will be of the type of DataSet[String]. Remember to import sparkSession.implicits._ for all schema conversion magic.
Once we have grouped, we can count each word using count method. It’s similar to reduceByKey of RDD.
val counts = groupedWords.count()

val words = data.flatMap(value => value.split("\\s+"))

val groupedWords = words.groupByKey(_.toLowerCase)
val counts = groupedWords.count()
counts.show()
val dsToRDD = ds.rdd

Dataset

Converting a RDD to dataframe is little bit work as we need to specify the schema. Here we are showing how to convert RDD[String] to DataFrame[String].
val rddStringToRowRDD = rdd.map(value => Row(value))
val dfschema = StructType(Array(StructField("value",StringType)))
val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
val rDDToDataSet = rddToDF.as[String]
https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
  1. spark.conf.set("spark.sql.shuffle.partitions", 6)
  2. spark.conf.set("spark.executor.memory", "2g")
  3. //get all settings
  4. val configMap:Map[String, String] = spark.conf.getAll()

  1. val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
  2. val spark = SparkSession
  3. .builder()
  4. .appName("SparkSessionZipsExample")
  5. .config("spark.sql.warehouse.dir", warehouseLocation)
  6. .enableHiveSupport()
  7. .getOrCreate()
http://vishnuviswanath.com/spark_session.html
SparkSession is new entry point from Spark 2.0. Prior to 2.0, we had only SparkContext and SQLContext, and also we would create StreamingContext (if using streaming). It looks like SparkSession is part of the Spark’s plan of unifying the APIs from Spark 2.0. 

getOrCreate method of SparkSession builder does the following:
  1. Create a SparkConf
  2. Get a SparkContext (using SparkContext.getOrCreate(sparkConf))
  3. Get a SparkSession (using SQLContext.getOrCreate(sparkContext).sparkSession)
Once spark session is created, it can be used to read data from various sources.




http://blog.antlypls.com/blog/2016/01/30/processing-json-data-with-sparksql/




val df = sqlContext.read.json("s3a://some-bucket/some-file.json")

Please note Spark expects each line to be a separate JSON object, so it will fail if you’ll try to load a pretty formatted JSON file.
Also you read JSON data from RDD[String] object like:




1
2
3
4
5
6
// construct RDD[Sting]
val events = sc.parallelize(
  """{"action":"create","timestamp":"2016-01-07T00:01:17Z"}""" :: Nil)

// read it
val df = sqlContext.read.json(events)

The latter option is also useful for reading JSON messages from Kafka with Spark Streaming.
If you are just playing around with DataFrames you can use show method to print DataFrame to console: df.show
Simply running sqlContext.read.json(events) will not load data, since DataFrames are evaluated lazily. But it will trigger schema inference, spark will go over RDD to determine schema that fits the data: df.printSchema










It is possible to provide schema explicitly to avoid that extra scan:
val schema = (new StructType).add("action", StringType).add("timestamp", TimestampType)
val df = sqlContext.read.schema(schema).json(events)
df.show
Look at 2nd row in the result set, as you may see, there is no conversion from string to integer. But here is one more big problem, if you try to set type for which parser doesn’t has conversion, it won’t simply discard value and set that field to null, instead it will consider entire row as incorrect, and set all fields to nulls. The good news is that you can read all values as strings.











If you can’t be sure in a quality of you data, the best option is to explicitly provide schema forcing StringType for all untrusted fields to avoid extra RDD scan, and then cast those columns to desired type:
https://issues.apache.org/jira/browse/SPARK-12744
The semantics of reading JSON integer as timestamp (explicitly defined by schema) has been changed, the integer value is treated as number of seconds instead of milliseconds
https://github.com/apache/spark/blob/master/core/src/test/java/org/apache/spark/JavaAPISuite.java
https://github.com/apache/spark/blob/master/mllib/src/test/java/org/apache/spark/ml/feature/JavaBucketizerSuite.java
StructType schema = new StructType(new StructField[]{
  new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});

http://stackoverflow.com/questions/37509932/write-spark-dataframe-as-csv-with-partitions
Spark 2.0.0+:
Built-in csv format supports partitioning out of the box so you should be able to simply use:
df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)
without including any additional packages.
http://stackoverflow.com/questions/37168716/how-many-partitions-does-spark-create-when-a-file-is-loaded-from-s3-bucket
How many partitions does Spark create when a file is loaded from S3 bucket?
If the file is loaded from HDFS by default spark creates one partition per block.

See the code of org.apache.hadoop.mapred.FileInputFormat.getSplits().
Block size depends on S3 file system implementation (see FileStatus.getBlockSize()). E.g. S3AFileStatus just set it equals to 0 (and then FileInputFormat.computeSplitSize() comes into play).
Also, you don't get splits if your InputFormat is not splittable :)
Spark will treat S3 as if it were a block-based filesystem, so partitioning rules for HDFS and S3 inputs are the same: by default you will get one partition per one block. It is worth inspecting number of created partitions yourself:
val inputRDD = sc.textFile("s3a://...")
println(inputRDD.partitions.length)
http://www.agildata.com/apache-spark-rdd-vs-dataframe-vs-dataset/
The Dataset API, released as an API preview in Spark 1.6, aims to provide the best of both worlds; the familiar object-oriented programming style and compile-time type-safety of the RDD API but with the performance benefits of the Catalyst query optimizer. Datasets also use the same efficient off-heap storage mechanism as the DataFrame API.
When it comes to serializing data, the Dataset API has the concept of encoders which translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders which are very advanced in that they generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object. Spark does not yet provide an API for implementing custom encoders, but that is planned for a future release.
Additionally, the Dataset API is designed to work equally well with both Java and Scala. When working with Java objects, it is important that they are fully bean-compliant. In writing the examples to accompany this article, we ran into errors when trying to create a Dataset in Java from a list of Java objects that were not fully bean-compliant.

JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List data = JavaData.sampleData();
Dataset dataset = sqlContext.createDataset(data, Encoders.bean(JavaPerson.class));
http://stackoverflow.com/questions/34194019/convert-spark-dataframe-to-pojo-object
DataFrame is simply a type alias of Dataset[Row] . These operations are also referred as “untyped transformations” in contrast to “typed transformations” that come with strongly typed Scala/Java Datasets.
The conversion from Dataset[Row] to Dataset[Person] is very simple in spark
DataFrame result = sQLContext.sql("SELECT * FROM peoples WHERE name='test'");
At this point, Spark converts your data into DataFrame = Dataset[Row], a collection of generic Row object, since it does not know the exact type.
// Create an Encoders for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class); 
Dataset<Person> personDF = result.as(personEncoder);
personDF.show();
Now, Spark converts the Dataset[Row] -> Dataset[Person] type-specific Scala / Java JVM object, as dictated by the class Person.

https://github.com/lucidworks/spark-solr

via DataFrame

val options = Map(
  "collection" -> "{solr_collection_name}",
  "zkhost" -> "{zk_connect_string}"
)
val df = sqlContext.read.format("solr")
  .options(options)
  .load


  • Send objects from a Spark (Streaming or DataFrames) into Solr.
  • Read the results from a Solr query as a Spark RDD or DataFrame.
  • Stream documents from Solr using /export handler (only works for exporting fields that have docValues enabled).
  • Read large result sets from Solr using cursors or with /export handler.
  • Data locality. If Spark workers and Solr processes are co-located on the same nodes, the partitions are placed on the nodes where the replicas are located.



https://github.com/databricks/spark-csv
import org.apache.spark.sql.SQLContext

SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");
You can manually specify schema:
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(new StructField[] {
    new StructField("year", DataTypes.IntegerType, true, Metadata.empty()),
    new StructField("make", DataTypes.StringType, true, Metadata.empty()),
    new StructField("model", DataTypes.StringType, true, Metadata.empty()),
    new StructField("comment", DataTypes.StringType, true, Metadata.empty()),
    new StructField("blank", DataTypes.StringType, true, Metadata.empty())
});

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(customSchema)
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");
You can save with compressed output:
import org.apache.spark.sql.SQLContext

SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
    .save("newcars.csv");

Spark’s ability to leverage lazy evaluation within memory computations make it particularly unique
Spark currently supports three kinds of cluster managers: the manager included in Spark, called the Standalone Cluster Manager, which requires Spark to be installed in each node of a cluster, Apache Mesos; and Hadoop YARN.

Resilient Distributed Datasets (RDDs) are a representation of lazily evaluated statically typed distributed collections.
Driver: master node

Spark represents large datasets as RDDs, immutable distributed collections of objects, which are stored in the executors or (slave nodes). The objects that comprise RDDs are called partitions.

The Spark cluster manager handles starting and distributing the Spark executors across a distributed system according to the configuration parameters set by the Spark application. The Spark execution engine itself distributes data across the executors for a computation

Spark evaluates RDDs lazily, computing RDD transformations only when the final RDD data needs to be computed

RDDs are immutable, so transforming an RDD returns a new RDD rather than the existing one.

An action is a Spark operation which returns something other than an RDD
Actions trigger the scheduler, which builds a directed acyclic graph (called the DAG), based on the dependencies between RDD transformations.

Lazy evaluation allows Spark to chain together operations that don’t require communication with the driver (called transformations with one-to-one dependencies) to avoid doing multiple passes through the data.

LAZY EVALUATION & FAULT TOLERANCE
each partition of the data contains the dependency information needed to re-calculate the partition.
since the RDD itself contains all the dependency information needed to replicate each of its partitions. Thus, if a partition is lost, the RDD has enough information about its lineage to recompute it, and that computation can be parallelized to make recovery faster.

Spark’s biggest performance advantage over MapReduce is in use cases involving repeated computations
Spark offers three options for memory management: in memory deserialized data, in memory as serialized data, and on disk
The persist() function in the RDD class lets the user control how the RDD is stored.

Spark defines an RDD interface with the properties that each type of RDD must implement. These properties include the RDD’s dependencies and information about data locality that are needed for the execution engine to compute that RDD. Since RDDs are statically typed and immutable, calling a transformation on one RDD will not modify the original RDD but rather return a new RDD object with a new definition of the RDD’s properties.

The Spark Context represents the connection to a Spark cluster and one running Spark application.
dependencies() Returns a sequence of dependency objects. The dependencies let the scheduler know how this RDD depends on other RDDs. There are two kinds of dependencies: Narrow Dependencies (NarrowDependency objects), which represent partitions that depend on one or a small subset of partitions in the parent, and Wide Dependencies (ShuffleDependency objects), which are used when a partition can only be computed by rearranging all the data in the parent.

PairRDDFunctions, OrderedRDDFunctions and GroupedRDDFunctions. The additional methods in these classes are made available by implicit conversion from the abstract RDD class, based on type information or when a transformation is applied to an RDD.

a semi-structured data type, called DataFrames and a typed version called Dataset

Spark currently supports three kinds of cluster managers: the manager included in Spark, called the Standalone Cluster Manager, which requires Spark to be installed in each node of a cluster, Apache Mesos; and Hadoop YARN.

Resilient Distributed Datasets (RDDs). RDDs are a representation of lazily evaluated statically typed distributed collections.

An action is a Spark operation which returns something other than an RDD.
Lazy evaluation allows Spark to chain together operations that don’t require communication with the driver (called transformations with one-to-one dependencies) to avoid doing multiple passes through the data.

Spark is fault-tolerant, because each partition of the data contains the dependency information needed to re-calculate the partition. Distributed systems, based on mutable objects and strict evaluation paradigms, provide fault tolerance by logging updates or duplicating data across machines. In contrast, Spark does not need to maintain a log of updates to each RDD or log the actual intermediary steps, since the RDD itself contains all the dependency information needed to replicate each of its partitions. Thus, if a partition is lost, the RDD has enough information about its lineage to recompute it, and that computation can be parallelized to make recovery faster.

http://blog.csdn.net/Earl211/article/details/51597209
Spark SQL可以使用基础的SQL或者HiveQL执行SQL查询。Spark SQL也可以被用来从已存在的Hive数据库中读取数据。读取的数据库被返回为DataFrame。

DataFrames

如果用过R或者python的pandas库的话,对DataFrames就特别熟悉了。直观的角度,数据是存在类似excel表中。不理解的话可以百度一下R的DataFrame结构。

Datasets

Dataset是Spark 1,。6中新的一种接口,目前还在试验阶段,Dataset尝试提供类似于RDDS优点的数据存取,同时又兼顾SQL的引擎优化。一个Dataset可以从JVM对象中被构造,使用transformations对数据进行操作。

Getting Started

Starting Point:SQLContext

(下面的代码我全部使用python代码,首先我对python比较熟悉,再者python简洁好理解,可能工程上使用java的居多,但是目前阶段需要快速,优质的掌握SparkSQL的相关概念和理论。)
Spark中SparkSQL的入口点就是SQL
Context类,或者他的派生。当然在穿件一个基础的SQLContext之前,我们需要创建一个SparkContext。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
#这里的sc是创建的SparkContext
除了使用SQLContext,我们也可以使用HiveContext,HiveContext比基础的SQLContext提供更多的功能。这些功能暴多:使用HiveQL解析器查询,使用Hive UDFs和从Hive tables中读取数据的能力。比较麻烦的是HiveContext是独立的包,有很多麻烦的依赖,如果能够搞定这个的话,哪使用HiveContext就不成问题了。

DataFrame

from pyspark.sql import SQLContext sqlContext = SQLContext(sc) # 创建一个DataFrame df = sqlContext.read.json("examples/src/main/resources/people.json") # 显示 df.show() ## age name ## null Michael ## 30 Andy ## 19 Justin # 以树结构打印数据 df.printSchema() ## root ## |-- age: long (nullable = true) ## |-- name: string (nullable = true) # 只选择“name”这一列 df.select("name").show() ## name ## Michael ## Andy ## Justin # 选择每一个人,但是年龄加一显示出来 df.select(df['name'], df['age'] + 1).show() ## name (age + 1) ## Michael null ## Andy 31 ## Justin 20 # 选择年龄操作21岁的人 df.filter(df['age'] > 21).show() ## age name ## 30 Andy # 按年龄计数,这个就类似SQL中的select count(*) groupby age df.groupBy("age").count().show() ## age count ## null 1 ## 19 1 ## 30 1

程序自动指定Schema

当关键字参数字典不能提前被定义(例如,记录的结构是一个字符串或者文本数据集会被解析,不同用户会有不同的项目),一个DataFrame可以通过以下三步创建:
* 从原始的RDD中创建一个set或者是lists(在java中是raw)
* 通过使用StructType匹配创建schema
* 通过SQLContext类中创建DataFrame方法将schema转换为RDD
比如:
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc 是一个已存的SparkContext
sqlContext = SQLContext(sc)

# 读取
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)

# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")

# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
  print(name)
Spark streaming+kafka实战教程
Kafka是一个分布式的发布-订阅式的消息系统,简单来说就是一个消息队列,好处是数据是持久化到磁盘的(本文重点不是介绍kafka,就不多说了)。Kafka的使用场景还是比较多的,比如用作异步系统间的缓冲队列,另外,在很多场景下,我们都会如如下的设计:
将一些数据(比如日志)写入到kafka做持久化存储,然后另一个服务消费kafka中的数据,做业务级别的分析,然后将分析结果写入HBase或者HDFS

正因为这个设计很通用,所以像Storm这样的大数据流式处理框架已经支持与kafka的无缝连接。当然,作为后起之秀,Spark同样对kafka提供了原生的支持。
有日志数据源源不断地进入kafka,我们用一个spark streaming程序从kafka中消费日志数据,这些日志是一个字符串,然后将这些字符串用空格分割开,实时计算每一个单词出现的次数。
object KafkaSparkDemoMain {
    def main(args: Array[String]) {
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafka-spark-demo")
        val scc = new StreamingContext(sparkConf, Duration(5000))
        scc.checkpoint(".") // 因为使用到了updateStateByKey,所以必须要设置checkpoint
        val topics = Set("kafka-spark-demo") //我们需要消费的kafka数据的topic
        val kafkaParam = Map(
                "metadata.broker.list" -> "localhost:9091" // kafka的broker list地址
            )

        val stream: InputDStream[(String, String)] = createStream(scc, kafkaParam, topics)
        stream.map(_._2)      // 取出value
            .flatMap(_.split(" ")) // 将字符串使用空格分隔
            .map(r => (r, 1))      // 每个单词映射成一个pair
            .updateStateByKey[Int](updateFunc)  // 用当前batch的数据区更新已有的数据
            .print() // 打印前10个数据

        scc.start() // 真正启动程序
        scc.awaitTermination() //阻塞等待
    }

    val updateFunc = (currentValues: Seq[Int], preValue: Option[Int]) => {
        val curr = currentValues.sum
        val pre = preValue.getOrElse(0)
        Some(curr + pre)
    }

    /**
     * 创建一个从kafka获取数据的流.
     * @param scc           spark streaming上下文
     * @param kafkaParam    kafka相关配置
     * @param topics        需要消费的topic集合
     * @return
     */
    def createStream(scc: StreamingContext, kafkaParam: Map[String, String], topics: Set[String]) = {
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](scc, kafkaParam, topics)
    }
}

DirectStream和Stream的区别

从高层次的角度看,之前的和Kafka集成方案(reciever方法)使用WAL工作方式如下:
  1. 运行在Spark workers/executors上的Kafka Receivers连续不断地从Kafka中读取数据,其中用到了Kafka中高层次的消费者API。
  2. 接收到的数据被存储在Spark workers/executors中的内存,同时也被写入到WAL中。只有接收到的数据被持久化到log中,Kafka Receivers才会去更新Zookeeper中Kafka的偏移量。
  3. 接收到的数据和WAL存储位置信息被可靠地存储,如果期间出现故障,这些信息被用来从错误中恢复,并继续处理数据。
这个方法可以保证从Kafka接收的数据不被丢失。但是在失败的情况下,有些数据很有可能会被处理不止一次!这种情况在一些接收到的数据被可靠地保存到WAL中,但是还没有来得及更新Zookeeper中Kafka偏移量,系统出现故障的情况下发生。这导致数据出现不一致性:Spark Streaming知道数据被接收,但是Kafka那边认为数据还没有被接收,这样在系统恢复正常时,Kafka会再一次发送这些数据。
这种不一致产生的原因是因为两个系统无法对那些已经接收到的数据信息保存进行原子操作。为了解决这个问题,只需要一个系统来维护那些已经发送或接收的一致性视图,而且,这个系统需要拥有从失败中恢复的一切控制权利。基于这些考虑,社区决定将所有的消费偏移量信息只存储在Spark Streaming中,并且使用Kafka的低层次消费者API来从任意位置恢复数据
为了构建这个系统,新引入的Direct API采用完全不同于Receivers和WALs的处理方式。它不是启动一个Receivers来连续不断地从Kafka中接收数据并写入到WAL中,而且简单地给出每个batch区间需要读取的偏移量位置,最后,每个batch的Job被运行,那些对应偏移量的数据在Kafka中已经准备好了。这些偏移量信息也被可靠地存储(checkpoint),在从失败中恢复可以直接读取这些偏移量信息。
需要注意的是,Spark Streaming可以在失败以后重新从Kafka中读取并处理那些数据段。然而,由于仅处理一次的语义,最后重新处理的结果和没有失败处理的结果是一致的。
因此,Direct API消除了需要使用WAL和Receivers的情况,而且确保每个Kafka记录仅被接收一次并被高效地接收。这就使得我们可以将Spark Streaming和Kafka很好地整合在一起。总体来说,这些特性使得流处理管道拥有高容错性,高效性,而且很容易地被使用。
Spark-Streaming入门例子
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

        // Split each line into words
        JavaDStream<String> words = lines.flatMap(
                new FlatMapFunction<String, String>() {
                    public Iterable<String> call(String x) {
                        return Arrays.asList(x.split(" "));
                    }
                });

        // Count each word in each batch
        JavaPairDStream<String, Integer> pairs = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                });
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });

        // Print the first ten elements of each RDD generated in this DStream to the console
        wordCounts.print();

        jssc.start();              // Start the computation
        jssc.awaitTermination();   // Wait for the computation to terminate
    }
}
本例使用netcat工具模拟socket,向本机的9999端口输入数据,供spark程序消费。启动netcat服务,使用的端口是9999
nc -lp 9999
Storm与Spark streaming的比较

Apache storm

Storm是一个分布式的,可靠的,容错的数据流处理系统。它会把工作任务委托给不同类型的组件,每个组件负责处理一项简单特定的任务。Storm集群的输入流由一个被称作spout的组件管理,spout把数据传递给bolt, bolt要么把数据保存到某种存储器,要么把数据传递给其它的bolt。你可以想象一下,一个Storm集群就是在一连串的bolt之间转换spout传过来的数据。

Spark streamig

Spark是一个类似于MapReduce的分布式计算框架,其核心是弹性分布式数据集(RDD),提供了比MapReduce更丰富的模型,可以在快速在内存中对数据集进行多次迭代,以支持复杂的数据挖掘算法和图形计算算法。Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。

处理模型与延迟

Storm处理的是每次传入的一个事件(虽然Storm trident支持batch),Spark streaming处理的是一个事件窗口内的所有事件。单从处理模型就能知道,storm的延迟要比spark streaming稍好一点。在这个方面,storm略优。

容错

在Storm中,每一个消息从创建开始就会被跟踪(ACK服务负责),所以它能够保证一个消息一定会被处理,也能够保证严格只被处理一次(这可以使用storm trident的事务机制实现)。
在Spark streaming中,由于其数据模型RDD(弹性分布式数据集)设计得比较巧妙,每一个RDD都是只读的,并且都只能从外部数据源创建或者由其他RDD经过转换操作创建。此外每一个RDD都记录了它的生成方法,所以当某些RDD没有被处理或者丢失了,就可以根据这个RDD的生成方法重新计算一遍即可,比较简单高效。在这个方面spark streaming略优。

API易用性

Storm有Java版的API,使用还是比较简单方便的,也可以使用Storm trident API。Spark streamin是使用scala语言实现的,虽然也有Java版和python版的api,但是使用java编写一个spark程序要比使用scala写的代码多得多,主要是因为scala支持函数式编程,且代码表达能力比Java强大。在这个方面,见仁见智吧,我倒是觉得区别不大。

生态系统

在这个方面,我觉的spark streamin要更胜一筹。因为:
  1. Spark streamin是基于Spark的,Spark这一套系统能够做的事情太多了,比如批处理,机器学习,图形计算等等,每一个公司想必都会希望使用一套统一的架构去做很多事情吧。
  2. Spark streaming能够运行在YARN和Mesos上,而Storm只能运行在Mesos(据说借助第三方支出组件也能运行在YARN,但是毕竟不是原生支持嘛)

产品成熟度

Storm 2011年出来的,目前属于少年阶段吧;Spark 2013年出来的,目前属于小孩阶段,还是有很多的不足,稳定性当然也不一定能够保证!所以如果你对数据处理有非常严格的要求,还是使用storm吧,安全!(个人意见,勿喷,毕竟我刚接触大数据…)


Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts