Saturday, January 9, 2016

使用Spark+Cassandra打造高性能数据分析平台



http://www.csdn.net/article/2014-10-24/2822278-how-to-bulida-spark-and-cassandra-based-high-performance-data-pipeline
大家比较肯定的几款NoSQL数据库分别是HBase、MongoDB和Cassandra。
1.1 高可靠性
Cassandra采用gossip作为集群中结点的通信协议,该协议整个集群中的节点都处于同等地位,没有主从之分,这就使得任一节点的退出都不会导致整个集群失效。
Cassandra和HBase都是借鉴了Google BigTable的思想来构建自己的系统,但Cassandra另一重要的创新就是将原本存在于文件共享架构的p2p(peer to peer)引入了NoSQL。
P2P的一大特点就是去中心化,集群中的所有节点享有同等地位,这极大避免了单个节点退出而使整个集群不能工作的可能。
与之形成对比的是HBase采用了Master/Slave的方式,这就存在单点失效的可能。
1.2 高可扩性
随着时间的推移,集群中原有的规模不足以存储新增加的数据,此时进行系统扩容。Cassandra级联可扩,非常容易实现添加新的节点到已有集群,操作简单。
1.3 最终一致性
分布式存储系统都要面临CAP定律问题,任何一个分布式存储系统不可能同时满足一致性(consistency),可用性(availability)和分区容错性(partition tolerance)。
Cassandra是优先保证AP,即可用性和分区容错性。
Cassandra为写操作和读操作提供了不同级别的一致性选择,用户可以根据具体的应用场景来选择不同的一致性级别。
1.4 高效写操作
写入操作非常高效,这对于实时数据非常大的应用场景,Cassandra的这一特性无疑极具优势。
数据读取方面则要视情况而定:


  • 如果是单个读取即指定了键值,会很快的返回查询结果。
  • 如果是范围查询,由于查询的目标可能存储在多个节点上,这就需要对多个节点进行查询,所以返回速度会很慢
  • 读取全表数据,非常低效。
1.5 结构化存储
Cassandra是一个面向列的数据库,对那些从RDBMS方面转过来的开发人员来说,其学习曲线相对平缓。
Cassandra同时提供了较为友好CQL语言,与SQL语句相似度很高。
1.6 维护简单
从系统维护的角度来说,由于Cassandra的对等系统架构,使其维护操作简单易行。如添加节点,删除节点,甚至于添加新的数据中心,操作步骤都非常的简单明了。
2.1 单表查询
2.1.1 单表主键查询
在建立个人信息数据库的时候,以个人身份证id为主键,查询的时候也只以身份证为关键字进行查询,则表可以设计成为:
create table person (
 userid text primary key,
 fname text,
 lname text,
 age int,
 gender int);
Primary key中的第一个列名是作为Partition key。也就是说根据针对partition key的hash结果决定将记录存储在哪一个partition中,如果不湊巧的情况下单一主键导致所有的hash结果全部落在同一分区,则会导致该分区数据被撑满。
解决这一问题的办法是通过组合分区键(compsoite key)来使得数据尽可能的均匀分布到各个节点上。
举例来说,可能将(userid,fname)设置为复合主键。那么相应的表创建语句可以写成
create table person (
userid text,
fname text,
lname text,
gender int,
age int,
primary key((userid,fname),lname);
) with clustering order by (lname desc);
稍微解释一下primary key((userid, fname),lname)的含义:
  • 其中(userid,fname)称为组合分区键(composite partition key)
  • lname是聚集列(clustering column)
  • ((userid,fname),lname)一起称为复合主键(composite primary key)
2.1.2 单表非主键查询
如果要查询表person中具有相同的first name的人员,那么就必须针对fname创建相应的索引,否则查询速度会非常缓慢。
Create index on person(fname);
Cassandra目前只能对表中的某一列建立索引,不允许对多列建立联合索引。
2.2 多表关联查询
Cassandra并不支持关联查询,也不支持分组和聚合操作。
那是不是就说明Cassandra只是看上去很美其实根本无法解决实际问题呢?答案显然是No,只要你不坚持用RDBMS的思路来解决问题就是了。
比如我们有两张表,一张表(Departmentt)记录了公司部门信息,另一张表(employee)记录了公司员工信息。显然每一个员工必定有归属的部门,如果想知道每一个部门拥有的所有员工。如果是用RDBMS的话,SQL语句可以写成:
select * from employee e , department d where e.depId = d.depId;
要用Cassandra来达到同样的效果,就必须在employee表和department表之外,再创建一张额外的表(dept_empl)来记录每一个部门拥有的员工信息。
Create table dept_empl (
deptId text,????
看到这里想必你已经明白了,在Cassandra中通过数据冗余来实现高效的查询效果。将关联查询转换为单一的表操作。
2.3 分组和聚合
在RDBMS中常见的group by和max、min在Cassandra中是不存在的。
如果想将所有人员信息按照姓进行分组操作的话,那该如何创建数据模型呢?
Create table fname_person (
fname text,
userId text,
primary key(fname, userId)
);
Cassandra不支持子查询,下图展示了一个在MySQL中的子查询例子: 
要用Cassandra来实现,必须通过添加额外的表来存储冗余信息。 
Create table office_empl (
officeCode text,
country text,
lastname text,
firstname,
primary key(officeCode,country));
create index on office_empl(country);
总的来说,在建立Cassandra数据模型的时候,要求对数据的读取需求进可能的清晰,然后利用反范式的设计方式来实现快速的读取,原则就是以空间来换取时间。

数据分区

存储在Cassandra中的数据一般都会比较多,记录数在千万级别或上亿级别是常见的事。如何将这些表中的内容快速加载到本地内存就是一个非常现实的问题。
解决这一挑战的思路从大的方面来说是比较简单的,那就是将整张表中的内容分成不同的区域,然后分区加载,不同的分区可以在不同的线程或进程中加载,利用并行化来减少整体加载时间。
顺着这一思路出发,要问的问题就是Cassandra中的数据如何才能分成不同的区域。
不同于MySQL,在Cassandra中是不存在Sequence Id这样的类型的,也就是说无法简单的使用seqId来指定查询或加载的数据范围。
既然没有SequenceID,在Cassandra中是否就没有办法了呢?答案显然是否定的,如果只是仅仅支持串行读取,Cassandra早就会被扔进垃圾桶了。
数据分区在Cassandra中至少可以通过两种途径实现 ,一是通过token range,另一个是slice range。这里主要讲解利用token range来实现目的。
1. Token Range 
Cassandra将要存储的记录存储在不同的区域中,判断某一记录具体存储在哪个区域的依据是partition key的Hash值。 
在Cassandra 1.2之前,组成Cassandra集群的所有节点(Node),都需要手动指定该节点的Hash值范围也就是Token Range。
手工计算Token Range显然是很繁琐,同时也不怎么容易维护,在Cassandra 1.2之后,引进了虚拟节点(vnode)的概念,主要目的是减少不必要的人工指定,同时也将token range的划分变得更为细粒度。比如原先手工指定token range,只能达到10000这样一个精度,而有了vnode之后,默认安装是每一个物理节点上有256个虚拟节点,这样子的话每一个range的范围就是10000/256,这样变的更为精细。
有关token range的信息存储在cassandra的system命名空间(keyspace)下的local和peers两张表中。其中local表示本节点的token range情况,而peers表示集群中其它节点的token range情况。这两张表中的tokens字段就存储有详细的信息。如果集群中只由一台机器组成,那么peers中的就会什么内容都没有。
简单实验,列出本节点的token range:
use system;
desc table local;
select tokens from local;
2. Thrift接口 
Token Range告诉我们Cassandra的记录是分片存储的,也就意味着可以分片读取。现在的问题转换成为如何知道每一个Token Range的起止范围。
Cassandra支持的Thrift接口中describe_ring就是用来获取token range的具体起止范围的。我们常用的nodetool工具使用的就是thrift接口,nodetool 中有一个describering指令使用的就是describe_ring原语。
可以做一个简单的实验,利用nodetool来查看某个keyspace的token range具体情况。
nodetool -hcassandra_server_addr describering keyspacename
注意将cassandra_server和keyspacename换成实际的内容。

Spark-Cassandra-Connector

在第一节中讲解了Cassandra中Token Range信息的存储位置,以及可以使用哪些API来获取token range信息。
接下来就分析spark-cassandra-connector是如何以cassandra为数据源将数据加载进内存的。
以简单的查询语句为例,假设用户要从demo这个keyspace的tableX表中加载所有数据,用CQL来表述就是:
select * from demo.tableX
上述的查询使用spark-cassandra-connector来表述就是:
sc.cassandraTable(“demo”,”tableX”)
尽管上述语句没有触发Spark Job的提交,也就是说并不会将数据直正的从Cassandra的tableX表中加载进来,但spark-cassandra-connector还是需要进行一些数据库的操作。要解决的主要问题就是schema相关。
cassandraTable(“demo”,”tableX”)只是说要从tableX中加载数据,并没有告诉connector有哪些字段,每个字段的类型是什么。这些信息对后面使用诸如get[String](“fieldX”)来说却是非常关键的。
为了获取字段类型信息的元数据,需要读取system.schema_columns表,利用如下语句可以得到schema_columns表结构的详细信息:
desc table system.schema_columns
如果在conf/log4j.properties中将日志级别设置为DEBUG,然后再执行sc.cassandraTable语句就可以看到具体的CQL查询语句是什么。

数据分区

存储在Cassandra中的数据一般都会比较多,记录数在千万级别或上亿级别是常见的事。如何将这些表中的内容快速加载到本地内存就是一个非常现实的问题。
解决这一挑战的思路从大的方面来说是比较简单的,那就是将整张表中的内容分成不同的区域,然后分区加载,不同的分区可以在不同的线程或进程中加载,利用并行化来减少整体加载时间。
顺着这一思路出发,要问的问题就是Cassandra中的数据如何才能分成不同的区域。
不同于MySQL,在Cassandra中是不存在Sequence Id这样的类型的,也就是说无法简单的使用seqId来指定查询或加载的数据范围。
既然没有SequenceID,在Cassandra中是否就没有办法了呢?答案显然是否定的,如果只是仅仅支持串行读取,Cassandra早就会被扔进垃圾桶了。
数据分区在Cassandra中至少可以通过两种途径实现 ,一是通过token range,另一个是slice range。这里主要讲解利用token range来实现目的。
1. Token Range 
Cassandra将要存储的记录存储在不同的区域中,判断某一记录具体存储在哪个区域的依据是partition key的Hash值。 
在Cassandra 1.2之前,组成Cassandra集群的所有节点(Node),都需要手动指定该节点的Hash值范围也就是Token Range。
手工计算Token Range显然是很繁琐,同时也不怎么容易维护,在Cassandra 1.2之后,引进了虚拟节点(vnode)的概念,主要目的是减少不必要的人工指定,同时也将token range的划分变得更为细粒度。比如原先手工指定token range,只能达到10000这样一个精度,而有了vnode之后,默认安装是每一个物理节点上有256个虚拟节点,这样子的话每一个range的范围就是10000/256,这样变的更为精细。
有关token range的信息存储在cassandra的system命名空间(keyspace)下的local和peers两张表中。其中local表示本节点的token range情况,而peers表示集群中其它节点的token range情况。这两张表中的tokens字段就存储有详细的信息。如果集群中只由一台机器组成,那么peers中的就会什么内容都没有。
简单实验,列出本节点的token range:
use system;
desc table local;
select tokens from local;
2. Thrift接口 
Token Range告诉我们Cassandra的记录是分片存储的,也就意味着可以分片读取。现在的问题转换成为如何知道每一个Token Range的起止范围。
Cassandra支持的Thrift接口中describe_ring就是用来获取token range的具体起止范围的。我们常用的nodetool工具使用的就是thrift接口,nodetool 中有一个describering指令使用的就是describe_ring原语。
可以做一个简单的实验,利用nodetool来查看某个keyspace的token range具体情况。
nodetool -hcassandra_server_addr describering keyspacename
注意将cassandra_server和keyspacename换成实际的内容。

Spark-Cassandra-Connector

在第一节中讲解了Cassandra中Token Range信息的存储位置,以及可以使用哪些API来获取token range信息。
接下来就分析spark-cassandra-connector是如何以cassandra为数据源将数据加载进内存的。
以简单的查询语句为例,假设用户要从demo这个keyspace的tableX表中加载所有数据,用CQL来表述就是:
select * from demo.tableX
上述的查询使用spark-cassandra-connector来表述就是:
sc.cassandraTable(“demo”,”tableX”)
尽管上述语句没有触发Spark Job的提交,也就是说并不会将数据直正的从Cassandra的tableX表中加载进来,但spark-cassandra-connector还是需要进行一些数据库的操作。要解决的主要问题就是schema相关。
cassandraTable(“demo”,”tableX”)只是说要从tableX中加载数据,并没有告诉connector有哪些字段,每个字段的类型是什么。这些信息对后面使用诸如get[String](“fieldX”)来说却是非常关键的。
为了获取字段类型信息的元数据,需要读取system.schema_columns表,利用如下语句可以得到schema_columns表结构的详细信息:
desc table system.schema_columns
如果在conf/log4j.properties中将日志级别设置为DEBUG,然后再执行sc.cassandraTable语句就可以看到具体的CQL查询语句是什么。
1. CassandraRDDPartitioner 
Spark-cassandra-connector添加了一种新的RDD实现,即CassandraRDD。我们知道对于一个Spark RDD来说,非常关键的就是确定getPartitions和compute函数。
getPartitions函数会调用CassandraRDDPartitioner来获取分区数目:
override def getPartitions: Array[Partition] = {
    verify // let's fail fast
    val tf = TokenFactory.forCassandraPartitioner(cassandraPartitionerClassName)
    val partitions = new CassandraRDDPartitioner(connector, tableDef, splitSize)(tf).partitions(where)
    logDebug(s"Created total ${partitions.size} partitions for $keyspaceName.$tableName.")
    logTrace("Partitions: \n" + partitions.mkString("\n"))
    partitions
  }
CassandraRDDPartitioner中的partitions的处理逻辑大致如下: 
  1. 首先确定token range,使用describe_ring
  2. 然后根据Cassandra中使用的Partitioner来确定某一个token range中可能的记录条数,这么做的原因就是为进一步控制加载的数据,提高并发度。否则并发度就永远是256了,比如有一个物理节点,其中有256个vnodes,也就是256个token分区。如果每个分区中大致的记录数是20000,而每次加载最大只允许1000的话,整个数据就可以分成256x2=512个分区。
  3. 对describeRing返回的token range进一步拆分的话,需要使用splitter,splitter的构建需要根据keyspace中使用了何种Partitioner来决定,Cassandra中默认的Partitioner是Murmur3Partitioner,Murmur3Hash算法可以让Hash值更为均匀的分布到不同节点。
  4. splitter中会利用到配置项spark.cassandra.input.split.size和spark.cassandra.page.row.size,分别表示一个线程最多读取多少记录,另一个表示每次读取多少行。
partitions的源码详见CasssandraRDDParitioner.scala
compute函数就利用确定的token的起止范围来加载内容,这里在理解的时候需要引起注意的就是flatMap是惰性执行的,也就是说只有在真正需要值的时候才会被执行,延迟触发。
数据真正的加载是发生在fetchTokenRange函数,这时使用到的就是Cassandra Java Driver了,平淡无奇。
2. fetchTokenRange 
fetcchTokenRange函数使用Cassandra Java Driver提供的API接口来读取数据,利用Java API读取数据一般遵循以下步骤:
val cluster = ClusterBuilder.addContactPoint(“xx.xx.xx.xx”).build
val session = cluster.connect
val stmt = new SimpleStatement(queryCQL)
session.execute(session)
session.close
cluster.close
addContactPoint的参数是cassandra server的ip地址,在后面真正执行cql语句的时候,如果集群有多个节点构成,那么不同的cql就会在不同的节点上执行,自动实现了负载均衡。可以在addContactPoint的参数中设定多个节点的地址,这样可以防止某一节点挂掉,无法获取集群信息的情况发生。
session是线程安全的,在不同的线程使用同一个session是没有问题的,建议针对一个keySpace只使用一个session。
3. RDD中使用Session 
在Spark RDD中是无法使用SparkContext的,否则会形成RDD嵌套的现象,因为利用SparkContext很容易构造出RDD,如果在RDD的函数中如map中调用SparkContext创建一个新的RDD,则形成深度嵌套进而导致Spark Job有嵌套。
但在实际的情况下,我们需要根据RDD中的值再去对数据库进行操作,那么有什么办法来打开数据库连接呢?
解决的办法就是直接使用Cassandra Java Driver而不再使用spark-cassandra-connector的高级封装,因为不能像这样子来使用cassandraRDD。
sc.cassandraRDD(“ks”,”tableX”)
.map(x=>sc.cassandraRDD(“ks”,”tableX”).where(filter))
如果是直接使用Cassandra Java Driver,为了避免每个RDD中的iterator都需要打开一个session,那么可以使用foreachPartition函数来进行操作,减少打开的session数。
val  rdd1 = sc.cassandraTable(“keyspace”,”tableX”)
 rdd1.foreachPartition( lst => {
  val cluster = ClusterBuilder.addContactPoint(“xx.xx.xx.xx”).build
  val session = cluster.connect
  while ( iter.hasNext ) {
    val  elem = iter.next
   //do something by using session and elem
  }
  session.close
  cluster.close
 })
其实最好的办法是在外面建立一个session,然后在不同的partition中使用同一个session,但这种方法不行的原因是在执行的时候会需要”Task not Serializable”的错误,于是只有在foreachPartition函数内部新建session。

数据备份

尽管Cassandra号称可以做到宕机时间为零,但为了谨慎起见,还是需要对数据进行备份。
Cassandra提供了几种备份的方法
  1. 将数据导出成为json格式
  2. 利用copy将数据导出为csv格式
  3. 直接复制sstable文件
导出成为json或csv格式,当表中的记录非常多的时候,这显然不是一个好的选择。于是就只剩下备份sstable文件了。
问题是将sstable存储到哪里呢?放到HDFS当然没有问题,那有没有可能对放到HDFS上的sstable直接进行读取呢,在没有经过任务修改的情况下,这是不行的。
试想一下,sstable的文件会被拆分为多个块而存储到HDFS中,这样会破坏记录的完整性,HDFS在存储的时候并不知道某一block中包含有完成的记录信息。
为了做到记录信息不会被拆分到多个block中,需要根据sstable的格式自行提取信息,并将其存储到HDFS上。这样存储之后的文件就可以被并行访问。
Cassandra中提供了工具sstablesplit来将大的sstable分割成为小的文件。
DataStax的DSE企业版中提供了和Hadoop及Spark的紧密结合,其一个很大的基础就是先将sstable的内容存储到CFS中,大体的思路与刚才提及的应该差不多。
对sstable存储结构的分析是一个研究的热门,可以参考如下的链接。
之所以要研究备份策略是想将对数据的分析部分与业务部分相分离开,避免由于后台的数据分析导致Cassandra集群响应变得缓慢而致前台业务不可用,即将OLTP和OLAP的数据源分离开。
通过近乎实时的数据备份,后台OLAP就可以使用Spark来对数据进行分析和处理。

高级查询 Cassandra+Solr

与传统的RDBMS相比,Cassandra所能提供的查询功能实在是弱的可以,如果想到实现非常复杂的查询功能的,需要将Cassandra和Solr进行结合。
DSE企业版提供了该功能,如果想手工搭建的话,可以参考下面的链接:
  1. http://www.slideshare.net/planetcassandra/an-introduction-to-distributed-search-with-cassandra-and-solr 
  2. https://github.com/Stratio/stratio-cassandra开源方面的尝试 Cassandra和Lucene的结合

共享SparkContext

SparkContext可以被多个线程使用,这意味着同个Spark Application中的Job可以同时提交到Spark Cluster中,减少了整体的等待时间。
在同一个线程中, Spark只能逐个提交Job,当Job在执行的时候,Driver Application中的提交线程是处于等待状态的。如果Job A没有执行完,Job B就无法提交到集群,就更不要提分配资源真正执行了。
那么如何来减少等待时间呢,比如在读取Cassandra数据的过程中,需要从两个不同的表中读取数据,一种办法就是先读取完成表A与读取表B,总的耗时是两者之和。
如果利用共享SparkContext的技术,在不同的线程中去读取,则耗时只是两者之间的最大值。
在Scala中有多种不同的方式来实现多线程,现仅以Future为例来说明问题:
val ll  = (1 to 3 toList).map(x=>sc.makeRDD(1 to 100000 toList, 3))
val futures = ll.map ( x => Future {
  x.count()
 })
val fl = Future.sequencce(futures)
Await.result(fl,3600 seconds)
  1. 简要说明一下代码逻辑
  2. 创建三个不同的RDD
  3. 在不同的线程(Future)中通过count函数来提交Job
  4. 使用Await来等待Future执行结束

http://stackoverflow.com/questions/18168379/cassandra-choosing-a-partition-key
Indexing in the documentation you wrote up refers to secondary indexes. In cassandra there is adifference between the primary and secondary indexes. For a secondary index it would indeed be bad to have very unique values, however for the components in a primary key this depends on what component we are focusing on. In the primary key we have these components:
PRIMARY KEY(partitioning key, clustering key_1 ... clustering key_n)
The partitioning key is used to distribute data across different nodes, and if you want your nodes to be balanced (i.e. well distributed data across each node) then you want your partitioning key to be as random as possible. That is why the example you have uses UUIDs.
The clustering key is used for ordering so that querying columns with a particular clustering key can be more efficient. That is where you want your values to not be unique and where there would be a performance hit if unique rows were frequent.

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts