2.Flink编程基础

2. Flink编程基础

2.1 需要引入的基本依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.4</version>
</dependency>

如需使用scala API,则需替换
flink-java 为 flink-scala_2.12
flink-streaming-java_2.12 为 flink-streaming-scala_2.12

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.13.0</version>
</dependency>

2.2 flink的DataStream抽象

2.3 flink编程模板

一般的flink编程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.yiyuyyds.flink.java.demo;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamBatchWordCount {
public static void main(String[] args) throws Exception {

//流处理的编程环境入口
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(1);
//按批计算模式执行
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
//streamEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); //按流计算模式执行
//streamEnv.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //flink自己决定

//读文件,得到DataStream
DataStreamSource<String> streamSource = streamEnv.readTextFile("input");

streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split("\\s+");
for (String word : words) {
collector.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
.sum(1)
.print();

streamEnv.execute();
}
}

使用Lambda表达式编程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package cn.yiyuyyds.flink.java.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import scala.tools.util.PathResolver;

public class WordCountLambda {
public static void main(String[] args) throws Exception {
//流式处理入口环境
StreamExecutionEnvironment envStream = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> streamSource = envStream.readTextFile("input");

//将句子转为大写
//streamSource.map(value -> value.toUpperCase());
//由于上面的lambda表达式,函数体只有一行代码,且其中的方法没有参数,则可以使用方法引用
SingleOutputStreamOperator<String> upperCased = streamSource.map(String::toUpperCase);

SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = upperCased.flatMap((String s, Collector<Tuple2<String, Integer>> collector) -> {
String[] words = s.split("\\s+");
for (String word : words) {
collector.collect(Tuple2.of(word, 1));
}
})
//.returns(new TypeHint<Tuple2<String, Integer>>(){});
//.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
.returns(Types.TUPLE(Types.STRING, Types.INT));

KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndOne.keyBy(value -> value.f0);

keyedStream.sum(1).print();

envStream.execute();
}
}

使用Lambda编程风格需要注意,
函数的返回类型时Tuple2<String, Integer>,其中的Tuple2可以推断,但是Tuple2里面又有泛型<String, Integer>,在java编译后,泛型被擦除了,这行代码

1
collector.collect(Tuple2.of(word, 1));

里面的Tuple2的泛型信息丢失了,也就是说存在一个问题,在返回结果的数据类型中有泛型参数,编译之后,这些信息丢失了,为了解决这个问题,可以用flink的API(returns)来显示声明返回的类型。

2.4 flink程序的并行概念

  • flink 程序中,每一个算子都可以成为一个独立任务(tast)
  • flink 程序中,视上下游算子间数据分发规则、并行度、共享槽位设置,可组成算子链为一个task
  • 每个任务在运行时都可拥有多个并行的运行实例(subTask)
  • 且每个算子任务的并行度都可以在代码中显示设置

Flink图8

2.5 flink编程入口

批处理计算入口
1
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
流式计算入口
1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
流批一体入口
1
2
3
4
5
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//为env设置环境参数
ExecutionConfig config = env.getConfig();
//设置批处理模式
config.setExecutionMode(ExecutionMode.BATCH);
开启web UI的入口
1
2
3
4
//显示声明为本地运行环境,且带WebUI
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8082);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

还需要添加相关依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.14.4</version>
</dependency>