https://medium.com/netflix-techblog/distributed-delay-queues-based-on-dynomite-6b31eca37fbc Traditionally, we have been using a Cassandra based queue recipe along with Zookeeper for distributed locks, since Cassandra is the de facto storage engine at Netflix. Using Cassandra for queue like data structure is a known anti-pattern, also using a global lock on queue while polling, limits the amount of concurrency on the consumer side as the lock ensures only one consumer can poll from the queue at a time. This can be addressed a bit by sharding the queue but the concurrency is still limited within the shard.
We wanted the following in the queue recipe:
Distributed
No external locks (e.g. Zookeeper locks)
Highly concurrent
At-least-once delivery semantics
No strict FIFO
Delayed queue (message is not taken out of the queue until some time in the future)
Priorities within the shard
A queue is stored as a sorted set (ZADD, ZRANGE etc. operations) within Redis. Redis sorts the members in a sorted set using the provided score. When storing an element in the queue, the score is computed as a function of the message priority and timeout (for timed queues).
For each queue three set of Redis data structures are maintained:
A Sorted Set containing queued elements by score.
A Hash set that contains message payload, with key as message ID.
A Sorted Set containing messages consumed by client but yet to be acknowledged. Un-ack set.
Push
Calculate the score as a function of message timeout (delayed queue) and priority
Add to sortedset for queue
Add message payload by ID into Redis hashed set with key as message ID.
Poll
Calculate max score as current time
Get messages with score between 0 and max
Add the message ID to unack set and remove from the sorted set for the queue.
If the previous step succeeds, retrieve the message payload from the Redis set based on ID
Ack
Remove from unack set by ID
Remove from the message payload set
Messages that are not acknowledged by the client are pushed back to the queue (at-least once semantics).
Availability Zone / Rack Awareness
Our queue recipe was built on top of Dynomite’s Java client, Dyno. Dyno provides connection pooling for persistent connections, and can be configured to be topology aware (token aware). Moreover, Dyno provides application specific local rack (in AWS a rack is a zone, e.g. us-east-1a, us-east-1b etc.) affinity based on request routing to Dynomite nodes. A client in us-east-1a will connect to a Dynomite/Redis node in the same AZ (unless the node is not available, in which case the client will failover). This property is exploited for sharding the queues by availability zone.
Sharding
Queues are sharded based on the availability zone. When pushing an element to the queue, the shard is determined based on round robin. This will ensure eventually all the shards are balanced. Each shard represents a sorted set on Redis with key being combination of queueName & AVAILABILITY _ZONE.
Dynomite consistency
The message broker uses a Dynomite cluster with consistency level set to DC_SAFE_QUORUM. Reads and writes are propagated synchronously to quorum number of nodes in the local data center and asynchronously to the rest. The DC_SAFE_QUORUM configuration writes to the number of nodes that make up a quorum. A quorum is calculated, and then rounded down to a whole number. This consistency level ensures all the writes are acknowledged by majority quorum.
Avoiding Global Lock
Each node (N1…Nn in the above diagram) has affinity to the availability zone and talks to the redis servers in that zone.
A Dynomite/Redis node serves only one request at a time. Dynomite can hold thousands of concurrent connections, however requests are processed by a single thread inside Redis. This ensures when two concurrent calls are issued to poll an element from queue, they are served sequentially by Redis server avoiding any local or distributed locks on the message broker side.
In an event of failover, DC_SAFE_QUORUM write ensures no two client connections are given the same message out of a queue, as write to UNACK collection will only succeed for a single node for a given element. This ensures if the same element is picked up by two broker nodes (in an event of a failover connection to Dynomite) only one will be able to add the message to the UNACK collection and another will receive failure. The failed node then moves onto peek another message from the queue to process.
Queue Rebalancing
Useful when queues are not balanced or new availability zone is added or an existing one is removed permanently.
Handling Un-Ack’ed messages
A background process monitors for the messages in the UNACK collections that are not acknowledged by a client in a given time (configurable per queue). These messages are moved back into the queue.
Multiple consumers
A modified version can be implemented, where the consumer can “subscribe” for a message type (message type being metadata associated with a message) and a message is delivered to all the interested consumers.
Ephemeral Queues
Ephemeral queues have messages with a specified TTL and are only available to consumer until the TTL expires. Once expired, the messages are removed from queue and no longer visible to consumer. The recipe can be modified to add TTL to messages thereby creating an ephemeral queue. When adding elements to the Redis collections, they can be TTLed, and will be removed from collection by Redis upon expiry.
KafkaKafka provides robust messaging solution with at-least once delivery semantics. Kafka lends itself well for message streaming use cases. Kafka makes it harder to implement the semantics around priority queues and time based queue (both are required for our primary use case). Case can be made to create large number of partitions in a queue to handle client usage — but then again adding a message broker in the middle will complicate things further.
SQSAmazon SQS is a viable alternative and depending upon the use case might be a good fit. However, SQS does not support priority or time based queues beyond 15 minute delay.
DisqueDisque is a project that aims to provide distributed queues with Redis like semantics. At the time we started working on this project, Disque was in beta (RC is out).
Zookeeper (or comparable) distributed locks / coordinator based solutions.A distributed queue can be built with Cassandra or similar backend with zookeeper as the global locking solution. However, zookeeper quickly becomes the bottleneck as the no. of clients grow adding to the latencies. Cassandra itself is known to have queues as anti-pattern use case.