Presto 运行时浅析
Presto Modules
一图胜千言,先看看官方提供的架构
如上图所示,Presto主要由三个模块构成
1)Coordinator
该模块作为Presto的Master(或者称为Server),其作用有三个,一是接收并处理用户提交的查询请求,并将查询结果返回给用户;二是负责具体查询请求的解析,执行计划的生成以及后续DAG执行的管理;三是负责整个集群的资源管理和调度(这里指的集群不是资源管理系统的集群概念)。
2)Client
该模块可以理解为Presto提供给用户的SDK,通过该SDK,用户可以方便的提交Query并获取结果。
3)Worker
Worker为物理执行计划落地和执行的场所,负责所有Query相关数据的计算工作。
图2
上图展示了Presto三个模块的动态交互过程,Client向Coordinator发起查询,Query创建后在Coordinator中经过Parser,Planner和Scheduler几个模块的依次处理后,下发给Worker执行,上图逻辑上是有一点错误,Worker执行完成的数据不是直接返回给Client,而是先经过Coordinator后,再经过Coordinator间接返回给Client,而Client与Worker之间没有交互。
Resource Management & Scheduler
Hadoop采用的是进程模型,在本地所能利用的是一整台物理机或者一个容器的全部计算资源,因此在大规模生产场景下需要外围的资源管理系统协同工作来启动worker,而Presto采用的是线程模型,worker为long running进程,并发的查询会以线程的方式同时运行在一个presto worker中,因此presto天生就需要解决多租户的问题。
图3
如上图所示,Presto对于worker资源的管理采用双向通信机制,首先由worker向coordinator注册服务,coordinator收集到可服务的worker节点后,再反向探测worker的状态,最终按照worker的状态分类存储,只有处于active状态的worker才可以被正常调度。
Coordinator通过定期收集worker节点的内存状况,从而汇总得到整个集群中当前Total以及Free的内存大小;另一方面Coordinator还通过不断检查集群中所有正在运行Query的内存使用情况,判断如果出现内存不足,会从所有Running的Query中挑选当前内存占用最多的query,并将其强制终止执行(有开关控制)。
在Worker层面,Presto又将本地内存分成三部分,分别为:1) General Memory 2) System Memory 3) Reserved Memory。
Presto对于资源管控另一个维度是CPU,这也是在Presto中为了在用户态解决多租户问题而非常有特色的一点。先看几行代码:
public ListenableFuture> processFor(Duration duration)
{
checkLockNotHeld("Can not process for a duration while holding the driver lock");
requireNonNull(duration, "duration is null");
long maxRuntime = duration.roundTo(TimeUnit.MILLISECONDS);
Optional
>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> { driverContext.startProcessTimer();
try {
long start = System.currentTimeMillis();
do {
ListenableFuture> future = processInternal();
if (!future.isDone()) {
return future;
}
}
while (System.currentTimeMillis() - start < maxRuntime && !isFinishedInternal());
}
finally {
driverContext.recordProcessed();
}
return NOT_BLOCKED;
});
return result.orElse(NOT_BLOCKED);
}
worker中由于采用线程模型,因此Query对应算子的执行会从线程池中分配一个线程负责,而该线程对于Operator的执行时长为duration表示,超过duration所表示的时间,该线程就有机会切换执行其他query的算子,从而从粗粒度实现用户态CPU资源的分时调度,进而实现cpu层面的多租户。
DAG Pipeline
和其他数据处理框架类似,SQL通过Client提交给Coordinator后便产生了对应的Query,经过Analyzer,Planner等模块处理,Query被拆分成Stage,Stage再被拆分成Task,即在Presto中查询按照Query -> Stage -> Task三级进行管理,Query/Stage/Task分别由各自的StateMachine驱动,并通过向下监听的方式感知下级状态变更,同时驱动本级状态的改变,最终驱动Query的状态从QUEUED -> FINISHED。
而Presto之所以相对更快的原因之一是并行数据加载以及流水线计算,如下图所示:
图4
Worker中Task的数据来源有两类1) Partitioned Source 2) Remote Source
Partitioned Source通过SPI中提供的Connector,Cursor等API与各种数据源进行对接,完成数据的流式读取,如图中Source Task,如果对接的数据源具有数据分区的机制,则可以通过Source Task的横向扩展来提高数据的并行加载能力;而Remote Source顾名思义是通过网络shuffle的方式获取数据,对应上图中的Intermediate Task,最终计算结果统一汇总到Output Task中,通过Coordinator返回给Client。Presto为Pipeline上各Stage的调度提供了可插拔的Policy API,目前提供两种策略实现:PhasedExecutionPolicy和AllAtOnceExecutionPolicy,而每一种策略中又可以对Stage启动的先后顺序进行定制。
在启动Source Task的时候会根据数据源的Metadata计算出需要扫描的Split并按照Partition的Locality(如果有)分配给对应Worker的Task;而启动Intermediate Task的时候会按照对应shuffle规则的partition映射关系将remote source的uri信息赋给对应的Intermediate Task,有了上下游数据的相关信息,无论Task的启动先后顺序如何,都可以迅速搭建起一条Pipeline并开始计算,只要Task中的算子从上游Task中可以获取到数据,便开始执行计算,计算产出的数据直接放入该Task对应的OutputBuffer中等待下游Task来访问,也就是说在执行非聚合算子的情况下,Client是可以源源不断的流式获取到计算结果,而不用等到所有数据都计算完成。
Page Based Data Processing
Worker中算子对于数据的计算是以Page为单位,可以从Operator的API中看出Page的重要性:
public interface Operator { void addInput(Page page); Page getOutput(); }
Page可以理解为一张Table的一个数据子集,Page是由多个Block组成,每个Block为单一类型,例如Int Block, Long Block, Varchar Block等,其代表了Table中的某一列中的部分数据,因此Presto也被称为列式存储,Page的大致结构如下图所示:
图5
接着来看下Page的使用方式,Page是通过PageBuilder生成,以RecordPageSource为例:
public RecordPageSource(List types, RecordCursor cursor) { this.cursor = requireNonNull(cursor, "cursor is null"); this.types = unmodifiableList(new ArrayList<>(requireNonNull(types, "types is null"))); this.pageBuilder = new PageBuilder(this.types); }
PageBuilder在初始化RecordPageSource的时候被创建
@Override
public Page getNextPage()
{
if (!closed) {
int i;
for (i = 0; i < ROWS_PER_REQUEST; i++) {
if (pageBuilder.isFull()) {
break;
}
if (!cursor.advanceNextPosition()) {
closed = true;
break;
}
pageBuilder.declarePosition();
for (int column = 0; column < types.size(); column++) {
BlockBuilder output = pageBuilder.getBlockBuilder(column);
if (cursor.isNull(column)) {
output.appendNull();
}
else {
Type type = types.get(column);
Class> javaType = type.getJavaType();
if (javaType == boolean.class) {
type.writeBoolean(output, cursor.getBoolean(column));
}
else if (javaType == long.class) {
type.writeLong(output, cursor.getLong(column));
}
else if (javaType == double.class) {
type.writeDouble(output, cursor.getDouble(column));
}
else if (javaType == Slice.class) {
Slice slice = cursor.getSlice(column);
type.writeSlice(output, slice, 0, slice.length());
}
else {
type.writeObject(output, cursor.getObject(column));
}
}
}
}
}
// only return a page if the buffer is full or we are finishing
if (pageBuilder.isEmpty() || (!closed && !pageBuilder.isFull())) {
return null;
}
Page page = pageBuilder.build();
pageBuilder.reset();
return page;
}
通过调用RecordPageSource的方法getNextPage获取新的Page,每次在Page中填充的行数为ROWS_PER_REQUEST(1024),而Page是由Block构成,Block的构建是通过BlockBuilder完成,每种数据类型都有自己的BlockBuidler,例如LongArrayBlockBuidler, IntArrayBlockBuidler等,PageBuilder在构建的时候根据传入的types参数确认需要创建的BlockBuidler类型。
BlockBuilder在初始化的时候会通过PageBuilder计算自己的初始化空间大小,具体公式为
min(DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, maxBlockSizeInBytes)
其中参数maxBlockSizeInBytes为
maxBlockSizeInBytes = (int) (1.0 * DEFAULT_MAX_PAGE_SIZE_IN_BYTES / types.size());
有了大小,在开辟具体空间的时候还是需要精确到大小,而根据类型的不同,估算方式也有差别,对于定长类型(eg. Long/Int)而言,计算公式为
expectedEntries = maxBlockSizeInBytes / fixedSize
对于varchar这种变长类型,由于不知道每次写入的大小,暂且预估为32bytes,后续写入的时候,再根据需要进行扩容。
Data Shuffle
Presto在Task之间的数据传输采用Pull的模式,即上游Task计算输出的数据会存储在Task的临时空间中,等待下游Task来消费,这里涉及到一些关键的设计,例如计算/存储分离,内存管理,反压,数据分区等。
图6
Task的整个生命周期包含两项主要的工作,一是计算数据,二是存储数据。无论是Source Task还是Intermediate Task,算子都被封装在Driver中,Driver中的第一个算子是SourceOperator,最后一个算子是TaskOutputOperator,在SourceOperator获取输入数据并经过后续其余算子计算完成后,经过TaskOutputOperator处理后,Page会被存入OutputBuffer中,同时下游Task的SourceOperator会通过ExchangeClient将数据拖走,这里涉及到比较复杂的标记和数据删除的机制,有机会下次专题再说。
总结
Presto无疑是一款优秀的MPP框架,架构优良,工程清晰,简单易用,社区活跃,为国内外一众互联网厂商作为OLAP分析的利器,然而对于高并发和毫秒级的场景,还有很大的差异,也成为后续优化的重点,尤其在高性能Pipeline,内存管理,超大规模数据,多租户等方面在大规模实战场景下都存在很大的提升空间。