https://segment.com/blog/exactly-once-delivery/
Our database has to satisfy three very separate query patterns
Deletion: size-bound, not time-bound
Ensuring Correctness
The last remaining piece is how we ensure correctness of the data in various failure modes.
http://leveldb.googlecode.com/svn/trunk/doc/impl.html
http://blog.csdn.net/sparkliang/article/details/7260267
https://medium.com/smyte/rate-limiter-df3408325846
https://github.com/facebook/rocksdb/wiki/RocksDB-Basics#compaction-filter
https://www.facebook.com/notes/facebook-engineering/under-the-hood-building-and-open-sourcing-rocksdb/10151822347683920
Secondly, we are starting to see servers with an increasing number of cores and with storage-IOPS reaching millions of requests per second. Lock contention and a high number of context switches in traditional database software prevents it from being able to saturate the storage-IOPS.
https://github.com/facebook/rocksdb/wiki
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics
Our database has to satisfy three very separate query patterns
RocksDB is an log-structured-merge-tree (LSM) database–meaning that it is constantly appending new keys to a write-ahead-log on disk, as well as storing the sorted keys in-memory as part of a memtable.
Writing keys is an extremely fast process. New items are journaled straight to disk in append-only fashion (for immediate persistence and failure recovery), and the data entries are sorted in-memory to provide a combination of fast search and batched writes.
Whenever enough entries have been written to the memtable, it is persisted to disk as an SSTable (sorted-string table). Since the strings have already been sorted in memory, they can be flushed directly to disk.
With our de-dupe process, we had to decide whether to limit our system to a strict ‘de-duplication window’, or by the total database size on disk.
To avoid the system falling over suddenly and de-dupe collection for all customers, we decided to limit by size rather than limit to a set time window. This allows us to set a max size for each RocksDB instance, and deal with sudden spikes or increases in load. The side-effect is that this can lower the de-duplication window to under 24 hours, at which point it will page our on-call engineer.
We periodically age out old keys from RocksDB to keep it from growing to an unbounded size. To do this, we keep a secondary index of the keys based upon sequence number, so that we can delete the oldest received keys first.
Rather than using the RocksDB TTL, which would require that we keep a fixed TTL when opening the database–we instead delete objects ourselves using the sequence number for each inserted key.
Because the sequence number is stored as a secondary index, we can query for it quickly, and ‘mark’ it as being deleted. Here’s our deletion function, when passed a sequence number.
To continue ensuring write speed, RocksDB doesn’t immediately go back and delete a key (remember these SSTables are immutable!). Instead, RocksDB will append a ‘tombstone’ which then gets removed as part of the compaction process. Thus, we can age out quickly with sequential writes, and avoid thrashing our memory by removing old items.
The last remaining piece is how we ensure correctness of the data in various failure modes.
EBS-snapshots and attachments
To ensure that our RocksDB instances are not corrupted by a bad code push or underlying EBS outage, we take periodic snapshots of each of our hard drives. While EBS is already replicated under the hood, this step guards against the database becoming corrupted from some underlying mechanism.
If we need to cycle an instance–the consumer can be paused, and the associated EBS drive detached and then re-attached to the new instance. So long as we keep the partition ID the same, re-assigning the disk is a fairly painless process that still guarantees correctness.
In the case of a worker crash, we rely on RocksDB’s built-in write-ahead-log to ensure that we don’t lose messages. Messages are not committed from the input topic unless we have a guarantee that RocksDB has persisted the message in the log.
Reading the output topic
You may notice that up until this point, that there is no ‘atomic’ step here which allows us to ensure that we’ve delivered messages just once. It’s possible that our worker could crash at any point: writing to RocksDB, publishing to the output topic, or acknowledging the input messages.
We need a ‘commit’ point that is atomic–and ensures that it covers the transaction for all of these separate systems. We need some “source of truth” for our data.
That’s where reading from the output topic comes in.
If the dedupe worker crashes for any reason or encounters an error from Kafka, when it re-starts it will first consult the “source of truth” for whether an event was published: the output topic.
If a message was found in the output topic, but not RocksDB (or vice-versa) the dedupe worker will make the necessary repairs to keep the database and RocksDB in-sync. In essence, we’re using the output topic as both our write-ahead-log, and our end source of truth, with RocksDB checkpointing and verifying it.
Previously we stored all of our keys in Memcached and used Memcached’s atomic CAS (check-and-set) operator to set keys if they didn’t exist. Memcached served as the commit point and ‘atomicity’ for publishing keys.
While this worked well enough, it required a large amount of memory to fit all of our keys. Furthermore, we had to decide between accepting the occasional Memcached failures, or doubling our spend with high-memory failover replicas.
Data stored on disk: keeping a full set of keys or full indexing in-memory was prohibitively expensive. By moving more of the data to disk, and leveraging various level of files and indexes, we were able to cut the cost of our bookkeeping by a wide margin. We are able to push the failover to cold storage (EBS) rather than running additional hot failover instances.
Partitioning: of course, in order to narrow our search space and avoid loading too many indexes in memory, we need a guarantee that certain messages are routed to the right workers. Partitioning upstream in Kafka allows us to consistently route these messages so we can cache and query much more efficiently.
Explicit age-out: with Memcached, we would set a TTL on each key to age them out, and then rely on the Memcached process to handle evictions. This caused us exhaust our memory in the face of large batches of data, and spike the Memcached CPU in the face of a large number of evictions. By having the client handle key deletion, we’re able to fail gracefully by shortening our ‘window of deduplication’.
Kafka as the source of truth: to truly avoid de-duplication in the face of multiple commit points, we have to use a source of truth that’s common to all of our downstream consumers. Using Kafka as that ‘source of truth’ has worked amazingly well. In the case of most failures (aside from Kafka failures), messages will either be written to Kafka, or they wont. And using Kafka ensures that published messages are delivered in-order, and replicated on-disk across multiple machines, without needing to keep much data in memory.
Batching reads and writes: by making batched I/O calls to Kafka and RocksDB, we’re able to get much better performance by leveraging sequential reads and writes. Instead of the random access we had before with Memcached, we’re able to achieve much better throughput by leaning into our disk performance, and keeping only the indexes in memory.
Using Kafka and RocksDB as the primitives for streaming applications has started to become more and more the norm. And we’re excited to continue building atop these primitives to build new distributed applications.
https://rawgit.com/google/leveldb/master/doc/index.htmlhttp://leveldb.googlecode.com/svn/trunk/doc/impl.html
http://blog.csdn.net/sparkliang/article/details/7260267
2012年1月21号开始研究下leveldb的代码,Google两位大牛开发的单机KV存储系统,涉及到了skip list、内存KV table、LRU cache管理、table文件存储、operation log系统等。先从边边角角的小角色开始扫。
开始之前先来看看Leveldb的基本框架,几大关键组件,如图1-1所示。
图1-1
Leveldb是一种基于operation log的文件系统,是Log-Structured-Merge Tree的典型实现。LSM源自Ousterhout和Rosenblum在1991年发表的经典论文<<The Design and Implementation of a Log-Structured File System >>。
由于采用了op log,它就可以把随机的磁盘写操作,变成了对op log的append操作,因此提高了IO效率,最新的数据则存储在内存memtable中。
当op log文件大小超过限定值时,就定时做check point。Leveldb会生成新的Log文件和Memtable,后台调度会将Immutable Memtable的数据导出到磁盘,形成一个新的SSTable文件。SSTable就是由内存中的数据不断导出并进行Compaction操作后形成的,而且SSTable的所有文件是一种层级结构,第一层为Level 0,第二层为Level 1,依次类推,层级逐渐增高,这也是为何称之为LevelDb的原因。
https://medium.com/smyte/rate-limiter-df3408325846
https://github.com/facebook/rocksdb/wiki/RocksDB-Basics#compaction-filter
Compaction Filter
Some applications may want to process keys at compaction time. For example, a database with inherent support for time-to-live (TTL) may remove expired keys. This can be done via an application-defined Compaction Filter. If the application wants to continuously delete data older than a specific time, it can use the compaction filter to drop records that have expired. The RocksDB Compaction Filter gives control to the application to modify the value of a key or to drop a key entirely as part of the compaction process. For example, an application can continuously run a data sanitizer as part of the compaction.
https://github.com/facebook/rocksdb/wiki/Merge-Operatorhttps://www.facebook.com/notes/facebook-engineering/under-the-hood-building-and-open-sourcing-rocksdb/10151822347683920
Secondly, we are starting to see servers with an increasing number of cores and with storage-IOPS reaching millions of requests per second. Lock contention and a high number of context switches in traditional database software prevents it from being able to saturate the storage-IOPS.
https://github.com/facebook/rocksdb/wiki
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics