Flink 维表关联系列之自定义异步查询

点击上方蓝

字关注~

维表关联系列目录:

一、维表服务与Flink异步IO

二、Mysql维表关联:全量加载

三、Hbase维表关联:LRU策略

四、Redis维表关联:实时查询

五、kafka维表关联:广播方式

六、自定义异步查询

在异步IO查询外部存储时,对于提供异步查询的客户端来说可以直接使用,但是对于没有提供异步查询的客户端应该怎么做呢?我们可以将查询请求丢到一个线程池中,将这个线程池看做是一个异步的客户端来帮助我们完成查询请求。

通过线程池方式来帮助我们完成异步请求关键在于线程池的core大小如何设置,如果设置过大,会到导致创建很多个线程,势必会造成CPU的压力比较大,由于大多数情况下集群是没有做CPU隔离策略的,就会影响到其他任务;如果设置过小,在处理的速度上根不上就会导致任务阻塞。可以做一个粗略的估算:假如任务中单个Task需要做维表关联查询的数据每秒会产生1000条,也就是1000的TPS,我们希望能够在1s以内处理完这1000条数据,如果外部单次查询耗时是10ms, 那我们就需要10个并发同时执行,也就是我们需要的coreSize 是10。

以查询mysql为例:

 
class ExecSideFunction extends RichAsyncFunction[String, String] {
 
var executors: Executor = _
var sqlTemplate: String = _
 
override def open(parameters: Configuration): Unit = {
executors = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue[Runnable](1000))
sqlTemplate = "select value from tbl1 where id=?"
}
 
override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
 
executors.execute(new Runnable {
override def run(): Unit = {
val con = ConnectionFactory.getConnection("sourceId").asInstanceOf[Connection]
val sql = sqlTemplate.replace("?", parseKey(input))
MysqlUtil.executeSelect(con, sql, rs => {
val res = new util.ArrayList[String]()
while (rs.next()) {
val v = rs.getString("value")
res.add(fillData(input, v))
}
resultFuture.complete(res)
})
con.close()
}
})
}
 
private def parseKey(input: String): String = {
""
}
 
private def fillData(input: String, v: String): String = {
""
}
}

end

关注回复 Flink 获取更多信息~