Wednesday, February 1, 2017

Kafka Misc


https://issues.apache.org/jira/browse/CASSANDRA-12150
- cqlsh can now connect to older Cassandra versions by downgrading the native
  protocol version. Please note that this is currently not part of our release 
  testing and, as a consequence, it is not guaranteed to work in all cases.
  See CASSANDRA-12150 for more details.

Examples
https://github.com/apache/kafka/blob/0.10.1/streams/examples
https://github.com/apache/kafka/blob/0.10.1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
https://github.com/apache/kafka/blob/0.10.1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
builder.stream("streams-file-input").to("streams-pipe-output");

https://github.com/Parsely/pykafka/issues/291
sounds like you want the min_queued_messages and linger_ms settings on Producer. The default min_queued_messages is very high, so if you're producing a single message and expecting it to be sent right away, you may want to use this kwarg:
producer = topic.get_producer(min_queued_messages=1)
linger_ms can also be helpful here depending on your use case.


Configuration
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
auto.offset.reset What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server latest
num.stream.threads The number of threads to execute stream processing 1
https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
http://codingjunkie.net/kafka-processor-part1/
  1. A “lower-level” processor that providea API’s for data-processing, composable processing and local state storage.
  2. A “higher-level” stream DSL that would cover most processor implementation needs.
http://codingjunkie.net/kafka-streams-part2/

Error handling
https://github.com/apache/kafka/pull/809
  1. Avoid wrapping all exceptions in the StreamThread, but instead only StreamsExceptions (this is the top-level exception type of Kafka Streams); other exceptions will be treated as user app code exceptions.
  2. KafkaStreams.start() / close() should not throw any exceptions; all exceptions thrown will be in the StreamThread, which will only stop that thread, and other threads will continue to run until KafkaStreams.close() is called.


http://stackoverflow.com/questions/37822252/manual-acknowledgement-of-messages-spring-cloud-stream-kafka
https://github.com/spring-cloud/spring-cloud-stream/issues/575

http://stackoverflow.com/questions/41660933/behaviour-of-kafkaproducer-when-the-connection-to-kafka-stream-is-broken
About your question: The only way to check if a write to a broker was successful is by enabling acks. If you disable acks, the producer applies a "fire and forget" strategy and does not check if a write was successful and/or if any connection to the Kafka cluster is still established etc.

https://cwiki.apache.org/confluence/display/KAFKA/System+Tools

https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
The first step is to make your application “reset ready”. For this, the only thing you need to do is to include a call to KafkaStreams#cleanUp() in your application code 

Calling cleanUp() is required because resetting a Streams applications consists of two parts: global reset and local reset. The global reset is covered by the new application reset tool (see “Step 2”), and the local reset is performed through the Kafka Streams API. Because it is a local reset, it must be performed locally for each instance of your application. Thus, embedding it in your application code is the most convenient way for a developer to perform a local reset of an application (instance).

// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under certain conditions.
// See Confluent Docs for more details:
// http://docs.confluent.io/3.0.1/streams/developer-guide.html#step-2-reset-the-local-environments-of-your-application-instances
streams.cleanUp();

So what would we need to do to restart this application from scratch, i.e., not resume the processing from the point the application was stopped before, but rather to reprocess all its input data again?
First you must stop all running application instances and make sure the whole consumer group is not active anymore (you can use bin/kafka-consumer-groups to list active consumer groups). Typically, the consumer group should become inactive one minute after you stopped all the application instances. This is important because the reset behavior is undefined if you use the reset tool while some application instances are still running — the running instances might produce wrong results or even crash.
https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "TextLinesTopic");
KStream<String, Long> wordCounts = textLines
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
      .map((key, word) -> new KeyValue<>(word, word))
      // Required in Kafka 0.10.0 to re-partition the data because we re-keyed the stream in the `map` step.
      // Upcoming Kafka 0.10.1 does this automatically for you (no need for `through`).
      .through("RekeyedIntermediateTopic")
      .countByKey("Counts")
      .toStream();
wordCounts.to(stringSerde, longSerde, "WordsWithCountsTopic");

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  • Event-at-a-time processing (not microbatch) with millisecond latency
  • Stateful processing including distributed joins and aggregations
  • A convenient DSL
  • Windowing with out-of-order data using a DataFlow-like model
  • Distributed processing and fault-tolerance with fast failover
  • Reprocessing capabilities so you can recalculate output when your code changes
Kafka Streams is a library for building streaming applications, specifically applications that transform input Kafka topics into output Kafka topics (or calls to external services, or updates to databases, or whatever). It lets you do this with concise code in a way that is distributed and fault-tolerant.
these stream processing apps were most often software that implemented core functions in the business rather than computing analytics about the business.

When we looked at how people were building stream processing applications with Kafka, there were two options:
  1. Just build an application that uses the Kafka producer and consumer APIs directly
  2. Adopt a full-fledged stream processing framework
http://docs.confluent.io/3.1.2/streams/index.html
http://docs.confluent.io/3.1.2/streams/developer-guide.html#streams-developer-guide-processor-api
http://docs.confluent.io/3.1.2/streams/developer-guide.html#using-kafka-streams-within-your-application-code
If there are other instances of this stream processing application running elsewhere (e.g., on another machine), Kafka Streams transparently re-assigns tasks from the existing instances to the new instance that you just started.

To catch any unexpected exceptions, you may set an java.lang.Thread.UncaughtExceptionHandler before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public uncaughtException(Thread t, throwable e) {
        // here you should examine the exception and perform an appropriate action!
    }
);
To stop the application instance call the close() method:
// Stop the Kafka Streams threads
streams.close();

http://blog.csdn.net/opensure/article/details/51507698
2、High-level Stream DSL
使用Stream DSL创建processor topology,开发者可以使用KStreamBuilder类,继承自TopologyBuilder,下面是官方的一个例子,完整的源码可以在streams/examples包中找到
  • Create Source Streams from Kafka
KStream可以从多个kafka topic中创建,而KTable只能单个topic
  1. KStreamBuilder builder = new KStreamBuilder();  
  2.     KStream source1 = builder.stream("topic1""topic2");  
  3.     KTable source2 = builder.table("topic3");  
https://github.com/jkorab/kafka-streams-example/blob/master/src/main/java/com/ameliant/examples/WordCountStream.java

https://github.com/spring-projects/spring-kafka/blob/master/build.gradle
kafkaVersion = '0.10.1.1'
https://kafka.apache.org/quickstart
bin/zookeeper-server-start.sh config/zookeeper.properties
- port 2181
bin/kafka-server-start.sh config/server.properties

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
Next, we'll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data. The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

http://docs.spring.io/spring-kafka/reference/htmlsingle/
Listeners can be configured to receive the entire batch of messages returned by the consumer.poll() operation, rather than one at a time

template.flush();

When using the methods with a Message<?> parameter, topic, partition and key information is provided in a message header:
  • KafkaHeaders.TOPIC
  • KafkaHeaders.PARTITION_ID
  • KafkaHeaders.MESSAGE_KEY
with the message payload being the data.
Optionally, you can configure the KafkaTemplate with a ProducerListener to get an async callback with the results of the send (success or failure) instead of waiting for the Future to complete.
public interface ProducerListener<K, V> {

    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);

    void onError(String topic, Integer partition, K key, V value, Exception exception);

    boolean isInterestedInSuccess();

}
By default, the template is configured with a LoggingProducerListener which logs errors and does nothing when the send is successful.
For convenience, the abstract ProducerListenerAdapter is provided in case you only want to implement one of the methods. It returns false for isInterestedInSuccess.

If you wish to block the sending thread, to await the result, you can invoke the future’s get() method. You may wish to invoke flush() before waiting or, for convenience, the template has a constructor with an autoFlush parameter which will cause the template to flush() on each send. Note, however that flushing will likely significantly reduce performance

When using a Message Listener Container you must provide a listener to receive data. There are currently four supported interfaces for message listeners:
MessageListener
AcknowledgingMessageListener
BatchMessageListener
BatchAcknowledgingMessageListener
Two MessageListenerContainer implementations are provided:
  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. The ConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.
  • MANUAL - the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics as BATCH are applied.
  • MANUAL_IMMEDIATE - commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.
@KafkaListener(id = "baz", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}
Finally, metadata about the message is available from message headers:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

In certain scenarios, such as rebalancing, a message may be redelivered that has already been processed. The framework cannot know whether such a message has been processed or not, that is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation thereof.
https://www.javacodegeeks.com/2016/11/spring-kafka-producerconsumer-sample.html
https://github.com/bijukunjummen/sample-spring-kafka-producer-consumer

http://msvaljek.blogspot.com/2015/12/stream-processing-with-spring-kafka_44.html

http://howtoprogram.xyz/2016/09/23/spring-kafka-tutorial/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private Listener listener;
@Test
public void contextLoads() throws InterruptedException {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic1", "ABC");
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("success");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("failed");
}
});
System.out.println(Thread.currentThread().getId());
assertThat(this.listener.countDownLatch1.await(60, TimeUnit.SECONDS)).isTrue();
}

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
We need to define the KafkaListenerContainerFactory which will create KafkaListenerContainer. In this example, we will use ConcurrentKafkaListenerContainer which can consume messages in multi-threaded style.

http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
The idea is to have equal size of message being sent from Kafka Producer to Kafka Broker and then received by Kafka Consumer i.e.
Kafka producer --> Kafka Broker --> Kafka Consumer
Suppose if the requirement is to send 15MB of message, then the Producer, the Broker and the Consumer, all three, needs to be in sync.
Kafka Producer sends 15 MB --> Kafka Broker Allows/Stores 15 MB --> Kafka Consumer receives 15 MB
The setting therefore should be A.) On Broker: message.max.bytes=15728640 replica.fetch.max.bytes=15728640
B.) On Consumer: fetch.message.max.bytes=15728640

Consumer
http://www.source4code.info/2016/09/spring-kafka-consumer-producer-example.html
    public void sendMessage(String topic, String message) {
        // the KafkaTemplate provides asynchronous send methods returning a
        // Future
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
                .send(topic, message);

        // you can register a callback with the listener to receive the result
        // of the send asynchronously
        future.addCallback(
                new ListenableFutureCallback<SendResult<Integer, String>>() {

                    @Override
                    public void onSuccess(
                            SendResult<Integer, String> result) {
                        LOGGER.info("sent message='{}' with offset={}",
                                message,
                                result.getRecordMetadata().offset());
                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        LOGGER.error("unable to send message='{}'",
                                message, ex);
                    }
                });

        // alternatively, to block the sending thread, to await the result,
        // invoke the future’s get() method
    }

    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }

    @Bean
    public Sender sender() {//?
        return new Sender();
    }

The @KafkaListener annotation creates a message listener container behind the scenes for each annotated method, using a ConcurrentMessageListenerContainer

we need to add the @EnableKafka annotation to enable support for the @KafkaListener annotation that was used on the Receiver.
    public Map consumerConfigs() {
        Map props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections
        // to the Kakfa cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        // consumer groups allow a pool of processes to divide the work of
        // consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");

        return props;
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public Receiver receiver() {
        return new Receiver();
    }
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTests {

    @Autowired
    private Sender sender;

    @Autowired
    private Receiver receiver;

    @Test
    public void testReceiver() throws Exception {
        sender.sendMessage("helloworld.t", "Hello Spring Kafka!");

        receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
        assertThat(receiver.getLatch().getCount()).isEqualTo(0);
    }
}
https://www.javacodegeeks.com/2016/11/spring-kafka-producerconsumer-sample.html
1SendResult<String, WorkUnit> sendResult =
2    workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();
3
4RecordMetadata recordMetadata = sendResult.getRecordMetadata();
5
6LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}",
7        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);


01@Bean
02public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {
03    ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =
04            new ConcurrentKafkaListenerContainerFactory<>();
05    factory.setConcurrency(1);
06    factory.setConsumerFactory(consumerFactory());
07    return factory;
08}
09
10@Bean
11public ConsumerFactory<String, WorkUnit> consumerFactory() {
12    return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
13}
01@Service
02public class WorkUnitsConsumer {
03    private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);
04
05    @KafkaListener(topics = "workunits")
06    public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,
07                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
08                            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
09        log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}",
10                topic, partition, offset, workUnit);
11    }
12}
http://stackoverflow.com/questions/39387263/kafka-listener-container-in-spring-boot-with-annotations-does-not-consume-messag

http://users.kafka.apache.narkive.com/n9xzt1Xz/caused-by-org-apache-kafka-common-errors-timeoutexception-failed-to-update-metadata-after-60000-ms
It seems that you put in the wrong port in the following statement. Kafka's
default port is 9092. 2181 is for Zookeeper.
props.put("bootstrap.servers", "localhost:2181");
https://www.mapr.com/blog/getting-started-sample-programs-apache-kafka-09

https://community.cloudera.com/t5/Cloudera-Manager-Installation/Error-publishing-a-message-after-upgrading-kafka-parcel/td-p/37640
Figured out that I had to change a setting in server.properties
listeners=PLAINTEXT://hostname:9092
to
listeners=PLAINTEXT://0.0.0.0:9092

http://blog.csdn.net/xufan007/article/details/51898335

http://www.jianshu.com/p/2db7abddb9e6
原因是发布到zookeeper的advertised.host.name如果没有设置,默认取java.net.InetAddress.getCanonicalHostName().值,被用于生产端和消费端。因此外部网络或者未配置hostname映射的机器访问kafka集群时就会有网络问题了。
原因是kafka客户端连接到broker是成功的,但连接到集群后更新回来的集群meta信息是错误的即是会返回的是节点的hostname,解决办法就是手动配置advertised.host.name和advertised.port,2个参数都必须配置,重启问题解决:
advertised.host.name=10.0.0.100
advertised.port=9092
http://ju.outofmemory.cn/entry/289670
"PLAINTEXT"表示协议,可选的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用"0.0.0.0"表示对所有的网络接口有效,如果hostname为空表示只对默认的网络接口有效
也就是说如果你没有配置advertised.listeners,就使用listeners的配置通告给消息的生产者和消费者,这个过程是在生产者和消费者获取源数据(metadata)。如果都没配置,那么就使用java.net.InetAddress.getCanonicalHostName()返回的值,对于ipv4,基本就是localhost了
然后你的生产者就会一直卡死,没有反应,如果添加slf4j 桥接 log4j,将日志级别调到debug,发现如下的日志输出
Updated cluster metadata version 2 to Cluster(nodes = [Node(0, 127.0.0.1, 9092)], partitions = [])
[cas-client-proxy] TRACE [main] org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(374) | Requesting metadata update for topic topic_9527.
可以看到客户端连接127.0.0.1:9092而不是你期望的服务器的地址!!!
如果按照0.8.x配置host.name和port属性,会报如下问题
org.apache.kafka.clients.producer.KafkaProducer.send(348) | Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Apache Kafka编程入门指南(二)—Spring整合Kafka
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Error+Handling+and+Logging
http://orchome.com/300

http://stackoverflow.com/questions/33537950/how-to-delete-a-topic-in-apache-kafka
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic DummyTopic
The command executed successfully but when I run a command to list the topics, I could see that the topic is still there and it shows marked for deletion.
bin/kafka-topics.sh --list --zookeeper localhost:2181
DummyTopic - marked for deletion
Deletion of a topic has been supported since 0.8.2.x version. You have to enable topic deletion (setting delete.topic.enable to true) on all brokers first.
Follow this step by step process for manual deletion of topics
  1. Stop Kafka server
  2. Delete the topic directory with rm -rf command
  3. Connect to Zookeeper instance
  4. ls /brokers/topics
  5. Remove the topic folder from ZooKeeper using rmr /brokers/topics/yourtopic
  6. Restart Kafka server
  7. Confirm if it was deleted or not by using this command kafka-topics.sh --list --zookeeperyourip:port


Labels

Review (551) System Design (281) System Design - Review (188) Java (177) Coding (75) Interview-System Design (65) Book Notes (59) Coding - Review (59) Interview (58) to-do (45) Knowledge (39) Linux (38) Interview-Java (35) Knowledge - Review (32) Database (29) Design Patterns (29) Product Architecture (28) Big Data (26) Miscs (25) Concurrency (24) Cracking Code Interview (24) MultiThread (24) Soft Skills (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Distributed (20) Interview Q&A (20) OOD Design (20) System Design - Practice (19) How to Ace Interview (15) Security (15) Brain Teaser (14) Algorithm (13) Linux - Shell (13) Spark (13) Code Quality (12) How to (12) Interview-Database (12) Interview-Operating System (12) Tools (12) Architecture Principles (11) Company - LinkedIn (11) Google (11) Redis (11) Resource (10) Spring (10) Testing (10) Amazon (9) Search (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cache (8) Company - Uber (8) Interview - MultiThread (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Scalability (8) Solr (8) Git (7) Interview Corner (7) JVM (7) Java Basics (7) Machine Learning (7) NoSQL (7) C++ (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) Trouble Shooting (6) CareerCup (5) Cassandra (5) Code Review (5) Company - Facebook (5) Design (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Be Architect (4) Big Fata (4) C (4) Company Product Architecture (4) Design Principles (4) Facebook (4) GeeksforGeeks (4) Generics (4) Google Interview (4) Hardware (4) JDK8 (4) Kafka (4) Optimization (4) Product + Framework (4) Shopping System (4) Source Code (4) Web Service (4) node.js (4) Back-of-Envelope (3) Company - Pinterest (3) Company - Twiiter (3) Company - Twitter (3) Consistent Hash (3) Data structures (3) GOF (3) Game Design (3) GeoHash (3) Growth (3) Guava (3) Interview-Big Data (3) Interview-Linux (3) Interview-Network (3) Java EE Patterns (3) Javarevisited (3) Map Reduce (3) Math - Probabilities (3) Performance (3) Puzzles (3) Python (3) Resource-System Desgin (3) Scala (3) UML (3) geeksquiz (3) AI (2) API Design (2) AngularJS (2) Behavior Question (2) Bugs (2) Coding Interview (2) Company - Netflix (2) Crawler (2) Cross Data Center (2) Data Structure Design (2) Database-Shard (2) Debugging (2) Elasticsearch (2) Garbage Collection (2) Go (2) Hadoop (2) Html (2) Interview - Soft Skills (2) Interview-Miscs (2) Interview-Web (2) JDK (2) Logging (2) POI (2) Papers (2) Programming (2) Project Practice (2) Random (2) Software Desgin (2) System Design - Feed (2) Thread Synchronization (2) Video (2) ZooKeeper (2) reddit (2) Ads (1) Advanced data structures (1) Algorithm - Review (1) Android (1) Approximate Algorithms (1) Base X (1) Bash (1) Books (1) C# (1) CSS (1) Chrome (1) Client-Side (1) Cloud (1) CodingHorror (1) Company - Yelp (1) Counter (1) DSL (1) Dead Lock (1) Difficult Puzzles (1) Distributed ALgorithm (1) Docker (1) Eclipse (1) Facebook Interview (1) Function Design (1) Functional (1) GoLang (1) How to Solve Problems (1) ID Generation (1) IO (1) Important (1) Internals (1) Interview - Dropbox (1) Interview - Project Experience (1) Interview Tips (1) Interview-Brain Teaser (1) Interview-How (1) Interview-Mics (1) Interview-Process (1) Jeff Dean (1) Joda (1) LeetCode - Review (1) Library (1) LinkedIn (1) Mac (1) Micro-Services (1) Mini System (1) MySQL (1) Nigix (1) NonBlock (1) Process (1) Productivity (1) Program Output (1) Programcreek (1) Quora (1) RPC (1) Raft (1) RateLimiter (1) Reactive (1) Reading (1) Reading Code (1) Resource-Java (1) Resource-System Design (1) Resume (1) SQL (1) Sampling (1) Shuffle (1) Slide Window (1) Spotify (1) Stability (1) Storm (1) Summary (1) System Design - TODO (1) Tic Tac Toe (1) Time Management (1) Web Tools (1) algolist (1) corejavainterviewquestions (1) martin fowler (1) mitbbs (1)

Popular Posts