3-6. FlinkSql 表相关

3. FlinkSql 表的概念及类别

3.1 表的标识结构

每一个表的标识由3部分组成:

  • catalog name (常用于标识不同的“源”,比如hive catalog,inner catalog等)
  • database name (通常语义中的“库”)
  • table name (通常语义中的“表”)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
TableEnvironment tEnv = ...;
tEnv.useCatalog("a_catalog");
tEnv.useDatabase("db1");

Table table = ...;

//注册在默认catalog的默认database中
tEnv.createTemporaryView("a_view", table);

//注册在默认catalog的指定database中
tEnv.createTemporaryView("db2.a_view", table);

//注册在指定catalog的指定database中
tEnv.createTemporaryView("x_catalog.db3.a_view", table);

一个FlinkSql程序在运行时,tableEnvironment通过持有一个map结构来记录所注册的catalog;

1
2
3
4
public final class CatalogManager {
private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class);
private final Map<String, Catalog> catalogs;
private final Map<ObjectIdentifier, CatalogBaseTable> temporaryTables;

3.2 表与视图

FlinkSql中的表,可以是virtual的(view视图)和regular的(table常规表)

  • table描述了一个物理上的外部数据源,如文件、数据库表、kafka消息topic
  • view则基于表创建,代表一个或多个表上的一段计算逻辑(就是对一段查询计划的逻辑封装);

不管是table还是view,在Table API中得到的都是Table对象;

3.2 临时与永久

  • 临时表(视图):创建时带temporary关键字(create temporary view、create temporary table)- - 永久表(视图):创建时不带temporary关键字(create view、create table)
1
2
3
4
5
6
7
8
9
10
11
12
13
//sql定义方式
tableEnv.executeSql("create view view_1 as select ... from projectedTable")
tableEnv.executeSql("create temporary view view_2 as select ... from projectedTable")

tableEnv.executeSql("create table (id int, ...) with ('connector' = '...')")
tableEnv.executeSql("create temporary table (id int, ...) with ('connector' = '...')")

//table api方式
tenv.createTable("t_1", tableDescriptro);
tenv.createTemporaryTable("t_1", tableDescriptor);

tenv.createTemporaryView("v_1", dataStream, schema);
tenv.createTemporaryView("v_1", table);

4. FlinkSql 表定义概览

4.1 [Table API] Table创建概览

FlinkSql图14

table对象获取方式解析:

  • 从已注册的表
  • 从TableDescriptor(连接器/format/schema/options)
  • 从DataStream
  • 从Table对象上的查询api生成
  • 从测试数据

涉及的核心参数

  • 已注册的表名(catalog_name.database_name.object_name)
  • TableDescriptor(表描述器,核心是connector连接器)
  • DataStream(底层流)
  • 测试数据值

4.2 [Table API] Table创建示例

通过已注册的表名生成Table对象

1
Table t1 = tenv.from("t1");

通过DataStream生成Table对象

  • 自动推断schema(反射手段)
1
Table t1 = tenv.fromDataStream(dataStream1);
  • 手动定义schema
1
2
3
4
5
6
7
Table table = tenv.fromDataStream(dataStream2, Schema.newBuilder()
.column("f0", DataTypes.STRUCTURED(
DataBean.class,
DataTypes.FIELD("guid", DataTypes.INT()),
DataTypes.FIELD("uuid", DataTypes.STRING()),
DataTypes.FIELD("ts", DataTypes.BIGINT())
)));

通过tableEnv的fromValues方法生成Table对象(快速测试用)

1
2
3
4
5
6
7
8
9
10
Table table = tenv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("number", DataTypes.DOUBLE())
),
Row.of(1, "zs", 18.2),
Row.of(2, "ls", 28.2),
Row.of(3, "cc", 16.2)
);

通过TableEnv上调用查询api,生成新的Table对象(本质上就是view)

1
Table table = table3.select($("guid"), $("uuid"));

4.3 [Table Sql] Table创建概览

FlinkSql图15

注册sql表(视图)方式解析:

  • 从已存在的DataStream注册
  • 从已存在的Table对象注册
  • 从TableDescriptor(连接器)注册
  • 执行Sql的DDL语句来注册

4.4 [Table Sql] Table创建示例

将已存在的Table对象注册成sql视图

1
tenv.createTemporaryView("t1", table);

将DataStream注册成sql视图

1
2
3
4
5
6
7
8
9
10
11
12
DataBean bean1 = new DataBean(1, "s1", 1000);
DataStreamSource<DataBean> dataStream1 = env.fromElements(bean1);

//1.自动推断schema
tenv.createTemporaryView("t1", dataStream1);

//2.也可以收到指定schema
Schema schema = Schema.Builder.column...build();
tenv.createTemporaryView("t1", dataStream1, schema);

tenv.executeSql("desc t1");
tenv.executeSql("select * from t1");

通过connector注册sql表

1
2
3
4
5
6
7
8
9
tenv.createTable("t1", TableDescriptor.forConnector("filesystem")
.option("path", "file:///d:/a.txt")
.format("csv")
.schema(Schema.newBuilder()
.column("guid", DataTypes.STRING())
.column("name", DataTypes.STRING())
.column("age", DataTypes.STRING())
.build())
.build());

通过sql DDl语句定义sql表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
tenv.executeSql("create Table age_info" +
"(" +
"id int," +
"name string" +
"gender, string" +
"age int" +
"primary key(id) not enforced" +
") with (" +
"'connector' = 'mysql-cdc'," +
"'hostname' = '192.168.0.219'," +
"'port' = '3306'," +
"'username' = 'root'," +
"'password' = '123456'," +
"'database-name' = 'abc'," +
"'table-name' = 'age-info'" +
")");

6. FlinkSql catalog详解