【Impala】架构原理

这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战

一、组件

Impala 是一个分布式, 大规模并行处理(MPP)数据库引擎, 它包括多个进程。

ImpalaHive 类似不是数据库而是 数据分析工具

# 在 linux123 中执行

$ ps -ef | grep impala
复制代码

如图:
2020-08-2610:15_1.png

  1. impalad
  • 角色名称为 Impala Daemon , 是在每个节点上运行的进程, 是 Impala 的核心组件, 进程名是 Impalad;
  • 作用: 负责读写数据文件, 接收来自 Impala-shell , JDBC , ODBC 等的查询请求, 与集群其它 Impalad 分布式并行完成查询任务, 并将查询结果返回给中心协调者。
  • 为了保证 Impalad 进程了解其它 Impalad 的健康状况, Impalad 进程会一直与 statestore 保持通信。
  • Impalad 服务由三个模块组成: Query PlannerQuery CoordinatorQuery Executor , 前两个模块组成前端, 负责接收 SQL 查询请求, 解析 SQL 并转换成执行计划, 交由后端执行。
  1. statestored
  • statestore 监控集群中 Impalad 的健康状况, 并将集群健康信息同步给 Impalad
  • statestore 进程名为 statestored
  1. catalogd
  • Impala 执行的 SQL 语句引发元数据发生变化时, catalog 服务负责把这些元数据的变化同步给其它 Impalad 进程(日志验证, 监控 statestore 进程日志)
  • catalog 服务对应进程名称是 catalogd
  • 由于一个集群需要一个 catalogd 以及一个 statestored 进程, 而且 catalogd 进程所有请求都是经过 statestored 进程发送, 所以官方建议让 statestored 进程与 catalogd 进程安排同个节点。

二、查询

查询流程如下图:
2020-08-2610:21.png

  1. Client 提交任务

Client 发送一个 SQL 查询请求到任意一个 Impalad 节点, 会返回一个 queryId 用于之后的客户端操作。

  1. 生成单机和分布式执行计划

SQL 提交到 Impalad 节点之后, Analyser 依次执行 SQL 的词法分析、语法分析、语义分析等操作;
MySQL 元数据库中获取元数据, 从 HDFS 的名称节点中获取数据地址, 以得到存储这个查询相关数据的所有数据节点

  • 单机执行计划: 根据上一步对 SQL 语句的分析, 由 Planner 先生成单机的执行计划, 该执行计划是有 PlanNode 组成的一棵树, 这个过程中也会执行一些 SQL 优化, 例如 Join 顺序改变、谓词下推等。
  • 分布式并行物理计划: 将单机执行计划转换成分布式并行物理执行计划, 物理执行计划由一个的 Fragment 组成, Fragment 之间有数据依赖关系, 处理过程中需要在原有的执行计划之上加入一些 ExchangeNodeDataStreamSink 信息等。
  • Fragment : sql 生成的分布式执行计划的一个子任务;
  • DataStreamSink : 传输当前的 Fragment 输出数据到不同的节点
  1. 任务调度和分发

CoordinatorFragment (子任务) 根据数据分区信息发配到不同的 Impalad 节点上执行。Impalad 节点接收到执行 Fragment 请求交由 Executor 执行。

  1. Fragment 之间的数据依赖

每一个 Fragment 的执行输出通过 DataStreamSink 发送到下一个 Fragment , Fragment 运行过程中不不断向 coordinator 节点汇报当前运行状态。

  1. 结果汇总

查询的 SQL 通常情况下需要有一个单独的 Fragment 用于结果的汇总, 它只在 Coordinator 节点运行, 将多个节点的最终执行结果汇总, 转换成 ResultSet 信息。

  1. 获取结果

客户端调用获取 ResultSet 的接口, 读取查询结果。

(1)查询计划示例

select t1.n1, t2.n2, count(1) as c
from t1 join t2 on t1.id = t2.id
join t3 on t1.id = t3.id
where t3.n3 between ‘a’ and ‘f’
group by t1.n1, t2.n2
order by c desc
limit 100;
复制代码

(2)单机执行计划

QueryPlanner 生成单机的执行计划。

如图:
2020-08-2610:30.png

分析上面的单机执行计划

  1. 先去扫描 t1 表中需要的数据, 如果数据文件存储是列式存储,可以便利的扫描到所需的列 id

  2. n1 需要与 t2 表进行 Join 操作, 扫描 t2 表与 t1 表类似获取到所需数据列 id , n2

  3. t1t2 表进行关联, 关联之后再与 t3 表进行关联, 这里 Impala 会使用谓词下推扫描 t3 表只取 join 所需数据

  4. group by 进行相应的 aggregation 操作, 最终是排序取出指定数量的数据返回。

(3)分布式并行计划

所谓的分布式并行化执行计划: 就是在单机执行计划基础之上结合数据分布式存储的特点, 按照任务的计算要求把单机执行计划拆分为多段子任务, 每个子任务都是可以并行执行的。

上面的单机执行计划 转为 分布式并行执行计划。

分布式并行执行计划,如图:
2020-08-2611:52.png

流程图,如下:
2020-08-2610:35.png

分布式执行计划中涉及到多表的 Join , Impala 会根据表的大小来决定 Join 的方式。

主要有两种: Hash JoinBroadcast Join

上面分布式执行计划中可以看出 T1,T2 表大一些, 而 T3 表小一些, 所以对于 T1T2Join Impala 选择使用 Hash Join
对于 T3 表选择使用 Broadcast 方式, 直接把 T3 表广播到需要 Join 的节点上。

分布式并行计划流程:

  1. T1T2 使用 Hash join , 此时需要按照 id 的值分别将 T1T2 分散到不同的 Impalad 进程, 但是相同的 id 会散列到相同的 Impalad 进程, 这样每一个 Join 之后是全部数据的一部分

  2. T1T2 Join 之后的结果数据再与 T3 表进行 Join , 此时 T3 表采用 Broadcast 方式把自己全部数据(id 列) 广播到需要的 Impala 节点上

  3. T1 , T2 , T3 Join 之后再根据 Group by 执行本地的预聚合, 每一个节点的预聚合结果只是最终结果的一部分(不同的节点可能存在相同的 group by 的值), 需要再进行一次全局的聚合。

  4. 全局的聚合同样需要并行, 则根据聚合列列进行 Hash 分散到不同的节点执行 Merge 运算(其实仍然是一次聚合运算), 一般情况下为了较少数据的网络传输, Impala 会选择之前本地聚合节点做全局聚合工作。

  5. 通过全局聚合之后, 相同的 key 只存在于一个节点, 然后对于每一个节点进行排序和 TopN 计算, 最终将每一个全局聚合节点的结果返回给 Coordinator 进行合并、排序、limit 计算, 返回结果给用户。