使用Calcite解析Sql做维表关联(一)
在Flink1.9中提供了使用sql化方式完成维表关联,只需要实现LookupableTableSource接口即可,可以实现同步或者异步关联。在1.9之前就需要自己实现sql语法解析,然后在转换为API方式,对上层提供sql语法。看一个sql语句:
select * from orders o join gdsInfo g on o.gdsId=g.gdsId
orders表示流表,gdsInfo 表示维表。根据sql解析顺序先 from 部分、然后where 部分、最后select,那么对于join 方式,相当于join生成了一张临时表,然后去select 这张临时表,因此可以确认
sql解析流程:
1. 识别出流表与维表
3. select 临时表
现在使用calcite解析这条语句
public class ParseDemo {
public static void main(String[] args) { //假设gdsInfo就是维表 String sql = "select * from orders o join gdsInfo g on o.gdsId=g.gdsId";
SqlParser.Config config = SqlParser.configBuilder().setLex(Lex.MYSQL).build(); SqlParser sqlParser = SqlParser.create(sql, config); SqlSelect sqlSelect = null; try { sqlSelect = (SqlSelect) sqlParser.parseStmt(); } catch (Exception e) { e.printStackTrace(); }
SqlNode sqlFrom = sqlSelect.getFrom(); boolean isSideJoin = false; String leftTable = ""; String rightTable = ""; String newName = ""; //临时表 SqlJoin sqlJoin = null; //解析join if (sqlFrom.getKind() == SqlKind.JOIN) { sqlJoin = (SqlJoin) sqlFrom; SqlNode left = sqlJoin.getLeft(); SqlNode right = sqlJoin.getRight(); isSideJoin = true; leftTable = paserTableName(left); rightTable = paserTableName(right); } //生成新的select if (isSideJoin) { newName = leftTable + "_" + rightTable; SqlParserPos pos = new SqlParserPos(0, 0); SqlIdentifier sqlIdentifier = new SqlIdentifier(newName, pos); sqlSelect.setFrom(sqlIdentifier); } } //解析表 private static String paserTableName(SqlNode tbl) { if (tbl.getKind() == SqlKind.AS) { SqlBasicCall sqlBasicCall = (SqlBasicCall) tbl; return sqlBasicCall.operands[1].toString(); } return ((SqlIdentifier) tbl).toString(); } }
那么我们需要的就是生成新的select节点与SqlJoin节点,执行逻辑就是根据SqlJoin节点做维表关联之后生成新的表,然后去select这样新的表。
sql解析部分已经完成,既然使用sql化方式,因此也需要定义源表与维表,数据源一般是kafka, 定义源表需要:表名称、字段名称、字段类型、数据格式、topic;维表假设为mysql,需要定义:表名称、字段类型、字段名称、关联方式(同步/异步)、缓存方式(LRU/全部缓存、无缓存)。
源表定义:
CREATE TABLE orders( orderId varchar, gdsId varchar, orderTime varchar )WITH( type = 'kafka', kafka.bootstrap.servers = 'localhost:9092', kafka.topic = 'topic1', kafka.group.id = 'gId1', sourcedatatype ='json' );
维表定义:
CREATE TABLE gdsInfo( gdsId varchar, gdsName varchar, price double )WITH( type='mysql', url='jdbc:mysql://localhost:3306/paul', userName='root', password='123456', tableName='gdsInfo', cache = 'LRU', isSideTable='true' );
现在就是要如何解析这些语句,正则表达式是首选,需要解析出表名称、字段、属性三个部分:creat table xxx (xxx) with(xxx);正则表达式可为:
(?i)create\s+table\s+(\S+)\s*\((.+)\)\s*with\s*\((.+)\)
?i表示后面的匹配忽略大小写,\s+ 表示匹配多个空格,\S+表示匹配多个字符,.+ 表示匹配任意字符。
定义一个table类:
class TableInfo{ private String tableName; // 表名称 private Map<String,String> fieldsInfo; //字段名称->类型 private Properties props; //表属性 private boolean isSideTable; //是否为维表 }
public class ParseCreate {
public static final String REG_CREATE="(?i)create\\s+table\\s+(\\S+)\\s*\\((.+)\\)\\s*with\\s*\\((.+)\\)";
public static void main(String[] args) {
String createSql="CREATE TABLE orders(" + " orderId varchar," + " gdsId varchar," + " orderTime varchar" + " )WITH(" + " type = 'kafka'," + " kafka.bootstrap.servers = 'localhost:9092'," + " kafka.topic = 'topic1'," + " kafka.group.id = 'gId1'," + " sourcedatatype ='json'" + " );"; Pattern pattern=Pattern.compile(REG_CREATE);
TableInfo tableInfo=new TableInfo(); Matcher matcher=pattern.matcher(createSql); if(matcher.find()){ tableInfo.setTableName(matcher.group(1)); String fieldsStr=matcher.group(2); String propsStr=matcher.group(3); tableInfo.setFieldsInfo(parseFiles(fieldsStr)); tableInfo.setProps(parseProps(propsStr)); if(Boolean.valueOf(tableInfo.getProps().getProperty("isSideTable","false"))){ tableInfo.setSideTable(true); } }
}
public static Map<String,String> parseFiles(String fieldsStr){ Map<String,String> fieldsInfo=new HashMap(); String[] fieldsArray=fieldsStr.split(","); for(String field: fieldsArray){ String[] fieldInfo=field.trim().split(" "); fieldsInfo.put(fieldInfo[0],fieldInfo[1]); } return fieldsInfo; }
public static Properties parseProps(String propsStr){ Properties props=new Properties(); String[] propsArray=propsStr.split(","); for(String prop: propsArray){ String[] propInfo=prop.trim().split("="); props.setProperty(propInfo[0],propInfo[1]); } return props; }
}
至此完成了简易的create语句解析,下一篇将介绍如何将解析后的create与维表关联转换为可执行代码。