51信用卡是中国最大的线上信用卡管理平台, 业务涵盖个人负债管理、信用卡科技服务、线上信贷撮合及投资理财服务。目前在51人品贷借贷场景,用户需要先导入个人资料,主要包括信用卡账单、运营商、通讯录等数据,然后触发一系列风控模型计算,包括实时的授信出额、A卡信用评分准入(要求秒级完成),以及近线的反欺诈审核流程(要求30分钟内完成)。风控模型依托大量变量,这些变量从开发到上线在以前的研发流程如下:
- 风控模型同学先基于离线数据源使用Hive SQL挖掘一系列变量,经过建模筛选后将变量需求提给开发同学;
- 开发同学与模型同学一起核对数据源以及变量实现逻辑,将每个变量用Groovy代码实现;
- 在线灰度运行,与离线Hive T+1 产出的变量以及模型分数做一致性比对,满足较高的一致性和稳定性后才最终发布;
这个过程中主要存在两个问题,严重影响了模型的交付速度与质量:
- 1、在线/离线数据源不一致: 实时数据源的清洗逻辑与离线数据源清洗逻辑不一致,同样是信用卡账单数据源,数据字段的标准和格式并不统一;
- 2、在线/离线变量实现逻辑不一致: 离线Hive SQL变量逻辑较复杂,一般会经过多层ETL才产出,对应的Groovy代码会迅速膨胀,当出现结果变量不一致时,需要按依赖逐层排查,十分耗时;
在2018年初为了上线一版全新的风控主模型,前后花了将近3个月时间,其中大部分时间耗在修复数据偏差上,开发同学苦不堪言,所以对于风控计算能力以及效率的提升,迫在眉睫。
2018下半年,CTO启动了"光锥"风控平台项目,旨在从根本上提升金融风控整体迭代效率(范围包括但不限于变量计算、策略模型部署、变量快速分析、策略仿真实验、变量/策略/模型监控等)。
光锥首先完成了在线/离线数据源的统一,对借贷用户下单时刻的主要风控数据源做关联快照,业务人员基于快照做离线挖掘,离线数据源标准向在线数据源看齐,保持一致。在变量计算方面,为了复用模型人员提供的HiveSQL,减少代码翻译,我们在反欺诈审核环节引入了基于SparkSQL的近线计算引擎(SparkSQL基本兼容HiveSQL),在单机Local模式下创建多个SparkSession并行处理微批订单,与微服务无缝融合,基本上分钟级别可以完成任务。但是对于实时性要求较高的授信以及订单准入场景,Spark处理性能达不到要求,我们需要一个实时的高性能SQL计算引擎,而且最好能在一定程度上兼容HiveSQL。
在方案调研上,首先想到的是内存数据库如H2、Apache Ignite, 随便拿几个业务SQL做了测试,发现稍微复杂点有嵌套的SQL语句就解析失败,直接放弃。然后重点放在了目前火热的流计算框架上,SparkStreaming、Flink。不管是Spark还是Flink,在对流计算的SQL支持上都比较弱(当时Flink1.4版本的Streaming SQL连LEFT OUTER JOIN也不支持)。虽然我们已经在一些风控指标计算场景上线了SparkStreaming和Flink任务,但是主要的风控变量计算任务并不是一个典型的Streaming场景,业务上用户可以先导入信用卡账单进行管理,等1个月后再来下单触发风控计算,如果实时拉取数据再加载到Streaming系统节点显得多余。另外在计算过程中通过外部接口获取数据后可以直接进行内存计算,并不需要中间存储,如果单机计算足够快,也不需要引入分布式计算,避免产生额外开销。
后来我们分别调研了Flink和Apache Beam在Batch模式下的单机本地SQL执行框架,Flink里是DataSet+CollectionEnvironment,Beam里是PCollection+DirectRunner,两者都能将Java集合数据直接映射为表,然后联合多表进行SQL查询。由于解析完SQL就生成执行代码直接运行,没有Job生成调度逻辑,也没有像Spark里的Stage划分,Executor回收清理等操作,执行效率较高,在我们的真实业务数据集上性能表现非常不错。Flink和Beam都使用了热门的SQL引擎项目Apache Calcite进行SQL解析和逻辑计划生成,然后实现自己的物理计划以及生成执行代码运行(在分布式模式下,Beam会将计划适配代理给指定Runner执行)。Calcite也被多个其他Apache项目使用,其中Hive和阿里的MaxCompute单独使用Calcite来做基于代价的优化,Hive对Calcite的使用方式如下:
Hive通过Antlr解析SQL生成抽象语法树(ASTNode),然后翻译成Calcite的逻辑计划(LogicPlan),基于代价优化完成等价转换,再翻译回Hive能处理的Optimized ASTNode。实际上Calcite的SQL语法并不兼容HiveSQL,在数据类型和操作符(Operator)支持上也有很大不同。但是在关系代数(Relational algebra)层面是相同的,所以可以完成上述转换。当然Hive也有Local运行模式,由于生成的执行计划面向MR任务,在低延时高性能计算场景,无法满足要求。
虽然Calcite并不兼容HiveSQL,但它对工业级的SQL高级特性有较好的支持(如复杂类型、窗口函数、表函数等OLAP特性),同时它默认内置了一套基于Linq4j的本地SQL执行框架,用于执行代码生成和运行。我们的目标是计算引擎要在业务上兼容HiveSQL,不管是选择Flink还是Beam都需要对Calcite做二次改造和者扩展,考虑到Calcite本身的简洁设计以及较强的扩展性,我们决定直接基于Calcite二次开发,实现一套在业务上兼容HiveSQL的本地实时SQL计算引擎。
先简单介绍下Calcite的架构和SQL处理流程:
对于一次Sql查询,一般经过以下流程:
- 先由Parser解析生成SqlNode节点树;
- 由Validator完成节点类型推导以及必要的表达式验证优化;
- 由SqlToRelConverter将SqlNode节点树转化为代表逻辑计划的RelNode;
- 在查询优化器(QueryOptimizer)里内置了100+的转换规则,分别用于逻辑计划优化以及物理计划转换;
- 生成的物理计划RelNode被代码实现器遍历处理,各子节点由对应的实现器生成执行代码,最后组装成一个执行Class;
Calcite会为每个SQL动态生成一个实现了Bindable接口的Class,然后动态编译创建实例后传入数据集执行计算。这里的数据集可以通过Schema进行灵活定义,我们在业务上针对每一次订单请求会创建一份Schema,存储当前请求处理过程的所有表数据,处理完成后堆内存后面会被回收。
受益于Calcite的可插拔架构,大多数项目会使用Calcite先生成LogicPlan,然后添加一系列rules来完成第三方物理计划转换与执行代码生成。考虑到研发成本,我们希望尽可能重用Calcite默认的物理计划和执行代码生成逻辑。为了支持HiveSQL的语法和操作符,我们主要做了以下改造和扩展:
- 1、替换默认的语法解析器Parser.jj,支持部分的Hive语法:
Calcite使用JavaCC来做语法解析,有一套内建的关键字和操作符。我们会按需添加一些业务要用到的关键字,如rlike、regexp:
select * from t where c1 rlike '^\\d+$'
-
2、优先使用我们自定义的操作符注册表来查找函数:
在计算引擎初始化阶段,先通过扫描Hive Class,将Hive函数自动桥接并添加到注册表,大概有200+的UDF、30+的UDAF以及部分常用的UDTF,另外还有我们数仓自定义的业务函数,在桥接适配中,会保留内建操作符的某些特性,以便查询优化器能正确识别优化; -
3、支持操作符类型动态推导与验证:
Calcite的SqlValidator会对操作符节点以及上下文参数做类型推导与验证。但是它的类型系统和Hive�并不相同,我们需要对两者的类型系统做映射转换才能完成Hive操作符的类型推导验证。另外Hive函数有个很棒的特性:一个特定函数的参数和返回值可以支持不确定类型,函数内部会完成不兼容类型的强制转换。例如函数:
add_months(start_date,add_months)
start_date可以是date类型,也可以是string。 我们需要先将Calcite数据类型转换为Hive的数据类型,然后桥接Hive实现动态类型推导验证,再将Hive函数返回类型转换回Calcite类型。这部分类似SparkSQL Catalyst连接Hive的处理方式,主要数据类型的映射关系如下:
CalciteSqlType | JavaStorageType | HiveObjectInspector |
---|---|---|
BIGINT | Long | LongObjectInspector |
INTEGER | Int | IntObjectInspector |
DOUBLE | Double | DoubleObjectInspectors |
DECIMAL | BigDecimal | HiveDecimalObjectInspector |
VARCHAR | String | StringObjectInspector |
DATE | Int | DateObjectInspector |
TIMESTAMP | Long | TimestampObjectInspector |
ARRAY | List | StandardListObjectInspector |
...... | ...... | ...... |
对于复杂类型,目前我们只按需支持了Array;
- 4、为Hive操作符实现执行代码:
除了一般函数外,还需要对一些特殊操作符的代码实现器做替换,例如数组取下标
select split('a,b,c',',')[0]
Calcite默认索引从[1]开始,而Hive是从[0]开始,我们为代码实现器的注册表添加了扩展机制,允许在初始化注册后为操作符覆盖或者添加新的实现器。对于UDF、UDAF、UDTF等分别实现了通用的代码实现器,可以生成相应的执行代码;
- 5、隐式类型转换增强:
Calcite已经具备了一定的隐式类型转换,但是相比Hive还不够。前面已经提到Hive函数自身能够处理类型转换,但在其他场景,如Join时,Hive支持对不兼容类型做等值连接:
select t1.*,t2.* from t1 join t2 on t1.intVal=t2.stringVal
我们参考Hive规则对HashJoin算法的Key进行了隐式转换。另外在强制转换的过程中遇到无法转换的情况,如cast('' as int) ,Calcite会报错,而Hive会转换成null;
在功能基本满足后,我们做了相应的Benchmark对比测试,在性能优化上主要做了几点:
- 1、为相同的SQL查询缓存执行对象实例: Calcite在1.18.0版本后对生成的Class做到了无状态,所以不用担心线程安全问题;
- 2、Join算法优化:
在基本的Projection、Join、Aggregate场景和不同量级的数据集上,相同sql的执行耗时和Flink Local模式差不多,但是对于包含非等值谓语条件的Join,在10000*10000数据集上耗时却相差几十倍,如
SELECT t1.*,t2.* FROM item1 t1 LEFT OUTER JOIN item2
t2 ON t1.i_item_sk=t2.i_item_sk and t1.i_item_sk <15000
由于包含了谓语条件t1.i_item_sk <15000
,Calcite对这类Join采用NestedLoopJoin(嵌套循环)算法实现,没有做优化。而Flink和Spark在Shuffle后数据会自然排序好,所以采用了更高效的Sort-Merge-Join算法实现。在我们优化了相应的转换rule以及HashJoin算法后,此类Join性能提升了几十倍(已提交patch给社区)。在真实的业务场景里,SQL往往较复杂,有多层嵌套,由于可以复用完整的执行实例,我们的计算性能表现比Flink的Local模式更优。
使用方面,我们参考Flink实现了一套简洁的Table API:
TableEnv tableEnv = HiveTableEnv.getTableEnv();
DataTable t1 = tableEnv.fromJavaPojoList(pojoList);
DataTable t2 = tableEnv.fromJdbcResultSet(resultSet);
tableEnv.addSubSchema("test");
tableEnv.registerTable("test","t1",t1);
tableEnv.registerTable("test","t2", t2);
DataTable queryResult = tableEnv.sqlQuery("select * from test.t1 join test.t2 on t1.id=t2.id");
那么在真实的业务系统里,我们如何利用这个内存SQL计算引擎?
在金融风控场景,大多数用户的单数据域的数据量一般在千到万级,所以单条查询的性能基本在百毫秒级或者以内,但是有两类典型场景无法使用纯内存计算:
- 黑名单匹配: 由于黑名单量较大,而且会频繁更新,无法放入计算引擎管理,我们也不希望引入额外的分布式Join方案。针对这个case,我们引入了外部接口函数,例如:
isHitBlackMobileList("t1",500)
在运行时会将t1临时表的数据传入isHitBlackMobileList函数,然后以500行每批并行调用外部接口,将匹配到的黑名单merge后返回成一个虚拟表,这样做的好处是,名单存储匹配可以由业务方灵活控制,背后可能采用HBase或者其他基于布隆过滤器的方案;
- 关系图谱场景下的关联查询: 和黑名单类似,我们可能需要通过用户的手机号码关联出通讯录里的借贷用户的还款记录,然后计算模型变量。可以先通过外部接口获取对应的数据,背后可能是图数据库查询,最后将未加工的数据放入内存里做SQL计算。
为了方便业务人员快速开发变量,我们沉淀了数据源管理和变量计算平台,提供了用于在线开发调试的SQL IDE, 业务人员能方便地自主部署自己的SQL任务,并基于历史数据完成数据校验比对。当SQL任务审批发布上线后,配套的变量监控任务也会启动,包括T+1的在线/离线一致性比对,PSI以及缺失率等指标监控。
在任务运行时,计算引擎会根据SQL提取表名,解析表与表的依赖形成执行DAG,对于一个SQL节点,只要上游依赖表的任务完成,就会被提交到线程池执行。
部署方面,在51信用卡,Kubernetes是通常的应用部署方式,对这类计算型应用,我们开启HPA功能进行自动伸缩容,并使用CPU Manager特性来保证性能优先的调度策略,这样可以避免因单个任务耗时而引发的排队等待。
经过一段时间试运行和验证,我们于2019年初在主业务的A卡全新模型上全面使用了自主研发的实时计算引擎,并逐步将其他模型的变量往新的计算平台迁移。在性能表现上,除开数据源拉取,完成一次订单请求的所有变量的计算平均耗时稳定在1s以内,而在线/离线数据的一致率达到99%以上。过去长达2个月的一版主模型的迭代周期,现在能缩短到2周以内,这从根本上提高了风控模型的迭代效率与质量。
为了进一步丰富计算引擎的功能以及适用更多业务场景,我们决定将核心库开源https://github.com/51nb/marble,另外也提交了部分issue和patch给Calcite社区,有兴趣的同学可以关注。
作者介绍: 周来,现负责51信用卡风控计算引擎以及风控数据智能分析产品等研发。