Wednesday, February 1, 2017

Kafka Misc

Idempotence: Exactly-once in order semantics per partition

An idempotent operation is one which can be performed many times without causing a different effect than only being performed once. The producer send operation is now idempotent. In the event of an error that causes a producer retry, the same message—which is still sent by the producer multiple times—will only be written to the Kafka log on the broker once. For a single partition, Idempotent producer sends remove the possibility of duplicate messages due to producer or broker errors. To turn on this feature and get exactly-once semantics per partition—meaning no duplicates, no data loss, and in-order semantics—configure your producer to set “enable.idempotence=true”.
each batch of messages sent to Kafka will contain a sequence number which the broker will use to dedupe any duplicate send. Unlike TCP, though—which provides guarantees only within a transient in-memory connection—this sequence number is persisted to the replicated log, so even if the leader fails, any broker that takes over will also know if a resend is a duplicate. The overhead of this mechanism is quite low: it’s just a few extra numeric fields with each batch of messages

Transactions: Atomic writes across multiple partitions

Second, Kafka now supports atomic writes across multiple partitions through the new transactions API. This allows a producer to send a batch of messages to multiple partitions such that either all messages in the batch are eventually visible to any consumer or none are ever visible to consumers. This feature also allows you to commit your consumer offsets in the same transaction along with the data you have processed, thereby allowing end-to-end exactly-once semantics.

The real deal: Exactly-once stream processing in Apache Kafka

Building on idempotency and atomicity, exactly-once stream processing is now possible through the Streams API in Apache Kafka. All you need to make your Streams application employ exactly-once semantics, is to set this config “processing.guarantee=exactly_once”. This causes all of the processing to happen exactly once; this includes making both the processing and also all of the materialized state created by the processing job that is written back to Kafka, exactly once.
“This is why the exactly-once guarantees provided by Kafka’s Streams API are the strongest guarantees offered by any stream processing system so far. It offers end-to-end exactly-once guarantees for a stream processing application that extends from the data read from Kafka, any state materialized to Kafka by the Streams app, to the final output written back to Kafka. Stream processing systems that only rely on external data systems to materialize state support weaker guarantees for exactly-once stream processing. Even when they use Kafka as a source for stream processing and need to recover from a failure, they can only rewind their Kafka offset to reconsume and reprocess messages, but cannot rollback the associated state in an external system, leading to incorrect results when the state update is not idempotent.”
In order to work against multiple broker versions, clients need to know what versions of various APIs a broker supports. The broker exposes this information since as described in KIP-35. Clients should use the supported API versions information to choose the highest API version supported by both client and broker. If no such version exists, an error should be reported to the user.
./ localhost:2181
get /brokers/topics/__consumer_offsets/partitions/44/state
cZxid = 0x4e
echo "exclude.internal.topics=false" > /tmp/consumer.config
./ --consumer.config /tmp/consumer.config --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
Go to $GOPATH/src/ and save the burrow.cfg as burrow.cfg.orig then edit burrow.cfg to match the environment then copy this file to $GOBIN for simplification. You can also specify the path where the config file is stored.
  1. $ GOPATH/bin/burrow --config path/to/burrow.cfg
Another note, you've omitted the [general] section from the config file you posted. I'm not sure if this is because you're not specifying it, or if you omitted it from what you posted for other reasons, but normally there is a group blacklist config that includes console consumers.
And yes, David is referring to under the kafka section of the config. If you do not explicitly set zookeeper-offsets to true, Burrow will not check Zookeeper committed offsets.
I needed to set the dual.commit.enabled and config params and follow Todds advice to bounce the consumers here (#7) and my missing consumer-group appears
        Properties config = new Properties();
        config.put("zookeeper.connect", defaultKafkaZk);
        config.put("", defaultKafkaZkTimeout);
        config.put("", defaultKafkaConsumerGroup);
        // this is required because of in order for burrow to get the offsets 
        config.put("dual.commit.enabled", "false");
        config.put("", "kafka");
We're running Secor again Kafka 10 without any issues. You've to configure Kafka to store offsets in ZK, and not Kafka.
To your original question, as to how Burrow lists consumer groups, it only knows about a consumer group when it commits offsets. Burrow does not use any other means of getting a full group list. If a consumer commits an offset, the group exists. If there is no offset committed, then as far as Burrow is concerned the group does not exist. If you have enabled Zookeeper offset checking for Kafka, Burrow also enumerates the /consumers tree in ZK for groups, but it will only evaluate groups there that have a /consumers//offsets tree.

From command I can see that you are using --new-consumer to describe consumer group. When new consumer is used it tries to fetch consumer group info from consumer offset topics which gets created in kafka log directory.
Try using --zookeeper instead of --new-consumer, for eg :
  1. $ /usr/bin/ --zookeeper <zookeeper-hostname>:2181 --describe --group <consumer-group>
Be aware that one use case for partitions is to semantically partition data, and adding partitions doesn't change the partitioning of existing data so this may disturb consumers if they rely on that partition. That is if data is partitioned by hash(key) % number_of_partitionsthen this partitioning will potentially be shuffled by adding partitions but Kafka will not attempt to automatically redistribute data in any way.  -- the script doesn't exist
./bin/ --alter --zookeeper localhost:2181 --topic my-topic --partitions 3
 --partition the new partition number
we can use this command to increase or decrease partition number
You can verify whether partitions have been increased by using describe command as follows - 
./bin/ --describe --zookeeper localhost:2181 --topic my-topic
By default, when a consumer is started for the very first time, it ignores all existing data in a topic and will only consume new data coming in after the consumer is started. If this is the case, try sending some more data after the consumer is started. Alternatively, you can configure the consumer by setting auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest" for the old consumer.!topic/confluent-platform/5HT-U0RQdtI
Kafka Streams currently supports at-least-once processing guarantees in the presence of failure. This means that if your stream processing application fails, no data records are lost and fail to be processed, but some data records may be re-read and therefore reprocessed.
Beyond catching and adding some more context to the exceptions i'm not sure what else we can do.
The only exception I was thinking about for post-KIP4 is consumer.commitSync, when it throws CommitFailedException which usually indicate a rebalance happens behind the scene and this task is no longer owned by the consumer. Today we throw this exception all the way to the users as well but we can actually auto-handle it by just closing the task since we know it has migrated to another instance already.
auto.create.topics.enable Enable auto creation of topic on the server boolean true
kafka-topics --zookeeper localhost:2181 --delete --topic $1
> bin/ --zookeeper localhost:2181 --topic test --delete --zookeeper localhost:2181 --topic your_topic_name

  Topic your_topic_name is marked for deletion.
  Note: This will have no impact if delete.topic.enable is not set to true.
Kafka topics are "distributed and partitioned append only logs". Parameter log.dir defines where topics (ie, data) is stored.
log.dir or log.dirs in the config/ specifiy the directories in which the log data is kept. The server log directory is kafka_base_dir/logs by default. You could modify it by specifying another directory for 'kafka.logs.dir' in
./ --describe --zookeeper localhost:2181 --topic topic_name
You should see what you need under PartitionCount.
Topic:topic_name        PartitionCount:5        ReplicationFactor:1     Configs:
        Topic: topic_name       Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 4    Leader: 1001    Replicas: 1001  Isr: 1001

# The default number of log partitions per topic.
  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG is a lower bound on the commit interval, ie, after a commit, the next commit happens not before this time passed. Basically, Kafka Stream tries to commit ASAP after this time passed, but there is no guarantee whatsoever how long it will actually take to do the next commit.
  • StreamsConfig.POLL_MS_CONFIG is used for the internal KafkaConsumer#poll() call, to specify the maximum blocking time of the poll() call.
Thus, both values are not helpful to heartbeat more often.
Kafka Streams follows a "depth-first" strategy when processing record. This means, that after a poll() for each record all operators of the topology are executed. Let's assume you have three consecutive maps, than all three maps will be called for the first record, before the next/second record will get processed.
Thus, the next poll() call will be made, after all record of the first poll() got fully processed. If you want to heartbeat more often, you need to make sure, that a single poll() call fetches less records, such that processing all records takes less time and the next poll() will be triggered earlier.
You can use configuration parameters for KafkaConsumer that you can specify via StreamsConfigto get this done (see
streamConfig.put(ConsumerConfig.XXX, VALUE);
  • max.poll.records: if you decrease this value, less record will be polled
  • if you increase this value, there is more time for processing data (adding this for completeness because it is actually a client setting and not a server/broker side configuration -- even if you are aware of this solution and do not like it :))
As of Kafka 0.10.1 it is possible (and recommended) to prefix consumer and procuder configs within streams config. This avoids parameter conflicts as some parameter names are used for consumer and producer and cannot be distinguiesh otherwise (and would be applied to consumer and producer at the same time). To prefix a parameter you can use StreamsConfig#consumerPrefix() or StreamsConfig#producerPrefix(), respectively. For example: streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);
One more thing to add: The scenario described in this question is a known issue and there is already KIP-62 that introduces a background thread for KafkaConsumer that send heartbeats, thus decoupling heartbeats from poll() calls. Kafka Streams will leverage this new feature in upcoming releases.
Allow consumer to send heartbeats in background thread (KIP-62)
bin/ --zookeeper localhost:2181 --delete --topic test
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config
While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs.
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config --entity-name MyTopic
Configurations set via this method can be displayed with the command
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
- cqlsh can now connect to older Cassandra versions by downgrading the native
  protocol version. Please note that this is currently not part of our release 
  testing and, as a consequence, it is not guaranteed to work in all cases.
  See CASSANDRA-12150 for more details.

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");"streams-file-input").to("streams-pipe-output");
sounds like you want the min_queued_messages and linger_ms settings on Producer. The default min_queued_messages is very high, so if you're producing a single message and expecting it to be sent right away, you may want to use this kwarg:
producer = topic.get_producer(min_queued_messages=1)
linger_ms can also be helpful here depending on your use case.

auto.offset.reset What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server latest The number of threads to execute stream processing 1
  1. A “lower-level” processor that providea API’s for data-processing, composable processing and local state storage.
  2. A “higher-level” stream DSL that would cover most processor implementation needs.

Error handling
  1. Avoid wrapping all exceptions in the StreamThread, but instead only StreamsExceptions (this is the top-level exception type of Kafka Streams); other exceptions will be treated as user app code exceptions.
  2. KafkaStreams.start() / close() should not throw any exceptions; all exceptions thrown will be in the StreamThread, which will only stop that thread, and other threads will continue to run until KafkaStreams.close() is called.
About your question: The only way to check if a write to a broker was successful is by enabling acks. If you disable acks, the producer applies a "fire and forget" strategy and does not check if a write was successful and/or if any connection to the Kafka cluster is still established etc.
The first step is to make your application “reset ready”. For this, the only thing you need to do is to include a call to KafkaStreams#cleanUp() in your application code 

Calling cleanUp() is required because resetting a Streams applications consists of two parts: global reset and local reset. The global reset is covered by the new application reset tool (see “Step 2”), and the local reset is performed through the Kafka Streams API. Because it is a local reset, it must be performed locally for each instance of your application. Thus, embedding it in your application code is the most convenient way for a developer to perform a local reset of an application (instance).

// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under certain conditions.
// See Confluent Docs for more details:

So what would we need to do to restart this application from scratch, i.e., not resume the processing from the point the application was stopped before, but rather to reprocess all its input data again?
First you must stop all running application instances and make sure the whole consumer group is not active anymore (you can use bin/kafka-consumer-groups to list active consumer groups). Typically, the consumer group should become inactive one minute after you stopped all the application instances. This is important because the reset behavior is undefined if you use the reset tool while some application instances are still running — the running instances might produce wrong results or even crash.
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines =, stringSerde, "TextLinesTopic");
KStream<String, Long> wordCounts = textLines
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
      .map((key, word) -> new KeyValue<>(word, word))
      // Required in Kafka 0.10.0 to re-partition the data because we re-keyed the stream in the `map` step.
      // Upcoming Kafka 0.10.1 does this automatically for you (no need for `through`).
      .toStream();, longSerde, "WordsWithCountsTopic");

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  • Event-at-a-time processing (not microbatch) with millisecond latency
  • Stateful processing including distributed joins and aggregations
  • A convenient DSL
  • Windowing with out-of-order data using a DataFlow-like model
  • Distributed processing and fault-tolerance with fast failover
  • Reprocessing capabilities so you can recalculate output when your code changes
Kafka Streams is a library for building streaming applications, specifically applications that transform input Kafka topics into output Kafka topics (or calls to external services, or updates to databases, or whatever). It lets you do this with concise code in a way that is distributed and fault-tolerant.
these stream processing apps were most often software that implemented core functions in the business rather than computing analytics about the business.

When we looked at how people were building stream processing applications with Kafka, there were two options:
  1. Just build an application that uses the Kafka producer and consumer APIs directly
  2. Adopt a full-fledged stream processing framework
If there are other instances of this stream processing application running elsewhere (e.g., on another machine), Kafka Streams transparently re-assigns tasks from the existing instances to the new instance that you just started.

To catch any unexpected exceptions, you may set an java.lang.Thread.UncaughtExceptionHandler before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public uncaughtException(Thread t, throwable e) {
        // here you should examine the exception and perform an appropriate action!
To stop the application instance call the close() method:
// Stop the Kafka Streams threads
2、High-level Stream DSL
使用Stream DSL创建processor topology,开发者可以使用KStreamBuilder类,继承自TopologyBuilder,下面是官方的一个例子,完整的源码可以在streams/examples包中找到
  • Create Source Streams from Kafka
KStream可以从多个kafka topic中创建,而KTable只能单个topic
  1. KStreamBuilder builder = new KStreamBuilder();  
  2.     KStream source1 ="topic1""topic2");  
  3.     KTable source2 = builder.table("topic3");
kafkaVersion = ''
bin/ config/
- port 2181
bin/ config/

bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/ --list --zookeeper localhost:2181

bin/ --broker-list localhost:9092 --topic test
bin/ --bootstrap-server localhost:9092 --topic test --from-beginning

bin/ --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
bin/ --describe --zookeeper localhost:2181 --topic my-replicated-topic

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
Next, we'll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data. The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.

bin/ config/ config/ config/

> bin/ org.apache.kafka.streams.examples.wordcount.WordCountDemo
Listeners can be configured to receive the entire batch of messages returned by the consumer.poll() operation, rather than one at a time


When using the methods with a Message<?> parameter, topic, partition and key information is provided in a message header:
  • KafkaHeaders.TOPIC
  • KafkaHeaders.PARTITION_ID
  • KafkaHeaders.MESSAGE_KEY
with the message payload being the data.
Optionally, you can configure the KafkaTemplate with a ProducerListener to get an async callback with the results of the send (success or failure) instead of waiting for the Future to complete.
public interface ProducerListener<K, V> {

    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);

    void onError(String topic, Integer partition, K key, V value, Exception exception);

    boolean isInterestedInSuccess();

By default, the template is configured with a LoggingProducerListener which logs errors and does nothing when the send is successful.
For convenience, the abstract ProducerListenerAdapter is provided in case you only want to implement one of the methods. It returns false for isInterestedInSuccess.

If you wish to block the sending thread, to await the result, you can invoke the future’s get() method. You may wish to invoke flush() before waiting or, for convenience, the template has a constructor with an autoFlush parameter which will cause the template to flush() on each send. Note, however that flushing will likely significantly reduce performance

When using a Message Listener Container you must provide a listener to receive data. There are currently four supported interfaces for message listeners:
Two MessageListenerContainer implementations are provided:
  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. The ConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.
  • MANUAL - the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics as BATCH are applied.
  • MANUAL_IMMEDIATE - commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.
@KafkaListener(id = "baz", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
Finally, metadata about the message is available from message headers:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

In certain scenarios, such as rebalancing, a message may be redelivered that has already been processed. The framework cannot know whether such a message has been processed or not, that is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation thereof.
public class KafkaProducerConfig {
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());

private KafkaTemplate<String, String> kafkaTemplate;
private Listener listener;
public void contextLoads() throws InterruptedException {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic1", "ABC");
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
public void onSuccess(SendResult<String, String> result) {
public void onFailure(Throwable ex) {
assertThat(this.listener.countDownLatch1.await(60, TimeUnit.SECONDS)).isTrue();

public class KafkaConsumerConfig {
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
return factory;
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
public Listener listener() {
return new Listener();
We need to define the KafkaListenerContainerFactory which will create KafkaListenerContainer. In this example, we will use ConcurrentKafkaListenerContainer which can consume messages in multi-threaded style.
The idea is to have equal size of message being sent from Kafka Producer to Kafka Broker and then received by Kafka Consumer i.e.
Kafka producer --> Kafka Broker --> Kafka Consumer
Suppose if the requirement is to send 15MB of message, then the Producer, the Broker and the Consumer, all three, needs to be in sync.
Kafka Producer sends 15 MB --> Kafka Broker Allows/Stores 15 MB --> Kafka Consumer receives 15 MB
The setting therefore should be A.) On Broker: message.max.bytes=15728640 replica.fetch.max.bytes=15728640
B.) On Consumer: fetch.message.max.bytes=15728640

    public void sendMessage(String topic, String message) {
        // the KafkaTemplate provides asynchronous send methods returning a
        // Future
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
                .send(topic, message);

        // you can register a callback with the listener to receive the result
        // of the send asynchronously
                new ListenableFutureCallback<SendResult<Integer, String>>() {

                    public void onSuccess(
                            SendResult<Integer, String> result) {
              "sent message='{}' with offset={}",

                    public void onFailure(Throwable ex) {
                        LOGGER.error("unable to send message='{}'",
                                message, ex);

        // alternatively, to block the sending thread, to await the result,
        // invoke the future’s get() method

    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate(producerFactory());

    public Sender sender() {//?
        return new Sender();

The @KafkaListener annotation creates a message listener container behind the scenes for each annotated method, using a ConcurrentMessageListenerContainer

we need to add the @EnableKafka annotation to enable support for the @KafkaListener annotation that was used on the Receiver.
    public Map consumerConfigs() {
        Map props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections
        // to the Kakfa cluster
        // consumer groups allow a pool of processes to divide the work of
        // consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");

        return props;

    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());

    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

        return factory;

    public Receiver receiver() {
        return new Receiver();
public class SpringKafkaApplicationTests {

    private Sender sender;

    private Receiver receiver;

    public void testReceiver() throws Exception {
        sender.sendMessage("helloworld.t", "Hello Spring Kafka!");

        receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
1SendResult<String, WorkUnit> sendResult =
2    workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();
4RecordMetadata recordMetadata = sendResult.getRecordMetadata();
5"topic = {}, partition = {}, offset = {}, workUnit = {}",
7        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);

02public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {
03    ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =
04            new ConcurrentKafkaListenerContainerFactory<>();
05    factory.setConcurrency(1);
06    factory.setConsumerFactory(consumerFactory());
07    return factory;
11public ConsumerFactory<String, WorkUnit> consumerFactory() {
12    return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
02public class WorkUnitsConsumer {
03    private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);
05    @KafkaListener(topics = "workunits")
06    public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,
07                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
08                            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
09"Processing topic = {}, partition = {}, offset = {}, workUnit = {}",
10                topic, partition, offset, workUnit);
11    }
It seems that you put in the wrong port in the following statement. Kafka's
default port is 9092. 2181 is for Zookeeper.
props.put("bootstrap.servers", "localhost:2181");
Figured out that I had to change a setting in
然后你的生产者就会一直卡死,没有反应,如果添加slf4j 桥接 log4j,将日志级别调到debug,发现如下的日志输出
Updated cluster metadata version 2 to Cluster(nodes = [Node(0,, 9092)], partitions = [])
[cas-client-proxy] TRACE [main] org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(374) | Requesting metadata update for topic topic_9527.
org.apache.kafka.clients.producer.KafkaProducer.send(348) | Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Apache Kafka编程入门指南(二)—Spring整合Kafka
bin/ --zookeeper localhost:2181 --delete --topic DummyTopic
The command executed successfully but when I run a command to list the topics, I could see that the topic is still there and it shows marked for deletion.
bin/ --list --zookeeper localhost:2181
DummyTopic - marked for deletion
Deletion of a topic has been supported since 0.8.2.x version. You have to enable topic deletion (setting delete.topic.enable to true) on all brokers first.
Follow this step by step process for manual deletion of topics
  1. Stop Kafka server
  2. Delete the topic directory with rm -rf command
  3. Connect to Zookeeper instance
  4. ls /brokers/topics
  5. Remove the topic folder from ZooKeeper using rmr /brokers/topics/yourtopic
  6. Restart Kafka server
  7. Confirm if it was deleted or not by using this command --list --zookeeperyourip:port
the `CommitFailedException` itself should not kill the process

A user-defined category (or feed name) to which messages are published.
A process that publishes messages to one or more topics.
A process that subscribes to one or more topics and processes the feeds of messages from those topics.
A Kafka server that manages the persistence and replication of message data (i.e., the commit log).

Topics consist of one or more partitions. Kafka appends new messages to a partition in an ordered, immutable sequence. Each message in a topic is assigned a unique, sequential ID called an offset.

Kafka Producers publish messages to topics. The producer determines which message to assign to which partition within the topic. Assignment can be done in a round-robin fashion to balance load, or it can be based on a semantic partition function.

Kafka Consumers keep track of which messages have already been consumed, or processed, by keeping track of an offset, a sequential id number that uniquely identifies a message within a partition. Because Kafka retains all messages on disk for a configurable amount of time, Consumers can rewind or skip to any point in a partition simply by supplying an offset value.

Partition support within topics provides parallelism within a topic. In addition, because writes to a partition are sequential, the number of hard disk seeks is minimized. This reduces latency and increases performance.
Kafka Brokers scale and perform well in part because Brokers are not responsible for keeping track of which messages have been consumed. The message Consumer is responsible for this. In traditional messaging systems such as JMS, the Broker bears this responsibility, which severely limits the system’s ability to scale as the number of Consumers increase. Kafka's design eliminates the potential for back-pressure when consumers process messages at different rates.

The process of replicating data between Kafka clusters is called "mirroring", to differentiate cross-cluster replication from replication among nodes within a single cluster. A common use for mirroring is to maintain a separate copy of a Kafka cluster in another data center.
Kafka's MirrorMaker tool reads data from topics in one or more source Kafka clusters, and writes corresponding topics to a destination Kafka cluster (using the same topic names):
2016-11-02 15:43:47 INFO StreamRunner:59 - Started http server successfully.
2016-11-02 15:44:50 ERROR StateDirectory:147 - Failed to lock the state directory due to an unexpected exception
java.nio.file.NoSuchFileException: /data/1/kafka-streams/myapp-streams/7_21/.lock
at sun.nio.fs.UnixException.translateToIOException(
at sun.nio.fs.UnixException.rethrowAsIOException(
at sun.nio.fs.UnixException.rethrowAsIOException(
We see a deadlock state when streams thread to process a task takes longer than MAX_POLL_INTERVAL_MS_CONFIG time. In this case this threads partitions are assigned to some other thread including rocksdb lock. When it tries to process the next task it cannot get rocks db lock and simply keeps waiting for that lock forever.
in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = 50L.
If it does not get lock the we simply increase the time by 10x and keep trying inside the while true loop.
We need to have a upper bound for this backoffTimeM. If the time is greater than MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means this thread's partitions are moved somewhere else and it may not get the lock again.
running multiple kafka streams instances causes one or more instance to get into file contention

Having multiple kafka streams application instances causes one or more instances to get get into file lock contention and the instance(s) become unresponsive with uncaught exception.
The exception is below:
22:14:37.621 [StreamThread-7] WARN o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING to NOT_RUNNING
22:14:37.621 [StreamThread-13] WARN o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING to NOT_RUNNING
22:14:37.623 [StreamThread-18] WARN o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING to NOT_RUNNING
22:14:37.625 [StreamThread-7] ERROR n.a.a.k.t.KStreamTopologyBase - Uncaught Exception:org.apache.kafka.streams.errors.ProcessorStateException: task directory [/data/kafka-streams/rtp-kstreams-metrics/0_119] doesn't exist and couldn't be created
at org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(
at org.apache.kafka.streams.processor.internals.StateDirectory.lock(
at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(
at org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
This happens within couple of minutes after the instances are up and there is NO data being sent to the broker yet and the streams app is started with auto.offset.reset set to "latest".
StateDirectory fails to create directory if any parent directory does not exist
The method directoryForTask attempts to create a task directory but will silently fail to do so as it calls taskDir.mkdir(); which will only create the leaf directory.
Calling taskDir.mkdirs(); (note the 's') will create the entire path if any parent directory is missing.
The constructor also attempts to create a bunch of directories using the former method and should be reviewed as part of any fix.

Timestamp Extractor (timestamp.extractor): A timestamp extractor extracts a timestamp from an instance of ConsumerRecord. Timestamps are used to control the progress of streams.
The default extractor is ConsumerRecordTimestampExtractor. This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer client (introduced in Kafka, see KIP-32: Add timestamps to Kafka message). Depending on the setting of Kafka’s log.message.timestamp.type parameter, this extractor will provide you with:
  • event-time processing semantics if log.message.timestamp.type is set to CreateTime aka “producer time” (which is the default). This represents the time when the Kafka producer sent the original message.
  • ingestion-time processing semantics if log.message.timestamp.type is set to LogAppendTime aka “broker time”. This represents the time when the Kafka broker received the original message.
Another built-in extractor is WallclockTimestampExtractor. This extractor does not actually “extract” a timestamp from the consumed record but rather returns the current time in milliseconds from the system clock, which effectively means Streams will operate on the basis of the so-called processing-time of events. --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts