2.6 Flink 基础source算子

2.6 Flink 基础source算子

source 是用来获取外部数据的算子,按照获取数据的方式,可以分为:

  • 基于集合的Source

  • 基于Socket网络端口的Source

  • 基于文件的Source

  • 第三方Connector Source

  • 自定义Source

从并行的角度,source又可分为非并行的source和并行的source

  • 非并行source:并行度只能为1,即只有一个运行时实例,在读取大量数据时效率比较低,通常是用来做一些实验或测试,例如Socket Source。
  • 并行的source:并行度可以是1到多个,在计算资源足够的前提下,并行度越大,效率越高,例如Kfaka Source。

2.6.1 基于集合的Source(测试用)

可将一个普通的Java集合、迭代器或者可变参数转换成一个分布式数据流DataStream

fromElements
1
2
3
4
5
6
7
8
9
10
11
public class SourceOperator {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Integer> fromElements = env.fromElements(1, 2, 3, 4, 5);

fromElements.map(x -> x+10).print();

env.execute();
}
}
fromCollection

非并行的 Source,可以将一个Collection作为参数传入到该方法中,返回一个DataStreamSource

1
2
3
4
List<String> dataList = Arrays.asList("a", "b", "c", "d");

DataStreamSource<String> fromCollection = env.fromCollection(dataList);
fromCollection.map(String::toUpperCase).print();
fromParallelCollection

该方法是一个并行的 Source,该方法需要传入两个参数,第一个是实现 SplittableIterator 的实现类的迭代器,第二个是迭代器中数据的类型。

1
2
DataStreamSource<LongValue> parallelCollection = env.fromParallelCollection(new LongValueSequenceIterator(1, 100), TypeInformation.of(LongValue.class));
parallelCollection.map(v -> v.getValue()+100).print();
generateSequence

并行的 Source(并行度也可以通过调用该方法后,再通过调用 setParallelism() 方法来设置),通过指定的起始值和结束值来生成数据序列流。

1
2
3
//调用env的generateSequence生成并行的DataSource,输出的数字是1到100
DataStreamSource<Long> sequence = env.generateSequence(1, 100).setParallelism(3);
sequence.map(x -> x - 1).print();

2.6.2 基于Socket的Source(测试用)

非并行的 Source,通过Socket通信获取数据得到数据流。
该方法还有多个重载的方法,如
socketTextStream(String hostname, int port, String delimiter, long maxRetry)
可以指定分隔符和最大重新连接次数。

1
DataStreamSource<String> lines = env.socketTextStream("localhost", 9999);

注意,socketSource是一个非并行source,如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,在Linux中输入nc -lk 9999

2.6.3 基于文件的Source

基于文件的Source,本质上就是使用指定的FileInputFormat格式读取数据,可以指定TextInputFormat,CsvInputFormat,BinaryInputFormat等格式。
底层都是ContinuousFileMonitoringFunction,这个类继承了RichSourceFunction,都是非并行的Source。

readFile

readFile(FileInputFormat inputFormat, String filePath, FileProcessingMode watchType)方法可以指定读取文件的FileInputFormat格式,
参数FileProcessingMode,可取值:

  • PROCESS_ONCE:只读取文件中的数据一次,读取完成后,程序退出。
  • PROCESS_CONTINUOUSLY:会一直监听指定的文件,文件的内容发生变化后,会将以前的内容和新的内容全部读取出来,进而造成数据重复读取。
1
2
String path = "file://a.txt";
DataStreamSource<String> lines = env.readFile(new TextInputFormat(null), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 2000);
readTextFile

readTextFile(String filePath) 可以从指定的目录或文件读取数据,默认使用的是 TextInputFormat 格式读取数据,还有一个重载的方法 readTextFile(String filePath, String charsetName) 可以传入读取文件指定的字符集,默认是UTF-8编码。该方法是一个有限的数据源,数据读完后,程序就会退出,不能一直运行,该方法底层调用的是readFile方法,FileProcessingMode为PROCESS_ONCE

1
DataStreamSource<String> lines = env.readTextFile(path);

2.6.4 基于Kfaka的Source (生产常用)

在实际生产环境中,为了保证flink可以高效地读取数据源中的数据,通常是跟一些分布式消息中间件结合使用,例如Kakfa。Kafka的特点是分布式、多副本、高可用、高吞吐、可以记录偏移量等。Flink和Kafka整合可以高效的读取数据,并且可以保证Exactly Once(精确一次性语义)。

首先在maven项目的pom.xml文件中导入Flink跟Kakfa整合的依赖。

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.4</version>
</dependency>
老版本API

核心类FlinkKafkaConsumer,需指定三个参数:

  • topic 名称
  • 反序列化 Schema,SimpleStringSchema 指定的是读取Kafka中的数据反序列化成String格式
  • Properties 实例
1

注意,目前这种方式无法保证 Exactly Once,Flink的Source消费完数据后,将偏移量定期的写入到Kafka的__consumer_offsets中,这种方式虽然可以记录偏移量,但是无法保证 Exactly Once

新版本API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("auto.offset.commit", "true");

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setTopics("tp01")
.setGroupId("gp01")
.setBootstrapServers("localhost:9092")
//消费起始位移选择之前的所提交的偏移量,(如果没有,则重置为LASTEST)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
//.setStartingOffsets(OffsetsInitializer.earliest())消费起始位移直接选择为最早
//.setStartingOffsets(OffsetsInitializer.latest())消费起始位移直接选择为最新
//.setStartingOffsets(OffsetsInitializer.offsets(Map<TopicPartition, Long >))消费起始位移选择为方法所传入的每个分区对应的起始偏移量
.setValueOnlyDeserializer(new SimpleStringSchema())

//开启了kafka底层消费者的自动位移提交机制,它会把最新的消费者位移提交到kafka的consumer_offsets中
//就算把自动位移提交机制开启,KafkaSource依然不依赖自动位移提交机制(宕机重启时,优先从flink自己的状态中去获取偏移量<更可靠>)
.setProperties(properties)
//把本Source算子设置成 BOUNDED属性(有界流),将来本Source去读取数据的时候,读到指定的位置,就停止读取并退出
//常用于补数或者重跑某一段历史数据
//.setBounded(OffsetsInitializer.committedOffsets())


//把本Source算子设置成 UNBOUNDED属性(无界流),但并不会一直读数据,而是到达指定的位置就停止读取,但程序不退出
//主要应用场景:需要从kakfa中读取某一段固定长度的数据,然后拿这一段数据去跟另外一个真正的无界流联合处理
.setUnbounded(OffsetsInitializer.latest())
.build();

//env.addSource();
DataStreamSource<String> kafkaStreamSource = (DataStreamSource<String>) env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");

新版本API中,Flink会把Kafka消费者的消费位移记录在算子状态中,这样就实现了消费位移状态的容错,从而可以支持端到端的 Exactly Once

2.6.5 自定义 Source

Flink的 DataStream API 可以让开发者根据实际需要,灵活的自定义Source,本质上就是定义一个类,实现 SourceFunction 或继承 RichParallelSourceFunction,实现 run 方法和 cancel 方法。

代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package cn.yiyuyyds.flink.java.demo;

import lombok.*;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.HashMap;
import java.util.Map;

public class CustomSourceFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFunction());
dataStreamSource.print();

env.execute();
}
}


class MySourceFunction implements SourceFunction<EventLog> {

volatile boolean flag = true;

@Override
public void run(SourceContext<EventLog> sourceContext) throws Exception {
EventLog eventLog = new EventLog();
String[] events = {"appLaunch", "pageLoad", "adShow", "adClick", "itemShare", "itemCollect", "wakeUp", "appClose"};
Map<String, String> eventInfoMap = new HashMap<>();
while (flag) {
eventLog.setGuid(RandomUtils.nextLong(1, 1000));
eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());
eventLog.setTimeStamp(System.currentTimeMillis());
eventLog.setEventId(events[RandomUtils.nextInt(0, events.length)]);

eventInfoMap.put(RandomStringUtils.randomAlphabetic(1), RandomStringUtils.randomAlphabetic(2));
eventLog.setEventInfo(eventInfoMap);

sourceContext.collect(eventLog);

eventInfoMap.clear();

Thread.sleep(RandomUtils.nextInt(500, 1000));
}
}

@Override
public void cancel() {
flag = false;
}
}

@Data
@NoArgsConstructor
@AllArgsConstructor
@Setter
@Getter
class EventLog {
private long guid;
private String sessionId;
private String eventId;
private long timeStamp;
private Map<String, String> eventInfo;
}