06-应用 4:四两拨千斤 —— HyperLogLog
课程
1
开篇:授人以鱼不若授人以渔 —— Redis 可以用来做什么?
学习时长: 5分21秒
2
基础:万丈高楼平地起 —— Redis 基础数据结构
上次学到
学习时长: 16分14秒
3
应用 1:千帆竞发 —— 分布式锁
学习时长: 7分47秒
4
应用 2:缓兵之计 —— 延时队列
学习时长: 8分9秒
5
应用 3:节衣缩食 —— 位图
学习时长: 8分52秒
6
应用 4:四两拨千斤 —— HyperLogLog
学习时长: 14分17秒
7
应用 5:层峦叠嶂 —— 布隆过滤器
学习时长: 17分54秒
8
应用 6:断尾求生 —— 简单限流
学习时长: 4分37秒
9
应用 7:一毛不拔 —— 漏斗限流
学习时长: 7分22秒
10
应用 8:近水楼台 —— GeoHash
学习时长: 7分52秒
11
应用 9:大海捞针 —— Scan
学习时长: 8分42秒
12
原理 1:鞭辟入里 —— 线程 IO 模型
学习时长: 4分1秒
13
原理 2:交头接耳 —— 通信协议
学习时长: 3分34秒
14
原理 3:未雨绸缪 —— 持久化
学习时长: 5分27秒
15
原理 4:雷厉风行 —— 管道
学习时长: 3分51秒
16
原理 5:同舟共济 —— 事务
学习时长: 6分36秒
17
原理 6:小道消息 —— PubSub
学习时长: 7分7秒
18
原理 7:开源节流 —— 小对象压缩
学习时长: 7分14秒
19
原理 8:有备无患 —— 主从同步
学习时长: 4分9秒
20
集群 1:李代桃僵 —— Sentinel
学习时长: 3分52秒
21
集群 2:分而治之 —— Codis
学习时长: 7分28秒
22
集群 3:众志成城 —— Cluster
学习时长: 8分38秒
23
拓展 1:耳听八方 —— Stream
学习时长: 13分40秒
24
拓展 2:无所不知 —— Info 指令
学习时长: 4分4秒
25
拓展 3:拾遗补漏 —— 再谈分布式锁
学习时长: 2分18秒
26
拓展 4:朝生暮死 —— 过期策略
学习时长: 2分21秒
27
拓展 5:优胜劣汰 —— LRU
学习时长: 4分34秒
28
拓展 6:平波缓进 —— 懒惰删除
学习时长: 2分13秒
29
拓展 7:妙手仁心 —— 优雅地使用 Jedis
学习时长: 6分35秒
30
拓展 8:居安思危 —— 保护 Redis
学习时长: 2分19秒
31
拓展 9:隔墙有耳 —— Redis 安全通信
学习时长: 6分34秒
32
拓展 10:法力无边 —— Redis Lua 脚本执行原理
学习时长: 9分24秒
33
拓展 11:短小精悍 —— 命令行工具的妙用
学习时长: 9分21秒
34
源码 1:丝分缕析 —— 探索「字符串」内部
学习时长: 5分20秒
35
源码 2:循序渐进 —— 探索「字典」内部
学习时长: 7分24秒
36
源码 3:挨肩迭背 —— 探索「压缩列表」内部
学习时长: 10分42秒
37
源码 4:风驰电掣 —— 探索「快速列表」内部
学习时长: 3分49秒
38
源码 5:凌波微步 —— 探索「跳跃列表」内部
学习时长: 9分57秒
39
源码 6:破旧立新 —— 探索「紧凑列表」内部
学习时长: 2分42秒
40
源码 7:金枝玉叶 —— 探索「基数树」内部
学习时长: 5分36秒
41
源码 8:精益求精 —— LFU vs LRU
学习时长: 8分4秒
42
源码 9:如履薄冰 —— 懒惰删除的巨大牺牲
学习时长: 9分53秒
43
源码 10:跋山涉水 —— 深入字典遍历
学习时长: 9分24秒
44
源码 11:见缝插针 —— 探索 HyperLogLog 内部
学习时长: 13分3秒
45
尾声:百尺竿头 —— 继续深造指南
学习时长: 2分32秒
juejin_logo copyCreated with Sketch.

应用 4:四两拨千斤 —— HyperLogLog

在开始这一节之前,我们先思考一个常见的业务问题:如果你负责开发维护一个大型的网站,有一天老板找产品经理要网站每个网页每天的 UV 数据,然后让你来开发这个统计模块,你会如何实现?

如果统计 PV 那非常好办,给每个网页一个独立的 Redis 计数器就可以了,这个计数器的 key 后缀加上当天的日期。这样来一个请求,incrby 一次,最终就可以统计出所有的 PV 数据。

但是 UV 不一样,它要去重,同一个用户一天之内的多次访问请求只能计数一次。这就要求每一个网页请求都需要带上用户的 ID,无论是登陆用户还是未登陆用户都需要一个唯一 ID 来标识。

你也许已经想到了一个简单的方案,那就是为每一个页面一个独立的 set 集合来存储所有当天访问过此页面的用户 ID。当一个请求过来时,我们使用 sadd 将用户 ID 塞进去就可以了。通过 scard 可以取出这个集合的大小,这个数字就是这个页面的 UV 数据。没错,这是一个非常简单的方案。

但是,如果你的页面访问量非常大,比如一个爆款页面几千万的 UV,你需要一个很大的 set 集合来统计,这就非常浪费空间。如果这样的页面很多,那所需要的存储空间是惊人的。为这样一个去重功能就耗费这样多的存储空间,值得么?其实老板需要的数据又不需要太精确,105w 和 106w 这两个数字对于老板们来说并没有多大区别,So,有没有更好的解决方案呢?

这就是本节要引入的一个解决方案,Redis 提供了 HyperLogLog 数据结构就是用来解决这种统计问题的。HyperLogLog 提供不精确的去重计数方案,虽然不精确但是也不是非常不精确,标准误差是 0.81%,这样的精确度已经可以满足上面的 UV 统计需求了。

HyperLogLog 数据结构是 Redis 的高级数据结构,它非常有用,但是令人感到意外的是,使用过它的人非常少。

使用方法

HyperLogLog 提供了两个指令 pfadd 和 pfcount,根据字面意义很好理解,一个是增加计数,一个是获取计数。pfadd 用法和 set 集合的 sadd 是一样的,来一个用户 ID,就将用户 ID 塞进去就是。pfcount 和 scard 用法是一样的,直接获取计数值。

127.0.0.1:6379> pfadd codehole user1
(integer) 1
127.0.0.1:6379> pfcount codehole
(integer) 1
127.0.0.1:6379> pfadd codehole user2
(integer) 1
127.0.0.1:6379> pfcount codehole
(integer) 2
127.0.0.1:6379> pfadd codehole user3
(integer) 1
127.0.0.1:6379> pfcount codehole
(integer) 3
127.0.0.1:6379> pfadd codehole user4
(integer) 1
127.0.0.1:6379> pfcount codehole
(integer) 4
127.0.0.1:6379> pfadd codehole user5
(integer) 1
127.0.0.1:6379> pfcount codehole
(integer) 5
127.0.0.1:6379> pfadd codehole user6
(integer) 1
127.0.0.1:6379> pfcount codehole
(integer) 6
127.0.0.1:6379> pfadd codehole user7 user8 user9 user10
(integer) 1
127.0.0.1:6379> pfcount codehole
(integer) 10

简单试了一下,发现还蛮精确的,一个没多也一个没少。接下来我们使用脚本,往里面灌更多的数据,看看它是否还可以继续精确下去,如果不能精确,差距有多大。人生苦短,我用 Python!Python 脚本走起来!😄

# coding: utf-8

import redis

client = redis.StrictRedis()
for i in range(1000):
    client.pfadd("codehole", "user%d" % i)
    total = client.pfcount("codehole")
    if total != i+1:
        print total, i+1
        break

当然 Java 也不错,大同小异,下面是 Java 版本:

public class PfTest {
  public static void main(String[] args) {
    Jedis jedis = new Jedis();
    for (int i = 0; i < 1000; i++) {
      jedis.pfadd("codehole", "user" + i);
      long total = jedis.pfcount("codehole");
      if (total != i + 1) {
        System.out.printf("%d %d\n", total, i + 1);
        break;
      }
    }
    jedis.close();
  }
}

我们来看下输出:

> python pftest.py
99 100

当我们加入第 100 个元素时,结果开始出现了不一致。接下来我们将数据增加到 10w 个,看看总量差距有多大。

# coding: utf-8

import redis

client = redis.StrictRedis()
for i in range(100000):
    client.pfadd("codehole", "user%d" % i)
print 100000, client.pfcount("codehole")

Java 版:

public class JedisTest {
  public static void main(String[] args) {
    Jedis jedis = new Jedis();
    for (int i = 0; i < 100000; i++) {
      jedis.pfadd("codehole", "user" + i);
    }
    long total = jedis.pfcount("codehole");
    System.out.printf("%d %d\n", 100000, total);
    jedis.close();
  }
}

跑了约半分钟,我们看输出:

> python pftest.py
100000 99723

差了 277 个,按百分比是 0.277%,对于上面的 UV 统计需求来说,误差率也不算高。然后我们把上面的脚本再跑一边,也就相当于将数据重复加入一边,查看输出,可以发现,pfcount 的结果没有任何改变,还是 99723,说明它确实具备去重功能。

pfadd 这个 pf 是什么意思?

它是 HyperLogLog 这个数据结构的发明人 Philippe Flajolet 的首字母缩写,老师觉得他发型很酷,看起来是个佛系教授。

pfmerge 适合什么场合用?

HyperLogLog 除了上面的 pfadd 和 pfcount 之外,还提供了第三个指令 pfmerge,用于将多个 pf 计数值累加在一起形成一个新的 pf 值。

比如在网站中我们有两个内容差不多的页面,运营说需要这两个页面的数据进行合并。其中页面的 UV 访问量也需要合并,那这个时候 pfmerge 就可以派上用场了。

注意事项

HyperLogLog 这个数据结构不是免费的,不是说使用这个数据结构要花钱,它需要占据一定 12k 的存储空间,所以它不适合统计单个用户相关的数据。如果你的用户上亿,可以算算,这个空间成本是非常惊人的。但是相比 set 存储方案,HyperLogLog 所使用的空间那真是可以使用千斤对比四两来形容了。

不过你也不必过于担心,因为 Redis 对 HyperLogLog 的存储进行了优化,在计数比较小时,它的存储空间采用稀疏矩阵存储,空间占用很小,仅仅在计数慢慢变大,稀疏矩阵占用空间渐渐超过了阈值时才会一次性转变成稠密矩阵,才会占用 12k 的空间。

HyperLogLog 实现原理

HyperLogLog 的使用非常简单,但是实现原理比较复杂,如果读者没有特别的兴趣,下面的内容暂时可以跳过不看。

为了方便理解 HyperLogLog 的内部实现原理,我画了下面这张图

这张图的意思是,给定一系列的随机整数,我们记录下低位连续零位的最大长度 k,通过这个 k 值可以估算出随机数的数量。 首先不问为什么,我们编写代码做一个实验,观察一下随机整数的数量和 k 值的关系。

import math
import random

# 算低位零的个数
def low_zeros(value):
    for i in xrange(1, 32):
        if value >> i << i != value:
            break
    return i - 1


# 通过随机数记录最大的低位零的个数
class BitKeeper(object):

    def __init__(self):
        self.maxbits = 0

    def random(self):
        value = random.randint(0, 2**32-1)
        bits = low_zeros(value)
        if bits > self.maxbits:
            self.maxbits = bits


class Experiment(object):

    def __init__(self, n):
        self.n = n
        self.keeper = BitKeeper()

    def do(self):
        for i in range(self.n):
            self.keeper.random()

    def debug(self):
        print self.n, '%.2f' % math.log(self.n, 2), self.keeper.maxbits


for i in range(1000, 100000, 100):
    exp = Experiment(i)
    exp.do()
    exp.debug()

Java 版:

public class PfTest {

  static class BitKeeper {
    private int maxbits;

    public void random() {
      long value = ThreadLocalRandom.current().nextLong(2L << 32);
      int bits = lowZeros(value);
      if (bits > this.maxbits) {
        this.maxbits = bits;
      }
    }

    private int lowZeros(long value) {
      int i = 1;
      for (; i < 32; i++) {
        if (value >> i << i != value) {
          break;
        }
      }
      return i - 1;
    }
  }

  static class Experiment {
    private int n;
    private BitKeeper keeper;

    public Experiment(int n) {
      this.n = n;
      this.keeper = new BitKeeper();
    }

    public void work() {
      for (int i = 0; i < n; i++) {
        this.keeper.random();
      }
    }

    public void debug() {
      System.out.printf("%d %.2f %d\n", this.n, Math.log(this.n) / Math.log(2), this.keeper.maxbits);
    }
  }

  public static void main(String[] args) {
    for (int i = 1000; i < 100000; i += 100) {
      Experiment exp = new Experiment(i);
      exp.work();
      exp.debug();
    }
  }

}

运行观察输出:

36400 15.15 13
36500 15.16 16
36600 15.16 13
36700 15.16 14
36800 15.17 15
36900 15.17 18
37000 15.18 16
37100 15.18 15
37200 15.18 13
37300 15.19 14
37400 15.19 16
37500 15.19 14
37600 15.20 15

通过这实验可以发现 K 和 N 的对数之间存在显著的线性相关性:

N=2^K  # 约等于

如果 N 介于 2^K 和 2^(K+1) 之间,用这种方式估计的值都等于 2^K,这明显是不合理的。这里可以采用多个 BitKeeper,然后进行加权估计,就可以得到一个比较准确的值。

import math
import random

def low_zeros(value):
    for i in xrange(1, 32):
        if value >> i << i != value:
            break
    return i - 1


class BitKeeper(object):

    def __init__(self):
        self.maxbits = 0

    def random(self, m):
        bits = low_zeros(m)
        if bits > self.maxbits:
            self.maxbits = bits


class Experiment(object):

    def __init__(self, n, k=1024):
        self.n = n
        self.k = k
        self.keepers = [BitKeeper() for i in range(k)]

    def do(self):
        for i in range(self.n):
            m = random.randint(0, 1<<32-1)
            # 确保同一个整数被分配到同一个桶里面,摘取高位后取模
            keeper = self.keepers[((m & 0xfff0000) >> 16) % len(self.keepers)]
            keeper.random(m)

    def estimate(self):
        sumbits_inverse = 0  # 零位数倒数
        for keeper in self.keepers:
            sumbits_inverse += 1.0/float(keeper.maxbits)
        avgbits = float(self.k)/sumbits_inverse  # 平均零位数
        return 2**avgbits * self.k  # 根据桶的数量对估计值进行放大


for i in range(100000, 1000000, 100000):
    exp = Experiment(i)
    exp.do()
    est = exp.estimate()
    print i, '%.2f' % est, '%.2f' % (abs(est-i) / i)

下面是 Java 版:

public class PfTest {

  static class BitKeeper {
    private int maxbits;

    public void random(long value) {
      int bits = lowZeros(value);
      if (bits > this.maxbits) {
        this.maxbits = bits;
      }
    }

    private int lowZeros(long value) {
      int i = 1;
      for (; i < 32; i++) {
        if (value >> i << i != value) {
          break;
        }
      }
      return i - 1;
    }
  }

  static class Experiment {
    private int n;
    private int k;
    private BitKeeper[] keepers;

    public Experiment(int n) {
      this(n, 1024);
    }

    public Experiment(int n, int k) {
      this.n = n;
      this.k = k;
      this.keepers = new BitKeeper[k];
      for (int i = 0; i < k; i++) {
        this.keepers[i] = new BitKeeper();
      }
    }

    public void work() {
      for (int i = 0; i < this.n; i++) {
        long m = ThreadLocalRandom.current().nextLong(1L << 32);
        BitKeeper keeper = keepers[(int) (((m & 0xfff0000) >> 16) % keepers.length)];
        keeper.random(m);
      }
    }

    public double estimate() {
      double sumbitsInverse = 0.0;
      for (BitKeeper keeper : keepers) {
        sumbitsInverse += 1.0 / (float) keeper.maxbits;
      }
      double avgBits = (float) keepers.length / sumbitsInverse;
      return Math.pow(2, avgBits) * this.k;
    }
  }

  public static void main(String[] args) {
    for (int i = 100000; i < 1000000; i += 100000) {
      Experiment exp = new Experiment(i);
      exp.work();
      double est = exp.estimate();
      System.out.printf("%d %.2f %.2f\n", i, est, Math.abs(est - i) / i);
    }
  }

}

代码中分了 1024 个桶,计算平均数使用了调和平均 (倒数的平均)。普通的平均法可能因为个别离群值对平均结果产生较大的影响,调和平均可以有效平滑离群值的影响。

观察脚本的输出,误差率控制在百分比个位数:

100000 97287.38 0.03
200000 189369.02 0.05
300000 287770.04 0.04
400000 401233.52 0.00
500000 491704.97 0.02
600000 604233.92 0.01
700000 721127.67 0.03
800000 832308.12 0.04
900000 870954.86 0.03
1000000 1075497.64 0.08

真实的 HyperLogLog 要比上面的示例代码更加复杂一些,也更加精确一些。上面的这个算法在随机次数很少的情况下会出现除零错误,因为 maxbits=0 是不可以求倒数的。

pf 的内存占用为什么是 12k?

我们在上面的算法中使用了 1024 个桶进行独立计数,不过在 Redis 的 HyperLogLog 实现中用到的是 16384 个桶,也就是 2^14,每个桶的 maxbits 需要 6 个 bits 来存储,最大可以表示 maxbits=63,于是总共占用内存就是2^14 * 6 / 8 = 12k字节。

思考 & 作业

尝试将一堆数据进行分组,分别进行计数,再使用 pfmerge 合并到一起,观察 pfcount 计数值,与不分组的情况下的统计结果进行比较,观察有没有差异。

扩展阅读

  • HyperLogLog 复杂的公式推导请阅读 Count-Distinct Problem,如果你的概率论基础不好,那就建议不要看了(另,这个 PPT 需要翻墙观看)。
留言
Ctrl + Enter
全部评论(111)
BinK_1783的头像
删除
遗憾的不是人,总是时间
点赞
回复
淦饭人的头像
删除
www.yuque.com
有能力的可以看下,我先跑了
1
回复
luoluocaihong的头像
删除
开发经理
虽然大学学的数学专业,但还是直接跳过算法部分了....因为都忘光光了...
急于快速的浏览下整本小册
点赞
回复
optional的头像
删除
开发狗 @ haha
bit和hyperloglog基本都有专业的的买点工具了
点赞
回复
羅小生的头像
删除
用嘴编程 @ 日本ストラタス
HyperLogLog可以用于海量数据的近似去重统计,误差在个位数,主要提供了三个命令:pfadd key value,pfcount key,pfmerage oldkey1 oldkey2 newkey。HyperLogLog数据量大的时候,会消耗固定的12k内存资源。
1
1
删除
pfmerge的指令应该是:
pfmerge newkey oldkey1 oldkey2
点赞
回复
晴了个天的头像
删除
Java
默默点击了下一章,跳过算法部分。。。。。
点赞
回复
亽閑屁亊哆的头像
删除
Java Siege Lion @ IT Zoo
HyperLogLog用于统计,误差在0.81%,可以去重,合并去重
点赞
回复
乔同志40508的头像
删除
留言记录下:所以HyperLogLog数据结构使用的场景为,面对海量的数据,如果要做精确度要求不高的去重统计,则推荐使用HyperLogLog,而不是set。因为HyperLogLog总共只占用12K的内存。而set会随着数据增长而线性增长。
2
回复
兵长爱学习的头像
删除
TouchFish Engineer @ 加里敦科技股份有限公司
blog.csdn.net 评论中之前推荐的链接挂了,这是个转载的链接
点赞
回复
嘿嘿嘿hhh的头像
删除
java @ 同程艺龙
pfmerge 合并了怎么获取这个合并的数据值,我看redisTemplate中 源码中是
public Long union(K destination, K... sourceKeys) {
final byte[] rawDestinationKey = this.rawKey(destination);
final byte[][] rawSourceKeys = this.rawKeys(Arrays.asList(sourceKeys));
return (Long)this.execute(new RedisCallback() {
public Long doInRedis(RedisConnection connection) throws DataAccessException {
connection.pfMerge(rawDestinationKey, rawSourceKeys);
return connection.pfCount(new byte[][]{rawDestinationKey});
}
}, true);
}
命令中如何获取合并后的值呢?请问
展开
1
1
删除
PFMERGE c3 c1 c2 合并计算 c1 c2的值,然后赋给c3 key 然后通过pfcount c3可以输出,另外pfmerge 合并的是c1 和 c2的并集。
点赞
回复
刚锅儿的头像
删除
知道统计可以用这个就好了,算法对我来说太深奥了
1
回复
timesculptor的头像
删除
在讲实现原理的时候仅仅说明了随机数的数量和连续0的个数K之间的关系,可以结合真实的实现讲一下吗,谢谢
点赞
回复
野猪佩奇酱的头像
删除
跳过
8
回复
怪兽的实验室的头像
删除
具体怎么用呢?
点赞
回复
林冠宏_指尖下的幽灵的头像
删除
《区块链以太坊DApp开发实战》作者 @ -
说得不行,没到点上
23
回复
鱼丸粗面不想说话的头像
删除
java开发 @ 保密
自己怎么实现合并
点赞
1
删除
PFMERGE destkey sourcekey [sourcekey ...]
将旧的统计数据sourcekey合并为新的destkey
点赞
回复
Ta'的头像
删除
我测试跑1000w数据 , pfcount之后有1005w , 误差还会向上的咩 ?
点赞
1
删除
好像是哎。我跑了1000条count1004
点赞
回复
Share猿的头像
删除
如果用户id是连续的,也可以用bitmap来统计UV,时间+网页标示作为key,用户id为偏移量
点赞
回复
void重名了的头像
删除
请问代码中的“0xfff0000”是否应该是“0xffff0000”4个f?另外最后调和平均的举例图中第二行的101应该为104
点赞
回复
无声的告别的头像
删除
这说的也太伤自尊了吧,什么叫概率论基础不好,那就建议不要看了
点赞
回复

查看全部 111 条回复