Fink:使用Table api和SQL
这几天在学习和使用flink中的table api和SQL部分,准备使用之前的数据做一个查询的小实验,简而言之,就是在接收到一条车辆流水数据之后,判断他是否是一条出站数据,如果是的话,就查询到之前与它对应的如站数据。然后,也可以使用table做一些其他的事情。
通过定义一个datastream或者dataset,然后使用tableenv.fromdataset方法将一个datastream或者dataset注册成一个table,然后在这个table上执行sql的查询操作。
批处理和流式处理的所有Table API和SQL程序遵循相同的模式。 以下代码示例显示了Table API和SQL程序的常见结构。
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // create a TableEnvironment // for batch programs use BatchTableEnvironment instead of StreamTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // register a Table tableEnv.registerTable("table1", ...) // or tableEnv.registerTableSource("table2", ...); // or tableEnv.registerExternalCatalog("extCat", ...); // create a Table from a Table API query Table tapiResult = tableEnv.scan("table1").select(...); // create a Table from a SQL query Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... "); // emit a Table API result Table to a TableSink, same for SQL result tapiResult.writeToSink(...); // execute env.execute();
通过定义一个datastream或者dataset,然后使用tableenv.fromdataset方法将一个datastream或者dataset注册成一个table,然后在这个table上执行sql的查询操作。
Flink与Hive
本来设计的是在fllink任务中使用API直接连接一个线程的hive进行一些SQL命令的查询等操作,但后来发现flink不支持这种操作。如果想在flink中读取一个 hive table中的数据,可以使用readcsvfile方法,即在已经知道该table在hdfs上的位置的情况下直接将该文件读进来,作为一个table,如下面的代码所示,可以参见stackoverflow中的这个flink interact with hive
DataSet<Record> csvInput = env .readCsvFile("hdfs://app/hive/warehouse/mydb/mytable/data.csv") .pojoType(MyClass.class, "col1", "col2", "col3"); Table mytable = tableEnv.fromDataSet(csvInput); tableEnv.registerTable("mytable", mytable );
实验设计
将输入的datastream注册成一个table,将不断接收的数据放到表中,这个实验是将不断获取的入站数据放到table中,然后对后面每一条接收到的出站数据都在这个表格中进行查找匹配,进行相关处理后输出。
评论
发表评论