LinuxSir.cn,穿越时空的Linuxsir!

 找回密码
 注册
搜索
热搜: shell linux mysql
查看: 273|回复: 0

弗林克接收器

[复制链接]
发表于 2023-12-29 16:09:26 | 显示全部楼层 |阅读模式
一、数据接收器
在使用 Flink 进行数据处理时,数据通过 Data Source 流入,然后通过一系列 Transformations 的转化,最终可以通过 Sink 将计算结果进行输出,Flink Data Sinks 就是用于定义数据流最终的输出位置。Flink 提供了几个比较简单的 Sink API 用于日常的开发,具体如下:

1.1 写为文本
writeAsText用于将计算结果以文本的方式写入到指定文件夹下,除了路径参数是必选外,该方法还可以通过指定第二个参数来定义输出模式,它有以下两个可选值:

WriteMode.NO_OVERWRITE:当指定路径上不存在任何文件时,才执行写出操作;
WriteMode.OVERWRITE:物质指定路径上是否存在文件,都执行写出操作;如果原来存在文件,则进行覆盖。
使用示例如下:

streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE);
以上写出任务的方式写出到多个文件,如果想要将输出结果全部写出到一个文件,需要设置其任务度为1:

streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
1.2 写为Csv
writeAsCsv用于将计算结果以 CSV 的文件格式写出到指定目录,除了路径参数是必选外,该方法还支持创建输出模式,行分隔符,和字段分隔符三个额外的参数,其方法定义如下:

writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter)
1.3 打印\printToErr
print \ printToErr是测试中最常用的方式,用于将计算结果以标准输出流或错误输出流的方式打印到控制台上。

1.4 使用输出格式进行写入
采用自定义的输出格式将计算结果写出,上面介绍的writeAsText及其writeAsCsv底层调用的都是该方法,源码如下:

public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
    TextOutputFormat<T> tof = new TextOutputFormat<>(new Path(path));
    tof.setWriteMode(writeMode);
    return writeUsingOutputFormat(tof);
}
1.5 写入套接字
writeToSocket用于将计算结果以指定的格式写出到 Socket 中,使用示例如下:

streamSource.writeToSocket("192.168.0.226", 9999, new SimpleStringSchema());
二、流媒体连接器
除了 API 之外,Flink 中还内置了一系列的 Connector 连接器,用于将计算结果输入到常用的存储系统或者消息中间件中,具体如下:

Apache Kafka(支持源和接收器)
Apache Cassandra(接收器)
Amazon Kinesis Streams(源/接收器)
Elasticsearch(接收器)
Hadoop 文件系统(接收器)
RabbitMQ(源/接收器)
Apache NiFi(源/接收器)
Google PubSub(源/接收器)
除了内置的连接器之外,你还可以通过 Apache Bahir 的连接器扩展 Flink。Apache Bahir 旨在为多元化的数据分析系统(如 Spark,Flink)等提供功能上的扩展,目前其支持的与 Flink Sink 相关的连接器如下:

Apache ActiveMQ(源/接收器)
Apache Flume(接收器)
Redis(接收器)
阿卡(水槽)
这里连接在Data Sources章节介绍整合Kafka Source的基础上,将Kafka Sink也一并进行整合,具体步骤如下。

三、整合Kafka Sink
3.1 添加接收器
Flink 提供了 addSink 方法用来调用自定义的 Sink 或者第三方的连接器,想要将计算结果写出到 Kafka 中,需要使用该方法来调用 Kafka 的生产者 FlinkKafkaProducer,具体代码如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 1.指定Kafka的相关配置属性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.200.0:9092");

// 2.接收Kafka上的数据
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties));

// 3.定义计算结果到 Kafka ProducerRecord 的转换
KafkaSerializationSchema<String> kafkaSerializationSchema = new KafkaSerializationSchema<String>() {
    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
        return new ProducerRecord<>("flink-stream-out-topic", element.getBytes());
    }
};
// 4. 定义Flink Kafka生产者
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic",
                                                                    kafkaSerializationSchema,
                                                                    properties,
                                               FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5);
// 5. 将接收到输入元素*2后写出到Kafka
stream.map((MapFunction<String, String>) value -> value + value).addSink(kafkaProducer);
env.execute("Flink Streaming");
3.2 创建输出主题
创建用于输出测试的主题:

bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                    --replication-factor 1 \
                    --partitions 1  \
                    --topic flink-stream-out-topic

# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
3.3 启动消费者
启动一个Kafka Consumer,用于查看Flink程序的输出情况:

bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flink-stream-out-topic
3.4 测试结果
在Kafka生产者上发送消息到Flink程序,观察Flink程序转换后的输出情况,具体如下:


可以看到 Kafka 生成者发出的数据已经被 Flink 程序正常接收到,并经过转换后又输出到 Kafka 对应的 Topic 上。

四、自定义水槽
除了使用内置的第三方连接器之外,Flink 还支持使用自定义的 Sink 来满足多样化的输出需求。想要实现自定义的 Sink ,需要直接或者继承实现 SinkFunction 接口。通常情况下,我们都是实现其抽象类 RichSinkFunction,相比于 SinkFunction ,其提供了更多的与生命周期相关的方法。两者间的生命如下:


这里我们自定义一个 FlinkToMySQLSink 来将计算结果写出到 MySQL 数据库中,具体步骤如下:

4.1 导入依赖
首先需要导入MySQL相关的依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.16</version>
</dependency>
4.2 自定义水槽
继承自 RichSinkFunction,实现自定义的 Sink :

public class FlinkToMySQLSink extends RichSinkFunction<Employee> {

    private PreparedStatement stmt;
    private Connection conn;

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.cj.jdbc.Driver");
        conn = DriverManager.getConnection("jdbc:mysql://192.168.0.229:3306/employees" +
                                           "?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",
                                           "root",
                                           "123456");
        String sql = "insert into emp(name, age, birthday) values(?, ?, ?)";
        stmt = conn.prepareStatement(sql);
    }

    @Override
    public void invoke(Employee value, Context context) throws Exception {
        stmt.setString(1, value.getName());
        stmt.setInt(2, value.getAge());
        stmt.setDate(3, value.getBirthday());
        stmt.executeUpdate();
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (stmt != null) {
            stmt.close();
        }
        if (conn != null) {
            conn.close();
        }
    }

}
4.3 使用自定义Sink
想要使用自定义的Sink,同样是需要调用addSink方法,具体如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Date date = new Date(System.currentTimeMillis());
DataStreamSource<Employee> streamSource = env.fromElements(
    new Employee("hei", 10, date),
    new Employee("bai", 20, date),
    new Employee("ying", 30, date));
streamSource.addSink(new FlinkToMySQLSink());
env.execute();

您需要登录后才可以回帖 登录 | 注册

本版积分规则

快速回复 返回顶部 返回列表