2.8 Flink 基础sink算子

2.8 Flink 基础sink算子

2.8.1 打印输出print

打印是最简单的一个Sink,通常是用来做实验和测试时使用。

1
SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = keyed.sum("f1");

2.8.2 文件Sink

以下writeAs... 方法均已被标记为deprecated
writeAsText 以文本格式输出

该方法是将数据以csv格式写入到指定的目录中,目录中的文件名称是该Sink所在subtask的Index+1。可以额外指定一个参数writeMode,默认是WriteMode.NO_OVERWRITE。如果是WriteMode.OVERWRITE,会将以前的文件覆盖。

writeAsCsv 以csv格式输出

该方法是将数据以csv格式写入到指定的目录中,本质上使用的是CsvOutputFormat格式写入的。该Sink并不是将数据实时写入到文件中,而是有一个BufferedOutputStream,默认缓存的大小为4096个字节,只有达到这个大小,才会flush到磁盘。另外程序在正常退出,调用Sink的close方法也会flush到磁盘。

writeUsingOutputFormat 以指定的格式输出

该方法是将数据以指定的格式写入到指定目录中,该方法要传入一个OutputFormat接口的实现类。

writeToSocket 输出到网络端口

该方法是将数据输出到指定的Socket网络地址端口。输出之前,指定的网络端口服务必须已经启动。

2.8.3 StreamFileSink

该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exactly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。

streamFileSink中输出的文件,其生命周期会经历3种状态:

  • In-Progress Files
  • Pending Files
  • Finished Files
Flink图9

需要添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.14.4</version>
</dependency>
代码示例

通过DefaultRollingPolicy这个工具类,指定文件滚动生成的策略。这里设置的文件滚动生成策略有两个,一个是距离上一次生成文件时间超过30秒,另一个是文件大小达到100mb.这两个条件只要满足其中一个即可滚动生成文件。然后StreamingFileSink.forRowFormat方法将文件输出目录,文件写入的编码传入,再调用withRollingPolicy关联上面的文件滚动生成策略,接着调用build方法构建好StreamingFileSink,最后将其作为参数传入到addSink方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class _08_SinkOperator_Demos {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");

env.setParallelism(2);

DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());

//输出到控制台
//streamSource.print();

//输出到文件
//streamSource.writeAsText("d:/sink_test2", FileSystem.WriteMode.OVERWRITE);
streamSource.map(bean -> Tuple5.of(bean.getEventId(), bean.getGuid(), bean.getEventInfo(), bean.getSessionId(), bean.getTimeStamp()))
.returns(new TypeHint<Tuple5<String, Long, Map<String, String>, String, Long>>(){});




/**
* 应用StreamFileSink算子,将数据输出到文件系统
*/

/**
* 1. 输出为 行格式
*/
//构造一个FileSink对象
FileSink<String> rowSink = FileSink
.forRowFormat(new Path("d:/filesink/"), new SimpleStringEncoder<String>("utf-8"))
//文件的滚动策略 (间隔时长10s,或文件大小达到1M,就进行文件切换)
.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(1000).withMaxPartSize(1024 * 1024).build())
//分桶的策略 (划分子文件夹的策略)
.withBucketAssigner(new DateTimeBucketAssigner<String>())
.withBucketCheckInterval(5)
//输出文件的文件名相关配置
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("doitedu").withPartSuffix(".txt").build())
.build();

//然后添加到流进行输出
streamSource.map(JSON::toJSONString)
//.addSink()/*SinkFunction的实现类对象,用addSink()来添加*/
.sinkTo(rowSink);/*Sink的实现类对象,用sinkTo()来添加*/



/**
* 2. 输出为 列格式
*/

//详见
// https://www.bilibili.com/video/BV1K44y1g7wA?p=24&vd_source=26668f0ed33317a00612f0d4c98799c9
// P24


env.execute();
}
}