如何在Hive/SparkSQL处理引擎下使用BloomFi

前言

BloomFilter,布隆过滤器,作为一个成熟的算法、数据结构和优化手段,已经在多个MPP数据库下有实现,可以直接被调用,比如在Doris可以作为指标列类型的一种——bitmap, 支持将多行指标值存到bitmap中。但是在发展早、成熟早、且作为批处理架构的Hive中,BloomFilter无法直接被使用,这就让以Hive或SparkSQL为离线计算处理引擎的大数据开发为难,因为明知BloomFilter这种优秀的优化手段却无法使用。笔者在重构公司的老任务时(仍旧使用Hive做处理引擎)就遇到了类似问题,故调研了下在Hive技术栈下的BloomFilter使用方法,在此将调研的收获一一呈现,希望能给后来的人一点帮助。

技术栈选择

如果你们公司的技术栈选择较灵活的话,在需要用到BloomFilter的场景下,推荐优先选择目前社区流行的有实现BloomFilter的MPP数据库,其次选择Spark on Scala(注意,不是SparkSQL,且Spark版本要是2.x+),那样的话使用成本也很低,当你能选择的技术栈只有Hive或SparkSQL时,你就需要通过UDF函数来支持你的任务使用BloomFilter。我将在下面的文章介绍两种技术栈下的实现,以及SQL平台下目前受欢迎的几个BloomFilter的不同实现版本,并说明在2个场景下如何使用,及其中的优劣。

BloomFilter在Spark on Scala的实现

对于公司有Spark2.x+Scala开发环境的大数据开发来说,用Scala实现此复杂场景是最最省事的,Spark2.x的官方库已经提供了BloomFilter的类(在spark.sql.Dataset目录下),只需要知道如何使用就可以了。

大表关联小表/中表

此处小表指仍无法放到广播变量的小表(远大于25M),且大表关联小表后关联上的数据较少的场景。
假设你有2个表要做关联,df2为大表,df1为小表/中表,那么你的思路就是,将df1和df2关联的主键中,粒度最细的字段key1取出来,灌到BloomFilter里面,形成一个去重后的数据集,在关联前将df2用该布隆过滤器筛选(时间复杂度O(N),且只是Map操作),这样则关联前,df2在筛选后数据比之前大大降低,在join时大大降低了无效关联占用的缓存内存、shuffle内存和溢出磁盘发生的概率,这样省下的时间远远大于建立布隆过滤器的时间。下图代码中BloomFilter传入的参数分别为关联key、预期需要储存的唯一id数,和容错率(因hash函数的原因可能将不属于该集合的数据标记为属于该集合)。

//实现布隆过滤器
val df1_bloomfilter = df1.stat.bloomFilter($"key1", 25000, 0.05)
val udf_df1_bloomfilter = udf((x: String) => df1_bloomfilter.mightContain(x))

val _df2 = df2.filter(udf_df1_bloomfilter($"key1")) //关联前用布隆过滤器筛数据
var final_df = df1.join(_df2, Seq("key1"), "left_outer").na.fill("default")
复制代码

大数据精准去重

此处只简单说明下场景和思路。
假设你要统计一个大数据集合下的唯一id数,这个id每天都有增量新增,且上层DWS层有基于该id(原子指标)的繁多的复合指标要产出,且这些复合指标统计的不是每天增量或有活跃的那部分id,而不是历史上一段时期,或全历史的id,则我们可以通过RoaringBitmap替我们高效计算此类uv数。
第一步,引入RoaringBitmap:import org.roaringbitmap.RoaringBitmap
第二步,我们先维护一套次id的序号,为什么要维护可以参考bitmap的介绍:mp.weixin.qq.com/s/xxauNrJY9… ,我们将这套id按照id本身做字典排序,排序后赋予从0到数量N的序号,之后每天有新增id时重新排序并为新id赋予序号。这样,我们就有了一个id和序号的映射表。
第三步,我们将各种id对应序号回关联到带有各种统计维度的以id为主要主键,以天为分区的DWD大宽表。然后,当在DWS层需要以各种统计维度进行聚合,统计每个维度下的该id去重数时,我们都new一个Roaring64NavigableMap数据结构,将该天该维度组合下的序号放入该数据结构(可以简单理解为一个long array),这时统计uv就成了速度非常快的位计算(在该类中该方法为getLongCardinality()),当我们需要统计多天的去重uv时,直接将多天对应的Roaring64NavigableMap数据结构执行或操作即可(在该类中该方法为or())。

BloomFilter在Hive/SparkSQL的实现

Brickhouse

Brickhouse的BloomFilter使用方法如下:
第一步,注册BloomFilter构建函数和是否在BloomFilter内判断函数,我们可以clone Brickhouse的仓库以得知类所在目录,并依次创建临时UDF函数。比如要使用brickhouse的布隆过滤器就需要依次注册这3个函数::

CREATE TEMPORARY FUNCTION bloom AS 'brickhouse.udf.bloom.BloomUDAF' USING JAR 'tmp/xxx/brickhouse-0.7.1-SNAPSHOT.jar';
CREATE TEMPORARY FUNCTION distribute_bloom AS 'brickhouse.udf.bloom.DistributedBloomUDF' USING JAR 'tmp/xxx/brickhouse-0.7.1-SNAPSHOT.jar';
CREATE TEMPORARY FUNCTION bloom_contains AS 'brickhouse.udf.bloom.BloomContainsUDF' USING JAR 'tmp/xxx/brickhouse-0.7.1-SNAPSHOT.jar';
复制代码

第二步,建立BloomFilter。先将一个表的key通过聚合传入构建BloomFilter的UDAF函数bloom(key_id),再将该BloomFilter数据结构作为临时文件写到执行计算的这台机器的本地磁盘中insert overwrite local directory bloomfile(为什么要做为文件来使用而不是当成一个中间数据集关联使用?这个在讲Hivemall对比的时候大家就能明白),因为已写到磁盘,所以还需要把文件数据读到当前job做缓存,为了能让分布在多个机器的tasks都能使用该文件数据做筛选,我们将数据分发到多个分布式的缓存里,代码如下:distributed_bloom('mybloom')
第三步,使用已经被放到分布式缓存的BloomFilter来筛主表数据,我们使用UDF函数:bloom_contains( key, distributed_bloom('mybloom') ) == true即可将只存在于该BloomFilter的记录保留下来。
该BloomFilter实现继承的是Filter类,使用的数据结构是byte array,该array会随着元素的增加而不断扩展,添加新的byte到数组。byte8个bit位,所以每个byte元素只能存8个唯一元素,相比long的64位要差不少,但理论上这两种数组占的空间大小应该是一致的。这也导致另一个问题,当你的数据集足够大(比如5000万去重id),则需要存到本地机器的BloomFilter文件可能也特别大,当他成规模落文件时会挤占该本地机器上正常服务运转需要的空间,不管是存储还是物理内存,比如本地hiveserver在本地写log的功能,可能会把hiverserver干到shutdown(别问我是怎么知道的)。
题外话,这里测试了下继承Filter即使用byte array的布隆过滤器存储500万左右的id占用的空间约为65MB,看其他人的说法,RoaringBitmap应该能做到2000万元素的集合只占3M左右,所以有条件还是尽量用最先进的RoaringBitmap。

Hivemall其一

Hivemall的BloomFilter使用方法如下:
第一步,注册相应BloomFilter函数,同上
第二步,将需要放入BloomFilter的表读出来,做聚合操作,将相应key放入BloomFilter,代码例子如下:

bloom_cuid AS (
SELECT app,bloom_build(cuid) AS bloom_key
FROM (
    SELECT 
        cuid,
        app
    FROM actcuid_info
) tmp
GROUP BY app
),
复制代码

注意这里BloomFilter就算建设完成了,我们不仅可以按照聚合分组,给每个分组单独生成一个BloomFilter,还可以把布隆过滤器当做另一个表/数据集来使用,不用再花费读写I/O去把数据存到本地再读出来,听着是不是比上一个方案好多了?先别高兴,继续往下看。
第三步,将大表和该BloomFilter用key做关联,对关联上的每条记录调用UDF函数:bloom_contains(),将大表数据打上标签,即是否能和小表以key关联上,代码如下:

select
CASE WHEN b.bloom_key IS NULL THEN false ELSE bloom_contains(b.bloom_key, a.cuid) END AS is_updated
from (
    ...//大表读数逻辑
) a left join (
        SELECT
            bloom_key,
            app
        FROM bloom_cuid
) b 
on (a.app = b.app)
复制代码

当你写的时候,你会惊讶于这个BloomFilter实现的便利性,和Hive代码很好的融合在一起。但是当你运行代码时,boooom,报错了?为什么我在构建BloomFilter的过程中必然报错?因为key太长?因为数据量太大?当我看该类实现的源码时,发现这个BloomFilter仍然是继承Filter类实现的(这个仓库建成时间比BrickHouse晚不少),该类在初始化布隆过滤器时,有这么一行代码调用:return newDynamicBloomFilter(DEFAULT_BLOOM_FILTER_SIZE, DEFAULT_ERROR_RATE, NUM_HASHES);,令人尴尬的是,这里的预期储存唯一id数即“DEFAULT_BLOOM_FILTER_SIZE”,源码里给的默认值是1024*1024,只有100万左右!且不支持你在UDF函数传入这个值,当id数大时怎么可能不报错呢。无奈的我在BloomFilterUDAF类初始化时将这个值改为4千万,打完jar包后总算迅速的跑出来了。
BloomFilter构建好后,其二就是关联大表,这里吊诡的事情又出现了,我发现在BloomFilter关联大表的reducer阶段,计算引擎只给分配了1个task!须知即使BloomFilter判断效率再高,一个task遍历几亿数据也是很慢的。为什么只分配一个task,因为一个维度组合下,你只生成了一条记录,key是维度组合,value是BloomFilter,在该实现里为一个很长很长的字符串,长到它占的空间也无法装入广播变量,所以没有使用broadcast join,只能common join,那么几亿数据关联1条记录,因为只会hash到一个分区,就必然也只分配一个reducer task了。到这时候,你就明白BrickHouse实现的先进性了,他等于是模拟了将BloomFilter放到广播变量的过程,还避开了Hive处理引擎对广播变量大小的限制,当BloomFilter被分布到多个存有大表数据的机器上时,bloom_contains做的筛选操作是Map操作,不涉及到开销大的shuffle阶段,这无疑比Hivemall好多了(当然占用空间大和写本地是硬伤)。

Hivemall其二

解决Hivemall的common join问题有2个思路,一个是将该只有1条的BloomFilter的长字符串作为中间变量储存到该job的cache中,一个是将BloomFilter和大表的关联分散到多个tasks去计算。第一个思路,我暂时还没有摸索出什么好的解法,这里先说第二个,如果大家有好的答案,请不吝在下面评论分享。
想要将大表关联1条记录的join分发,这想想不会感觉和我们大数据经常遇到的某类问题很相似嘛?没错,数据倾斜问题!这个问题就等同于你关联一个大表和小表,小表是极度的数据倾斜,大部分key都是一个值,导致太多的记录被hash到一个reducer而处理缓慢。我们可以在BloomFilter只有1条记录的基础上,将该条记录复制100份,每份对应一个新key即序号,再为大表的每条记录生成一个从0到99的随机序号(加盐解决倾斜的思路),关联时带上这个新key即能让数据充分分配到各个reducer了,且因为每个reducer的BloomFilter数据集都是一样的,故计算结果不会有任何偏移。代码示例如下:

select
...
from (
    select
        floor(rand()*100) AS salt_key,
        ...
        ...//大表读数逻辑
    ) a LEFT JOIN (
        SELECT
            bloom_key,
            app,
            salt_key
        FROM bloom_cuid AS x 
        lateral view explode(split('0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99',',')) y AS salt_key
    ) b 
    ON (a.app = b.app AND a.salt_key = b.salt_key)
复制代码

但是这个操作还有一个问题,那就是它无疑大大加大了计算的代价,1个BloomFilterUDAF可能就占用几十到几百M了,你还要复制100份,最终可能在整个job中占用几G到十几G的内存。更勿论关联导致的shuffle操作,即最终把数据再merge到一起的shuffle操作,也是多出来的资源消耗。

仓库地址

两个Hive UDF所在仓库地址:
BrickHouse:github.com/klout/brick…
Hivemall:github.com/apache/incu…

谢谢看完!