|
2.1 源函数
除了内置的数据源之外,用户还可以使用addSource方法来添加自定义的数据源。自定义的数据源必须要实现 SourceFunction 接口,这里以产生 [0 , 1000) 区间内的数据为例,代码如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SourceFunction<Long>() {
private long count = 0L;
private volatile boolean isRunning = true;
public void run(SourceContext<Long> ctx) {
while (isRunning && count < 1000) {
// 通过collect将输入发送出去
ctx.collect(count);
count++;
}
}
public void cancel() {
isRunning = false;
}
}).print();
env.execute();
2.2 ParallelSourceFunction 和 RichParallelSourceFunction
上面通过SourceFunction实现的数据源是不具备精度的,即不支持在获取的DataStream上调用setParallelism(n)方法,此时会抛出如下的异常:
Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
如果你想要实现家具度的输入流,则需要实现 ParallelSourceFunction 或 RichParallelSourceFunction 接口,其与 SourceFunction 的关系如下图:
ParallelSourceFunction 直接继承自 ParallelSourceFunction,具有家具度的功能。RichParallelSourceFunction 则继承自 AbstractRichFunction,同时实现了 ParallelSourceFunction 接口,因此其除了具有家具度的功能外,还提供了额外的与生命周期相关的方法,如 open() ,close() 。
|
|