Flink 维表关联系列之自定义异步查询
2009 年 2 月 26 日
点击上方蓝
字关注~
维表关联系列目录:
四、Redis维表关联:实时查询
六、自定义异步查询
在异步IO查询外部存储时,对于提供异步查询的客户端来说可以直接使用,但是对于没有提供异步查询的客户端应该怎么做呢?我们可以将查询请求丢到一个线程池中,将这个线程池看做是一个异步的客户端来帮助我们完成查询请求。
通过线程池方式来帮助我们完成异步请求关键在于线程池的core大小如何设置,如果设置过大,会到导致创建很多个线程,势必会造成CPU的压力比较大,由于大多数情况下集群是没有做CPU隔离策略的,就会影响到其他任务;如果设置过小,在处理的速度上根不上就会导致任务阻塞。可以做一个粗略的估算:假如任务中单个Task需要做维表关联查询的数据每秒会产生1000条,也就是1000的TPS,我们希望能够在1s以内处理完这1000条数据,如果外部单次查询耗时是10ms, 那我们就需要10个并发同时执行,也就是我们需要的coreSize 是10。
以查询mysql为例:
class ExecSideFunction extends RichAsyncFunction[String, String] { var executors: Executor = _ var sqlTemplate: String = _ override def open(parameters: Configuration): Unit = { executors = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue[Runnable](1000)) sqlTemplate = "select value from tbl1 where id=?" } override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = { executors.execute(new Runnable { override def run(): Unit = { val con = ConnectionFactory.getConnection("sourceId").asInstanceOf[Connection] val sql = sqlTemplate.replace("?", parseKey(input)) MysqlUtil.executeSelect(con, sql, rs => { val res = new util.ArrayList[String]() while (rs.next()) { val v = rs.getString("value") res.add(fillData(input, v)) } resultFuture.complete(res) }) con.close() } }) } private def parseKey(input: String): String = { "" } private def fillData(input: String, v: String): String = { "" } }

end
关注回复 Flink 获取更多信息~