LinuxSir.cn,穿越时空的Linuxsir!

 找回密码
 注册
搜索
热搜: shell linux mysql
查看: 320|回复: 0

Flink数据源自定义数据源

[复制链接]
发表于 2023-12-29 16:11:17 | 显示全部楼层 |阅读模式
2.1 源函数
除了内置的数据源之外,用户还可以使用addSource方法来添加自定义的数据源。自定义的数据源必须要实现 SourceFunction 接口,这里以产生 [0 , 1000) 区间内的数据为例,代码如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new SourceFunction<Long>() {

    private long count = 0L;
    private volatile boolean isRunning = true;

    public void run(SourceContext<Long> ctx) {
        while (isRunning && count < 1000) {
            // 通过collect将输入发送出去
            ctx.collect(count);
            count++;
        }
    }

    public void cancel() {
        isRunning = false;
    }

}).print();
env.execute();
2.2 ParallelSourceFunction 和 RichParallelSourceFunction
上面通过SourceFunction实现的数据源是不具备精度的,即不支持在获取的DataStream上调用setParallelism(n)方法,此时会抛出如下的异常:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
如果你想要实现家具度的输入流,则需要实现 ParallelSourceFunction 或 RichParallelSourceFunction 接口,其与 SourceFunction 的关系如下图:


ParallelSourceFunction 直接继承自 ParallelSourceFunction,具有家具度的功能。RichParallelSourceFunction 则继承自 AbstractRichFunction,同时实现了 ParallelSourceFunction 接口,因此其除了具有家具度的功能外,还提供了额外的与生命周期相关的方法,如 open() ,close() 。

您需要登录后才可以回帖 登录 | 注册

本版积分规则

快速回复 返回顶部 返回列表