Fink:使用Table api和SQL

这几天在学习和使用flink中的table api和SQL部分,准备使用之前的数据做一个查询的小实验,简而言之,就是在接收到一条车辆流水数据之后,判断他是否是一条出站数据,如果是的话,就查询到之前与它对应的如站数据。然后,也可以使用table做一些其他的事情。

Flink中的Table

批处理和流式处理的所有Table API和SQL程序遵循相同的模式。 以下代码示例显示了Table API和SQL程序的常见结构。
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// create a TableEnvironment
// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register a Table
tableEnv.registerTable("table1", ...)            // or
tableEnv.registerTableSource("table2", ...);     // or
tableEnv.registerExternalCatalog("extCat", ...);

// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...);

// execute
env.execute();

通过定义一个datastream或者dataset,然后使用tableenv.fromdataset方法将一个datastream或者dataset注册成一个table,然后在这个table上执行sql的查询操作。

Flink与Hive

本来设计的是在fllink任务中使用API直接连接一个线程的hive进行一些SQL命令的查询等操作,但后来发现flink不支持这种操作。如果想在flink中读取一个 hive table中的数据,可以使用readcsvfile方法,即在已经知道该table在hdfs上的位置的情况下直接将该文件读进来,作为一个table,如下面的代码所示,可以参见stackoverflow中的这个flink interact with hive
DataSet<Record> csvInput = env
            .readCsvFile("hdfs://app/hive/warehouse/mydb/mytable/data.csv")
            .pojoType(MyClass.class, "col1", "col2", "col3");

Table mytable = tableEnv.fromDataSet(csvInput);
tableEnv.registerTable("mytable", mytable );

实验设计

将输入的datastream注册成一个table,将不断接收的数据放到表中,这个实验是将不断获取的入站数据放到table中,然后对后面每一条接收到的出站数据都在这个表格中进行查找匹配,进行相关处理后输出。

评论