2. FlinkSql 编程概览

2. FlinkSql 编程概览

2.1 FlinkSql程序结构

所需依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.14.4</version>
</dependency>
FlinkSql编程4步曲
  • 创建FlinkSql编程入口
  • 将数据定义成表(视图)
  • 执行sql语义的查询(sql语法或者tableAPI)
  • 将查询结果输出到目标表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Demo1 {
public static void main(String[] args) {
EnvironmentSettings envSettrings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(envSettrings);

//把kafka中的一个topic数据,映射成一张FlinkSql表
tableEnv.executeSql("create table t_kafka "
+ "( "
+ " id int, "
+ " name string, "
+ " age int, "
+ " gender string "
+ ") "
+ "with ( "
+ " 'connector' = 'kafka', "
+ " 'topic' = 'first', "
+ " 'properties.bootstrap.servers' = 'hadoop1:9092', "
+ " 'properties.group.id' = 'g1', "
+ " 'scan.startup.mode' = 'earliest-offset', "
+ " 'format' = 'json', "
+ " 'json.fail-on-missing-field' = 'false', "
+ " 'json.ignore-parse-errors' = 'true' "
+ ") ");

tableEnv.executeSql("select gender,avg(age) as avg_age from t_kafka group by gender").print();
}
}

2.2 两种编程方式

FlinkSql提供两种编程方式:

  • Table API
  • Table Sql

类比sparksql的dataframe api编程和sql编程;

2.2.1 Table API方式

Table API是一个与编程语言(java、python、scala)集成的查询API;与SQL不同,查询逻辑不是以字符串表达,而是在“宿主语言”中调用所提供的类、方法等;
复杂的运算可以通过调用多个方法组成,如:table.filter(…).groupBy(…).select(…).join(…).on(…)

1
2
Table table2 = table.groupBy($("gender"))
.select($("gender"), $("age").avg().as("avg_age"))

2.2.2 Table Sql方式

用“sql字符串”形式进行基于表的关系运算逻辑表达

1
2
3
4
5
6
7
table.executeSql(
"insert into RevenueChina" +
"select cId, cName, sum(revenue) asa revSum" +
"from Orders" +
"where cCountry = china" +
"group by cId, cName"
);

2.2.3 混搭方式

表API和SQL查询可以很容易的混合,因为Table对象可以和sql表进行方便的互转:

  • 可以让SQL查询返回Table对象,进而调用Table API
  • 可以用env.from(“sql表名”)引用sql表得到Table对象,进而调用Table API
  • 可以用env.createTemporaryView(“sql表名”, table对象),将Table对象注册成sql表,进而用sql

2.3 Table Environment

FlinkSql的编程,总是从一个入口环境TableEnvironment开始;
TableEnvironment主要功能如下:

  • 注册catalog
  • 向catallog注册表
  • 加载可插拔模块(目前有hive modules,以用于扩展支持hive的语法、函数等)
  • 执行sql查询(sql解析,查询计划生成,job提交)
  • 注册用户自定义函数
  • 提供DataStream和Table之间的转换

创建方式1(直接创建TableEnvironment)

1
2
3
4
5
6
EnvironmentSettings envSettrings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);

创建方式2(从StreamExecutionEnvironment创建,这样便于结合sql和stream编程)

1
2
StreamExecutrionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);