Tuesday, February 13, 2018

Discord architecture



https://blog.discordapp.com/how-discord-stores-billions-of-messages-7fa6ec7ee4c7

https://blog.discordapp.com/how-discord-indexes-billions-of-messages-e3d5e9be866f
  • Lazily Indexed: Not everyone uses search — we shouldn’t index messages unless someone attempts to search them at least once. Additionally, if an index fails, we needed to be able to re-index servers on the fly.

Elasticsearch had the edge:
  • Node discovery on Solr requires ZooKeeper. We run etcd, and did not want to have additional infrastructure specifically for Solr. Elasticsearch’s Zen Discovery is self contained.
  • Elasticsearch supports automatic shard rebalancing, which would let us add new nodes to the cluster, fulfilling the linearly scalable requirement out of the box.
  • Elasticsearch has a structured query DSL built-in, whereas you’d have to programmatically create a query string with Solr using a third party library.
  • Engineers on the team had more experience working with Elasticsearch


We wanted to avoid these cumbersome, large clusters, so we came up with the idea to delegate sharding and routing to the application layer, allowing us to index messages into a pool of smaller Elasticsearch clusters. This meant that in the event of a cluster outage only Discord messages contained on the affected cluster would be unavailable for searching. This also gave us the advantage of being able to throw away an entire cluster’s data should it become unrecoverable (the system is able to lazily re-index the Discord server the next time a user performs a search

Elasticsearch likes it when documents are indexed in bulk. This meant that we couldn’t index messages as they were being posted in real time. Instead, we designed a queue in which a worker grabs a bunch of messages and indexes them within in a single bulk operation. We decided that this small delay between when a message was posted and when it became searchable was a perfectly reasonable constraint. After all, most users search for messages said historically, not something just said.
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
The bulk API makes it possible to perform many index/delete operations in a single API call. This can greatly increase the indexing speed.

  • Message Queue: We needed a queue that we can put messages into as they are posted in real time (to be consumed by a pool of workers).
  • Index Workers: Workers that do the actual routing and bulk inserts into Elasticsearch from the queue.

We also needed a quick and easy mapping of which Elasticsearch cluster and index a Discord server’s messages would reside on. We call this “cluster + index” pair a Shard (not to be confused with Elasticsearch’s native shards within an index). The mapping we created comes in two layers:
  • Persistent Shard Mapping: We put this on Cassandra, our primary data store for persistent data, as the source of truth.
  • Shard Mapping Cache: When we’re ingesting messages on our workers, querying Cassandra for a Shard is a slow operation. We cache these mappings in Redis, so that we can do mget operations to quickly figure out where a message needs to be routed to.


    When a server is being indexed for the first time, we also needed a way to select which Shard to hold a Discord server’s messages on. Since our Shards are an application layered abstraction, we can be a bit smart about how to allocate them. By harnessing the power of Redis, we used a sorted set to build a load aware shard allocator.

    • Shard Allocator: Using a sorted set in Redis we keep a set of the Shards with a score that represents their load. The Shard with the lowest score is the shard that should be allocated next. The score gets incremented with each new allocation, and each message that is indexed in Elasticsearch has a probability to increment the score of its Shard too. As Shards get more data in them they have a less likely chance of being allocated to a new Discord server.

    Since we handed all of the sharding logic in the application level (our Shards), having Elasticsearch do the sharding for us didn’t really make sense. However, we could use it to do replication and balancing of the indices between nodes in the cluster. In order to have Elasticsearch automatically create an index using the correct configuration, we used an index template, which contained the index configuration and data mapping. The index configuration was pretty simple:
    • The index should only contain one shard (don’t do any sharding for us)
    • The index should be replicated to one node (be able to tolerate the failure of the primary node the index is on)
    • The index should only refresh once every 60 minutes (why we had to do this is explained below).
    • The index contains a single document type: message

    INDEX_TEMPLATE = { 'template': 'm-*',
    'settings': {
    'number_of_shards': 1,
    'number_of_replicas': 1,
    'index.refresh_interval': '3600s'
    },
    'mappings': {
    'message': {
    '_source': {
    'includes': [
    'id',
    'channel_id',
    'guild_id'
    ]
    },
    'properties': {
    # This is the message_id, we index by this to allow for greater than/less than queries, so we can search
    # before, on, and after.
    'id': {
    'type': 'long'
    },
    # Lets us search with the "in:#channel-name" modifier.
    'channel_id': {
    'type': 'long'
    },
    # Lets us scope a search to a given server.
    'guild_id': {
    'type': 'long'
    },
    # Lets us search "from:Someone#0001"
    'author_id': {
    'type': 'long'
    },
    # Is the author a user, bot or webhook? Not yet exposed in client.
    'author_type': {
    'type': 'byte'
    },
    # Regular chat message, system message...
    'type': {
    'type': 'short'
    },
    # Who was mentioned, "mentions:Person#1234"
    'mentions': {
    'type': 'long'
    },
    # Was "@everyone" mentioned (only true if the author had permission to @everyone at the time).
    # This accounts for the case where "@everyone" could be in a message, but it had no effect,
    # because the user doesn't have permissions to ping everyone.
    'mention_everyone': {
    'type': 'boolean'
    },
    # Array of [message content, embed title, embed author, embed description, ...]
    # for full-text search.
    'content': {
    'type': 'text',
    'fields': {
    'lang_analyzed': {
    'type': 'text',
    'analyzer': 'english'
    }
    }
    },
    # An array of shorts, specifying what type of media the message has. "has:link|image|video|embed|file".
    'has': {
    'type': 'short'
    },
    # An array of normalized hostnames in the message, traverse up to the domain. Not yet exposed in client.
    # "http://foo.bar.com" gets turned into ["foo.bar.com", "bar.com"]
    'link_hostnames': {
    'type': 'keyword'
    },
    # Embed providers as returned by oembed, i.e. "Youtube". Not yet exposed in client.
    'embed_providers': {
    'type': 'keyword'
    },
    # Embed type as returned by oembed. Not yet exposed in client.
    'embed_types': {
    'type': 'keyword'
    },
    # File extensions of attachments, i.e. "fileType:mp3"
    'attachment_extensions': {
    'type': 'keyword'
    },
    # The filenames of the attachments. Not yet exposed in client.
    'attachment_filenames': {
    'type': 'text',
    'analyzer': 'simple'
    }
    }
    }
    }
    }


    These fields however aren’t actually “stored” in Elasticsearch, rather, they are only stored in the inverted index. The only fields that are actually stored and returned are the message, channel and server ID that the message was posted in. This means that message data is not duplicated in Elasticsearch. The tradeoff being that we’ll have to fetch the message from Cassandra when returning search results, which is perfectly okay, because we’d have to pull the message context (2 messages before & after) from Cassandra to power the UI anyway. Keeping the actual message object out of Elasticsearch means that we don’t have to pay for additional disk space to store it. However, this means we can’t use Elasticsearch to highlight matches in search results. We’d have to build the tokenizers and language analyzers into our client to do the highlighting (which was really easy to do).

    For indexing a server’s historical messages, a historical index job which would perform a unit of work and return the next job that needed to run to continue indexing that server. Each job represents a cursor into a server’s message history and a fixed unit of execution (in this case defaulting to 500 messages). The job returns a new cursor to the next batch of messages to be indexed or None if there is no more work to be done. In order to return results quickly for a large server, we split the historical indexing into two phases, an “initial” and “deep” phase. The “initial” phase indexes the last 7 days of messages on the server and makes the index available to the user. After that, we index the entire history in the “deep” phase, which executes at a lower priority. This article shows what it looks like to the user. These jobs are executed in a pool of celery workers, allowing for scheduling of the jobs amongst other tasks that the workers run.


    Testing It Out On Production
    By default, Elasticsearch has its index refresh interval set to 1 second. This is what provides the “near real-time” search ability in Elasticsearch. Every second (across a thousand indexes) Elasticsearch was flushing the in-memory buffer to a Lucene segment, and opening the segment to make it searchable. Over night, while idle, Elasticsearch merged the massive amounts of tiny segments it generated into much larger (but more space efficient) ones on disk.
    Testing this out was pretty simple: We dropped all the indexes on the cluster, set the refresh interval to an arbitrarily large number, and we then scheduled the same servers to be indexed. CPU usage was down to almost nothing while the documents were being ingested, and disk usage was not growing at an alarmingly high rate

    At some point, we will spin up more clusters so that new Discord servers being indexed land on them (thanks to our weighted shard distribution system). On our existing clusters, we’ll need to limit the number of master eligible nodes as we add more data nodes to the cluster.

    1. heap_free: (aka heap_committed — heap_used) When we run out of free heap space, the JVM is forced to do a full stop-the-world GC to quickly reclaim space. If it fails to reclaim enough space, the node will crash and burn. Before then, the JVM will get into a state where it’s doing stop-the-world GCs constantly as the heap fills up and too little memory is freed during each full GC. We look at this along with GC stats to see how much time is spent garbage collecting.
    2. disk_free: Obviously when we run out of disk space, we’ll need to add more nodes, or more disk space to handle new documents being indexed. This is very easy on GCP as we can just grow the size of the disk without rebooting the instance. Choosing between adding a new node or resizing disks depends on how the other metrics mentioned here look. For example, if disk usage is high, but the other metrics are at acceptable levels, we’ll choose to add more disk space rather than a new node.
    3. cpu_usage: If we reach a threshold of CPU usage during peak hours.
    4. io_wait: If the IO operations on the cluster are getting too slow.
    https://support.discordapp.com/hc/en-us/articles/115000414847-What-is-Server-Indexing-
    While you can view the initial messages in the results, the rest of the server will still be indexing


    Labels

    Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

    Popular Posts