Wednesday, December 2, 2015

Learning Leveldb



https://segment.com/blog/exactly-once-delivery/
Our database has to satisfy three very separate query patterns
RocksDB is an log-structured-merge-tree (LSM) database–meaning that it is constantly appending new keys to a write-ahead-log on disk, as well as storing the sorted keys in-memory as part of a memtable.


Keys are sorted in-memory as part of a memtable

Writing keys is an extremely fast process. New items are journaled straight to disk in append-only fashion (for immediate persistence and failure recovery), and the data entries are sorted in-memory to provide a combination of fast search and batched writes. 
Whenever enough entries have been written to the memtable, it is persisted to disk as an SSTable (sorted-string table). Since the strings have already been sorted in memory, they can be flushed directly to disk. 


The current memtable is flushed to disk as an SSTable at Level 0
Each SSTable is immutable–once it has been created, it is never changed–which is what makes writing new keys so fast. No files need to be updated, and there is no write amplification. Instead, multiple SSTables at the same ‘level’ are merged together into a new file during an out-of-band compaction phase. 


When individual SSTables at the same level are compacted, their keys are merged together, and then the new file is promoted to the next higher level.

After a compaction completes, the newly merged SSTables become the definitive set of database records, and the old SSTables are unlinked.
If we log onto a production instance, we can see this write-ahead-log being updated–as well as the individual SSTables being written, read, and merged. 


The log and the most recent SSTable dominate the I/O

If we look at the SSTable statistics from production, we can see that we have four total ‘levels’ of files, with larger and larger files found at each higher level.
RocksDB keeps indexes and bloom filters of particular SSTables stored on the SSTable itself–and these are loaded into memory. These filters and indexes are then queried to find a particular key.  and then the full SSTable is loaded into memory as part of an LRU basis. 
In the vast majority of cases, we see new messages–which makes our dedupe system the textbook use case for bloom filters. 
Bloom filters will tell us whether a key is ‘possibly in the set’, or ‘definitely not in the set’. To do this, the bloom filter keeps set bits for various hash functions for any elements which have been seen. If all the bits for a hash function are set, the filter will return that the message is ‘possibly in the set’.
If the response is ‘possibly in the set’, then RocksDB can query the raw data from our SSTables to determine whether the item actually exists within the set. But in most cases, we can avoid querying any SSTables whatsoever, since the filter will return a ‘definitely not in the set’ response. 
When we query RocksDB, we issue a MultiGet for all of the relevant messageIds we’d like to query. We issue these as part of a batch for performance, and to avoid many concurrent locking operations. It also allows us to batch the data coming from Kafka and generally avoid random writes in favor of sequential ones. 

Deletion: size-bound, not time-bound
With our de-dupe process, we had to decide whether to limit our system to a strict ‘de-duplication window’, or by the total database size on disk.
To avoid the system falling over suddenly and de-dupe collection for all customers, we decided to limit by size rather than limit to a set time window. This allows us to set a max size for each RocksDB instance, and deal with sudden spikes or increases in load. The side-effect is that this can lower the de-duplication window to under 24 hours, at which point it will page our on-call engineer. 
We periodically age out old keys from RocksDB to keep it from growing to an unbounded size. To do this, we keep a secondary index of the keys based upon sequence number, so that we can delete the oldest received keys first.  
Rather than using the RocksDB TTL, which would require that we keep a fixed TTL when opening the database–we instead delete objects ourselves using the sequence number for each inserted key.
Because the sequence number is stored as a secondary index, we can query for it quickly, and ‘mark’ it as being deleted. Here’s our deletion function, when passed a sequence number. 
To continue ensuring write speed, RocksDB doesn’t immediately go back and delete a key (remember these SSTables are immutable!). Instead, RocksDB will append a ‘tombstone’ which then gets removed as part of the compaction process. Thus, we can age out quickly with sequential writes, and avoid thrashing our memory by removing old items.

Ensuring Correctness
The last remaining piece is how we ensure correctness of the data in various failure modes.

EBS-snapshots and attachments

To ensure that our RocksDB instances are not corrupted by a bad code push or underlying EBS outage, we take periodic snapshots of each of our hard drives. While EBS is already replicated under the hood, this step guards against the database becoming corrupted from some underlying mechanism. 
If we need to cycle an instance–the consumer can be paused, and the associated EBS drive detached and then re-attached to the new instance. So long as we keep the partition ID the same, re-assigning the disk is a fairly painless process that still guarantees correctness. 
In the case of a worker crash, we rely on RocksDB’s built-in write-ahead-log to ensure that we don’t lose messages. Messages are not committed from the input topic unless we have a guarantee that RocksDB has persisted the message in the log. 

Reading the output topic

You may notice that up until this point, that there is no ‘atomic’ step here which allows us to ensure that we’ve delivered messages just once. It’s possible that our worker could crash at any point: writing to RocksDB, publishing to the output topic, or acknowledging the input messages. 
We need a ‘commit’ point that is atomic–and ensures that it covers the transaction for all of these separate systems. We need some “source of truth” for our data. 
That’s where reading from the output topic comes in. 
If the dedupe worker crashes for any reason or encounters an error from Kafka, when it re-starts it will first consult the “source of truth” for whether an event was published: the output topic
If a message was found in the output topic, but not RocksDB (or vice-versa) the dedupe worker will make the necessary repairs to keep the database and RocksDB in-sync. In essence, we’re using the output topic as both our write-ahead-log, and our end source of truth, with RocksDB checkpointing and verifying it. 
Previously we stored all of our keys in Memcached and used Memcached’s atomic CAS (check-and-set) operator to set keys if they didn’t exist. Memcached served as the commit point and ‘atomicity’ for publishing keys. 
While this worked well enough, it required a large amount of memory to fit all of our keys. Furthermore, we had to decide between accepting the occasional Memcached failures, or doubling our spend with high-memory failover replicas. 
Data stored on disk: keeping a full set of keys or full indexing in-memory was prohibitively expensive. By moving more of the data to disk, and leveraging various level of files and indexes, we were able to cut the cost of our bookkeeping by a wide margin. We are able to push the failover to cold storage (EBS) rather than running additional hot failover instances. 
Partitioning: of course, in order to narrow our search space and avoid loading too many indexes in memory, we need a guarantee that certain messages are routed to the right workers. Partitioning upstream in Kafka allows us to consistently route these messages so we can cache and query much more efficiently. 
Explicit age-out: with Memcached, we would set a TTL on each key to age them out, and then rely on the Memcached process to handle evictions. This caused us exhaust our memory in the face of large batches of data, and spike the Memcached CPU in the face of a large number of evictions. By having the client handle key deletion, we’re able to fail gracefully by shortening our ‘window of deduplication’. 
Kafka as the source of truth: to truly avoid de-duplication in the face of multiple commit points, we have to use a source of truth that’s common to all of our downstream consumers. Using Kafka as that ‘source of truth’ has worked amazingly well. In the case of most failures (aside from Kafka failures), messages will either be written to Kafka, or they wont. And using Kafka ensures that published messages are delivered in-order, and replicated on-disk across multiple machines, without needing to keep much data in memory. 
Batching reads and writes: by making batched I/O calls to Kafka and RocksDB, we’re able to get much better performance by leveraging sequential reads and writes. Instead of the random access we had before with Memcached, we’re able to achieve much better throughput by leaning into our disk performance, and keeping only the indexes in memory. 

Using Kafka and RocksDB as the primitives for streaming applications has started to become more and more the norm. And we’re excited to continue building atop these primitives to build new distributed applications. 
https://rawgit.com/google/leveldb/master/doc/index.html
http://leveldb.googlecode.com/svn/trunk/doc/impl.html

http://blog.csdn.net/sparkliang/article/details/7260267
2012121号开始研究下leveldb的代码,Google两位大牛开发的单机KV存储系统,涉及到了skip list、内存KV tableLRU cache管理、table文件存储、operation log系统等。先从边边角角的小角色开始扫。
开始之前先来看看Leveldb的基本框架,几大关键组件,如图1-1所示。
1-1
Leveldb是一种基于operation log的文件系统,是Log-Structured-Merge Tree的典型实现。LSM源自OusterhoutRosenblum1991年发表的经典论文<<The Design and Implementation of a Log-Structured File System >>
由于采用了op log,它就可以把随机的磁盘写操作,变成了对op logappend操作,因此提高了IO效率,最新的数据则存储在内存memtable中。
op log文件大小超过限定值时,就定时做check pointLeveldb会生成新的Log文件和Memtable,后台调度会将Immutable Memtable的数据导出到磁盘,形成一个新的SSTable文件。SSTable就是由内存中的数据不断导出并进行Compaction操作后形成的,而且SSTable的所有文件是一种层级结构,第一层为Level 0,第二层为Level 1,依次类推,层级逐渐增高,这也是为何称之为LevelDb的原因。


https://medium.com/smyte/rate-limiter-df3408325846


https://github.com/facebook/rocksdb/wiki/RocksDB-Basics#compaction-filter

Compaction Filter

Some applications may want to process keys at compaction time. For example, a database with inherent support for time-to-live (TTL) may remove expired keys. This can be done via an application-defined Compaction Filter. If the application wants to continuously delete data older than a specific time, it can use the compaction filter to drop records that have expired. The RocksDB Compaction Filter gives control to the application to modify the value of a key or to drop a key entirely as part of the compaction process. For example, an application can continuously run a data sanitizer as part of the compaction.
https://github.com/facebook/rocksdb/wiki/Merge-Operator

https://www.facebook.com/notes/facebook-engineering/under-the-hood-building-and-open-sourcing-rocksdb/10151822347683920
Secondly, we are starting to see servers with an increasing number of cores and with storage-IOPS reaching millions of requests per second. Lock contention and a high number of context switches in traditional database software prevents it from being able to saturate the storage-IOPS.
https://github.com/facebook/rocksdb/wiki
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics


Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts