java - Kafka consumer not receiving message -
i have found code kafka consumer. when send message producer, consumer in command prompt receives message consumer code not show message. have attached code below.
public class simplehlconsumer { private final consumerconnector consumer; private final string topic; public simplehlconsumer(string zookeeper, string groupid, string topic) { properties props = new properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupid); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); consumer = consumer.createjavaconsumerconnector( new consumerconfig(props)); this.topic = topic; } public void testconsumer() { map<string, integer> topiccount = new hashmap<string, integer>(); // define single thread topic topiccount.put(topic, new integer(1)); map<string, list<kafkastream<byte[], byte[]>>> consumerstreams = consumer.createmessagestreams(topiccount); list<kafkastream<byte[], byte[]>> streams = consumerstreams.get(topic); (final kafkastream stream : streams) { consumeriterator<byte[], byte[]> consumerite = stream.iterator(); while (consumerite.hasnext()) system.out.println("message single topic :: " + new string(consumerite.next().message())); } if (consumer != null){ system.out.println("shut down"); consumer.shutdown(); } } public static void main(string[] args) { string topic = args[0]; basicconfigurator.configure(); simplehlconsumer simplehlconsumer = new simplehlconsumer("localhost:2181", "testgroup", topic); simplehlconsumer.testconsumer(); } }
thanks
Comments
Post a Comment