Friday, August 26, 2016

Couchbase



http://guide.couchdb.org/draft/conflicts.html
You now have a document in each of the databases that has different information. This situation is called a conflict. Conflicts occur in distributed systems. They are a natural state of your data. How does CouchDB’s replication system deal with conflicts?
When you replicate two databases in CouchDB and you have conflicting changes, CouchDB will detect this and will flag the affected document with the special attribute "_conflicts":true. Next, CouchDB determines which of the changes will be stored as the latest revision (remember, documents in CouchDB are versioned). The version that gets picked to be the latest revision is the winning revision. The losing revision gets stored as the previous revision.
CouchDB does not attempt to merge the conflicting revision. Your application dictates how the merging should be done. The choice of picking the winning revision is arbitrary. In the case of the phone number, there is no way for a computer to decide on the right revision. This is not specific to CouchDB; no other software can do this (ever had your phone’s sync-contacts tool ask you which contact from which source to take?).
Replication guarantees that conflicts are detected and that each instance of CouchDB makes the same choice regarding winners and losers, independent of all the other instances. There is no group decision made; instead, a deterministic algorithm determines the order of the conflicting revision. After replication, all instances taking part have the same data. The data set is said to be in a consistent state. If you ask any instance for a document, you will get the same answer regardless which one you ask.
Whether or not CouchDB picked the version that your application needs, you need to go and resolve the conflict, just as you need to resolve a conflict in a version control system like Subversion. Simply create a version that you want to be the latest by either picking the latest, or the previous, or both (by merging them) and save it as the now latest revision. Done. Replicate again and your resolution will populate over to all other instances of CouchDB. Your conflict resolving on one node could lead to further conflicts, all of which will need to be addressed, but eventually, you will end up with a conflict-free database on all nodes.
How Does CouchDB Decide Which Revision to Use?
CouchDB guarantees that each instance that sees the same conflict comes up with the same winning and losing revisions. It does so by running a deterministic algorithm to pick the winner. The application should not rely on the details of this algorithm and must always resolve conflicts. We’ll tell you how it works anyway.
Each revision includes a list of previous revisions. The revision with the longest revision history list becomes the winning revision. If they are the same, the _rev values are compared in ASCII sort order, and the highest wins. So, in our example, 2-de0ea16f8621cbac506d23a0fbbde08a beats 2-7c971bb974251ae8541b8fe045964219.
One advantage of this algorithm is that CouchDB nodes do not have to talk to each other to agree on winning revisions. We already learned that the network is prone to errors and avoiding it for conflict resolution makes CouchDB very robust.


http://developer.couchbase.com/documentation/server/current/introduction/intro.html
https://www.quora.com/What-is-the-difference-between-couchbase-and-mongodb
Couchbase (not to be confused with couchdb) and MongoDB are both document oriented databases.  They both have a document as their storage unit.

That is pretty much where the similarties stop.

Couchbase is a combination of couchdb + membase. It uses a strict HTTP protocol to query and interact with objects. Objects (documents) are stored in buckets.

To query documents in Couchbase, you define a view with the columns of the document you are interested in (called the map); and then optionally can define some aggregate functions over the data (the reduce step).

If you are storing customer data and want to query all customers that have not bought any goods for the past three months; you would first have to write a view (the map) that filters these customers; once this view is published - couchbase will optimize searches on this and you can use this view (map) as your source on which you execute queries.

You can create multiple views over your documents and these views are highly optimized by the system and are only reindexed when the underlying document has significant changes.

This makes couchbase ideal for those situations where you have infrequent changes to the _structure_ of your document; and know in advance what are the kinds of queries you will be executing. You can think of dashboards, realtime updates, etc.

It also offers excellent support for offline databases and built-in master-master replication; making it a good candidate for mobile and other occasionally connected devices.


MongoDB has an entirely different approach to the same problem.

It has a concept of SQL-like queries, and databases and collections.

In MongoDB documents live in a collection, and collections are part of a database.

Just like Couchbase, you can store any arbitrarily nested document; and just like Couchbase an automatic key is generated for you.

However, with MongoDB the way you retrieve documents is more like how you write SQL queries; there are operators for most boolean matches, and pattern matching and (with 3.0) full text search as well. You can also define indexes to help speed up your results.

In this respect, MongoDB is easier to get familiar with if you are already comfortable with traditional SQL.

MongoDB also provides the normal replication capabilities and it is capable of master-master replication (although such a configuration is not enabled by default).

MongoDB can most easily replace your traditional relational database needs; as it has the same concepts of keys/tables ("collections") and query parameters - along with the benefit of being schema-free.

Couchbase and MongoDB both provide commercial support for their databases - MongoDB's commercial offering is called MongoDB Enterprise and Couchbase has Enterprise Edition (EE).

One difference you'll immediately find between MongoDB and Couchbase is that MongoDB does not come with a default administration console/GUI - in fact a GUI and a complete hosted management service is offered as a pay option.

You can install any number of third party GUI to quickly browse your documents; but having one by default would have been nice.

Couchbase provides and excellent GUI with their free product.

http://www.slideshare.net/gabriele.lana/couchdb-vs-mongodb-2982288
> use checkout
switched to db checkout
> db.tickets.save({ "_id": 1, "day": 20100123, "checkout": 100 })
> db.tickets.save({ "_id": 2, "day": 20100123, "checkout": 42 })
> db.tickets.save({ "_id": 3, "day": 20100123, "checkout": 215 })
> db.tickets.save({ "_id": 4, "day": 20100123, "checkout": 73 })
> db.tickets.count()
4
> db.tickets.find()
{ "_id" : 1, "day" : 20100123, "checkout" : 100 }
...
> db.tickets.find({ "_id": 1 })

> var map = function() {
... emit(null, this.checkout)
... }
> var reduce = function(key, values) {
... var sum = 0
... for (var index in values) sum += values[index]
... return sum
... }
> sumOfCheckouts = db.tickets.mapReduce(map, reduce)
{
 "result" : "tmp.mr.mapreduce_1263717818_4",
 "timeMillis" : 8,
 "counts" : { "input" : 4, "emit" : 4, "output" : 1 },
 "ok" : 1
}
> db.getCollectionNames()
[
 "tickets",
 "tmp.mr.mapreduce_1263717818_4",
]
> db[sumOfCheckouts.result].find()
{ "_id" : null, "value" : 430 }
> db.tickets.mapReduce(map, reduce, { “out”: “sumOfCheckouts” })
> db.getCollectionNames()
[
 “sumOfCheckouts”,
 "tickets",
 "tmp.mr.mapreduce_1263717818_4"
]
> db.sumOfCheckouts.find()
{ "_id" : null, "value" : 430 }
> db.sumOfCheckouts.findOne().value
430
# GROUP AS MAP/REDUCE ALTERNATIVE
> db.tickets.group({
... "initial": { "sum": 0 },
... "reduce": function(ticket, checkouts) {
...... checkouts.sum += ticket.checkout
...... }
... })
[ { "sum" : 430 } ]

> db.tickets.update({ "_id": 1 }, {
... $set: { "products": {
...... "apple": { "quantity": 5, "price": 10 },
...... "kiwi": { "quantity": 2, "price": 25 }
...... }
... },
... $unset: { "checkout": 1 }
... })

> var map = function() {
... var checkout = 0
... for (var name in this.products) {
...... var product = this.products[name]
...... checkout += product.quantity * product.price
...... }
... emit(this.day, checkout)
}
> var reduce = function(key, values) {
... var sum = 0
... for (var index in values) sum += values[index]
... return sum
}

> db.tickets.mapReduce(map, reduce, { "out": "sumOfCheckouts" })
> db.sumOfCheckouts.find()

> var map = function() {
... var checkout = 0
... for (var name in this.products) {
...... var quantity = this.products[name]
...... var price = db.product.findOne({ "_id": name }).price
...... checkout += quantity * price
...... }
... emit(this.day, checkout)
}
> var reduce = function(key, values) {
... var sum = 0
... for (var index in values) sum += values[index]
... return sum
}

> db.tickets.mapReduce(map, reduce, { "out": "sumOfCheckouts" })

Count of unique elements
> var map = function() {
... var accumulator = {
...... "numberOfViews": 1,
...... "visitedPages": {},
...... "totalTime": 0
...... };
... accumulator["visitedPages"][this.page] = 1
... accumulator["totalTime"] += this.time
... emit(this.user, accumulator)
}
> var aUser = db.view.findOne({ "user": "001" })
> var emit = function(id, value) { print(tojson(value)) }
> map.call(aUser)


Learning Couchbase
Data manager
Any operation performed on the Couchbase database system gets stored in the memory, which acts as a caching layer. By default, every document gets stored in the memory for each read, insert, update, and so on, until the memory is full. It's a drop-in replacement for Memcache. However, in order to provide persistency of the record, there is a concept called disk queue. This will flush the record to the disk asynchronously, without impacting the client request.

The cluster manager is responsible for node administration and node monitoring within a cluster. Every node within a Couchbase cluster includes the cluster manager component, data storage, and data manager. It manages data storage and retrieval. It contains the memory cache layer, disk persistence mechanism, and query engine.

Couchbase clients use the cluster map provided by the cluster manager to find out which node holds the required data, and then communicate with the data manager on that node to perform database operations.

A bucket is an independent virtual container that groups documents logically in a Couchbase cluster, which is equivalent to a database namespace in RDBMS

In Couchbase, a bucket is the equivalence of a database, but there is no concept of tables in Couchbase. In Couchbase, all data or records, which are referred to as documents, are stored directly in a bucket. Basically, the lowest namespace for storing documents or data in Couchabase is a bucket.

Types of bucket
Memcached
Couchbase

One way is that the key marked for deletion is removed when there is a request for that particular key. This is called Lazy Deletion. The other way is if the keys are flagged as expired, then they will be removed by an automatic maintenance process that runs according to the maintenance intervals. By default, this takes place every 60 minutes.

Depending on the document ID, documents are distributed across the nodes in a cluster. Each bucket is divided into 1024 logical partitions which are called vBucket. Each partition is bound to a particular node in the cluster. This bindings of vBucket to server nodes is stored in a cluster map, which is a lookup structure. Each vBucket will have a subset of document IDs. This mechanism allows effective distribution and sharding of documents across the nodes in a cluster.

Whenever there is a failure of one of the nodes in the cluster, replica vBuckets are converted to an active vBucket in place of the vBuckets that failed because of the node failure. This process occurs instantaneously.

Views enable indexing and querying by looking inside JSON documents for a key, for ranges of keys, or to aggregate data.
Views are created using incremental MapReduce, which powers indexing

Views enable us to define materialized views on JSON documents and then query across the dataset.
A materialized view is a database object that contains the result of MapReduce.

Views are created for documents that are stored on the disk only. Hence, sometimes, there will be some documents missing in the views, which are mostly those documents that are in the RAM and that have not yet spilled on disks.

Types of views
DEVELOPMENT
PRODUCTION
Create design documents on a bucket.
Create views within that design document.
For manageability, all views are attached to a design document. So, whenever you want to write a view, you need to create one design document to attach the view into it. A design document can have multiple views. However, whenever any changes take place in one of the views' definitions in a single design document, all views that belong to that design document are rebuilt. This will increase the I/O and CPU usage across the Couchbase cluster. Hence, ensure that you group views in a design document that are not going to change its view's definition often.
In reduce function, when the rereduce parameter is true, it indicates that the reduce function is again called as part of re-reduce.

Wednesday, August 24, 2016

Database Misc



http://www.raychase.net/2932
读模型
1、主键读
最常见的读模型,说是主键,其实也包括其它索引键,或者联合主键。
常见实现:hash,时间复杂度可以接近O(1);B树或变种:时间复杂度接近O(log(n))。
关于B树和变种:
B树(B-树):本质上是二叉查找树的升级版,变成了平衡的N叉查找树,这个N的范围根据磁盘一次读取的块大小来调整,这样复杂度log n的底数就从2变成一个更大的数,减少了树的高度。除此以外,还有一些额外的优化,比如为了插入和删除的性能考虑,通常准备一些预留的空间,只要在当前块或者邻近块中找到空间写入,就避免了开销巨大的所有记录向后偏移的操作。
B树的阶:
  1. 一棵m阶的B树最多有m棵子树;
  2. 根节点至少有两棵子树;
  3. 每个非根分支节点至少有ceil(m/2)棵子树;
  4. 叶节点全部在同一层;
  5. 有x个孩子的非叶节点恰有x-1个递增排列的关键字。
读写模型整理笔记
图片来自此页面
B+树:和B树相比,改变的地方包括:
  1. 全部关键字信息都放在叶子节点;
  2. 所有叶子节点串成一个linked list以便搜索;
  3. 存放重复的搜索键。
具体的区别可以参见《Difference between B Tree and B+ Tree》,(下图出处)。
读写模型整理笔记 
B*树在B+树基础上做了进一步改进:
  1. 非叶子节点增加指向兄弟节点的指针(用以在节点满时,可以往兄弟节点放数据,减少节点创建的情况);
  2. 非叶子节点至少为2/3满的(关键字字数至少为最大值的2/3)。
2、指定页查询
指定页就意味着具备分页的概念,比如在DynamoDB的查询接口设计上,可以传入一个LastEvaluatedKey这样的对象,通过主键读的方式定位到本页读取的起始位置。但是,如果要随机指定页码号的查询,这种情况的复杂度在不同实现的情况下就有很大差异了,有的可以直接算出该页的位置,有的则需要从第一页开始一页页找下去。
常见实现:指定起始位置,条件查询的情况下返回数据子集。
3、范围查询
首先,数据可以根据某一属性排序,然后才存在范围查询的概念。比如用户的年龄在某个区间之内的查询。
常见实现:B树及其变种(这种情况下B+树比B树优越的地方就体现出来了,B+树可以直接扫描叶子节点的linked list即可)。
4、全数据扫描
这种访问模型通常意味着低速和高开销,一般多用作异步任务,比如报表系统,在低访问时段做定时的数据统计。通常非索引键查询本质上也是全数据扫描。
例子:数据库全表扫描,Hadoop上的数据集处理任务。
5、全文检索
常见实现:倒排索引。
6、前缀/后缀匹配
前缀匹配:Trie树;后缀匹配:后缀树。参见《Trie树和其它数据结构的比较》
7、条件查询
常见实现:全表扫描;R树;Space-filling Curve
写模型
1、异步更新
先返回,不关注更新的事务性,更新操作在后台完成,这种方式具备最快的结果返回速度。
2、队列/双端队列
这种情况适用于吞吐量比较大并且非常不稳定的情形,借助队列的缓冲作用;也有一种是需要处理写入次序的问题,借助优先级队列的有序性。
3、批量写
很多情况下是异步的数据处理,比如数据回填、批量数据导出等等。
4、根据查询结果更新
就是把查询和更新这两步过程合并,使之具备原子性。比如Java中的compareAndSet操作,比如数据库的update语句跟上where子句等等。
5、插入或更新
upsert,如同hash map中的put,不管之前该记录是否存在,存在就覆盖,不存在就插入。
6、更新到多个replication
几乎所有的产品化的存储系统都会考虑replication,对于数据可靠性的问题,软件层面上冗余多份数据是唯一的办法。


How to Design a library



http://www.raychase.net/2380
http://programmers.stackexchange.com/questions/196151/what-guidelines-should-i-follow-while-designing-a-library
  • Pin Map,明确你期望库主要用来做什么,但不要把它定得太死,用户要可以比较方便地做出改变。
  • Working Library,一个工作的库,如果它连这点都达不到,一定要注明。没有人希望浪费时间在一个无法工作的程序库上面。
  • Basic Readme,清晰地描述库是用来做什么的,测试的情况等等。
  • Interfaces,接口必须清晰地定义,这可以帮助库的使用者。
  • Special Functions,特殊的功能,一定要注明,包括在readme文档中注明,以及在注释中注明。
  • Busy Waits,如果有一些场景需要使用busy wait(我不知道怎么翻译),其过程中可能会出现异常,使用interrupt或者其它妥善的方法来处理。
  • Comments,你做的任何的改变都要注释清楚,明确描述接口和其每个参数,方法是做什么的,又返回什么;如果有某个中间方法被调用到,就要注明。
  • Consistency,一致性,所有东西,包括注释。相关的方法要放在一个简单的代码文件里面,小但是逻辑一致。
其中的高级部分又包括Detailed Readme,Directory Structure,Licensing和Version Control。

考虑库的目标用户。这听起来扯得有点泛,但实际上这是确切的问题,这是开源库还是你只是在小组内部使用的库,或者是公司内部使用的?用户的能力和需求是不一样的,要求当然不同。
要解决的核心问题。这是上文中Pin Map的一部分,不要重复发明轮子,那么每一个新库都有其存在的价值,这个问题既要通用又要具体,“通用”指的是库总有一个普适性,解决的实际问题对于不同的用户来说是不一样的;而“具体”是指库解决的问题对于程序员来说是非常清晰和直接的。例如设计一个库,根据某种规则把不同的数据类型(XML,BSON或者某种基于行的文本等等)都转换成JSON。
统一的编程风格。很多库都有自己精心设计的一套DSL,比如链式调用等等几种方式,当然,这也和使用的语言有关系。定义一种用起来舒服的编程风格对于程序库的推广是很有好处的。这也是一致性的一个体现。
内聚的调用入口。这和面向对象的“最少知识原则”有类似的地方,把那些不该暴露出去的库内部实现信息隐藏起来,在很多情况下,程序库不得不暴露和要求用户了解一些知识的时候,比如:
1
2
3
4
5
6
7
MappingConfig config = new MappingConfig();
config.put(MappingConfigConstants.ENCODE, "UTF-8");
FileBuilder fileBuilder = new StandardFileBuilder(mapping);
InputStream stream = fileBuilder.build().getInputStream();
DataTransformer<XMLNode> transformer = new XMLDataTransformer(...);
...
transformer.transform(stream);
这里引入了太多的概念,MappingConfig、FileBuilder、DataTransformer等等,整个过程大致是构造了一个数据源,还有一个数据转换器,然后这个数据转换器接受这个数据源来转换出最后结果的过程。那么:
这些象征着概念的接口和类最好以某种易于理解的形式组织起来,比如放在同一层比较浅的包里面,便于寻找;
也可以建立一个facade类,提供几种常用的组合,避免这些繁琐的对象构建和概念理解,例如:
1
XXFacade.buildCommonXMLTransformer();
向后兼容。当然,这一点也可以归纳到前文提到的一致性里面去。我曾经拿JDKHashTable举了一个例子,它的containsValue和contains方法其实是一样的,造成这种情况的一个原因就是为了保持向后兼容。
依赖管理。依赖管理很多情况下是一个脏活累活,但是却不得不考虑到。通常来说,任何一个库考虑自己的依赖库时都必须慎重,尤其是面对依赖的库需要升级的时候。如果依赖的库出了问题,自己设计的程序库也可能因此连累。
完善的测试用例。通常来说,程序库都配套有单元测试保证,无论是什么语言写的。
健全的文档组织。通常包括教程(tutorial)、开发者文档(developer guide)和接口API文档(API doc)。前者是帮助上手和建议使用的,中间的这个具备详尽的特性介绍,后者则是传统的API参考使用文档。


How to Design Workflow Sytem



https://docs.aws.amazon.com/amazonswf/latest/awsflowguide/swf-timeout-types.html
https://docs.aws.amazon.com/amazonswf/latest/awsflowguide/awsflow-basics-reliable-execution.html

Ensuring that Results are Not Lost

Maintaining Workflow History

An activity that performs a data-mining operation on a petabyte of data might take hours to complete, and an activity that directs a human worker to perform a complex task might take days, or even weeks to complete!
To accommodate scenarios such as these, AWS Flow Framework workflows and activities can take arbitrarily long to complete: up to a limit of one year for a workflow execution. Reliably executing long running processes requires a mechanism to durably store the workflow's execution history on an ongoing basis.
The AWS Flow Framework handles this by depending on Amazon SWF, which maintains a running history of each workflow instance. The workflow's history provides a complete and authoritative record of the workflow's progress, including all the workflow and activity tasks that have been scheduled and completed, and the information returned by completed or failed activities.
AWS Flow Framework applications usually don't need to interact with the workflow history directly, although they can access it if necessary. For most purposes, applications can simply let the framework interact with the workflow history behind the scenes. For a full discussion of workflow history, see Workflow History in the Amazon Simple Workflow Service Developer Guide.

Stateless Execution

The execution history allows workflow workers to be stateless. If you have multiple instances of a workflow or activity worker, any worker can perform any task. The worker receives all the state information that it needs to perform the task from Amazon SWF.
This approach makes workflows more reliable. For example, if an activity worker fails, you don't have to restart the workflow. Just restart the worker and it will start polling the task list and processing whatever tasks are on the list, regardless of when the failure occurred. You can make your overall workflow fault-tolerant by using two or more workflow and activity workers, perhaps on separate systems. Then, if one of the workers fails, the others will continue to handle scheduled tasks without any interruption in workflow progress.

http://www.raychase.net/3758

最近工作中一直和SWF(Amazon的Simple Work Flow)打交道,在一个基于SWF的工作流框架上面开发和修bug。SWF的activity超时时间是5分钟,在activity task开始执行以后,activity worker需要主动发送心跳请求告知service端:“我还活着,我还在干活”,如果出现超过5分钟(可以配置)没有心跳,SWF的service端就认为,你已经挂了,我需要把这个activity安排到别的activity worker上来执行了。借用AWS官网的一张图:
一种工作流心跳机制的设计
每台机器上有若干个activity task在被执行。可以看到,在activity任务启动起来以后,需要用不断的心跳来告知service端任务还在进行,activity worker还活着。这个“汇报”需要activity worker所在的host主动进行,这也是SWF的service端无状态(几年前写过一点东西介绍它)的基本要求之一。任务都是由worker端去pull的,这些行为也都是worker端主动触发的。
这个机制描述起来很简单,但是实际在相关设计实现的时候,有许多有趣和值得琢磨的地方。
在我手头的这个workflow里面,心跳机制是这样实现的:
  • 有两个queue,一个是main queue,是dequeue(双端队列);另一个是backup queue,普通队列。二者都是用来存放需要发送心跳的activity信息(heartbeatable对象)。
  • 每秒钟都尝试执行这样一个方法A:从main queue里面poll一个heartbeatable对象(如果queue为空就忽略本次执行),检查该心跳所代表的activity task是否还在工作,如果是,就发送一个心跳。发送成功以后,就把这个heartbeatable对象扔到backup queue里面去。这样,一秒一个,逐渐地,main queue的heartbeatable对象全部慢慢被转移到backup queue里去了。
  • 每隔两分钟(称为一个cycle)执行方法B:把backup queue里面所有的heartbeatable对象全部转移到main queue里去,于是就又可以继续执行上面一步的逐个心跳逻辑。
这个机制的基本好处是,所有activity task的心跳统一管理,通常情况下保证了心跳不会过快(默认配置下是一秒一个,或者不发送),同时保证了没有谁会被遗漏:
一种工作流心跳机制的设计
但是,这里又会浮现好多好多问题:
为什么要使用两个queue?
首先,有这样一个事实:方法A在执行的时候,理论上每秒钟会执行一次,但是这里并没有强制的保证,使得前一秒的A执行一定会在这一秒的A开始之前完成。换言之,它们的理论启动时间是按序的,但是实际启动时间和实际的心跳执行时间是不定的,需要处理并发的情形。而到底最多可能存在多少个执行A的线程并行,取决于用于此心跳功能的线程池的配置。因此,在执行和判断的过程中,需要对当前poll出来的heartbeatable对象加锁。
使用两个queue,这主要是为了记录在本次cycle里面,能够很容易判断某一个heartbeatable对象是否已经完成心跳行为。还没有完成心跳的,都在main queue里;完成了的,都放到backup queue里。
如果使用一个queue,那么也是有解决方案的:
  • 有一个公共计数器,每个cycle开始的时候,给计数器+1。
  • 每一个heartbeatable对象自身需要携带一个私有计数器,用以标识当前这轮cycle的心跳是否已经完成。
  • 每次完成的heartbeatable对象给自己的计数器+1以后扔到队尾;每次A取新的heartbeatable对象的时候从队首取。
  • 如果取到的对象自己的计数器已经等于公共计数器的数值,说明整个queue里面的对象心跳都已经完成了。
当然,这种方法的弊端在于,判断是否还需要发送心跳这件事情,不仅需要从queue里取对象,还要判断对象的计数器数值,明显比两个queue的解决方案复杂和开销大。因为两个queue的解决方案下,只需要尝试从main queue里面取对象就好,取不到了就说明本次cycle里没有需要发送心跳的对象了。看起来是多了一个queue,但是方案其实还是简单一些。
心跳的频率保持在多久为好?
显然不是越高越好,不只是成本,因为心跳也是需要消耗资源的,比如CPU资源;而且,心跳在service端也有throttling,当前activity worker发起太频繁的心跳,当前心跳可能被拒,还可能会让别的activity worker的正常心跳被拒了。
我们要解决的最核心问题是,正常情况下,必须保持上限5分钟内能发起一次成功心跳就好。
要这么说,尽量增大cycle,那我设计一个每隔5分钟就执行一次的定时器就好了。但是问题没那么简单,首先要考虑心跳的发起不一定成功。如果在接近5分钟的时候才去尝试发起心跳,一旦失败了,也没有时间重试了。因此,要trade-off。比如,配置cycle为120秒,这样的好处是,5分钟的超时时间内,可以覆盖1~2个完整的cycle。如果cycle配置为3分钟,那么5分钟无法严格保证一定覆盖有一个完整的cycle。
确定心跳频率的有两个重要参数,一个是方法A的执行频率,一个则是一个cycle的时间长度。例如,前者为1 per second,后者为2分钟,那么在理想情况下,一个cycle 120秒,可以处理120个activity task,换言之,极限是120个activity task在这台机器上一起执行。超过了这个数,就意味着在一个cycle内,无法完成所有的心跳发送任务。
当然,实际情况没有那么理想,考虑到暂时性的网络问题,线程、CPU资源的竞争等等,实际可以并行的activity task要比这个数低不少。
异常处理和重试
在上图中,步骤③有三个箭头,表示了心跳出现不同种情形的处理:
  • 有一些常规异常,比如表示资源不存在,或者任务已经cancel了,这种情况发生的时候,要把相应的activity task给cancel掉,同时,把自己这个heartbeatable对象永久移除出queue。
  • 重试情形1:throttling导致的异常,这种异常发生的时候,把当前heartbeatable对象再addFirst回main queue,因为这不是当前有什么不可解决的或者不明原因的问题造成的,只需要简单重试即可。
  • 重试情形2:其它未知原因的异常,这种情况当然需要重试(之前我们缺少这样的重试机制,导致下一次该activity task能够得到心跳的机会被推到了下一个cycle,这显然是不够合理的),但是,可以把heartbeatable对象放到queue尾部去重试(addLast),并且附上一个私有计数器,如果重试超过一定次数,就挪到下一个cycle(backup queue)去。这个放到queue尾部的办法,使得重试可以在当前cycle里进行,又可以使得这个重试能够尽量不影响其他heartbeatable对象的心跳及时发送。整个重试过程其实就是把当前失败对象再放回queue的过程,没有线程阻塞。
曾经遇到过一些这方面的问题,经过改进才有了上述的机制:
在CPU或者load达到一定程度的时候(比如这个时候有一个进程在call service,占用了大量的CPU资源),就很容易发生心跳无法及时进行的问题,比如有时候线程已经初始化了,但是会stuck若干时间,因为没有足够的资源去进行。等到某一时刻,资源被释放(比如这个call service 的进程结束),这个时候之前积攒的心跳任务会一下子爆发出来。不但这些心跳的顺序无法保证,而且严重的情况下会导致throttling。如果没有当前cycle内的重试机制,那么下一次该对象的心跳需要等到下一个cycle,很容易造成activity task的timeout。
下面再说一个和心跳异常有关的问题。
有这样一个例子,在这个工作流框架内,我们需要管理EMR资源,有一个activity把EMR cluster初始化完成,另一个activity把实际执行的steps提交上去。但是发现在实际运行时有如下的问题:EMR cluster已经初始化完成,但是steps迟迟没有办法提交上去,导致了这个cluster空闲太长时间,被框架内的monitor认为已经没有人使用了,需要回收,于是EMR cluster就被terminate了。但是这之后,steps才被提交上去,但是这时候cluster已经处于terminating状态了,自然这个step提交就失败了。而经过分析,造成这个EMR cluster非预期的termination,包括这样几个原因:
  • decision task timeout。在EMR cluster创建好之后,SWF会问decider下一步该干嘛,这时候如果因为CPU高负荷等各种原因,导致decision task timeout,SWF就会一直等在那里,而如果这个timeout的时间配得太长,这段超时就足以让上面的这个EMR cluster空闲过长时间导致被误回收了。
  • 判断EMR cluster空闲到一定时间就要回收的逻辑有问题。我们以前的实现是,每隔2分钟执行一次“EMR资源操作”,包括检查资源状态,进行资源操作,然后如果发现该EMR资源创建后经过了4次资源操作,依然没有step提交上去,就认为空闲时间过长,需要回收(2 x 4 = 8分钟)。但是问题在于,实际由于种种原因(和心跳的执行间隔实际时间不确定的原理一样),间隔执行EMR资源操作并不能严格保证每隔2分钟一次,有时一段时间都得不到执行,而有时候会迎来一次集中爆发,这个时候就可能实际EMR资源空闲了远远不到8分钟就被回收了。因此,这个逻辑最好是能够用绝对的“空闲时间”来判断,例如EMR资源创建时记录好时间,之后每次检查时都用当前时间去和创建时间比较,空闲超过8分钟再回收。
  • 由于之前提到过的心跳无法按时完成导致activity task timeout,于是这个EMR cluster创建的任务实际已经完成了,但是被当做超时给无视了。
最后,我想说的是。设计一个好的工作流框架,还是有很多困难的地方,需要尤其考虑周全的地方。即便是基于SWF这样现有的workflow来搭积木和叠加功能,也有很多不易和有趣的地方。
http://www.raychase.net/3998


Scalability
基本上随便设计什么基础设施,扩展性都是重要的考虑内容。作为workflow来讲,基本上工作节点的水平扩展是考量扩展性的最重要标志。既然工作节点可以水平扩展,那么这就意味着任务(task)必须是以pull的方式由工作节点主动去获取,而不是由pull的方式从调度节点来分配(曾经非常简单地比较过pull和push,但其实二者差异远不止文中内容之浅显)。任务的分配上,需要考虑这样的事情:如果有多个工作节点尝试来pull任务,该分配给谁?具体来说,比如这样的例子:如果每一个task节点允许同时执行5个任务,而现在可同时执行的总任务数只有5个,总共的task节点也有5个,最理想的状态应当是这5个被均匀分配到这5个节点去,但是采用简单的pull机制并不能保证这一点,有可能这5个任务全部跑到一台机器上去了,因为这并不超过一个节点可同时执行任务数量的上限。
另一方面,通常来讲,所有任务都应当是idempotent的,即可以重复提交执行,执行若干次和执行一次的结果是一样的。工作节点的任务执行可以在任意一步发生错误,随着节点数量的增加,这样的错误更多地成为一种常态,而不是“异常”。工作节点的健康状态需要由某种方式来维系和通知,最典型和廉价有效的方式就是“心跳”
功能性解耦
  • 资源管理和任务管理解耦。这一点我只在少数workflow里面见到。任务管理几乎是所有workflow都具备的,但是单独的资源管理则不是。举例来说,我可以写一个task去执行EMR上的任务,你也可以写一个task去EMR上执行,EMR的执行管理逻辑,可以以代码的方式被我们共用——但是这种架构下,你的task和我的task很难安全高效地共享同一个EMR资源,无论是资源的创建、销毁,状态的查询,还是throttling,都变得很麻烦。类似的例子还有,数据库的共享,打印机的共享,甚至另外一个工作流系统的共享。当有开销较大的资源,我们经常需要workflow层面被统一管理起来,管理一份或者几份资源,但是共享给数量众多的task。
  • 业务逻辑和调度逻辑解耦。这基本上在所有workflow里面都具备,调度逻辑是业务无关的,也是相对来说“死”的东西,管理工作流的状态,和每个task的成功失败。但是业务逻辑则是组成workflow和其中的task“活生生”的血肉。我还没有见过哪个workflow把业务代码和调度逻辑写到一起。
  • 状态查询和调度系统的解耦。一个完善的工作流系统,调度只能是其中核心的一个方面,如果没有一个好的状态查询系统,维护的工作量将是巨大的。而这二者,必须解耦开。举例来说,工作流和任务执行的状态,必然是持久化在某种存储介质中,比如关系数据库,比如NoSQL的数据库,比如磁盘日志文件等等。这个时候,调度系统可以说是这些信息写入存储系统的最主要来源,而这些信息的读取,则可能从调度系统读取,也可能从状态查询系统读取。这个存储的格式或者说schema,必须相对稳定。这个存储的一致性和可用性,将是整个系统一致性和可用性的核心组成部分。
  • 决策系统和执行系统解耦。决策系统用于决定某个任务是否满足条件并开始该执行,它是整个工作流系统的大脑;执行系统则是具体的一个个任务,它是整个工作流系统的骨肉。
  • 事件系统和监听系统解耦。涉及这个的工作流只占少数。很多工作流系统都有内部的事件系统,比如某个task分配给某个节点了,某个task执行失败了等等,但是这样事件的监听系统,却没有独立出来,导致后续针对特殊事件要执行特定逻辑变得困难。
同步与异步任务
事实上,当考虑到了独立的资源管理功能,异步和同步任务的划分就变得自然而然。
  • 有很多任务是需要在当前的工作节点上执行的。比如需要在工作节点上下载一个文件,然后经过处理以后写到数据库里去,这些任务消耗大量的内存和CPU,需要分配独立的专属的线程去完成,是同步任务。
  • 还有一些任务,工作节点并非实际的工作执行者,而是针对某一个资源系统的客户端,只负责提交任务到该系统内,并且负责管理和监控。比如打印任务,向打印机提交打印请求,然后只需要不断地向打印机查询任务的状态,以及根据需要作出删除任务和重新提交等操作即可。这些任务通常不需要长期占有线程,一个线程可以在一个周期内处理多个任务。它们是异步任务。另外,举一个特例,工作流的嵌套,即工作流调用子工作流,那么对于子工作流状态的查询这个行为来说,必然是异步任务。异步任务就涉及到事件的通知和监听机制,后文有提到。
分布式锁
在某些情况下,分布式锁变成一个必选项。比如前面提到的资源管理。有许多资源是要求操作是独占的,换言之,不支持两个操作并发调用,期间可能出现不可以预料的问题;另一方面,一个节点在对资源进行操作时,它需要和别的节点进行协作,从而两个工作节点的操作是有序和正确的,不至于发生冲突。
举个例子来说,工作节点A要查询当前EMR的状态,如果已经空闲10分钟,就要执行操作结束掉这个EMR资源;而工作节点B则查询该EMR的状态,如果没有被结束掉,就要往上面提交新的计算任务。这时候,如果没有分布式锁的协作,问题就来了,可能B节点先查询发现EMR状态还活着,就这这一瞬间,A节点结束了它,可是B不知道,接着提交了一个计算任务到这个已经结束了的(死了的)EMR资源上,于是这个提交的计算任务必然执行失败了。
有很多分布式锁的实现方式,简单的有强一致性的存储系统,当然也有更高效的实现,比如一些专门的分布式锁系统。
功能的可扩展性
之前讲到了性能架构上的可扩展性,在功能层面亦然。
  • 自定义任务。这是几乎所有工作流系统都会考虑的事情,这也是业务逻辑和调度逻辑解耦的必然。因为工作流系统设计的时候,必然没法预知所有的任务类型,用户是可以定义自己的执行逻辑的。
  • 自定义资源。有了资源管理,就有自定义资源的必要。
  • 自定义事件监听。事件管理通常在工作流系统中是很容易被忽视的内容,比如我希望在某一个task超时的时候发送一个特殊的消息通知我,这就需要给这个事件监听提供扩展的可能性。
  • 运行时的工作流任务执行条件。通常workflow都会有一个定义如何执行的文件(meta file),但是有一些执行的参数和条件,是在运行时才能够确定的,甚至依赖于上一步执行的结果,或者需要执行一些逻辑才能得到。
可用性和可靠性
大多数workflow,都采用了去中心节点的设计,保证不存在任何单点故障问题。所有的子系统都是。也保证在业务压力增加的情况下,标志着可用性的latency在预期范围之内。其它的内容不展开,介绍这方面的文章到处都是。
生命周期管理
这里既指workflow一次执行的生命周期管理,也指单个task的生命周期管理。
谈论这些必然涉及到这样几个问题:
  • workflow definition和workflow的分离,task definition和task execution的分离。其中definition定义执行的逻辑,而execution才真正和执行的环境、时间、参数等等相关。逻辑通常可以只有一份(但这也不一定,要看workflow是否支持多版本,后文有提到),但是execution随着重试的发生,会保存多份。
  • workflow重试时,参数变化的处理。有些参数的变化,是不会影响已完成任务的,但是有的参数则不是。
  • workflow重试时,对于已完成任务的处理。有的情况我们希望已完成任务也要重新执行,而又的情况我们则希望这些已完成任务被跳过。
  • task的重试次数,以及重试时back off的策略。比如第一次重试需要等5分钟,第二次重试需要等10分钟,最多重试2次。
  • 如何礼貌地结束工作节点上的任务执行。在很多情况下我们不得不中断并结束某个节点上的任务执行,比如这个工作节点需要重启,这并不能算作业务代码导致的任务执行失败,而更像是一种“resource termination”。这种情况下,任务通常需要被分配到另外活着的节点去,而这里有牵涉到这个reallocation的策略,前面已经提到过。
  • 任务的权重。或者叫做优先级,这项功能我只在少数workflow中看到。在考虑到资源分配时,某些更重要任务可具备更高优先级,而无关紧要的任务失败甚至可以不影响workflow的状态。
任务DAG的设计和表达
这是workflow执行的流程图,也是所有task之间依赖关系的表述。我见过多种表达方式的,有XML的,也有JSON的,还有一些不知名的自己定义的格式的。有些workflow的定义可以以一个图形化工具来协助完成这个流程图。这个DSL的设计,一定程度上决定了workflow的使用是不是能够易于理解。另外提一句,这里提到的这个可选的图形化工具,毕竟只是一个辅助,它不是workflow的核心(你可以说这个DSL是核心的一部分,但这个帮助完成的工具显然不是)——我见过一个团队,workflow整体设计得不怎么样,跑起来一堆问题,但是这个工具花了大量的时间精力去修缮,本末倒置。
另外,workflow的状态和执行情况,还有对其的归档和管理,也需要一个整合工具来协助。这方面几乎所有workflow都具备,通常都是网页工具,以及命令行工具。
输入输出的管理
这也是一个nice-to-have的东西,对于每一个task,都存在input和output,它们可以完全交给用户自己来实现,比如用户把它们存储到文件里面,或者写到数据库里面,而workflow根本不管,每个task内部自己去读取相应的用户文件即可。但是更好的方法是,对于一些常用和简单的input、output,是可以随着execution一起持久化到workflow和task的状态里面去的。这样也便于workflow的definition里面,放置一些根据前一步task执行结果来决策后续执行的表达式。
另外,还有一个稍微冷门的use case,就是input和output的管理。通常workflow是重复执行的,而每次执行的input和output的数据规模往往是很多人关心的内容。关于这部分,我还没有见到任何一个workflow提供这样的功能。许多用户自己写工具和脚本来获取这样的信息。
独立的metrics和日志系统
对于metrics,核心的内容也无非节点的健康状况、CPU、内存,task执行时间分布,失败率等等几项。有些情况下用户还希望自行扩展。
关于日志,则主要指的是归档和合并。归档,指的是历史日志不丢失,或者在一定时间内不丢失,过期日志可以被覆写,从而不引起磁盘容量的问题;而合并,指的是日志能被以更统一的视角进行查询和浏览,出了问题不至于到每台机器上去手动查找。缺少这个功能,有时候会很麻烦。在工作中我遇到过一个资源被异常终止的问题,为了找到那个终止资源的节点,我查阅了几十个节点的日志,痛苦不堪。
版本控制和平滑部署
把这两个放一起是因为,代码升级是不可避免且经常要发生的。为了保证平滑部署,显然通常情况下,节点上的代码不能同时更新,需要一部分一部分进行。比如,先终止50%的节点,部署代码后,激活并确保成功,再进行剩下那50%的节点。但是在这期间存在新老代码并存的问题,这通常会带来很多奇形怪状的问题。对于这种问题,我见过这样两个解决方式:
  • 一个是全部节点同时部署,这种情况下所有节点全部失活,有可能出现因为这个失活导致的task超时,甚至导致workflow执行失败。但是workflow的生命周期由单独的调度系统管理,因此除去超时外多数情况不受影响。
  • 还有一种是一部分一部分部署,最平滑,但是这种情况下需要管理多版本共存问题,也对代码质量提出了新的要求——向后兼容。
无论选用哪一种,这种方式实现起来相对简单,但是也有不少问题,比如这种情况下,外部资源怎么处理?例如在外部EMR资源上执行Spark任务,但是已经有老代码被放到EMR上去执行了,这时候工作节点更新,这些EMR上正在执行的任务怎样处理?是作废还是保留,如果保留的话这些执行可还是依仗着老代码的,其结果的后续处理是否会和刚部署的新代码产生冲突。再比如对于workflow有定义上的改变(比如DAG的改变),对于现有的execution,应当怎样处理,是更新还是保持原样(通常都是保持原样,因为更新带来的复杂问题非常多)。
http://www.raychase.net/244
观察者模式中,消息采用推和拉方式来传递的比较
更新自己。
观察者模式中,消息采用推和拉方式来传递的比较
现在要说的分歧在这里:
“推”的方式是指,Subject维护一份观察者的列表,每当有更新发生,Subject会把更新消息主动推送到各个Observer去。
“拉”的方式是指,各个Observer维护各自所关心的Subject列表,自行决定在合适的时间去Subject获取相应的更新数据。

“推”的好处包括:
1、高效。如果没有更新发生,不会有任何更新消息推送的动作,即每次消息推送都发生在确确实实的更新事件之后,都是有意义的。
2、实时。事件发生后的第一时间即可触发通知操作。
3、可以由Subject确立通知的时间,可以避开一些繁忙时间。
4、可以表达出不同事件发生的先后顺序。

“拉”的好处包括:
1、如果观察者众多,Subject来维护订阅者的列表,可能困难,或者臃肿,把订阅关系解脱到Observer去完成。
2、Observer可以不理会它不关心的变更事件,只需要去获取自己感兴趣的事件即可。
3、Observer可以自行决定获取更新事件的时间。
4、拉的形式可以让Subject更好地控制各个Observer每次查询更新的访问权限。
事实上“推”和“拉”可以比较的内容太多了,比如:
客户端通常是不稳定的,服务端是稳定的,如果消息由客户端主动发起去获取,它很容易找到服务端的地址,可以比较容易地做到权限控制(集中在服务端一处),服务端也可以比较容易地跟踪客户端的位置和状态,反之则不行;
互联网页面的访问就是一个最好的“拉”的模式的例子;
通常我们希望把压力分散到各个客户端上去,服务端只做最核心的事情,只提供内容,不管理分发列表;
……
还有一个idea是关于“推”和“拉”结合的形式,例如,服务端只负责通知某一些数据已经准备好,至于是否需要获取和什么时候客户端来获取这些数据,完全由客户端自行确定。

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts