Redis - 时间序列数据类型的保存方案和消息队列实现
- 一. 用 Redis 保存时间序列类型数据方案
- 1.1 内存和范围查找的支持性问题
- 1.2 聚合操作的支持性问题(仅供参考)
- 二. 用 Redis 实现消息队列
- 2.1 消息保序的实现
- 2.2 重复消费问题解决
- 2.3 消息可靠性保证
- 2.4 Redis 做中间件的优劣势
一. 用 Redis 保存时间序列类型数据方案
我们日常开发中,有很多这种类似的场景,记录某一个时刻下,某个目标的相关属性或者状态。 那么常规的,我们可以用时间戳作为Key
,而这个目标的相关属性我们可以将其转化为JSON
串或者通过字符串拼接的方式来保存为String
类型,然后作为Value
。
但是,这样的存储是针对于一个时间戳而言的,而实际环境中,往往需要记录非常多的这样的数据,甚至可能需要对这种数据进行统计、聚合、范围查找等操作。那么可想而知,Redis
中的String
类型,虽然可以提供存储功能,但是却难以提供统计、聚合、范围查找等这些复杂操作。 除此之外,String
类型我在String内存开销问题以及基本/扩展数据类型的使用这篇文章有提到,如果存储的数据量太大,那么内存的占用是非常庞大的。耗内存。
同时用时间戳作为Key
的数据,往往也有以下特点:
- 数据的插入比较频繁。
- 读操作的查询模式的种类比较多(统计、聚合…)。
1.1 内存和范围查找的支持性问题
场景:按照一定的时间间隔记录某一个设备集群中每台机器的温度。
针对内存占用问题,我们可以选择Hash
类型去替代String
。
set 1659947381000 31
set 1659947392000 35
set 1659947413000 28
set temperature 1659947381000 31 1659947392000 35 1659947413000 28
虽然Hash
结构弥补了String
类型在内存开销上的短板。但是仅仅这样是无法满足数据的一些范围查询或者操作的。
那么针对范围查找问题:因此我们可以再使用Sorted Set
去保存相同的一份数据。这里做个解释,为何要同时用两个数据结构来存储相同的数据:
-
Hash
结构:用来提供单值查询。
-
Sorted Set
:提供范围查询。虽然将范围的前后指定为一个相同的值,看起来像是单个值查询,但是本质上依旧是范围查询,效率不如人家Hash
来的快。
那么此时就可以用这样的命令来进行查找:
zrangebyscore temperature 1659947381000 1659947413000
此外,我们既然使用了两种数据结构来保存相同的数据,就应该保证数据一致性。我们应该保证两个数据结构中的数据是完全一样的,不能出现哪个结构中的数据有少或者不一致的情况。因此我们可以在进行数据插入的时候,保障两个操作的原子性。即使用简单的事务操作:
-
multi
:事务开始。之后的操作将会放入一个队列中,而不会真正的去执行。
-
exec
:事务结束,开始执行队列中的一系列命令。
例如:
multi
hset temperature 1659947381000 25
zadd temperature 1659947381000 25
exec
那么Java
对应的操作就是:
Transaction multi = jedis.multi();
multi.hset("temperature", "1659947381000", "25");
System.out.println("事务执行中:hash:" + multi.hget("temperature", "1659947381000"));
multi.zadd("temperatureZSet", 1659947381000L, "25");
System.out.println("事务执行中:sorted set:" + multi.zrangeByScore("temperatureZSet", 1659947381000L, 1659947381000L));
multi.exec();
System.out.println("**************事务执行完毕******************");
System.out.println("事务执行中:hash:" + jedis.hget("temperature", "1659947381000"));
System.out.println("事务执行中:sorted set:" + jedis.zrangeByScore("temperatureZSet", 1659947341000L, 1659947392000L));
注意:
hash
和sorted set
两个集合使用的key
不能是同一个。
- 并且事务中不能使用普通的
jedis
对象。multi
对象拥有和jedis
对象同样的API
操作。因为Redis
中事务开启后,执行的操作是放到队列中的,并不是马上执行的,因此需要做区分。
结果如下:

到这里,内存问题和范围查找的问题已经解决了。虽然我们用了两个数据类型来保存相同的一份数据,但是整体的内存消耗,是比全部用String
类型存储要节省的。那么接下来要解决的,就是聚合操作问题。
备注:
- 到这里为止,如果业务上只涉及到时序的范围查找,是可以同时用
Hash
和Sorted Set
去替代传统的String
的。如果仅仅限于此,我个人建议1.2节可以不看。
-
Redis
中对于事务的使用,在文章中提到的原子性问题也是有一定缺陷的。因为Redis
中的事务并不像Mysql
那样,倘若在一个事务中,先后执行了A和B操作,但是在执行C操作的时候发生了错误,A和B的操作是不会回滚的。
-
Redis
主要还是拿来做缓存比较多,这种专门的时序数据处理最好交给专门的时序数据库处理,例如influxDB
。
- 1.2节内容仅供参考,并且实用性和实际操作起来是否简单这个问题上,有待商榷,因为并不容易实现。(至少我写这篇文章的时候,关于
RedisTimeSeries
的JavaAPI
操作没有找到)
1.2 聚合操作的支持性问题(仅供参考)
首先,我们当然可以在客户端将相关的数据全部读取过来,然后再客户端自行完成聚合操作。但是倘若有这么几个点:
- 数据量很大。
- 聚合操作的频率很高。
那么这种情况下,就会有很多请求(包含了大量数据)在Redis
和客户端之间来回穿梭,就会造成资源的竞争,降低Redis
的性能。
那么针对聚合操作问题:我们可以使用RedisTimeSeries
,它支持在Redis
实例上对时间维度进行聚合计算。
但是使用这个,却比较麻烦,需要了解这么几个点:
-
RedisTimeSeries
是Redis
的扩展模块,原生Redis
并不支持。
- 使用的时候需要将
Redis
源码单独编译成动态链接库 redistimeseries.so
,再使用 loadmodule
命令进行加载。
loadmodule redistimeseries.so
那么针对上述的聚合场景,使用RedisTimeSeries
的大致流程如下:
ts.create temperature retention 800000 labels uid 1
ts.add temperature 1659947381000 25
ts.get temperature
ts.mget FILTER uid = xxx
ts.range temperature 1659947371000 1659947381000 AGGREGATION avg 180000
二. 用 Redis 实现消息队列
前言:Redis
是可以做消息队列的,但是对于一些不允许出现消息丢失的情形下,例如金融支付操作。不要用Redis
作为中间件,请使用专门的中间件去做存储。例如Kafka、ActiveMQ、RabbitMQ
等。具体原因下面分析。
首先消息队列需要解决三个问题:
- 消息保序。
- 消息的重复消费问题。
- 消息的可靠性保证。
2.1 消息保序的实现
那么如何用Redis
作为消息队列呢?利用Redis
中的List
数据结构。
-
List
这个数据结构本身就是FIFO
,先进先出的顺序对数据进行存储的。
- 实际操作上,生产者通过
lpush
命令将数据写入List
中。消费者端则通过rpop
命令将其弹出。
这是一般的操作。但是光凭这样的操作并不满足一个合格的消息中间件具备的条件。因为在生产者向Redis
中写入数据的时候,Redis
并不会主动地通知消费者有新消息写入了。此时消费者只能通过这样的伪代码来实现轮询:
while(true){
String json = jedis.rpop('key');
process(json);
}
问题:这样的无限循环,会导致CPU
一直消耗在这里执行rpop
命令。造成性能损失。
解决:建议使用brpop
命令,即阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。
2.2 重复消费问题解决
对于消息的重复消费问题,我们只需要提供一个唯一标识,然后消费的时候做判断即可。
- 生产者端:发送消息的时候,给消息里面塞一个唯一标识。
- 消费者端:将消费完成的消息的唯一标识记录下来。在后续消费的时候,都要反查一遍先。
lpush key 1000001:title:helloworld
2.3 消息可靠性保证
背景:当消费者程序从Redis
中读取一条消息并做处理,但是还没处理完成的时候就发生了宕机,那么Redis
中这条数据已经被剔除,但这个数据并没有被真正的消费掉。怎么办?
解决:生产者在推消息给Redis
的时候,使用 BRPOPLPUSH
命令,其作用如下:
- 在生产者推消息的时候,
Redis
会把这个消息再插入到另一个 List
留存。
- 这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份
List
中重新读取消息并处理。
综上所述,常规情况下:
- 生产者端使用
BRPOPLPUSH
命令往Redis
中推数据,同时塞入唯一标识。
- 消费者端使用
brpop
命令。防止无限循环调用rpop()
命令。将消费过的消息的唯一标识做数据存储。
- 消费者倘若消费某个消息成功,由于生产者端往两个
List
都插入了数据,此时最好将备份队列中的消息删除,避免备份队列中存储过多过期数据,造成内存浪费。
2.4 Redis 做中间件的优劣势
先来说下Redis
做中间件的优势:
- 用
Redis
作为消息队列,由于Redis
的特性,在内存上操作,因此性能高。
-
API
操作起来非常方便,没有复杂的操作,部署轻量。 Kafka
的操作相比之下就会复杂许多。维护成本也要更高点。
Redis
做中间件的劣势:可能出现数据丢失。 有这么个几个场景:
-
AOF
策略为每秒写盘。该过程为异步,若Redis
发生宕机,会丢失1秒的数据。若改为同步写盘,则会导致性能下降。
- 在主从集群下,倘若写操作的频率非常大,那么主从的数据同步就会存在延迟,那么在进行主从切换的时候,也可能存在数据丢失问题。详细可以看Redis - Redis主从数据一致性和哨兵机制。
- 无法保证数据的完整性,而像Kafka这样的专业中间件,副本等机制保证了数据的可靠性。哪怕集群的某个节点挂掉了,也不会丢失数据。详细可以参考Kafka复习计划 - Kafka基础知识以及集群参方案和参数。