Wednesday, February 1, 2017

Kafka Misc
- cqlsh can now connect to older Cassandra versions by downgrading the native
  protocol version. Please note that this is currently not part of our release 
  testing and, as a consequence, it is not guaranteed to work in all cases.
  See CASSANDRA-12150 for more details.

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");"streams-file-input").to("streams-pipe-output");
sounds like you want the min_queued_messages and linger_ms settings on Producer. The default min_queued_messages is very high, so if you're producing a single message and expecting it to be sent right away, you may want to use this kwarg:
producer = topic.get_producer(min_queued_messages=1)
linger_ms can also be helpful here depending on your use case.

auto.offset.reset What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server latest The number of threads to execute stream processing 1
  1. A “lower-level” processor that providea API’s for data-processing, composable processing and local state storage.
  2. A “higher-level” stream DSL that would cover most processor implementation needs.

Error handling
  1. Avoid wrapping all exceptions in the StreamThread, but instead only StreamsExceptions (this is the top-level exception type of Kafka Streams); other exceptions will be treated as user app code exceptions.
  2. KafkaStreams.start() / close() should not throw any exceptions; all exceptions thrown will be in the StreamThread, which will only stop that thread, and other threads will continue to run until KafkaStreams.close() is called.
About your question: The only way to check if a write to a broker was successful is by enabling acks. If you disable acks, the producer applies a "fire and forget" strategy and does not check if a write was successful and/or if any connection to the Kafka cluster is still established etc.
The first step is to make your application “reset ready”. For this, the only thing you need to do is to include a call to KafkaStreams#cleanUp() in your application code 

Calling cleanUp() is required because resetting a Streams applications consists of two parts: global reset and local reset. The global reset is covered by the new application reset tool (see “Step 2”), and the local reset is performed through the Kafka Streams API. Because it is a local reset, it must be performed locally for each instance of your application. Thus, embedding it in your application code is the most convenient way for a developer to perform a local reset of an application (instance).

// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under certain conditions.
// See Confluent Docs for more details:

So what would we need to do to restart this application from scratch, i.e., not resume the processing from the point the application was stopped before, but rather to reprocess all its input data again?
First you must stop all running application instances and make sure the whole consumer group is not active anymore (you can use bin/kafka-consumer-groups to list active consumer groups). Typically, the consumer group should become inactive one minute after you stopped all the application instances. This is important because the reset behavior is undefined if you use the reset tool while some application instances are still running — the running instances might produce wrong results or even crash.
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines =, stringSerde, "TextLinesTopic");
KStream<String, Long> wordCounts = textLines
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
      .map((key, word) -> new KeyValue<>(word, word))
      // Required in Kafka 0.10.0 to re-partition the data because we re-keyed the stream in the `map` step.
      // Upcoming Kafka 0.10.1 does this automatically for you (no need for `through`).
      .toStream();, longSerde, "WordsWithCountsTopic");

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  • Event-at-a-time processing (not microbatch) with millisecond latency
  • Stateful processing including distributed joins and aggregations
  • A convenient DSL
  • Windowing with out-of-order data using a DataFlow-like model
  • Distributed processing and fault-tolerance with fast failover
  • Reprocessing capabilities so you can recalculate output when your code changes
Kafka Streams is a library for building streaming applications, specifically applications that transform input Kafka topics into output Kafka topics (or calls to external services, or updates to databases, or whatever). It lets you do this with concise code in a way that is distributed and fault-tolerant.
these stream processing apps were most often software that implemented core functions in the business rather than computing analytics about the business.

When we looked at how people were building stream processing applications with Kafka, there were two options:
  1. Just build an application that uses the Kafka producer and consumer APIs directly
  2. Adopt a full-fledged stream processing framework
If there are other instances of this stream processing application running elsewhere (e.g., on another machine), Kafka Streams transparently re-assigns tasks from the existing instances to the new instance that you just started.

To catch any unexpected exceptions, you may set an java.lang.Thread.UncaughtExceptionHandler before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public uncaughtException(Thread t, throwable e) {
        // here you should examine the exception and perform an appropriate action!
To stop the application instance call the close() method:
// Stop the Kafka Streams threads
2、High-level Stream DSL
使用Stream DSL创建processor topology,开发者可以使用KStreamBuilder类,继承自TopologyBuilder,下面是官方的一个例子,完整的源码可以在streams/examples包中找到
  • Create Source Streams from Kafka
KStream可以从多个kafka topic中创建,而KTable只能单个topic
  1. KStreamBuilder builder = new KStreamBuilder();  
  2.     KStream source1 ="topic1""topic2");  
  3.     KTable source2 = builder.table("topic3");
kafkaVersion = ''
bin/ config/
- port 2181
bin/ config/

bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/ --list --zookeeper localhost:2181

bin/ --broker-list localhost:9092 --topic test
bin/ --bootstrap-server localhost:9092 --topic test --from-beginning

bin/ --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
bin/ --describe --zookeeper localhost:2181 --topic my-replicated-topic

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
Next, we'll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data. The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.

bin/ config/ config/ config/

> bin/ org.apache.kafka.streams.examples.wordcount.WordCountDemo
Listeners can be configured to receive the entire batch of messages returned by the consumer.poll() operation, rather than one at a time


When using the methods with a Message<?> parameter, topic, partition and key information is provided in a message header:
  • KafkaHeaders.TOPIC
  • KafkaHeaders.PARTITION_ID
  • KafkaHeaders.MESSAGE_KEY
with the message payload being the data.
Optionally, you can configure the KafkaTemplate with a ProducerListener to get an async callback with the results of the send (success or failure) instead of waiting for the Future to complete.
public interface ProducerListener<K, V> {

    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);

    void onError(String topic, Integer partition, K key, V value, Exception exception);

    boolean isInterestedInSuccess();

By default, the template is configured with a LoggingProducerListener which logs errors and does nothing when the send is successful.
For convenience, the abstract ProducerListenerAdapter is provided in case you only want to implement one of the methods. It returns false for isInterestedInSuccess.

If you wish to block the sending thread, to await the result, you can invoke the future’s get() method. You may wish to invoke flush() before waiting or, for convenience, the template has a constructor with an autoFlush parameter which will cause the template to flush() on each send. Note, however that flushing will likely significantly reduce performance

When using a Message Listener Container you must provide a listener to receive data. There are currently four supported interfaces for message listeners:
Two MessageListenerContainer implementations are provided:
  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. The ConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.
  • MANUAL - the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics as BATCH are applied.
  • MANUAL_IMMEDIATE - commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.
@KafkaListener(id = "baz", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
Finally, metadata about the message is available from message headers:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

In certain scenarios, such as rebalancing, a message may be redelivered that has already been processed. The framework cannot know whether such a message has been processed or not, that is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation thereof.
public class KafkaProducerConfig {
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());

private KafkaTemplate<String, String> kafkaTemplate;
private Listener listener;
public void contextLoads() throws InterruptedException {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic1", "ABC");
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
public void onSuccess(SendResult<String, String> result) {
public void onFailure(Throwable ex) {
assertThat(this.listener.countDownLatch1.await(60, TimeUnit.SECONDS)).isTrue();

public class KafkaConsumerConfig {
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
return factory;
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
public Listener listener() {
return new Listener();
We need to define the KafkaListenerContainerFactory which will create KafkaListenerContainer. In this example, we will use ConcurrentKafkaListenerContainer which can consume messages in multi-threaded style.
The idea is to have equal size of message being sent from Kafka Producer to Kafka Broker and then received by Kafka Consumer i.e.
Kafka producer --> Kafka Broker --> Kafka Consumer
Suppose if the requirement is to send 15MB of message, then the Producer, the Broker and the Consumer, all three, needs to be in sync.
Kafka Producer sends 15 MB --> Kafka Broker Allows/Stores 15 MB --> Kafka Consumer receives 15 MB
The setting therefore should be A.) On Broker: message.max.bytes=15728640 replica.fetch.max.bytes=15728640
B.) On Consumer: fetch.message.max.bytes=15728640

    public void sendMessage(String topic, String message) {
        // the KafkaTemplate provides asynchronous send methods returning a
        // Future
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
                .send(topic, message);

        // you can register a callback with the listener to receive the result
        // of the send asynchronously
                new ListenableFutureCallback<SendResult<Integer, String>>() {

                    public void onSuccess(
                            SendResult<Integer, String> result) {
              "sent message='{}' with offset={}",

                    public void onFailure(Throwable ex) {
                        LOGGER.error("unable to send message='{}'",
                                message, ex);

        // alternatively, to block the sending thread, to await the result,
        // invoke the future’s get() method

    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate(producerFactory());

    public Sender sender() {//?
        return new Sender();

The @KafkaListener annotation creates a message listener container behind the scenes for each annotated method, using a ConcurrentMessageListenerContainer

we need to add the @EnableKafka annotation to enable support for the @KafkaListener annotation that was used on the Receiver.
    public Map consumerConfigs() {
        Map props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections
        // to the Kakfa cluster
        // consumer groups allow a pool of processes to divide the work of
        // consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");

        return props;

    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());

    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

        return factory;

    public Receiver receiver() {
        return new Receiver();
public class SpringKafkaApplicationTests {

    private Sender sender;

    private Receiver receiver;

    public void testReceiver() throws Exception {
        sender.sendMessage("helloworld.t", "Hello Spring Kafka!");

        receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
1SendResult<String, WorkUnit> sendResult =
2    workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();
4RecordMetadata recordMetadata = sendResult.getRecordMetadata();
5"topic = {}, partition = {}, offset = {}, workUnit = {}",
7        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);

02public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {
03    ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =
04            new ConcurrentKafkaListenerContainerFactory<>();
05    factory.setConcurrency(1);
06    factory.setConsumerFactory(consumerFactory());
07    return factory;
11public ConsumerFactory<String, WorkUnit> consumerFactory() {
12    return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
02public class WorkUnitsConsumer {
03    private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);
05    @KafkaListener(topics = "workunits")
06    public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,
07                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
08                            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
09"Processing topic = {}, partition = {}, offset = {}, workUnit = {}",
10                topic, partition, offset, workUnit);
11    }
It seems that you put in the wrong port in the following statement. Kafka's
default port is 9092. 2181 is for Zookeeper.
props.put("bootstrap.servers", "localhost:2181");
Figured out that I had to change a setting in
然后你的生产者就会一直卡死,没有反应,如果添加slf4j 桥接 log4j,将日志级别调到debug,发现如下的日志输出
Updated cluster metadata version 2 to Cluster(nodes = [Node(0,, 9092)], partitions = [])
[cas-client-proxy] TRACE [main] org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(374) | Requesting metadata update for topic topic_9527.
org.apache.kafka.clients.producer.KafkaProducer.send(348) | Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Apache Kafka编程入门指南(二)—Spring整合Kafka
bin/ --zookeeper localhost:2181 --delete --topic DummyTopic
The command executed successfully but when I run a command to list the topics, I could see that the topic is still there and it shows marked for deletion.
bin/ --list --zookeeper localhost:2181
DummyTopic - marked for deletion
Deletion of a topic has been supported since 0.8.2.x version. You have to enable topic deletion (setting delete.topic.enable to true) on all brokers first.
Follow this step by step process for manual deletion of topics
  1. Stop Kafka server
  2. Delete the topic directory with rm -rf command
  3. Connect to Zookeeper instance
  4. ls /brokers/topics
  5. Remove the topic folder from ZooKeeper using rmr /brokers/topics/yourtopic
  6. Restart Kafka server
  7. Confirm if it was deleted or not by using this command --list --zookeeperyourip:port


Review (551) System Design (281) System Design - Review (188) Java (177) Coding (75) Interview-System Design (65) Book Notes (59) Coding - Review (59) Interview (58) to-do (45) Knowledge (39) Linux (38) Interview-Java (35) Knowledge - Review (32) Database (29) Design Patterns (29) Product Architecture (28) Big Data (26) Miscs (25) Concurrency (24) Cracking Code Interview (24) MultiThread (24) Soft Skills (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Distributed (20) Interview Q&A (20) OOD Design (20) System Design - Practice (19) How to Ace Interview (15) Security (15) Brain Teaser (14) Algorithm (13) Linux - Shell (13) Spark (13) Code Quality (12) How to (12) Interview-Database (12) Interview-Operating System (12) Tools (12) Architecture Principles (11) Company - LinkedIn (11) Google (11) Redis (11) Resource (10) Spring (10) Testing (10) Amazon (9) Search (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cache (8) Company - Uber (8) Interview - MultiThread (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Scalability (8) Solr (8) Git (7) Interview Corner (7) JVM (7) Java Basics (7) Machine Learning (7) NoSQL (7) C++ (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) Trouble Shooting (6) CareerCup (5) Cassandra (5) Code Review (5) Company - Facebook (5) Design (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Be Architect (4) Big Fata (4) C (4) Company Product Architecture (4) Design Principles (4) Facebook (4) GeeksforGeeks (4) Generics (4) Google Interview (4) Hardware (4) JDK8 (4) Kafka (4) Optimization (4) Product + Framework (4) Shopping System (4) Source Code (4) Web Service (4) node.js (4) Back-of-Envelope (3) Company - Pinterest (3) Company - Twiiter (3) Company - Twitter (3) Consistent Hash (3) Data structures (3) GOF (3) Game Design (3) GeoHash (3) Growth (3) Guava (3) Interview-Big Data (3) Interview-Linux (3) Interview-Network (3) Java EE Patterns (3) Javarevisited (3) Map Reduce (3) Math - Probabilities (3) Performance (3) Puzzles (3) Python (3) Resource-System Desgin (3) Scala (3) UML (3) geeksquiz (3) AI (2) API Design (2) AngularJS (2) Behavior Question (2) Bugs (2) Coding Interview (2) Company - Netflix (2) Crawler (2) Cross Data Center (2) Data Structure Design (2) Database-Shard (2) Debugging (2) Elasticsearch (2) Garbage Collection (2) Go (2) Hadoop (2) Html (2) Interview - Soft Skills (2) Interview-Miscs (2) Interview-Web (2) JDK (2) Logging (2) POI (2) Papers (2) Programming (2) Project Practice (2) Random (2) Software Desgin (2) System Design - Feed (2) Thread Synchronization (2) Video (2) ZooKeeper (2) reddit (2) Ads (1) Advanced data structures (1) Algorithm - Review (1) Android (1) Approximate Algorithms (1) Base X (1) Bash (1) Books (1) C# (1) CSS (1) Chrome (1) Client-Side (1) Cloud (1) CodingHorror (1) Company - Yelp (1) Counter (1) DSL (1) Dead Lock (1) Difficult Puzzles (1) Distributed ALgorithm (1) Docker (1) Eclipse (1) Facebook Interview (1) Function Design (1) Functional (1) GoLang (1) How to Solve Problems (1) ID Generation (1) IO (1) Important (1) Internals (1) Interview - Dropbox (1) Interview - Project Experience (1) Interview Tips (1) Interview-Brain Teaser (1) Interview-How (1) Interview-Mics (1) Interview-Process (1) Jeff Dean (1) Joda (1) LeetCode - Review (1) Library (1) LinkedIn (1) Mac (1) Micro-Services (1) Mini System (1) MySQL (1) Nigix (1) NonBlock (1) Process (1) Productivity (1) Program Output (1) Programcreek (1) Quora (1) RPC (1) Raft (1) RateLimiter (1) Reactive (1) Reading (1) Reading Code (1) Resource-Java (1) Resource-System Design (1) Resume (1) SQL (1) Sampling (1) Shuffle (1) Slide Window (1) Spotify (1) Stability (1) Storm (1) Summary (1) System Design - TODO (1) Tic Tac Toe (1) Time Management (1) Web Tools (1) algolist (1) corejavainterviewquestions (1) martin fowler (1) mitbbs (1)

Popular Posts