在某个集合中判断一个元素是否存在是日常开发工作中经常会遇到的问题, 最直观的做法是将集合的元素存到哈希表中, 这样通过元素的哈希值来判断该元素是否在集合中, 但哈希表的代价比较高, 一般来说哈希表都将要装载因子控制在某个阈值以下, 否则将要花费大量的计算浪费在处理冲突上 (见之前博客 [Blog 4 - HashTable 的实现]), 因此哈希表需要留有一定的空余空间, 当元素数量很大时, 哈希表所占用的空间将非常大, 布隆过滤器 (Bloom Filter) 是解决该问题的一种方法, 布隆过滤器可以判断某个元素一定不在某集合中, 以及判断一个元素可能在某集合中, 对于存在性它有一定的误判率 (False Positive Rate), 本文讨论布隆过滤器的原理, 并给出相应的误判概率推导

21.1 布隆过滤器的原理

布隆过滤器维护了一个长度为 m 的比特向量, 初始状态下, 比特向量的每一位都是 0, 同时设置了 k 个哈希函数, 当新的元素插入集合时, 首先依次用 k 个哈希函数分别对元素做哈希运算生成 k 个哈希值 (不考虑哈希冲突), 然后分别将这 k 个哈希值在比特向量中对应的位设置为 1, 每当有一个新的元素插入集合时便重复上述操作, 当然, 新的元素的哈希值对应的位可能早已经是 1 了, 此时跳过即可, 以上便是布隆过滤器插入元素的过程

当要查询某个元素是否在集合中时, 只需要同样用 k 个哈希函数分别对待查找的元素做哈希运算, 得到 k 个哈希值, 然后检查比特向量对应的这个 k 个位的值是否为 1, 其中只要有一位是 0 则说明该元素一定不在集合中, 若 k 个哈希值对应的比特位都为 1, 则只能说明该元素可能在集合中, 因为不同元素的哈希值可能相同

以上便是布隆过滤器的插入和查找的过程, 可以看到, 布隆过滤器只使用了长度为 m 的比特向量, 所以占用的空间非常小, 它的优点在于空间占用小 / 查询效率高, 但缺点是只能 100% 的判断一个元素不在集合中, 无法 100% 的判定一个元素一定在集合中

21.2 对布隆过滤器的定量分析

以上是对布隆过滤器的一个定性分析, 我们只知道布隆过滤器存在误判概率, 但是在实际应用的过程中, 需要对该误判概率有一个实际的判断, 若误判概率过高则可能无法在工程中应用, 从上面布隆过滤器的插入和查询过程可以知道, 布隆过滤器的误判概率实际上取决于以下几个参数:

  • 比特向量的长度 m

  • 哈希函数的个数 k

  • 插入元素的个数 n

我们假设哈希函数是足够均匀的, 即任何一个输入元素经过哈希函数映射到比特向量中的每一个比特位的概率都是 , 从比特向量中任取一个比特位, 经过 k 个哈希函数之后, 该位没有置 1 的概率为

插入 n 个元素之后, 该位仍没有被置为 1 的概率为

反过来, 插入 n 个元素之后, 该位被置为 1 的概率为

判断元素是否在集合即计算 k 个元素对应的哈希值所对应的比特位是否全为 1, 这个概率便是误判概率

当 m 充分大时, 误判概率近似等于下式

通过上面的估计式也可以估计在给定的比特向量长度 m 以及插入次数 n 下使得误判率最小的哈希函数的个数, 考察关于 k 的函数 (其中 n 和 m 是常数)

由于 n 和 m 都是常数, 因此我们不妨将 看做一个整体, 记为 t, 即 , 于是

将上式两边取对数, 得

对上式两边求导, 有

令上式等于 0, 解得

于是,

即当 时, 布隆过滤器的误判概率最低, 此时, 误判概率为

21.3 布隆过滤器的实现 (Golang)

布隆过滤器的原理很简单, 它的难点在于根据需要的误判概率确定过滤器的参数值 (比特向量长度 n, 哈希函数个数 k), 根据上一节的推导, 我们可以使用如下的结论:

  • 在给定误判概率 P 下, k 应取

  • 在给定的元素数 n 和哈希函数个数 k 下, m 应取

shadowsocks 中使用了布隆过滤器用来拒绝使用重复 salt 的数据包, 从而避免重放攻击, 我们使用 shadowsocks 的例子来讨论布隆过滤器的具体实现方式

首先定义布隆过滤器的结构体, 如下所示

type classicFilter struct {
    // 布隆过滤器的比特向量
    b []byte
    // 哈希函数的个数
    k int
    // 哈希函数, 这里使用的是 double hash, 即输入一段字节序列, 返回两个不同的哈希值
    h func([]byte) (uint64, uint64)
}

计算哈希函数个数 k, 比特向量位数 m, 使用求得的参数初始化布隆过滤器

// 按照我们刚刚得到的结论, 确定布隆过滤器的参数 k 和 m
// 函数的参数 n 为放入布隆过滤器的元素数, p 为误判概率, h 为 double hash 函数
func New(n int, p float64, h func([]byte) (uint64, uint64)) Filter {
    // 哈希函数的个数
    k := -math.Log(p) * math.Log2E
    // 比特向量的位数
    m := float64(n) * k * math.Log2E
    return &classicFilter{b: make([]byte, int(m/8)), k: int(k), h: h}
}

getOffset() 函数根据哈希函数的返回值计算其在比特向量中的 offset:

func (f *classicFilter) getOffset(x, y uint64, i int) uint64 {
    return (x + uint64(i)*y) % (8 * uint64(len(f.b)))
}

Add() 函数向布隆过滤器中添加数据, shadowsocks 只使用了一个哈希函数, 该哈希函数返回两个不同的哈希值, 其通过将第二个哈希值按递增倍数进行偏移 (我们将两个哈希值记为 v1 和 v2, 第一次将 v1 对应的比特置为 1, 第二次将 v1 + 1 * v2 对应的比特置位 1, 第三次将 v1 + 2 * v2 对应的比特置为 1, 以此类推) 来对比特向量赋值 (或运算)

func (f *classicFilter) Add(b []byte) {
    x, y := f.h(b)
    for i := 0; i < f.k; i++ {
        offset := f.getOffset(x, y, i)
        f.b[offset/8] |= 1 << (offset % 8)
    }
}

Test() 函数与 Add() 函数的逻辑基本相同, 差别在于 Test() 函数在定位到比特位之后不是将比特位赋值为 1, 而是检查比特位是否等于 0, 若比特位为 0 说明元素一定不在集合中 (与运算)

func (f *classicFilter) Test(b []byte) bool {
    x, y := f.h(b)
    for i := 0; i < f.k; i++ {
        offset := f.getOffset(x, y, i)
        if f.b[offset/8]&(1<<(offset%8)) == 0 {
            return false
        }
    }
    return true
}

所以总的来看, 布隆过滤器的关键在于理解如何确定相应的参数, 本身的实现原理非常简单, 有兴趣的读者自行推导一遍上面的结论


如果文章对您有帮助, 不妨请作者喝杯咖啡