2.7 Flink 基础transformation算子

2.7 Flink 基础transformation算子

2.7.1 映射算子

map 映射(DataStream -> DataStream)

map(new MapFunction)
MapFunction : (x) -> y [1条变1条]

1
2
3
4
5
6
7
8
9
10
11
12
13
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> words = env.fromElements("hadoop", "spark", "flink", "hbase", "flink");
//在map方法中传入MapFunction实现类实例,重写map方法
SingleOutputStreamOperator<String> upperWords = words.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
//将单词转为大写
return s.toUpperCase();
}
});
//调用Sink将数据打印在控制台
upperWords.print();
flatMap 扁平化映射(DataStream -> DataStream)

flatMap(new FlatMapFunction)
FlatMapFunction : x -> x1,x2,x3 [1条变多条,并展平]

1
2
3
4
5
6
7
8
9
SingleOutputStreamOperator<Tuple2<String, Integer>> words = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] split = value.split("\\s+");
for (String word : split) {
out.collect(Tuple2.of(word, 1));
}
}
});

如果是flatMap方法时传入Lambda表达式,需要在调用flatMap方法后,再调用returns方法指定返回的数据类型,否则Flink无法自动推断出返回的数据类型,会出现异常。

project 投影(DataStream -> DataStream)

该算子只能对 Tuple 类型数据使用,project方法的功能类似sql中的”select”字段,该方法只有java API才有,scala API没有该方法

1
2
3
DataStreamSource<Tuple3<String, String, Integer>> users = env.fromElements(Tuple3.of("佩奇", "女", 5), Tuple3.of("乔治", "男", 3));
//只要第0个和第2个字段
SingleOutputStreamOperator<Tuple> result = users.project(0, 2);

2.7.2 过滤算子

filter 过滤(DataStream -> DataStream)

filter(new FilterFunction)
FilterFunction : x -> true/false

1
2
3
4
5
6
7
8
DataStreamSource<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//过滤奇数,保留偶数
SingleOutputStreamOperator<Integer> even = numbers.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer integer) throws Exception {
return integer % 2 == 0;
}
});

2.7.3 分组算子

keyBy 按key分组(DataStream -> KeyedStream)
1
2
3
4
5
6
KeyedStream<Tuple2<String, Integer>, String> keyed = words.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});

2.7.4 滚动聚合算子

  • 此处所说的滚动聚合算子,是多个聚合算子的统称,有 sum、min、minBy、max、maxBy
  • 这些算子的底层逻辑都是维护一个聚合值,并使用每条流入的数据对聚合值进行滚动更新
  • 这些算子都只能在 KeyedStream 上调用 (就是必须 keyBy 后调用)
sum

该算子实现实时滚动相加的功能,即新输入的数据和历史数据进行相加。

min、minBy

这两个算子都是求最小值,min和minBy的区别在于:

  • min的返回值,除了最小值字段以外,其它字段是第一条输入数据的值
  • minBy的返回值,就是最小值字段所在的那条数据
max、maxBy

这两个算子都是求最大值,用法和min、minBy一样

reduce 归约

它的滚动聚合逻辑没有写死,而是由用户通过ReduceFunction来传入

iterate 迭代 (DataStream -> IterativeStream -> DataStream)