Thursday, October 29, 2015

Message Brokers



https://streaml.io/resources/tutorials/concepts/understanding-batch-microbatch-streaming
https://streaml.io/resources/tutorials/concepts/messaging-and-queuing
Technologies such as Apache ActiveMQ, Amazon SQS, IBM Websphere MQ, RabbitMQ, and RocketMQ were developed primarily for message queuing use cases, while systems such as Amazon SNS, Apache Kafka and Google Cloud Pub/Sub were designed primarily for publish-subscribe use cases

https://mp.weixin.qq.com/s/ad7jibTb5nTzh3nDQYKFeg
功能维度又可以划分个多个子维度,大致可以分为以下这些:
  • 优先级队列
优先级队列不同于先进先出队列,优先级高的消息具备优先被消费的特权,这样可以为下游提供不同消息级别的保证。不过这个优先级也是需要有一个前提的:如果消费者的消费速度大于生产者的速度,并且消息中间件服务器(一般简单的称之为Broker)中没有消息堆积,那么对于发送的消息设置优先级也就没有什么实质性的意义了,因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
  • 延迟队列
当你在网上购物的时候是否会遇到这样的提示:“三十分钟之内未付款,订单自动取消”?这个是延迟队列的一种典型应用场景。延迟队列存储的是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。延迟队列一般分为两种:基于消息的延迟和基于队列的延迟。基于消息的延迟是指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,当然这也会对性能造成极大的影响。实际应用中大多采用基于队列的延迟,设置不同延迟级别的队列,比如5s、10s、30s、1min、5mins、10mins等,每个队列中消息的延迟时间都是相同的,这样免去了延迟排序所要承受的性能之苦,通过一定的扫描策略(比如定时)即可投递超时的消息。
  • 死信队列
由于某些原因消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊角色的队列,这个队列一般称之为死信队列。与此对应的还有一个“回退队列”的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack),进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。
  • 重试队列
重试队列其实可以看成是一种回退队列,具体指消费端消费消息失败时,为防止消息无故丢失而重新将消息回滚到Broker中。与回退队列不同的是重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大。举个例子:消息第一次消费失败入重试队列Q1,Q1的重新投递延迟为5s,在5s过后重新投递该消息;如果消息再次消费失败则入重试队列Q2,Q2的重新投递延迟为10s,在10s过后再次投递该消息。以此类推,重试越多次重新投递的时间就越久,为此需要设置一个上限,超过投递次数就入死信队列。重试队列与延迟队列有相同的地方,都是需要设置延迟级别,它们彼此的区别是:延迟队列动作由内部触发,重试队列动作由外部消费端触发;延迟队列作用一次,而重试队列的作用范围会向后传递。
  • 消费模式
消费模式分为推(push)模式和拉(pull)模式。推模式是指由Broker主动推送消息至消费端,实时性较好,不过需要一定的流制机制来确保服务端推送过来的消息不会压垮消费端。而拉模式是指消费端主动向Broker端请求拉取(一般是定时或者定量)消息,实时性较推模式差,但是可以根据自身的处理能力而控制拉取的消息量。
  • 广播消费
消息一般有两种传递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。对于点对点的模式而言,消息被消费以后,队列中不会再存储,所以消息消费者不可能消费到已经被消费的消息。虽然队列可以支持多个消费者,但是一条消息只会被一个消费者消费。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。RabbitMQ是一种典型的点对点模式,而Kafka是一种典型的发布订阅模式。但是RabbitMQ中可以通过设置交换器类型来实现发布订阅模式而达到广播消费的效果,Kafka中也能以点对点的形式消费,你完全可以把其消费组(consumer group)的概念看成是队列的概念。不过对比来说,Kafka中因为有了消息回溯功能的存在,对于广播消费的力度支持比RabbitMQ的要强。
  • 消息回溯
一般消息在消费完成之后就被处理了,之后再也不能消费到该条消息。消息回溯正好相反,是指消息在消费完成之后,还能消费到之前被消费掉的消息。对于消息而言,经常面临的问题是“消息丢失”,至于是真正由于消息中间件的缺陷丢失还是由于使用方的误用而丢失一般很难追查,如果消息中间件本身具备消息回溯功能的话,可以通过回溯消费复现“丢失的”消息进而查出问题的源头之所在。消息回溯的作用远不止与此,比如还有索引恢复、本地缓存重建,有些业务补偿方案也可以采用回溯的方式来实现。
  • 消息堆积+持久化

  • 消息追踪
对于分布式架构系统中的链路追踪(trace)而言,大家一定不会陌生。对于消息中间件而言,消息的链路追踪(以下简称消息追踪)同样重要。对于消息追踪最通俗的理解就是要知道消息从哪来,存在哪里以及发往哪里去。基于此功能下,我们可以对发送或者消费完的消息进行链路追踪服务,进而可以进行问题的快速定位与排查。
  • 消息过滤
消息过滤是指按照既定的过滤规则为下游用户提供指定类别的消息。就以kafka而言,完全可以将不同类别的消息发送至不同的topic中,由此可以实现某种意义的消息过滤,或者Kafka还可以根据分区对同一个topic中的消息进行分类。不过更加严格意义上的消息过滤应该是对既定的消息采取一定的方式按照一定的过滤规则进行过滤。同样以Kafka为例,可以通过客户端提供的ConsumerInterceptor接口或者Kafka Stream的filter功能进行消息过滤。

  • 流量控制
流量控制(flow control)针对的是发送方和接收方速度不匹配的问题,提供一种速度匹配服务抑制发送速率使接收方应用程序的读取速率与之相适应。通常的流控方法有Stop-and-wait、滑动窗口以及令牌桶等。
功能项
Kafka(1.1.0版本)
RabbitMQ(3.6.10版本)
优先级队列
不支持
支持。建议优先级大小设置在0-10之间。
延迟队列
不支持
支持
死信队列
不支持
支持
重试队列
不支持
不支持。RabbitMQ中可以参考延迟队列实现一个重试队列,二次封装比较简单。如果要在Kafka中实现重试队列,首先得实现延迟队列的功能,相对比较复杂。
消费模式
推模式
推模式+拉模式
广播消费
支持。Kafka对于广播消费的支持相对而言更加正统。
支持,但力度较Kafka弱。
消息回溯
支持。Kafka支持按照offset和timestamp两种维度进行消息回溯。
不支持。RabbitMQ中消息一旦被确认消费就会被标记删除。
消息堆积
支持
支持。一般情况下,内存堆积达到特定阈值时会影响其性能,但这不是绝对的。如果考虑到吞吐这因素,Kafka的堆积效率比RabbitMQ总体上要高很多。
持久化
支持
支持
消息追踪
不支持。消息追踪可以通过外部系统来支持,但是支持粒度没有内置的细腻。
支持。RabbitMQ中可以采用Firehose或者rabbitmq_tracing插件实现。不过开启rabbitmq_tracing插件件会大幅影响性能,不建议生产环境开启,反倒是可以使用Firehose与外部链路系统结合提供高细腻度的消息追踪支持。
消息过滤
客户端级别的支持
不支持。但是二次封装一下也非常简单。
多租户
不支持
支持
多协议支持
只支持定义协议,目前几个主流版本间存在兼容性问题。
RabbitMQ本身就是AMQP协议的实现,同时支持MQTT、STOMP等协议。
跨语言支持
采用Scala和Java编写,支持多种语言的客户端。
采用Erlang编写,支持多种语言的客户端。
流量控制
支持client和user级别,通过主动设置可将流控作用于生产者或消费者。
RabbitMQ的流控基于Credit-Based算法,是内部被动触发的保护机制,作用于生产者层面。
消息顺序性
支持单分区(partition)级别的顺序性。
顺序性的条件比较苛刻,需要单线程发送、单线程消费并且不采用延迟队列、优先级队列等一些高级功能,从某种意义上来说不算支持顺序性。
安全机制
(TLS/SSL、SASL)身份认证和(读写)权限控制
与Kafka相似
幂等性
支持单个生产者单分区单会话的幂等性。
不支持
事务性消息
支持
支持
https://segment.com/blog/exactly-once-delivery/
The single requirement of all data pipelines is that they cannot lose data. Data can usually be delayed or re-ordered–but never dropped. 
To satisfy this requirement, most distributed systems guarantee at-least-once deliveryThe techniques to achieve at-least-once delivery typically amount to: “retry, retry, retry”. You never consider a message ‘delivered’ until you receive a firm acknowledgement from the consumer.
But as a user, at-least-once delivery isn’t really what I want. I want messages to be delivered once. And only once.
Unfortunately, achieving anything close to exactly-once delivery requires a bullet-proof design. Each failure case has to be carefully considered as part of the architecture–it can’t be “bolted on” to an existing implementation after the fact. And even then, it’s pretty much impossible to have messages only ever be delivered once. 
In the past three months we’ve built an entirely new de-duplication system to get as close as possible to exactly-once delivery, in the face of a wide variety of failure modes. 
Most of Segment’s internal systems handle failures gracefully using retries, message re-delivery, locking, and two-phase commits. But, there’s one notable exception: clients that send data directly to our public API. 
Clients (particularly mobile clients) have frequent network issues, where they might send data, but then miss the response from our API.
In these cases, clients retry and re-send the same events to Segment’s API, even though the server has technically already received those exact messages.
De-duplicating our messages
For each message in our stream, we first check if we’ve seen that particular message, keyed by its id (which we assume to be unique). If we’ve seen a message before, discard it. If it’s new, we re-publish the message and commit the message atomically. 
To avoid storing all messages for all time, we keep a ‘de-duplication window’–defined as the time duration to store our keys before we expire them. As messages fall outside the window, we age them out. We want to guarantee that there exists only a single message with a given ID sent within the window.
To achieve this, we’ve created a ‘two-phase’ architecture which reads off Kafka, and de-duplicates all events coming in within a 4-week window.
First, each incoming message is tagged with a unique messageId , generated by the client. In most cases this is a UUIDv4 (though we are considering a switch to ksuids). If a client does not supply a messageId, we’ll automatically assign one at the API layer.
Individual messages are logged to Kafka for durability and replay-ability. They are partitioned by messageId so that we can ensure the same messageId will alwaysbe processed by the same consumer.
This is an important piece when it comes to our data processing. Instead of searching a central database for whether we’ve seen a key amongst hundreds of billions of messages, we’re able to narrow our search space by orders of magnitude simply by routing to the right partition. 
Each worker stores a local RocksDB database on its local EBS hard drive. RocksDB is an embedded key-value store developed at Facebook, and is optimized for incredibly high performance.
Whenever an event is consumed from the input topic, the consumer queries RocksDB to determine whether we have seen that event’s messageId . 
If the message does not exist in RocksDB, we add the key to RocksDB and then publish the message to the Kafka output topic.  
If the message already exists in RocksDB, the worker simply will not publish it to the output topic and update the offset of the input partition, acknowledging that it has processed the message.
  1. detecting existence of random keys that come in, but likely don’t exist in our DB. These may be found anywhere within our keyspace.
  2. writing new keys at a high write throughput
  3. aging out old keys that have passed outside of our ‘de-duplication window’
Our database has to satisfy three very separate query patterns
RocksDB is an log-structured-merge-tree (LSM) database–meaning that it is constantly appending new keys to a write-ahead-log on disk, as well as storing the sorted keys in-memory as part of a memtable.

https://blog.iron.io/smart-endpoints-smart-pipes-smarter/
Persistence is a key trait that makes a message queue evolve from being “dumb” to being “smart”, but what does that really mean? Message queues are not databases or caches in the traditional sense, but can provide persistence by writing to disk during the queue process. The general pattern is write once, read once, and then delete when acknowledged by the consumer. The acknowledgement, or ack, is a critical step in ensuring the persistence is working as it’s intended.
https://bravenewgeek.com/smart-endpoints-dumb-pipes/
People are easily seduced by “fat” middleware—systems with more features, more capabilities, more responsibilities—because they think it makes their lives easier, and it might at first. Pushing off more responsibility onto the infrastructure makes the application simpler, but it also makes the infrastructure more complex, more fragile, and more slow. Take exactly-once message delivery, for example. Lots of people want it, but the pursuit of it introduces a host of complexityoverhead (in terms of development, operations, and performance), and risk. The end result is something that, in addition to these things, requires all downstream systems to not introduce duplicates and be mindful about their side effects. That is, everything in the processing pipeline must be exactly-once or nothing is. So typically what you end up with is an approximation of exactly-once delivery. You make big investments to lower the likelihood of duplicates, but you still have to deal with the problem. This might make sense if the cost of having duplicates is high, but that doesn’t seem like the common case. My advice is to always opt for the simple solution. We tend to think of engineering challenges as technical problems when, in reality, they’re often just mindset problems. Usually the technical problems have already been solved if we can just adjust our mindset.
There are a couple things to keep in mind here. The first thing to consider is simply capability lock-in. As you push more and more logic off onto more and more specialized middleware, you make it harder to move off it or change things. The second is what we already hinted at. Even with smart middleware, problems still leak out and you have to handle them at the edge—you’re now being taxed twice. This is essentially the end-to-end argument. Push responsibility to the edges, smart endpoints, dumb pipes, etc. It’s the idea that if you need business-level guarantees, build them into the business layer because the infrastructure doesn’t care about them.
NATS was the first system in the space that really turned the way we did messaging on its head (outside of maybe ZeroMQ). It didn’t provide any strong delivery guarantees, transactions, message persistence, or other responsibilities usually assumed by message brokers (there is a layer that provides some of these things, but it’s not baked into the core technology). Instead, NATS prioritized availability, simplicity, and performance over everything else. 

RPC is a bad abstraction for building distributed systems. Use simple, versatile primitives and embrace asynchrony and messaging.

be careful about relying on strong semantics because experience shows few things are guaranteed when working with distributed systems at scale. Err to the side of simple. Make few assumptions of your middleware. Push work out of your infrastructure and to the edges if you care about performance and scalability because nothing is harder to scale (or operate) than slow infrastructure that tries to do too much.
https://bravenewgeek.com/fifo-exactly-once-and-other-costs/
Kevin shows how FIFO delivery is really only meaningful when you have one single-threaded publisher and one single-threaded receiver. Amazon’s FIFO queues allow you to control how restrictive this requirement is by applying ordering on a per-group basis. In other words, we can improve throughput if we can partition work into different ordered groups rather than a single totally ordered group. However, FIFO still effectively limits throughput on a group to a single publisher and single subscriber. If there are multiple publishers, they have to coordinate to ensure ordering is preserved with respect to our application’s semantics. On the subscriber side, things are simpler because SQS will only deliver messages in a group one at a time in order amongst subscribers.
Amazon’s FIFO queues also have an exactly-once processing feature which deduplicates messages within a five-minute window. Note, however, that there are some caveats with this, the obvious one being duplicate delivery outside of the five-minute window. A mission-critical system would have to be designed to account for this possibility. My argument here is if you still have to account for it, what’s the point unless the cost of detecting duplicates is prohibitively expensive? But to be fair, five minutes probably reduces the likelihood enough to the point that it’s useful and in those rare cases where it fails, the duplicate is acceptable.
The more interesting caveat is that FIFO queues do not guarantee exactly-once delivery to consumers (which, as we know, is impossible). Rather, they offer exactly-once processing by guaranteeing that once a message has successfully been acknowledged as processed, it won’t be delivered again. It’s up to applications to ack appropriately. When a message is delivered to a consumer, it remains in the queue until it’s acked. The visibility timeout prevents other consumers from processing it. With FIFO queues, this also means head-of-line blocking for other messages in the same group.
We might do this by using a database transaction to atomically process and acknowledge the messages. An alternative, yet similar, approach might be to use a write-ahead-log-like strategy whereby the consuming system reads messages from SQS and transactionally stores them in a database for future processing. Once the messages have been committed, the consumer deletes the messages from SQS. In either of these approaches, we’re basically shifting the onus of exactly-once processing onto an ACID-compliant relational database.
Note that this is really how Kafka achieves its exactly-once semantics. It requires end-to-end cooperation for exactly-once to work. State changes in your application need to be committed transactionally with your Kafka offsets.
As Kevin points out, FIFO SQS queues offer exactly-once processing only if 1) publishers never publish duplicate messages wider than five minutes apart and 2) consumers never fail to delete messages they have processed from the queue. Solving either of these problems probably requires some
Where I see FIFO and exactly-once semantics being useful is when talking to systems which cannot cooperate with the end-to-end transaction

It turns out relational databases are really good at ensuring invariants like exactly-once.
They see “exactly-once FIFO queues” in SQS or “exactly-once delivery” in Kafka and take it at face value. They don’t read beyond the headline. They don’t look for the caveats. 


Web Scalability for Startup Engineers
Asynchronous Processing
Synchronous processing makes it hard to build responsive applications because there is no way to guarantee how long it will take for a blocking operation to complete.

A callback is a construct of asynchronous processing where the caller does not block while waiting for the result of the operation, but provides a mechanism to be notified once the operation is finished. A callback is a function, an object, or an endpoint that gets invoked whenever the asynchronous call is completed.

Instead of all the steps happening within a single execution thread, we can have ClientCode, Callback, Queue, and QueueConsumer execute in separate threads. They could also execute on different servers as different processes.

Message Queue
A message queue is a component that buffers and distributes asynchronous requests.
The separation of producers and consumers using a queue gives us the benefit of nonblocking communication between producer and consumer
producers and consumers can be scaled separately. This means that we can add more producers at any time without overloading the system.

Another benefit of this separation is that now producers and consumers can be scaled separately. This means that we can add more producers at any time without overloading the system. Messages that cannot be consumed fast enough will just begin to line up in the message queue. We can also scale consumers separately, as now they can be hosted on separate machines and the number of consumers can grow independently of producers

Not having to know how consumers are implemented, what technologies they use, or even if they are available are signs of strong decoupling (which is a very good thing).

Message Broker
permissions control, routing, or failure recovery
A message broker may be referred to as message-oriented middleware (MOM) or enterprise service bus (ESB), depending on the technology used.

direct worker queue, publish/subscribe, and custom routing rules.

Direct Worker Queue Method
you can have one or more consumers competing for messages. Each message arriving to the queue is routed to only one consumer.

Publish/Subscribe Method

Producers publish messages to a topic, not a queue. Messages arriving to a topic are then cloned for each consumer that has a declared subscription to that topic. If there are no consumers at the time of publishing, messages can be discarded altogether (though this behavior may depend on the configuration of the message broker).

Each consumer can then consume messages independently from other consumers, as it has a private queue with copies of all the messages that were published to the selected topic.

most brokers allow for competing consumers, in which case multiple consumers subscribe to the same queue and messages are distributed among them, rather than a single consumer having to process all of the messages.

Custom Routing Rules
in RabbitMQ you can use a concept of bindings to create flexible routing rules (based on text pattern matching) In ActiveMQ you can use the Camel extension to create more advanced routing rules.

Messaging Protocols
A messaging protocol defines how client libraries connect to a message broker and how messages are transmitted.
AMQP (Advanced Message Queuing Protocol)
STOMP (Streaming Text-Oriented Messaging Protocol)
Prefetch is a great way of increasing throughput because messages are received in batches instead of one message at a time.
advanced features have to be implemented as extensions using custom headers -- no standard header

BENEFITS OF MESSAGE QUEUES
Enabling asynchronous processing
Easier scalability
Evening out traffic spikes
Isolating failures and self-healing
Decoupling
Low-value processing in the critical path

Problems
No Message Ordering
Limit the number of consumers to a single thread per queue. Some message queues guarantee ordered delivery (First In First Out [FIFO]) as long as you consume messages one at a time by a single client. Unfortunately, this is not a scalable solution and not all messaging systems support it.
Build the system to assume that messages can arrive in random order.

It is best to depend on the message broker to deliver messages in the right order by using a partial message guarantee (ActiveMQ) or topic partitioning (Kafka)

Partial message ordering is a clever mechanism provided by ActiveMQ called message groups. Messages can be published with a special “label” called a message group ID.

The group ID is defined by the application developer (for example, it could be a customer ID). Then all messages belonging to the same group are guaranteed to be consumed in the same order they were produced. Figure 7-19 shows how messages belonging to different groups get queued up separately for different consumers. Whenever a message with a new group ID gets published, the message broker maps the new group ID to one of the existing consumers. From then on, all the messages belonging to the same group are delivered to the same consumer. This may cause other consumers to wait idly without messages as the message broker routes messages based on the mapping rather than random distribution.

Message Requeueing (another issue: out-of-order delivery)
A strategy worth considering is to depend on at-least-once delivery instead of exactly-once delivery. 
By allowing messages to be delivered to your consumers more than once, you make your system more robust and reduce constraints put on the message queue and its workers. For this approach to work, you need to make all of your consumers idempotent.

make consumers idempotent whenever it is practical, but remember that enforcing it across the system may not always be worth the effort.

Race Conditions Become More Likely
MESSAGE QUEUE–RELATED ANTI-PATTERNS
Treating the Message Queue as a TCP Socket
Some message brokers allow you to create return channels. A return channel becomes a way for the consumer to send a message back to the producer. If you use it a lot, you may end up with an application that is more synchronous than asynchronous.

Treating Message Queue as a Database
Coupling Message Producers with Consumers
a flawed implementation I saw involved serializing an entire object and adding it to the message body. This meant that the consumer had to have this particular class available, and it was not able to process the message without executing the serialized object’s code. Even worse, it meant that the consumer had to be implemented in the same technology as the producer and its deployment had to be coordinated to prevent class mismatches. Messages should not have “logic” or executable code within.

Lack of Poison Message Handling
Hope for the best, prepare for the worst.

In ActiveMQ you can use dead-letter queue policies out of the box. All you need to do is set limits for your messages, and they will be automatically removed from the queue after a certain number of failures. If you use Amazon SQS, you can implement poison message handling in your own code by using an approximate delivery counter. Every time a message is redelivered, SQS increments its approximate delivery counter so that your application could easily recognize messages of death and route them to a custom dead-letter queue or simply discard them.

MVP - Minimum viable product
Amazon Simple Queue Service
All you need to do when integrating with SQS is to make sure your publishers and consumers are not coupled directly to SQS SDK code. I recommend using thin wrappers and your own interfaces

RabbitMQ
Flexibility is actually the thing that makes RabbitMQ really stand out.
RabbitMQ supports two main messaging protocols—AMQP and STOMP.

The most attractive feature of RabbitMQ is the ability to dynamically configure routes and completely decouple publishers from consumers. In regular messaging, the consumer has to be coupled by a queue name or a topic name. This means that different parts of the system have to be aware of one another to some extent. In RabbitMQ, publishers and consumers are completely separated because they interact with separate endpoint types. RabbitMQ introduces a concept of an exchange.

An exchange is just an abstract named endpoint to which publishers address their messages. Publishers do not have to know topic names or queue names as they publish messages to exchanges. Consumers, on the other hand, consume messages from queues.

Publishers have to know the location of the message broker and the name of the exchange, but they do not have to know anything else. Once a message is published to an exchange, RabbitMQ applies routing rules and sends copies of the message to all applicable queues. Once messages appear in queues, consumers can consume them without knowing anything about exchanges.

The trick is that routing rules can be defined externally using a web administration interface, AMQP protocol, or RabbitMQ’s REST API.

Don't support scheduled message delivery. The only important drawbacks of RabbitMQ are the lack of partial message ordering and poor poison message support.
RabbitMQ can be fully configured and monitored using a REST API.

ActiveMQ - written in java
Camel is an integration framework designed to implement enterprise integration patterns.
ActiveMQ implements message groups mentioned earlier, which allow you to partially guarantee ordered message delivery.

RabbitMQ performs better in such a scenario, as it has a built-in backpressure feature. If messages are published faster than they can be processed or persisted, RabbitMQ begins to throttle producers to avoid message loss and running out of memory. The benefit of that approach is increased stability and reliability, but it can cause unexpected delays on the publisher side, as publishing messages slows down significantly whenever backpressure is triggered.

event-driven architecture (EDA)
Event-driven architecture (EDA) is an architecture style where most interactions between different components are realized by announcing events that have already happened instead of requesting work to be done. On the consumer side, EDA is about responding to events that have happened somewhere in the system or outside of it. EDA consumers do not behave as services; they do not do things for others. They just react to things happening elsewhere.

Request/Response Interaction
Direct Worker Queue Interaction
Event-Based Interaction

In a purely EDA, all the interactions are based on events. This leads to an interesting conclusion that if all of the interactions are asynchronous and all the interactions are carried out using events, you could use events to re-create the state of the entire system by simply replaying events. This is exactly what event sourcing allows us to do.

Event sourcing is a technique where every change to the application state is persisted in the form of an event. Events are usually stored on disk in the form of event log files or some data store. At the same time, an application is built of event consumers, which process events passed to them. As a result, you can restore the system to an old state (for example, using a daily snapshot) and replay events to reach the same end state.

MySQL replication is done in a similar way, as every data modification is recorded in the binary log right after the change is made on the master server. Since all state changes are in the binary log, the state of the slave replica server can be synchronized by replaying the binary log. The only difference is that consumers of these events are replicating slaves. Having all events persisted in a log means that you can add a new event consumer and process historical events, so it would look like it was running from the beginning of time.

The important limitation of event sourcing is the need for a centralized state and event log. To be able to reconstruct the state of the application based on event log alone, you need to be processing them in the same order.
event sourcing requires sequential processing of all events.

Index
Cardinality is a number of unique values stored in a particular field. Fields with high cardinality are good candidates for indexes, as they allow you to reduce the data set to a very small number of rows.

The reason why low-cardinality fields are bad candidates for indexes is that they do not narrow down the search enough. After traversing the index, you still have a lot of rows left to inspect.

The first rule of thumb when creating indexes on a data set is that the higher the cardinality, the better the index performance.
Another important factor is the item distribution.
The second rule of thumb when creating indexes is that equal distribution leads to better index performance.

A compound index, also known as a composite index, is an index that contains more than one field. You can use compound indexes to increase search efficiency where cardinality or distribution of values of individual fields is not good enough.

When you create a compound index, you effectively create a sorted list ordered by multiple columns.
Depending on the order of columns in the compound index, the sorting of data changes.
you need to know exactly what types of searches you are going to perform to choose which one is better for your application.

if you want to use NoSQL data stores, you need to stop thinking of data as if it were stored in tables and think of it as if it were stored in indexes.

MODELING DATA
When you use NoSQL data stores, you need to get used to thinking of data as if it were an index.
The main challenge when designing and building the data layer of a scalable web application is identifying access patterns and modeling your data based on these access patterns.

When designing a relational database schema, you would usually start by looking at the data itself. You would ask yourself, “What is the data that I need to store?” You would then go through all of the bits of information that need to be persisted and isolate entities (database tables). You would then decide which pieces of information should be stored in which table. You would also create relationships between tables using foreign keys. You would then iterate over the schema design, trying to reduce the amount of redundant data and circular relationships.

NoSQL
you need to start with queries in mind
Once you optimize your data model for particular types of queries, you usually lose the ability to perform other types of queries. Designing a NoSQL data model is much more about tradeoffs and data layout optimization than it is about normalization.

When designing a data model for a NoSQL data store, you want to identify all the queries and access patterns first.

Flexibility is one of the most important attributes of good architecture. To quote Robert C. Martin again, “Good architecture maximizes the number of decisions not made.” By denormalizing data and optimizing for certain access patterns, you are making a tradeoff. You sacrifice some flexibility for the sake of performance and scalability. It is critical to be aware of these tradeoffs and make them very carefully.

Key-value data stores These data stores support only the most simplistic access patterns. To access data, you need to provide the key under which data was stored. Key-value stores have a limited programming interface—basically all you can do is set or get objects based on their key. Key-value stores usually do not support any indexes or sorting (other than the primary key). At the same time, they have the least complexity and they can implement automatic sharding based on the key, as each value is independent and the only way to access values is by providing their keys. They are good for fast one-to-one lookups, but they are impractical when you need sorted lists of objects or when you need to model relationships between objects. Examples of key-value stores are Dynamo and Riak. Memcached is also a form of a key-value data store, but it does not persist data, which makes it more of a key-value cache than a data store. Another data store that is sometimes used as a key-value store is Redis, but it has more to offer than just key-value mappings.

Wide columnar data stores These data stores allow you to model data as if it was a compound index.
you can build sorted lists. There is no concept of a join, so denormalization is a standard practice, but in return wide columnar stores scale very well. They usually provide data partitioning and horizontal scalability out of the box. They are a good choice for huge data sets like user-generated content, event streams, and sensory data. Examples of wide columnar data stores are BigTable, Cassandra, and HBase.

Document-oriented data stores These data stores allow more complex objects to be stored and indexed by the data store. Document-based data stores use a concept of a document as the most basic building block in their data model. Documents are data structures that can contain arrays, maps, and nested structures just as a JSON or XML document would. Although documents have flexible schemas (you can add and remove fields at will on a per-document basis), document data stores usually allow for more complex indexes to be added to collections of documents. Document stores usually offer a fairly rich data model, and they are a good use case for systems where data is difficult to fit into a predefined schema (where it is hard to create a SQL-like normalized model) and at the same time where scalability is required. Examples of document-oriented data stores are MongoDB, CouchDB, and Couchbase.

Each row has a row key, which is a primary key and at the same time a sharding key of the table. The row key is a string—it uniquely identifies a single row and it is automatically indexed by Cassandra. Rows are distributed among different servers based on the row key, so all of the data that needs to be accessed together in a single read needs to be stored within a single row.
rows are indexed based on the row key and columns are indexed based on a column name.

The way Cassandra organizes and sorts data in tables is similar to the way compound indexes work. Any time you want to access data, you need to provide a row key and then column name, as both of these are indexed. Because columns are stored in sorted order, you can perform fast scans on column names to retrieve neighboring columns. Since every row lives on its own and there is no table schema definition, there is no way to efficiently select multiple rows based on a particular column value.

Finding a flexible data model that supports all known access patterns and provides maximal flexibility is the real challenge of NoSQL.
An alternative way to deal with the NoSQL indexing challenge is to use a dedicated search engine for more complex queries rather than trying to satisfy all use cases with a single data store.

order of command execution on each server becomes irrelevant, and you can issue the same command multiple times without affecting the end result (making these commands idempotent).

set data under column named “$time|$item_id” for a row “$user_email” in table user_bids
set data under column named “$time|$user_email” for a row “$item_id” in table item_bids

Once you think your data model is complete, it is critical to validate it against the list of known use cases. This way, you can ensure that your data model can, in fact, support all of the access patterns necessary.

Search Engine
why search engines need so much memory to operate. With millions of documents, each containing thousands of words, an inverted index grows in size faster than a normal index would because each word in each document must be indexed.

Elasticsearch does not require any predefined schema.

A common pattern for indexing data in a search engine is to use a job queue (especially since search engines are near real time anyway). Anytime anyone modifies car metadata, they submit an asynchronous message for this particular car to be reindexed. At a later stage, a queue worker picks up the message from the queue, builds up the JSON document with all the information, and posts to the search engine to overwrite previous data.
https://dzone.com/articles/exploring-message-brokers
RabbitMQ is written in Erlang, not a widely used programming language but well adapted to such tasks. 
Kafka
What is so special about Kafka is the architecture, it stores the messages in flat files and consumers ask messages based on an offset.
The server is pretty simple and just don’t care about the consumers much. That simplicity makes it super fast and low on resource. Old messages can be retained on a time base (like expire_logs_days) and/or on a storage usage base.
So, if the server doesn’t keep track of what has been consumed on each topics, how do can you have multiple consumer. The missing element here is Zookeeper. The Kafka server uses Zookeeper for cluster membership and routing while the consumers can also use Zookeeper or something else for synchronization. The sample consumer provided with the server uses Zookeeper so you can launch many instances and they’ll synchronize automatically. 
ActiveMQ(Java)
HA can be provided by the storage backend, levelDB supports replication.
the mesh of brokers is that you connect to one of the members and you publish or consume a message. You don’t know on which node(s) the queue is located, the broker you connect to knows and routes your request. To further help, you can specify all the brokers on the connection string and the client library will just reconnect to another if the one you are connected to goes down.
Kestrel
Written in scala, the Kestrel broker speaks the memcached protocol. Basically, the key becomes the queue name and the object is the message. Kestrel is very simple, queues are defined in a configuration file but you can specify, per queue, storage limits, expiration and behavior when limits are reached. With a setting like “discardOldWhenFull = true”, my requirement of never blocking the publishers is easily met.
RabbitMQ vs Kafka


RabbitMQ uses message acknowledgments to ensure delivery state on the broker itself.
Kafka doesn't have message acknowledgments and it expects the consumer to remember about the delivery state.


Both of them use ZooKeeper to maintain their state across a cluster.


RabbitMQ was not designed for large volume and would fall over if the consumers were too slow.
However, post 2.0, RabbitMQ claims to handle slow batch consumers as well.
Kafka was designed from the beginning to handle both online and batch consumers.
So it can handle 100k+ events per second.
RabbitMQ's claim to that attribute is around 20k+ events per second.


Kafka uses Topic-like exchanges only.
RabbitMQ uses a richer variety of exchanges like Queues as well as Topics.


Kafka provides message ordering inside partitions.
So for strict ordering across partitions, the Kafka consumers have to be smart enough and resolve ordering across partitions themselves.


Kafka persists messages on disk and replicates them within the cluster to prevent data loss.
Each broker can handle terabytes of messages without performance impact.
Kafka has been tested to provide close to 200k messages/sec for writes and 3M messages/sec for reads.

Advanced Message Queuing Protocol (AMQP)

The Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol for message-oriented middleware.
The defining features of AMQP are:
Message orientation
Queuing
Routing (including point-to-point and publish-and-subscribe)
Reliability and
Security

It was originated in 2003 at JPMorgan Chase.

AMQP mandates the behavior of the messaging provider and client to the extent that implementations from different vendors are inter-operable, in the same way as SMTP, HTTP, FTP, etc. have created inter-operable systems. Previous standardizations like focused on standardizing the API level. AMQP however is a wire-level protocol. It is a description of the data-format that is sent across the network as a stream of octets. Thus it allows implementations to have different programming interfaces as long as they conform to the wire-level format. 

RabbitMQ vs Kafka
RabbitMQ uses message acknowledgments to ensure delivery state on the broker itself.
Kafka doesn't have message acknowledgments and it expects the consumer to remember about the delivery state.

Both of them use ZooKeeper to maintain their state across a cluster.

RabbitMQ was not designed for large volume and would fall over if the consumers were too slow.
However, post 2.0, RabbitMQ claims to handle slow batch consumers as well.
Kafka was designed from the beginning to handle both online and batch consumers.
So it can handle 100k+ events per second.
RabbitMQ's claim to that attribute is around 20k+ events per second.

Kafka uses Topic-like exchanges only.
RabbitMQ uses a richer variety of exchanges like Queues as well as Topics.

Kafka provides message ordering inside partitions.
So for strict ordering across partitions, the Kafka consumers have to be smart enough and resolve ordering across partitions themselves.

Kafka persists messages on disk and replicates them within the cluster to prevent data loss.
Each broker can handle terabytes of messages without performance impact.
Kafka has been tested to provide close to 200k messages/sec for writes and 3M messages/sec for reads.

Advanced Message Queuing Protocol (AMQP)
The Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol for message-oriented middleware.
The defining features of AMQP are:
Message orientation
Queuing
Routing (including point-to-point and publish-and-subscribe)
Reliability and
Security

It was originated in 2003 at JPMorgan Chase.

AMQP mandates the behavior of the messaging provider and client to the extent that implementations from different vendors are inter-operable, in the same way as SMTP, HTTP, FTP, etc. have created inter-operable systems. Previous standardizations like focused on standardizing the API level. AMQP however is a wire-level protocol. It is a description of the data-format that is sent across the network as a stream of octets. Thus it allows implementations to have different programming interfaces as long as they conform to the wire-level format.


Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts