send output of two bolts to a single bolt in Storm? -
what easiest way send output of bolta , boltb boltc. have use joins or there simpler solution. , b have same fields (ts, metric_name, metric_count).
// kafkaspout --> logdecoder builder.setbolt(logdecoder_bolt_id, logdecoderbolt, 10).shufflegrouping(kafka_spout_id); // logdecoder --> countbolt builder.setbolt(count_bolt_id, countbolt, 10).shufflegrouping(logdecoder_bolt_id); // logdecoder --> httprescodecountbolt builder.setbolt(http_res_code_count_bolt_id, http_res_code_count_bolt, 10).shufflegrouping(logdecoder_bolt_id); # , want send countbolt , httprescodecountbolt output aggregator bolt. // countbolt --> aggregatwbolt builder.setbolt(aggregate_bolt_id, aggregatebolt, 5).fieldsgrouping((count_bolt_id), new fields("ts")); // httprescodecountbolt --> aggregatwbolt builder.setbolt(aggregate_bolt_id, aggregatebolt, 5).fieldsgrouping((http_res_code_count_bolt_id), new fields("ts"));
is possible ?
yes. add stream-id ("stream1" , "stream2" below) fieldsgrouping call:
boltdeclarer bd = builder.setbolt(aggregate_bolt_id, aggregatebolt, 5); bd.fieldsgrouping((count_bolt_id), "stream1", new fields("ts")); bd.fieldsgrouping((http_res_code_count_bolt_id), "stream2", new fields("ts"));
and in execute() method boltc can test see stream tuple came from:
public void execute(tuple tuple) { if ("stream1".equals(tuple.getsourcestreamid())) { // came stream1 } else if ("stream2".equals(tuple.getsourcestreamid())) { // came stream2 }
since know stream tuple came from, don't need have same shape of tuple on 2 streams. de-marshall tuple according stream-id.
you can check see component tuple came (as type think might more appropriate case) instance of component (the task) emitted tuple.
Comments
Post a Comment