使用Calcite解析Sql做维表关联(二)

public class MySqlAsyncFunction extends RichAsyncFunction<Row,Row> {


private Connection connection; private String sqlTemplate;
private String url; private String username; private String password; private String tableName;
private int idx; //条件值在流入数据的位置 private int inLength; //流入字段数 private int outLength; //输出字段数 private int sideLength; //维表查询字段数
public MySqlAsyncFunction(SqlJoin sqlJoin,TableInfo sideTableInfo,TableInfo leftTableInfo){
Properties props=sideTableInfo.getProps(); this.url=props.getProperty("url"); this.username=props.getProperty("username"); this.password=props.getProperty("password"); this.tableName=props.getProperty("tableName");
String rightField=parseCondition(sqlJoin,false); genSqlTemplate(sideTableInfo,rightField);

String leftField=parseCondition(sqlJoin,true); for (int i = leftTableInfo.getFieldNames().length - 1; i >= 0; i--) { if(leftField.equals(leftTableInfo.getFieldNames()[i])){ this.idx=i; break; } } inLength=leftTableInfo.getFieldNames().length; sideLength=sideTableInfo.getFieldNames().length; outLength=inLength+sideLength; }
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.connection= DriverManager.getConnection(url,username,password); } //这里还是一个同步查询,没有使用异步方式,需要使用一部mysql客户端 @Override public void asyncInvoke(Row input, ResultFuture resultFuture) throws Exception {
String v=(String)input.getField(idx); //获取条件值 PreparedStatement preparedStatement=connection.prepareStatement(sqlTemplate); preparedStatement.setString(1,v); ResultSet rs=preparedStatement.executeQuery(); boolean isJoin=false; while (rs.next()){ isJoin=true; Row row=new Row(outLength); for(int i=0;i<input.getArity();i++){ row.setField(i,input.getField(i)); } //直接将维表数据补齐在流数据后面 for(int i=0;i<sideLength;i++){ row.setField(inLength+i,rs.getObject(i+1)); } resultFuture.complete(Collections.singletonList(row)); } if(!isJoin) resultFuture.complete(null); } //解析on 条件的左右表字段名称, 这里只解析了一个关联条件 private String parseCondition(SqlJoin sqlJoin,boolean isLeft){ SqlNode condition=sqlJoin.getCondition(); SqlBasicCall sqlBasicCall=(SqlBasicCall)condition; String name=SqlExec.paserAliasTableName(isLeft?sqlJoin.getLeft():sqlJoin.getRight()); SqlIdentifier sqlIdentifier1=(SqlIdentifier)sqlBasicCall.operands[0]; if(name.equals(sqlIdentifier1.names.get(0))){ return sqlIdentifier1.names.get(1); }
SqlIdentifier sqlIdentifier2=(SqlIdentifier)sqlBasicCall.operands[1]; if(name.equals(sqlIdentifier2.names.get(0))){ return sqlIdentifier2.names.get(1); } return null; }
//查询sql private void genSqlTemplate(TableInfo tableInfo,String condition){ StringBuilder sql=new StringBuilder(); StringBuilder selects=new StringBuilder(); sql.append("select "); for(String field : tableInfo.getFieldNames()){ selects.append(field); selects.append(","); } sql.append(selects.substring(0,selects.lastIndexOf(","))); sql.append(" from ").append(this.tableName); if(condition!=null) sql.append(" where ").append(condition).append("=?"); this.sqlTemplate=sql.toString(); }

}