|
Flink 数据源用于定义 Flink 程序的数据来源,Flink 官方提供了多种数据快速获取方法,用于帮助开发者简单地具体构建输入流,如下:
1.1 基于文件构建
1. readTextFile(path):按照TextInputFormat格式读取文本文件,并将其内容以字符串的形式返回。示例如下:
env.readTextFile(filePath).print();
2. readFile(fileInputFormat, path):按照指定格式读取文件。
3. readFile(inputFormat, filePath, watchType, Interval, typeInformation):按照指定格式循环的读取文件。其中各个参数的含义如下:
inputFormat:数据流的输入格式。
filePath:文件路径,可以是本地文件系统上的路径,也可以是HDFS上的文件路径。
watchType:读取方式,有两个可选值,分别是FileProcessingMode.PROCESS_ONCE和FileProcessingMode.PROCESS_CONTINUOUSLY:前面表示对指定路径上的数据只读取一次,然后退出;晚上表示对路径进行定期地扫描和读取。需要注意的是如果 watchType 被设置为PROCESS_CONTINUOUSLY,那么当文件被修改时,其所有的内容(包含原有的内容和新增的内容)都将被重新处理,因此这会打破 Flink 的恰好一次语义。
Interval:定期扫描的时间间隔。
typeInformation:输入流中元素的类型。
使用示例如下:
final String filePath = "D:\\log4j.properties";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readFile(new TextInputFormat(new Path(filePath)),
filePath,
FileProcessingMode.PROCESS_ONCE,
1,
BasicTypeInfo.STRING_TYPE_INFO).print();
env.execute();
1.2 基于集合构建
1. fromCollection(Collection):基于集合构建,集合中的所有元素必须是相同类型。示例如下:
env.fromCollection(Arrays.asList(1,2,3,4,5)).print();
2. fromElements(T ...):基于元素构建,所有元素必须是相同类型。示例如下:
env.fromElements(1,2,3,4,5).print();
3.generateSequence(from, to):基于给定的序列区间进行构建。示例如下:
env.generateSequence(0,100);
4. fromCollection(Iterator, Class):基于迭代器进行构建。第一个参数用于定义迭代器,第二个参数用于定义输出元素的类型。使用示例如下:
env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();
其中 CustomIterator 为自定义的迭代器,这里以产生 1 到 100 区间内的数据为例,源码如下。注意的是自定义迭代器除了要实现 Iterator 接口外,还必须要实现序列化接口 Serialized ,否则会引发序列化失败的异常:
import java.io.Serializable;
import java.util.Iterator;
public class CustomIterator implements Iterator<Integer>, Serializable {
private Integer i = 0;
@Override
public boolean hasNext() {
return i < 100;
}
@Override
public Integer next() {
i++;
return i;
}
}
5. fromParallelCollection(SplittableIterator, Class):接收两个参数的方法,第二个参数定义用于输出元素的类型,第一个参数 SplittableIterator 是迭代器的抽象基类,它用于将原始迭代器的值拆除分到多个不相交的迭代器中。
1.3 基于Socket构建
Flink 提供了 socketTextStream 方法用于构建基于 Socket 的数据流,socketTextStream 方法有以下主要参数:
主机名:主机名;
port:端口号,设置为0时,表示端口号自动分配;
delimiter:用于分隔每条记录的分隔符;
maxRetry:当 Socket 临时关闭时,程序的最大重试间隔,单位为秒。设置为 0 时表示不进行重试;设置为负值则表示一直重试。示例如下:
env.socketTextStream("192.168.0.229", 9999, "\n", 3).print();
|
|