需求
模拟实时统计通话记录中 主叫-被叫 的通话次数。
storm编程体验
创建spout
IRichSpout接口的主要方法:
- open(mapconf,context,collector):初始化方法
- nextTuple():通过收集器输出数据并处理数据给下游组件
- close():spout停止的时候调用的方法
- declareOutputFields():声明tuple输出的schema
- ack(msgId):确认一个特定的元组已被处理
- fail(msgId):指定一个特定的元组没有被处理,并且不会被重复处理
CallLogReaderSpout
implements IRichSpout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; }
@Override public void nextTuple() { List<String> mobiles = new ArrayList<String>(); mobiles.add("num1"); mobiles.add("num2"); mobiles.add("num3"); if (this.i<100) { String from = mobiles.get(rnadomGenerator.nextInt(mobiles.size())); String to = mobiles.get(rnadomGenerator.nextInt(mobiles.size())); while (from.equals(to)) { to = mobiles.get(rnadomGenerator.nextInt(mobiles.size())); } int duration = rnadomGenerator.nextInt(60); this.collector.emit(new Values(from, to, duration), from+to+System.currentTimeMillis()); this.i++; } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); }
@Override public void ack(Object msgId) { NCUtil.write2NC(this, "ack: " + msgId.toString()); } @Override public void fail(Object msgId) { NCUtil.write2NC(this, "fail: " + msgId.toString()); }
|
创建bolt
IRichBolt接口的主要方法:
- prepare(mapconf,context,collector):初始化方法
- execute(tuple):接收上游数据并处理数据给下游组件
- cleanup():bolt停止的时候调用的方法
- declareOutputFields(declarer):声明tuple输出的schema
- getComponentConfiguration():组件有关参数的声明方法
CallLogCreatorBolt
implements IRichBolt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; }
@Override public void execute(Tuple input) { String from = input.getString(0); String to = input.getString(1); Integer duration = input.getInteger(2); collector.emit(input, new Values(from+"-"+to, duration)); collector.ack(input); }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); }
|
CallLogCounterBolt
implements IRichBolt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counterMap = new HashMap<String,Integer>(); }
@Override public void execute(Tuple input) { String call = input.getString(0); Integer count = 1; if (counterMap.containsKey(call)) { count = counterMap.get(call) + 1; } counterMap.put(call, count); collector.ack(input); }
@Override public void cleanup() { counterMap.forEach((k,v) -> { System.err.println(k + ": " + v); NCUtil.write2NC(this, k + ": " + v); }); }
|
编写App
App
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void main(String[] args) throws Exception { Config config = new Config(); config.setDebug(false); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call_log_reader_spout", new CallLogReaderSpout(), 1); builder.setBolt("call_log_creator_bolt", new CallLogCreatorBolt(), 1).shuffleGrouping("call_log_reader_spout"); builder.setBolt("call_log_counter_bold", new CallLogCounterBolt(), 1).fieldsGrouping("call_log_creator_bolt", new Fields("call")); if (args.length > 0) { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("call_log_analysis_topo", config, builder.createTopology()); Thread.sleep(5*1000); localCluster.shutdown(); } }
|
本地运行的结果
打包到集群运行
1 2 3 4 5
| $ mvn clean package -Dmaven.test.skip=true
$ strom jar zdemo-storm-0.0.1-SNAPSHOT.jar test01.App call_log_analysis_topo
|
Storm应用拓扑结构图: