2.9 Flink 扩展sink算子

2.9 Flink 扩展sink算子

2.9.1 KafkaSink

Flink可以和Kafka多个版本整合,比如0.11.x、1.x、2.x等;
从Flink1.9开始,使用的是kafka2.2的客户端。

核心类
  • KafkaStringSerializationSchema – 反序列化
  • FlinkKafkaProducer – 生产者(即sink)

需要添加依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.4</version>
</dependency>
新版本API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class _10_KafkaSinkOperator_Demo1 {
public static void main(String[] args) throws Exception {

Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8822);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

//开启checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");

DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());

//把数据写入kafka
// 1. 构造一个kafka的sink算子
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.0.219:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
.setTopic("event-log")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setTransactionalIdPrefix("doitedu-")
.build();

// 2.把数据流输出到构造好的sink算子
streamSource
.map(JSON::toJSONString)
.sinkTo(kafkaSink);

env.execute();
}
}

KafkaSink是能结合Flink的Checkpoint机制,来支持端到端的一致性语义;
(底层,当然是利用了kafka producer的事务机制)

2.9.2 JdbcSink

需要添加依赖:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
非Exactly-Once的JdbcSink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class _11_JdbcSinkOperator_Demo1 {
public static void main(String[] args) throws Exception {

Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8822);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

//开启checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");

DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());

// 构造一个Jdbc的sink算子
// 不保证Exactly-Once的一种
SinkFunction<EventLog> jdbcSink = JdbcSink.sink(
"insert into t_eventlog values(?,?,?,?,?) on duplicate key update sessionId=?,eventId=?,ts=?,eventInfo=?",
new JdbcStatementBuilder<EventLog>() {
@Override
public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {
preparedStatement.setLong(1, eventLog.getGuid());
preparedStatement.setString(2, eventLog.getSessionId());
preparedStatement.setString(3, eventLog.getEventId());
preparedStatement.setLong(4, eventLog.getTimeStamp());
preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));

preparedStatement.setString(6, eventLog.getSessionId());
preparedStatement.setString(7, eventLog.getEventId());
preparedStatement.setLong(8, eventLog.getTimeStamp());
preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo()));
}
},
JdbcExecutionOptions.builder().withBatchIntervalMs(5).withMaxRetries(2).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUsername("root")
.withPassword("123")
.withUrl("jdbc:mysql://")
.build()
);

streamSource.addSink(jdbcSink);

env.execute();
}
}
Exactly-Once的JdbcSink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class _11_JdbcSinkOperator_Demo1 {
public static void main(String[] args) throws Exception {

Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8822);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

//开启checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");

DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());


/**
* 构造一个Jdbc的sink算子
* 不保证Exactly-Once的一种
* 底层是利用jdbc目标数据库的事务机制
*/
SinkFunction<EventLog> eosJdbcsink = JdbcSink.exactlyOnceSink(
"insert into t_eventlog values(?,?,?,?,?) on duplicate key update sessionId=?,eventId=?,ts=?,eventInfo=?",
new JdbcStatementBuilder<EventLog>() {
@Override
public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {
preparedStatement.setLong(1, eventLog.getGuid());
preparedStatement.setString(2, eventLog.getSessionId());
preparedStatement.setString(3, eventLog.getEventId());
preparedStatement.setLong(4, eventLog.getTimeStamp());
preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));

preparedStatement.setString(6, eventLog.getSessionId());
preparedStatement.setString(7, eventLog.getEventId());
preparedStatement.setLong(8, eventLog.getTimeStamp());
preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo()));
}
},
JdbcExecutionOptions.builder().withBatchSize(5).withMaxRetries(2).build(),
JdbcExactlyOnceOptions.builder().withTransactionPerConnection(true).build(),
new SerializableSupplier<XADataSource>() {
@Override
public XADataSource get() {
MysqlXADataSource xADataSource = new MysqlXADataSource();
xADataSource.setUser("root");
xADataSource.setUrl("jdbc:mysql://");
xADataSource.setPassword("123");
return xADataSource;
}
}
);


streamSource.addSink(eosJdbcsink);
env.execute();
}
}

2.9.3 RedisSink

详见
https://www.bilibili.com/video/BV1K44y1g7wA?p=33&vd_source=26668f0ed33317a00612f0d4c98799c9
P33

Redis是一个基于内存、性能极高的NoSQL数据库,数据还可以持久化到磁盘,读写速度快,适合存储key-value类型的数据。Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset,hash等数据结构的存储。Flink实时计算出的结果,需要快速的输出存储起来,要求写入的存储系统的速度要快,这个才不会造成数据积压。Redis就是一个非常不错的选择。

需要添加依赖:

注意,这个依赖,在Maven官方的仓库中是没有的,需要下载bahir的源码到本地,并进行编译和安装到maven本地仓库,然后才能按照下面的方式引入:

1
2
3
4
5
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>