redis消息队列

需求分析

  • 消息保序:消费者需要按照生产者发送消息的顺序来处理消息

  • 处理重复的消息:消费者避免多次处理重复的消息

  • 保证消息可靠性:消费者重启后,可以重新读取消息再次进行处理

基于List

  • LPUSH

把要发送的消息依次写入 List

  • BRPOP

阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列。

  • 消费者程序本身能对重复消息进行判断

消息队列要能给每一个消息提供全局唯一的 ID 号,消费者程序要把已经处理过的消息的 ID 号记录下来。ID号需要生产者程序在发送消息前自行生成,并在LPUSH的时候插入List。

  • BRPOPLPUSH

让消费者程序从一个 List 中读取消息,同时,把这个消息再插入到另一个 List(可以叫作备份 List)留存

缺点:不支持消费组

基于 Streams(Redis 5.0)

Redis Stream有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。

每个 Stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。

同一个消费组可以挂接多个消费者,这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者有一个组内唯一名称。

消费者内部会有个状态变量pending_ids,它记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为PEL。

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
  • XREAD:用于读取消息,可以按 ID 读取数据;
  • XREADGROUP:按消费组形式读取消息
  • XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成。

缺点

在用Redis当作队列或存储数据时,是有可能丢失数据的:AOF同步写盘会降低性能。主从集群切换也可能丢数据。


   转载规则


《redis消息队列》 wangyixin-tom 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
redis6.0 redis6.0
多线程问题单个主线程处理网络请求的速度跟不上底层网络硬件的速度。 优化多个 IO 线程并行处理网络操作,可以提升实例的整体处理性能。 使用单线程执行命令操作,就不用为了保证 Lua 脚本、事务的原子性,额外开发多线程互斥机制了。 具体流程1
2020-12-09
下一篇 
redis事务 redis事务
事务实现基本命令 MULTI 显式地表示一个事务的开启,把这些命令暂存到一个命令队列中 EXEC 实际执行命令队列中的所有命令 DISCARD 主动放弃事务执行,把暂存的命令队列清空(起不到回滚的效果) WATCH 在事务执
2020-11-01
  目录