apache kafka - Storm UI improper values and Capillary tool -


i quiet new apache storm , have been trying trident topology kafka i.e. transactionaltridentkafkaspout. works fine except storm ui. though i've not produced data topic, storm ui keeps showing invalid emitted/transferred values. meaning count keeps on increasing when there no data in topic. i've tried deleting data/logs stored in zookeeper, storm, kafka , recreate kafka topics , have set

topology.stats.sample.rate: 1.0  

but still problem persists.

and came across tool called capillary to monitor storm cluster. using below properties

capillary.zookeepers="192.168.125.20:2181" capillary.kafka.zkroot="192.168.125.20:/home/storm/kafka_2.11-0.8.2.0" capillary.storm.zkroot="192.168.125.20:/home/storm/apache-storm-0.9.3" 

i using kafka's embedded zookeeper here. not working getting below exception.

! @6mbg4bp7l - internal server error, (get) [/] ->  play.api.application$$anon$1: execution exception[[jsonparseexception: unexpected character ('.' (code 46)): expected space separating root-level values  @ [source: java.io.stringreader@24adb083; line: 1, column: 9]]]         @ play.api.application$class.handleerror(application.scala:296) ~[com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]         @ play.api.defaultapplication.handleerror(application.scala:402) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]         @ play.core.server.netty.playdefaultupstreamhandler$$anonfun$14$$anonfun$apply$1.applyorelse(playdefaultupstreamhandler.scala:205) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]         @ play.core.server.netty.playdefaultupstreamhandler$$anonfun$14$$anonfun$apply$1.applyorelse(playdefaultupstreamhandler.scala:202) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]         @ scala.runtime.abstractpartialfunction.apply(abstractpartialfunction.scala:33) [org.scala-lang.scala-library-2.10.4.jar:na] caused by: com.fasterxml.jackson.core.jsonparseexception: unexpected character ('.' (code 46)): expected space separating root-level values  @ [source: java.io.stringreader@24adb083; line: 1, column: 9]         @ com.fasterxml.jackson.core.jsonparser._constructerror(jsonparser.java:1524) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]         @ com.fasterxml.jackson.core.base.parserminimalbase._reporterror(parserminimalbase.java:557) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]         @ com.fasterxml.jackson.core.base.parserminimalbase._reportunexpectedchar(parserminimalbase.java:475) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]         @ com.fasterxml.jackson.core.base.parserminimalbase._reportmissingrootws(parserminimalbase.java:495) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]         @ com.fasterxml.jackson.core.json.readerbasedjsonparser._verifyrootspace(readerbasedjsonparser.java:1178) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] 

any on either great. in advance.

configuration , source code snippet:

 final config config = new config();     config.put(config.topology_trident_batch_emit_interval_millis, 3000);     config.setnumworkers(2);     config.put(config.nimbus_host, "192.168.125.20");     config.put(config.nimbus_thrift_port, 6627);     config.put(config.storm_zookeeper_port, 2181);     config.put(config.storm_zookeeper_servers, arrays.aslist("192.168.125.20"));     config.put(config.topology_executor_receive_buffer_size, 16384);     config.put(config.topology_acker_executors, 1);     config.put(config.topology_max_spout_pending, 10);     config.put(config.drpc_servers, arrays.aslist("192.168.125.20"));     config.put(config.drpc_port, 3772);  final brokerhosts zkhosts = new zkhosts("192.168.125.20"); final tridentkafkaconfig kafkaconfig = new tridentkafkaconfig(zkhosts, "test_topic", ""); kafkaconfig.scheme = new schemeasmultischeme(new stringscheme()); kafkaconfig.buffersizebytes = 1024 * 1024 * 4; kafkaconfig.fetchsizebytes = 1024 * 1024 * 4; kafkaconfig.forcefromstart = false;  final transactionaltridentkafkaspout kafkaspout = new transactionaltridentkafkaspout(kafkaconfig); final tridenttopology topology = new tridenttopology(); topology.newstream("spout", kafkaspout)        .each(new fields("str"), new testfunction(), new fields("test"))        .each(new fields("str"), new printfilter()); 

topology summary image : topology stats

are possibly seeing i'd call ui metric artifacts of trident? these tuples show in counters of storm ui:

trident executes batch every 500ms (by default). batch involves bunch of coordination messages going out bolts coordinate batch (even if batch empty). that's you're seeing.

(source: trident kafka spout - ack count increasing though no messages processed)


Comments

Popular posts from this blog

python - TypeError: start must be a integer -

c# - DevExpress RepositoryItemComboBox BackColor property ignored -

django - Creating multiple model instances in DRF3 -