apache kafka - Storm UI improper values and Capillary tool -
i quiet new apache storm , have been trying trident topology kafka i.e. transactionaltridentkafkaspout. works fine except storm ui. though i've not produced data topic, storm ui keeps showing invalid emitted/transferred values. meaning count keeps on increasing when there no data in topic. i've tried deleting data/logs stored in zookeeper, storm, kafka , recreate kafka topics , have set
topology.stats.sample.rate: 1.0
but still problem persists.
and came across tool called capillary to monitor storm cluster. using below properties
capillary.zookeepers="192.168.125.20:2181" capillary.kafka.zkroot="192.168.125.20:/home/storm/kafka_2.11-0.8.2.0" capillary.storm.zkroot="192.168.125.20:/home/storm/apache-storm-0.9.3"
i using kafka's embedded zookeeper here. not working getting below exception.
! @6mbg4bp7l - internal server error, (get) [/] -> play.api.application$$anon$1: execution exception[[jsonparseexception: unexpected character ('.' (code 46)): expected space separating root-level values @ [source: java.io.stringreader@24adb083; line: 1, column: 9]]] @ play.api.application$class.handleerror(application.scala:296) ~[com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] @ play.api.defaultapplication.handleerror(application.scala:402) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] @ play.core.server.netty.playdefaultupstreamhandler$$anonfun$14$$anonfun$apply$1.applyorelse(playdefaultupstreamhandler.scala:205) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] @ play.core.server.netty.playdefaultupstreamhandler$$anonfun$14$$anonfun$apply$1.applyorelse(playdefaultupstreamhandler.scala:202) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] @ scala.runtime.abstractpartialfunction.apply(abstractpartialfunction.scala:33) [org.scala-lang.scala-library-2.10.4.jar:na] caused by: com.fasterxml.jackson.core.jsonparseexception: unexpected character ('.' (code 46)): expected space separating root-level values @ [source: java.io.stringreader@24adb083; line: 1, column: 9] @ com.fasterxml.jackson.core.jsonparser._constructerror(jsonparser.java:1524) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] @ com.fasterxml.jackson.core.base.parserminimalbase._reporterror(parserminimalbase.java:557) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] @ com.fasterxml.jackson.core.base.parserminimalbase._reportunexpectedchar(parserminimalbase.java:475) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] @ com.fasterxml.jackson.core.base.parserminimalbase._reportmissingrootws(parserminimalbase.java:495) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] @ com.fasterxml.jackson.core.json.readerbasedjsonparser._verifyrootspace(readerbasedjsonparser.java:1178) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
any on either great. in advance.
configuration , source code snippet:
final config config = new config(); config.put(config.topology_trident_batch_emit_interval_millis, 3000); config.setnumworkers(2); config.put(config.nimbus_host, "192.168.125.20"); config.put(config.nimbus_thrift_port, 6627); config.put(config.storm_zookeeper_port, 2181); config.put(config.storm_zookeeper_servers, arrays.aslist("192.168.125.20")); config.put(config.topology_executor_receive_buffer_size, 16384); config.put(config.topology_acker_executors, 1); config.put(config.topology_max_spout_pending, 10); config.put(config.drpc_servers, arrays.aslist("192.168.125.20")); config.put(config.drpc_port, 3772); final brokerhosts zkhosts = new zkhosts("192.168.125.20"); final tridentkafkaconfig kafkaconfig = new tridentkafkaconfig(zkhosts, "test_topic", ""); kafkaconfig.scheme = new schemeasmultischeme(new stringscheme()); kafkaconfig.buffersizebytes = 1024 * 1024 * 4; kafkaconfig.fetchsizebytes = 1024 * 1024 * 4; kafkaconfig.forcefromstart = false; final transactionaltridentkafkaspout kafkaspout = new transactionaltridentkafkaspout(kafkaconfig); final tridenttopology topology = new tridenttopology(); topology.newstream("spout", kafkaspout) .each(new fields("str"), new testfunction(), new fields("test")) .each(new fields("str"), new printfilter());
topology summary image :
are possibly seeing i'd call ui metric artifacts of trident? these tuples show in counters of storm ui:
trident executes batch every 500ms (by default). batch involves bunch of coordination messages going out bolts coordinate batch (even if batch empty). that's you're seeing.
(source: trident kafka spout - ack count increasing though no messages processed)
Comments
Post a Comment