开了掘金的专栏: https://juejin.im/post/5c2dd18af265da61285a3e08
因为掘金的 Markdown 编辑器还是很好用的。后面团队里面别的小伙伴和老司机们的文章也会在上面陆续发布。欢迎关注。知乎的专栏得公司申请认证,还在走流程。
《 Scala 实用指南》快要第二次印刷了,勘误还是太少,希望 V 友们如果买了,多多反馈。另外,我在知乎的 Scala 专栏已经写了很久了,作为图书的补充,欢迎大家阅读。Enzyme SQL 就是我翻译完《 Scala 实用指南》用 Scala 编写的,在我的序(序和前面的章节在异步社区和微信阅读都是免费的)里面,大家可以看到相关信息。
Enzyme 是挖财数据团队自研的 SQL 执行引擎,适用于小规模或者中型数据集的快速计算。基于 Spark Catalyst 实现,Enzyme SQL 在查询层面 和 Spark SQL 完全兼容。至于 Dataframe,在 Enzyme 中有对应的 Protein。在 API 的层次上,Protein 和 Spark Dataframe 几乎完全一致。
应用
Enzyme SQL 目前应用于信贷风控体系中的变量中心。变量,也就是指标或者特征,是描述一个用户的一个值。最初,变量的加工逻辑由负责风控的数据分析师提供,需要通过数据团队的工程师用 Java 代码实现。这种方式比较原始,研发的链路和周期也相对冗长。故而,我们使用 SQL 作为一种加工变量的 DSL,提供在离线和实时两个平台上的一致语义。
为什么要使用 SQL 呢?首先,自研 DSL 需要做很多设计,包括易用性、实现层面的性能等等;其次,自研的 DSL 最终被接受被高效使用,不可避免会有一个相对较长的磨合周期;最后,SQL 作为数据分析师的看家本领,没有使用的障碍和语义上的歧义,其实现也已经有大量现有的代码可供参考。
Enzyme SQL 引擎极致的性能表现和非常低的 CPU 占用与内存消耗,有效地支撑了变量中心庞大的计算量(一个用户就会触发数以千计的变量计算)。
实践
Enzyme 设计之初就是以兼容 Spark SQL 为目标的,故而在使用上,和 Spark SQL 的 API 大体是一致的。EnzymeSession 即 SparkSession,Protein 即 Dataframe。
我们从构建一个 Protein 数据集开始:
// a session for computing
val conf = new EnzymeConf
val session = new EnzymeSession(conf)
// construct a protein from rows and schemas
val schema = StructType(Seq(
StructField("x", LongType),
StructField("y", StringType),
StructField("z", DoubleType),
StructField("in", IntegerType)
))
val rows = Seq(Row(1L, "234L", 1.1, 12),
Row(2L, "23L", 23.4, 4245),
Row(2L, "65L", 5244.2, 234),
Row(null, "7L", 245.234, 5245),
Row(4L, "7L", 245.234, 5245))
val df = new Protein(session, rows, schema)
这样的一个数据集可以直接展示:
> df.show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
|null| 7L|245.234|5245|
| 4| 7L|245.234|5245|
+----+----+-------+----+
如果要使用 SQL,首先我们要把这个数据集和一个表名关联起来:
> session.register(tableName = "a", df)
> session.sql("select * from a").show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
|null| 7L|245.234|5245|
| 4| 7L|245.234|5245|
+----+----+-------+----+
上面的代码中session.sql()的结果还是一个 Protein。除了使用 SQL,我们还可以使用 Protein 里面丰富的 API:
> session.sql("select * from a order by x asc").show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
|null| 7L|245.234|5245|
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
| 4| 7L|245.234|5245|
+----+----+-------+----+
> df.sort("x").show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
|null| 7L|245.234|5245|
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
| 4| 7L|245.234|5245|
+----+----+-------+----+
更多用法的细节可以查看 Spark SQL 的文档,也可以查看 Enzyme 的文档。
实现
Enzyme 基于 Spark Catalyst 实现,而 Catalyst 对标的开源项目是 Apache Calcite。Apache Phoenix 和 Apache Hive 等众多项目都在使用 Calcite。因为我们的目标是兼容 Spark SQL,自然而然选择了 Catalyst,作为 SQL 的解析器、逻辑计划的执行器和优化器。
Spark Catalyst 概览
- SQL Text
- (parse): Unresolved Logical Plan
- (analyze): Resolved Logical Plan
- (optimize): Optimized Logical Plan(s) ----- RBO
- (planning): Physical Planning ------ CBO
- (optimize): Optimized Logical Plan(s) ----- RBO
- (analyze): Resolved Logical Plan
- (parse): Unresolved Logical Plan
上面的层次结构简明地概括了一个 SQL 从最原始的 SQL 文本,到最后执行的各个阶段。其中加粗的部分是 Enzyme 中所实现的,未加粗的部分是 Catalyst 所提供的功能。
解析,就是用 Antlr4 将 SQL 文本变成一棵 AST 树,这个 AST 树经过转换,变成了最原始的逻辑计划。在这样的逻辑计划中,我们是不知道*所表示的字段究竟是哪些。
分析,就是结合 Catalog 中的元数据信息,将原始的逻辑计划中各个未确定的部分(比如*)和元数据匹配确定下来。如果发现类型无法满足或者所引用的字段根本不存在,就直接抛出 AnalysisException。
优化,即通过逻辑计划的等价变换,转换得到最优的逻辑计划。Catalyst 中内置了一系列既有的优化规则,比如谓词下推和列剪裁。我们也可以通过 Catalyst 提供的接口,将自己研发的优化规则加入其中。这里的优化就是 RBO,基于规则的优化。
最后是物理计划的生成,一个优化过后的逻辑计划其实可以生成多种等效的物理计划,数据最终决定了其中一个物理计划是最优的。在没有时光机的当下,我们无法将所有物理计划都运行一遍,再选择最优的那个。所以通常的做法是,收集一些关于底层表的统计信息,依据这些信息,预判出执行效率最高的物理计划。这就是所谓的 CBO,基于代价的优化。
一个 SQL 的一生
SELECT *
FROM employee
INNER JOIN department
ON employee.DepartmentID = department.DepartmentID
我们用上面这个 SQL 来详细了解一下上述各个阶段。
分析阶段
Project [*]
+- 'Join Inner, ('employee.DepartmentID = 'department.DepartmentID)
:- 'UnresolvedRelation `employee`
+- 'UnresolvedRelation `department`
Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- SubqueryAlias employee
: +- LocalRelation [LastName#6, DepartmentID#7L]
+- SubqueryAlias department
+- LocalRelation [DepartmentID#0L, DepartmentName#1]
我们看到*已经被展开成了四个明确的字段,而且每个字段都有明确的 ID 标志,从而可以明确判定这个字段来自于哪一个表。当我们需要对 Spark SQL 做精确到字段级别的权限控制的时候,我们所需要的其实就是经过分析的逻辑计划。
优化
Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- SubqueryAlias employee
: +- LocalRelation [LastName#6, DepartmentID#7L]
+- SubqueryAlias department
+- LocalRelation [DepartmentID#0L, DepartmentName#1]
Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- Filter isnotnull(DepartmentID#7L)
: +- LocalRelation [LastName#6, DepartmentID#7L]
+- Filter isnotnull(DepartmentID#0L)
+- LocalRelation [DepartmentID#0L, DepartmentName#1]
因为这是一个 inner join,所以这里的一个优化点其实是在做 join 之前,把 join key 为 null 的行过滤掉。
物理计划的生成
我们模仿 Spark SQL 中 SparkPlan 的实现,提供了简化的 EnzymePlan:
abstract class EnzymePlan extends QueryPlan[EnzymePlan] {
def iterator: Iterator[InternalRow]
override def output: Seq[Attribute]
...
}
trait LeafExecNode extends EnzymePlan {
override final def children: Seq[EnzymePlan] = Nil
}
trait UnaryExecNode extends EnzymePlan {
def child: EnzymePlan
override final def children: Seq[EnzymePlan] = child :: Nil
}
trait BinaryExecNode extends EnzymePlan {
def left: EnzymePlan
def right: EnzymePlan
override final def children: Seq[EnzymePlan] = Seq(left, right)
}
在这个代码片段中,EnzymePlan 是核心,其中 output 表示一个物理计划的节点上结果集的元数据信息,而 iterator 则是调用这个物理计划节点的入口。我们看到有三类物理计划:
- LeafExecNode: LocalTableScanExec, LazyLocalTableScanExec
- UnaryExecNode: ProjectExec, LimitExec, FilterExec
- BinaryExecNode: HashJoinExec, NestedLoopExec
Enzyme 中的部分物理计划实现分类之后,如上所示。物理计划整体上是一棵树,数据实际上是从叶节点(Leaf)开始,经过过滤或者转换(Unary)或者合流(Binary),最终汇聚到根节点,得到计算结果。叶节点就是我们的数据源。有两个输入源的是 Union 或者 Join,而只有一个输入源的就是 Projection,Filter,Sort 等算子。
上一节中优化之后的逻辑计划可以生成这样的物理计划:
HashJoinExec [DepartmentID#11L], [DepartmentID#4L]
, BuildRight, Inner
:- FilterExec isnotnull(DepartmentID#11L)
: +- LazyLocalTableScan [LastName#10, DepartmentID#11L],
employee, catalog@60dcf9ec
+- FilterExec isnotnull(DepartmentID#4L)
+- LazyLocalTableScan [DepartmentID#4L, DepartmentName#5],
department, catalog@60dcf9ec
计算通过在根节点调用 iterator 方法,层层回溯:
HashJoinExec.iterator
+ FilterExec.iterator
+ LazyLocalTableScan(employee).iterator
+ FilterExec.iterator
+ LazyLocalTableScan(department).iterator
性能调优
首先,我们需要定位性能瓶颈。JVM 生态中有很多做 Profiling 的工具。Enzyme 在优化过程中,使用的是 JDK 中自带的 jmc 命令和 FlightRecord。通过 jmc 的分析,可以定位到热点的方法,耗时的方法等有帮助的信息。我们有两种优化的策略。
- 其一,直接替换掉慢的部分
- 其二,对无法优化的部分做必要的缓存
- 其三,逻辑计划优化
优化点一:动态代码生成调优
Spark 的钨丝计划引入了动态代码生成的技术,比较有效地解决了三方面的问题(详见参考资料 2 ):
- 大量虚函数调用,生成的实际代码不再需要执行表达式系统中统一定义的虚函数
- 判断数据类型和操作算子等内容的大型分支选择语句
- 常数传播限制,生成的代码能够确定性地折叠常量
对于 Enzyme 的使用场景,动态代码生成并不一定有性能优化的效果,我们使用 JMH 做基准测试,将一部分使得性能变差的代码生成关闭掉。
数以千计的 SQL 会生成大量 Java 类,在引擎中编译并缓存,会带来一些内存上的占用和 CPU 的消耗,也是我们做取舍的其中一个原因。
优化点二:缓存
我们做的最主要的缓存就是从 Unresolve Logical Plan 到 Physical Plan 的生成。为什么不是直接从 SQL 到 Physical Plan 呢?因为 SQL 解析的开销实际上很小,而且略有差异的 SQL 所生成的 Unresolved Logical Plan 可能是一模一样的。
在物理计划的缓存中,还有两点需要注意:
- 其一,物理计划必须和数据隔离
- 其二,物理计划的计算不能有副作用
只有这样,我们的缓存才是有效的、正确无误的。另外,在表的 schema 发生改变的时候,我们还需要让所缓存的相关物理计划失效。
优化点三:新增逻辑计划优化规则
Catalyst 中的优化器提供了可扩展的接口,使得我们可以自定义逻辑计划优化的规则。Databricks 在 Spark Summit 上做过一个题为 A Deep Dive into Spark SQL's Catalyst Optimizer 的讲座,其中有细节的介绍。
具体的接口如下:
spark.experimental.extraStrategies = CustomizedExtraStrategy :: Nil
我们利用这个接口,针对我们的业务数据,专门定制了一系列额外的优化规则,极大地提升了引擎的性能。
Enzyme 的未来
- 开源
- 做更多针对小数据集的优化,进一步改善性能
- 基于 Enzyme,做一些上层生态的扩展
对于第三点,我们想做的实际上是让 Enzyme 和其他生态更好地结合。比如如何将 Enzyme 运用到 Spark Streaming 或者 Flink Streaming 中,如何在 Spring Boot 中更加方便地使用 Enzyme,如何在机器学习中使用 Enzyme。
参考资料
- Spear: A playground for experimenting ideas that may apply to Spark SQL/Catalyst
- Scala Benchmark Starter
- 《 Spark SQL 内核剖析》
- 《高性能 Scala 》
作者简介
忍冬,挖财数据研发工程师,负责 Spark SQL 在挖财的落地,自研了兼容 Spark SQL 适用于单机小数据集的 Enzyme SQL 引擎。译有《 Scala 实用指南》,业余时间是 GNU TeXmacs 项目的维护者之一。