LinuxSir.cn,穿越时空的Linuxsir!

 找回密码
 注册
搜索
热搜: shell linux mysql
查看: 331|回复: 0

Flink 改造

[复制链接]
发表于 2023-12-29 22:20:45 | 显示全部楼层 |阅读模式
一、变换分类
Flink 的转换操作主要用于将一个和多个数据流转换成新的数据流。它主要分为以下三类:

DataStream Transformations:进行数据流相关转换操作;
物理分区:物理分区。Flink 提供了底层的 API,允许用户定义数据的分区规则;
任务链和资源组:任务链和资源组。允许用户进行任务链和资源组的细粒度的控制。
以下分别对其主要API进行介绍:

二、数据流转换
2.1 映射[数据流→数据流]
对于一个 DataStream 中的每个元素都执行特定的转换操作:

DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5);
integerDataStream.map((MapFunction<Integer, Object>) value -> value * 2).print();
// 输出 2,4,6,8,10
2.2 FlatMap【数据流→数据流】
FlatMap 与 Map 类似,但是 FlatMap 中的一个输入元素可以被映射成一个或者多个输出元素,示例如下:

String string01 = "one one one two two";
String string02 = "third third third four";
DataStream<String> stringDataStream = env.fromElements(string01, string02);
stringDataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String s : value.split(" ")) {
            out.collect(s);
        }
    }
}).print();
// 输出每一个独立的单词,为节省排版,这里去掉换行,后文亦同
one one one two two third third third four
2.3 过滤器【数据流→数据流】
用于过滤符合条件的数据:

env.fromElements(1, 2, 3, 4, 5).filter(x -> x > 3).print();
2.4 KeyBy和Reduce
KeyBy [DataStream → KeyedStream]:用于将相同Key值的数据分到相同的分区中;
Reduce [KeyedStream → DataStream]:用于对数据执行归约计算。
下面的例子将按照数据键值分区后,滚动进行求和计算:

DataStream<Tuple2<String, Integer>> tuple2DataStream = env.fromElements(new Tuple2<>("a", 1),
                                                                        new Tuple2<>("a", 2),
                                                                        new Tuple2<>("b", 3),
                                                                        new Tuple2<>("b", 5));
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = tuple2DataStream.keyBy(0);
keyedStream.reduce((ReduceFunction<Tuple2<String, Integer>>) (value1, value2) ->
                   new Tuple2<>(value1.f0, value1.f1 + value2.f1)).print();

// 持续进行求和计算,输出:
(a,1)
(a,3)
(b,3)
(b,8)
KeyBy 操作存在以下两个限制:

KeyBy 操作用于用户自定义的 POJO 类型时,该自定义类型必须重写 hashCode 方法;
KeyBy 操作不能用于数据库类型。
2.5 聚合[KeyedStream→DataStream]
Aggregations是官方提供的聚合算子,封装了常用的聚合操作,如上面利用Reduce进行求和的操作也可以利用Aggregations中的sum算子重写为下面的形式:

tuple2DataStream.keyBy(0).sum(1).print();
除了 sum 外,Flink 还提供了 min , max , minBy,maxBy 等常用聚合算子:

// 滚动计算指定key的最小值,可以通过index或者fieldName来指定key
keyedStream.min(0);
keyedStream.min("key");
// 滚动计算指定key的最大值
keyedStream.max(0);
keyedStream.max("key");
// 滚动计算指定key的最小值,并返回其对应的元素
keyedStream.minBy(0);
keyedStream.minBy("key");
// 滚动计算指定key的最大值,并返回其对应的元素
keyedStream.maxBy(0);
keyedStream.maxBy("key");
2.6 联合[数据流* → 数据流]
用于连接两个或者多个元素类型相同的DataStream。当然一个DataStream也可以与本生进行连接,此时该DataStream中的每个元素都会被获取两次:

DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 1),
                                                                            new Tuple2<>("a", 2));
DataStreamSource<Tuple2<String, Integer>> streamSource02 = env.fromElements(new Tuple2<>("b", 1),
                                                                            new Tuple2<>("b", 2));
streamSource01.union(streamSource02);
streamSource01.union(streamSource01,streamSource02);
2.7 连接[数据流,数据流→连接流]
Connect 操作用于连接两个或者多个类型不同的 DataStream ,其返回的类型是 ConnectedStreams ,此时被连接的多个 DataStream 可以共享同一之间的数据状态。但是需要注意的是由于不同 DataStream 之间的数据类型不同的,如果想要进行后续的计算操作,还需要通过 CoMap 或 CoFlatMap 将 ConnectedStreams 转换回 DataStream:

DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 3),
                                                                            new Tuple2<>("b", 5));
DataStreamSource<Integer> streamSource02 = env.fromElements(2, 3, 9);
// 使用connect进行连接
ConnectedStreams<Tuple2<String, Integer>, Integer> connect = streamSource01.connect(streamSource02);
connect.map(new CoMapFunction<Tuple2<String, Integer>, Integer, Integer>() {
    @Override
    public Integer map1(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }

    @Override
    public Integer map2(Integer value) throws Exception {
        return value;
    }
}).map(x -> x * 100).print();

// 输出:
300 500 200 900 300
2.8 分割与选择
Split [DataStream → SplitStream]:用于将一个DataStream按照指定规则进行分割为多个DataStream,注意的是这里进行的是逻辑分割,即Split只是将数据贴上不同的类型标签,但最终返回的仍然只是一个 SplitStream;
选择[SplitStream → DataStream]:想要从逻辑分割的SplitStream中获取真实的不同类型的DataStream,需要使用Select算子,示例如下:
DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
// 标记
SplitStream<Integer> split = streamSource.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        output.add(value % 2 == 0 ? "even" : "odd");
        return output;
    }
});
// 获取偶数数据集
split.select("even").print();
// 输出 2,4,6,8
2.9 项目【数据流→数据流】
项目主要用于获取元组中的指定字段集,示例如下:

DataStreamSource<Tuple3<String, Integer, String>> streamSource = env.fromElements(
                                                                         new Tuple3<>("li", 22, "2018-09-23"),
                                                                         new Tuple3<>("ming", 33, "2020-09-23"));
streamSource.project(0,2).print();

// 输出
(li,2018-09-23)
(ming,2020-09-23)

您需要登录后才可以回帖 登录 | 注册

本版积分规则

快速回复 返回顶部 返回列表