FlinkTableAPI&SQL概述Table

Table API 和SQL是Flink提供的相对DataStream更加高级的API,他是对DataStream的封装,它有一下的有点:

  • 支持SQL
  • Table API有更加简单的编程
  • 由于Flink对Table API做了一些优化,所以在使用Table API的时候会更加的高效
  • Table API意在支持统一的批流一体,可以同时兼容流处理以及批处理。

Table API中的table概念

在Table API和SQL中,Flink将数据看成是一张表,流数据就是一张无限追加的表,而批数据就是一张有限的数据表。既然是一张表就有特定的structure,可以是自己指定也可以根据数据源推断。

Table的层级结构

Flink中表的层级结构是:catalog -> database -> table.

catalog可以基于内存的,也是可以基于外部存储系统,比如hive,postgre。

  • 如果是基于内存,那么catalog中的数据库和表就都是临时存在的,一旦会话结束,这些表元数据就被销毁。
  • 如果是基于外部存储系统,那么这些表就是持久化表,可以被多个flink集群共用,可以在多次会话中使用。

当然在Flink中也支持视图。视图可以从现有的Table对象创建,通常用作Table API/SQL查询的结果。而表通常用来描述外部数据,例如文件、数据库表或消息队列。

本地IDE创建Table API的项目

  1. 创建一个maven项目,并添加依赖:
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.11</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.11</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
复制代码
  1. 构建Table API ENV:
//方式1,使用setting的方式创建
EnvironmentSettings settings = EnvironmentSettings
        .newInstance()
        .inStreamingMode()
        //.inBatchMode()
        .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

//方式2, 使用StreamExecutionEnvironment来构建table env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

//  table api and sql to process data , including source, transfrom, sink

//execute job
tEnv.execute();
复制代码

Table API 与 DataStream 转换

DataStream to Table

fromDataStream(DataStream, Expression...)
fromDataStream(DataStream, String)
fromDataStream(DataStream, Schema)

//fromDataStream(DataStream), 不指定field name,field name就是f0,f1...
tableEnv.fromDataStream(stream) 
//fromDataStream(DataStream, String) 
tableEnv.fromDataStream(stream, "field-name1, field-name2.sub-name1, field-name3");
//fromDataStream(DataStream, Expression...)  
tableEnv.fromDataStream(stream, $("field-name1"), ${"field-name2.sub-name1"), $("field-name3"));
//fromDataStream(DataStream, Schema)  
tableEnv.fromDataStream(
    dataStream,
    Schema.newBuilder()
        .columnByExpression("proc_time", "PROCTIME()")
        //.columnByMetadata("time1","TIMESTAMP_LTZ(3)")
        //.column("field1","STRING”)
        //.column("field2","TIMESTAMP_LTZ(3)”)
        //.column("f0",DataTypes.of(User.class)”)
        //.column("f0",DataTypes.STRUCTURE(User.class,DataTypes.FIELD("name",DataTypes.STRING()))”)
        .watermark("timeField", "timeField - INTERVAL '10' MINUTE")
        //.watermark("timeField", "SOURCE_WATERMARK()")
        .build());

//fromChangelogStream(stream)
//fromChangelogStream(stream,Schema)

//创建view, 与创建table一样
tableEnv.createTemporaryView("table-name", stream)
tableEnv.createTemporaryView("table-name", stream, String);
tableEnv.createTemporaryView("table-name", stream, Expression...);
tableEnv.createTemporaryView("table-name", stream, Schema);

复制代码

Table to DataStream

//toDataStream(table)
tableEnv.toDataStream(table);
//toDataStream(table, Class)
tableEnv.toDataStream(table, User.class);
//toDataStream(table, AbstractDataType)
tableEnv.toDataStream(table, DataTypes.STRUCTURED(User.class,DataTypes.FIELD("name",DataTypes.STRING())));
//Append table, 只能insert
tEnv.toAppendStream(table, Types.ROW(Types.INT, Types.SQL_TIMESTAMP, Types.STRING, Types.SQL_TIMESTAMP));
tEnv.toAppendStream(table, Row.class);
tEnv.toAppendStream(table, new TupleTypeInfo(Types.STRING, Types.INT);
//toChangelogStream(table),支持表更新
tEnv.toChangelogStream(table)  
//toChangelogStream(table,Schema),支持表更新
tEnv.toChangelogStream(table,Schema) 

//Retract table:可以update,会在结果stream中增加一个boolean字段,在更新的时候先删除这条记录(将boolean标记为删除状态),然后再inset一条新的数据进去,新的数据Boolean状态为未删除状态
tEnv.toRetractStream(table, Types.ROW(Types.INT, Types.SQL_TIMESTAMP, Types.STRING, Types.SQL_TIMESTAMP));
tEnv.toRetractStream(table, Row.class);
tEnv.toRetractStream(table, new TupleTypeInfo(Types.STRING, Types.INT);
复制代码