LinuxSir.cn,穿越时空的Linuxsir!

 找回密码
 注册
搜索
热搜: shell linux mysql
查看: 300|回复: 0

Flink数据源内置数据源

[复制链接]
发表于 2023-12-29 16:10:31 | 显示全部楼层 |阅读模式
Flink 数据源用于定义 Flink 程序的数据来源,Flink 官方提供了多种数据快速获取方法,用于帮助开发者简单地具体构建输入流,如下:

1.1 基于文件构建
1. readTextFile(path):按照TextInputFormat格式读取文本文件,并将其内容以字符串的形式返回。示例如下:

env.readTextFile(filePath).print();
2. readFile(fileInputFormat, path):按照指定格式读取文件。

3. readFile(inputFormat, filePath, watchType, Interval, typeInformation):按照指定格式循环的读取文件。其中各个参数的含义如下:

inputFormat:数据流的输入格式。
filePath:文件路径,可以是本地文件系统上的路径,也可以是HDFS上的文件路径。
watchType:读取方式,有两个可选值,分别是FileProcessingMode.PROCESS_ONCE和FileProcessingMode.PROCESS_CONTINUOUSLY:前面表示对指定路径上的数据只读取一次,然后退出;晚上表示对路径进行定期地扫描和读取。需要注意的是如果 watchType 被设置为PROCESS_CONTINUOUSLY,那么当文件被修改时,其所有的内容(包含原有的内容和新增的内容)都将被重新处理,因此这会打破 Flink 的恰好一次语义。
Interval:定期扫描的时间间隔。
typeInformation:输入流中元素的类型。
使用示例如下:

final String filePath = "D:\\log4j.properties";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readFile(new TextInputFormat(new Path(filePath)),
             filePath,
             FileProcessingMode.PROCESS_ONCE,
             1,
             BasicTypeInfo.STRING_TYPE_INFO).print();
env.execute();
1.2 基于集合构建
1. fromCollection(Collection):基于集合构建,集合中的所有元素必须是相同类型。示例如下:

env.fromCollection(Arrays.asList(1,2,3,4,5)).print();
2. fromElements(T ...):基于元素构建,所有元素必须是相同类型。示例如下:

env.fromElements(1,2,3,4,5).print();
3.generateSequence(from, to):基于给定的序列区间进行构建。示例如下:

env.generateSequence(0,100);
4. fromCollection(Iterator, Class):基于迭代器进行构建。第一个参数用于定义迭代器,第二个参数用于定义输出元素的类型。使用示例如下:

env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();
其中 CustomIterator 为自定义的迭代器,这里以产生 1 到 100 区间内的数据为例,源码如下。注意的是自定义迭代器除了要实现 Iterator 接口外,还必须要实现序列化接口 Serialized ,否则会引发序列化失败的异常:

import java.io.Serializable;
import java.util.Iterator;

public class CustomIterator implements Iterator<Integer>, Serializable {
    private Integer i = 0;

    @Override
    public boolean hasNext() {
        return i < 100;
    }

    @Override
    public Integer next() {
        i++;
        return i;
    }
}
5. fromParallelCollection(SplittableIterator, Class):接收两个参数的方法,第二个参数定义用于输出元素的类型,第一个参数 SplittableIterator 是迭代器的抽象基类,它用于将原始迭代器的值拆除分到多个不相交的迭代器中。

1.3 基于Socket构建
Flink 提供了 socketTextStream 方法用于构建基于 Socket 的数据流,socketTextStream 方法有以下主要参数:

主机名:主机名;
port:端口号,设置为0时,表示端口号自动分配;
delimiter:用于分隔每条记录的分隔符;
maxRetry:当 Socket 临时关闭时,程序的最大重试间隔,单位为秒。设置为 0 时表示不进行重试;设置为负值则表示一直重试。示例如下:
env.socketTextStream("192.168.0.229", 9999, "\n", 3).print();


您需要登录后才可以回帖 登录 | 注册

本版积分规则

快速回复 返回顶部 返回列表