storm系列3:Topology创建过程

Topology创建过程

  Topology是storm的一个完整工作流,由spout、bolt等组件构成。下面我们来看一下Topology是如何被创建的。

入口函数

  我们一般会在storm的入口函数调用TopologyBuilder进行Topology的创建,如下所示

1
2
3
4
5
6
7
TopologyBuilder builder = new TopologyBuilder();
//设置spout
builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
//设置bolt
builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);

TopolgyBuilder

  比较重要的实例变量

1
2
3
4
5
6
//所有提交的bolt放入_bolts中
private Map<String, IRichBolt> _bolts = new HashMap<>();
//所有提交的spout放入_spouts中
private Map<String, IRichSpout> _spouts = new HashMap<>();
//所有topology的spout和bolt放入_commons中
private Map<String, ComponentCommon> _commons = new HashMap<>();

setSpout

1
2
3
4
5
6
7
8
9
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) throws IllegalArgumentException {
//检测输入id是不是唯一的,主要是从实例变量的map里看有没有对应的key存在
validateUnusedId(id);
//构建ComponentCommon对象并进行初始化,最后放入_commons中
initCommon(id, spout, parallelism_hint);
//放入_spouts中
_spouts.put(id, spout);
return new SpoutGetter(id);
}

initCommon

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
ComponentCommon common = new ComponentCommon();
//设置消息流来源和分组方式
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
//设置并行度
if(parallelism!=null) {
int dop = parallelism.intValue();
if(dop < 1) {
throw new IllegalArgumentException("Parallelism must be positive.");
}
common.set_parallelism_hint(dop);
}
//设置组件的配置参数
Map conf = component.getComponentConfiguration();
if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}

setBolt

  与setSpout类似

1
2
3
4
5
6
7

public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);
}

setSpout和setBolt区别

看上去二者完成的事情基本类似,但是返回值有区别

SpoutGetter的源码

1
2
3
4
5
6

protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer {
public SpoutGetter(String id) {
super(id);
}
}

而BoltGetter源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {
private String _boltId;

public BoltGetter(String boltId) {
super(boltId);
_boltId = boltId;
}

public BoltDeclarer fieldsGrouping(String componentId, Fields fields) {
return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields);
}

public BoltDeclarer fieldsGrouping(String componentId, String streamId, Fields fields) {
return grouping(componentId, streamId, Grouping.fields(fields.toList()));
}

public BoltDeclarer globalGrouping(String componentId) {
return globalGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}

public BoltDeclarer globalGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.fields(new ArrayList<String>()));
}

public BoltDeclarer shuffleGrouping(String componentId) {
return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}

public BoltDeclarer shuffleGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.shuffle(new NullStruct()));
}

.....
}

可以发现BoltGetter还实现了不同的分组方式,如

1
2
3
4
5

private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) {
_commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping);
return this;
}

分组的本质是在_common中通过对应的boltId找到对应的ComponentCommon对象,对inputs属性进行设置。

createTopology()

  TopologyBuilder中还有一个比较重要的方法–createTopology(),其主要完成最后的封装工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

public StormTopology createTopology() {
//bolt集合
Map<String, Bolt> boltSpecs = new HashMap<>();
//spout集合
Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
maybeAddCheckpointSpout();
for(String boltId: _bolts.keySet()) {
//通过boltId获取Bolt
IRichBolt bolt = _bolts.get(boltId);
bolt = maybeAddCheckpointTupleForwarder(bolt);
//设置对应ComponentCommon对象的streams属性(输出的字段列表是否为直接流)
ComponentCommon common = getComponentCommon(boltId, bolt);
try{
maybeAddCheckpointInputs(common);
//把bolt和common一起放入bolt集合
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
}catch(RuntimeException wrapperCause){
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){
throw new IllegalStateException(
"Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
"which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
"should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause);
}
throw wrapperCause;
}
}
//对spout的处理和bolt的处理基本一致
for(String spoutId: _spouts.keySet()) {
IRichSpout spout = _spouts.get(spoutId);
ComponentCommon common = getComponentCommon(spoutId, spout);
try{
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
}catch(RuntimeException wrapperCause){
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){
throw new IllegalStateException(
"Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
"which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
"should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause);
}
throw wrapperCause;
}
}

StormTopology stormTopology = new StormTopology(spoutSpecs,
boltSpecs,
new HashMap<String, StateSpoutSpec>());

stormTopology.set_worker_hooks(_workerHooks);

return Utils.addVersions(stormTopology);

最终我们设置的bolt和spout都被封装到了StormTopology中。

总结

  总的来讲TopologyBuilder就是根据分组方式把spout和bolt节点连接起来形成一个拓扑结构。

-------------本文结束感谢您的阅读-------------

本文标题:storm系列3:Topology创建过程

文章作者:小建儿

发布时间:2018年08月22日 - 16:08

最后更新:2018年08月22日 - 16:08

原始链接:http://yajian.github.io/storm系列3-Topology创建过程/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。