storm分组策略
内置分组策略
storm 内置了 8 种流分组的方式,通过实现 CustomStreamGrouping 接口可以实现自定义的流分组。InputDeclarer 接口定义了不同的流分组方式,每当TopologyBuilder#setBolt 方法被调用就返回该对象,用于声明一个bolt的输入流以及这些流应当如何分组。该接口定义的所有分组方法如下:
随机分组(
shuffleGrouping
):最常用的分组方式,它随机地分发元组到 bolt 上的任务,这样能保证每个任务得到基本相同数量的元组。例如如果希望 bolt2 读取 spout 和 bolt1 两个组件发送的tuple,则可以定义 bolt2 如下:1
2
3topologyBuilder.setBolt("bolt2", new Bolt2(), 5)
.shuffleGrouping("spout")
.shuffleGrouping("bolt1");无分组(
noneGrouping
):假定你不关心流是如何被分组的,则可以使用这种方式,目前这种分组和随机分组是一样的效果,有一点不同的是 storm 会把这个bolt放到其订阅者的同一个线程中执行。- 本地或随机分组(
localOrShuffleGrouping
):如果目标 Bolt 中的一个或者多个 Task 和当前产生数据的 Task 在同一个Worker 进程里面,那么就走内部的线程间通信,将Tuple直接发给在当前 Worker 进程的目的 Task。否则,同 shuffleGrouping。localOrShuffleGrouping 的数据传输性能优于shuffleGrouping,因为在 Worker 内部传输,只需要通过Disruptor队列就可以完成,没有网络开销和序列化开销。因此在数据处理的复杂度不高, 而网络开销和序列化开销占主要地位的情况下,可以优先使用 localOrShuffleGrouping来代替 shuffleGrouping。 - 字段分组(
fieldsGrouping
):根据指定字段对流进行分组。例如,如果是按 userid 字段进行分组,具有相同 userid 的元组被分发到相同的任务,具有不同userid的元组可能被分发到不同的任务。字段分组是实现流连接和关联、以及大量其他用例的基础,在实现上,字段分组使用取模散列来实现。 - 部分关键字分组(
partialKeyGrouping
):这种方式与字段分组很相似,根据定义的字段来对数据流进行分组,不同的是,这种方式会考虑下游 Bolt 数据处理的均衡性问题,在输入数据源关键字不平衡时会有更好的性能。 - 广播分组(
allGrouping
):流被发送到所有 bolt 的任务中,使用这个分组方式要特别小心。 - 全局分组(
globalGrouping
):全部流被发送到 bolt 的同一个任务中(id最小的任务)。 - 直接分组(
directGrouping
):由元组的生产者组件决定元组消费者的组件,直接分组只能在已经声明为直接流(direct stream)的流中使用,声明方法为在 declareOutFields 方法中使用OutputFieldsDeclarer#declareStream 方法,并且元组必须使用emitDirect 方法来发射。Bolt 通过 TopologyContext 对象或者 OutputCollector 类的 emit 方法的返回值,可以得到其消费者的任务 id 列表(List<Integer>)。
直接分组示例
App中的定义:
1 | TopologyBuilder builder = new TopologyBuilder(); |
CallLogReaderSpout:
1 | private SpoutOutputCollector collector; |
CallLogCreatorBolt
1 |
|
自定义分组策略
可以通过实现 CustomStreamGrouping 接口来创建自定义的流分组(customGrouping)。
使用示例:
1 | topologyBuilder.setBolt("bolt2", new Bolt2(), 5).customGrouping("componentIdxxx", new MyGroupring()); |
MyGroupring
1 | public class MyGrouping implements CustomStreamGrouping { |