您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

Redis实现延迟队列的方法是什么

2025/11/1 13:40:55发布31次查看
1、前言1.1、什么是延迟队列延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。延迟队列的结构更像以时间为权重的有序堆结构,而不是传统的队列。
1.2、应用场景在一些业务场景中,我们常常会遇到需要在特定时间节点到达或经过一段时间后才能执行的功能。就比如以下这些场景:
新建一个订单,在规定时间内未支付需要自动取消 外卖或者打车在预计时间到达的前十分钟提醒骑手或者司机即将超时 快递收货后在规定时间内用户没有确认收货会自动确认收货 预定的会议在会议开始前十分钟会去提醒你尽快加入会议 每日周报在截止半小时前会提醒你尽快提交
1.3、为什么要使用延迟队列对于一些数据量小并且对数据的时效性不怎么要求的项目来说,最简单有效的方法就是写一个定时任务去扫描数据库以达到业务的实现。当数据量达到数百万或千万级别时,如果定期扫描数据库,很容易遭受打击。想信大家也有所了解,当数据达到这种地步的时候,还去定时扫表会非常低效,甚至对于那些定时间隔比较小的情景来说,这一遍还没扫完下一遍就要开始了。这时候如果用延迟队列的话或许会很有效。
实现延迟队列的几种途径
quartz 定时任务
delayqueue 延迟队列
redis sorted set redis
过期键监听回调
rabbitmq死信队列
rabbitmq基于插件实现延迟队列
wheel时间轮算法
2、redis sorted set在redis中,zet作为有序集合,可以利用其有序的特性,将任务添加到zset中,将任务的到期时间作为score,利用zset的默认有序特性,获取score值最小的元素(也就是最近到期的任务),判断系统时间与该任务的到期时间大小,如果达到到期时间,就执行业务,并删除该到期任务,继续判断下一个元素,如果没有到期,就sleep一段时间(比如1秒),如果集合为空,也sleep一段时间。
通过zadd命令向队列delayqueue中添加元素,并设置score值表示元素过期的时间;向delayqueue添加三个order1、order2、order3,分别是10秒、20秒、30秒后过期。
zadd delayqueue 3 order3
消费端轮询队列delayqueue,将元素排序后取最小时间与当前时间比对,如小于当前时间代表已经过期移除key。
/** * 消费消息 */public void pollorderqueue() { while (true) { set<tuple> set = jedis.zrangewithscores(delay_queue, 0, 0); string value = ((tuple) set.toarray()[0]).getelement(); int score = (int) ((tuple) set.toarray()[0]).getscore(); calendar cal = calendar.getinstance(); int nowsecond = (int) (cal.gettimeinmillis() / 1000); if (nowsecond >= score) { jedis.zrem(delay_queue, value); system.out.println(sdf.format(new date()) + " removed key:" + value); } if (jedis.zcard(delay_queue) <= 0) { system.out.println(sdf.format(new date()) + " zset empty "); return; } thread.sleep(1000); }}
我们看到执行结果符合预期:
2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty
3、redis 过期键监听回调redis的key过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback事件。
要启用notify-keyspace-events ex,需要编辑redis.conf文件。 notify-keyspace-events ex
redis监听配置,注入bean redismessagelistenercontainer。
其次,配置redis监听器 最后,编写redis key过期监听回调方法
@configurationpublic class redislistenerconfig {@beanredismessagelistenercontainer container(redisconnectionfactory connectionfactory) { redismessagelistenercontainer container = new redismessagelistenercontainer(); container.setconnectionfactory(connectionfactory); return container; }}
编写redis过期回调监听方法,必须继承keyexpirationeventmessagelistener ,有点类似于mq的消息监听。
@componentpublic class rediskeyexpirationlistener extends keyexpirationeventmessagelistener {public rediskeyexpirationlistener(redismessagelistenercontainer listenercontainer) { super(listenercontainer);}@overridepublic void onmessage(message message, byte[] pattern) { string expiredkey = message.tostring(); system.out.println("监听到key:" + expiredkey + "已过期"); }}
到这代码就编写完成,非常的简单,接下来测试一下效果,在redis-cli客户端添加一个key并给定3s的过期时间。
set xiaofu 123 ex 3
在控制台成功监听到了这个过期的key。
监听到过期的key为:xiaofu
4、quartz定时任务quartz一款非常经典任务调度框架,在redis、rabbitmq还未广泛应用时,超时未支付取消订单功能都是由定时任务实现的。
导入quartz依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-quartz</artifactid></dependency>>在启动类中使用@enablescheduling注解开启定时任务功能。```java@springbootapplication@enableschedulingpublic class delayqueueapplication { public static void main(string[] args) { springapplication.run(delayqueueapplication.class, args); }}
编写定时任务
@slf4j@componentpublic class quartzdemo { /** * 每隔五秒开启一次任务 */ @scheduled(cron = "0/5 * * * * ? ") public void process(){ log.info("--------------定时任务测试--------------"); }}
5、delayqueue 延迟队列jdk中提供了一组实现延迟队列的api,位于java.util.concurrent包下delayqueue。
delayqueue是一个blockingqueue(无界阻塞)队列,它本质就是封装了一个priorityqueue(优先队列),priorityqueue内部使用完全二叉堆(不知道的自行了解哈)来实现队列元素排序,我们在向delayqueue队列中添加元素时,会给元素一个delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了delay时间才允许从队列中取出。队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。 先简单实现一下看看效果,添加三个order入队delayqueue,分别设置订单在当前时间的5秒、10秒、15秒后取消。
要实现delayqueue延时队列,队中元素要implements delayed 接口,这哥接口里只有一个getdelay方法,用于设置延期时间。在order类中,compareto方法的作用是对队列中的元素进行排列。
public class order implements delayed {/** * 延迟时间 */@jsonformat(locale = "zh", timezone = "gmt+8", pattern = "yyyy-mm-dd hh:mm:ss")private long time;string name;public order(string name, long time, timeunit unit) { this.name = name; this.time = system.currenttimemillis() + (time > 0 ? unit.tomillis(time) : 0);}@overridepublic long getdelay(timeunit unit) { return time - system.currenttimemillis();}@overridepublic int compareto(delayed o) { order order = (order) o; long diff = this.time - order.time; if (diff <= 0) { return -1; } else { return 1; }}}
delayqueue的put方法是线程安全的,因为put方法内部使用了reentrantlock锁进行线程同步。delayqueue还提供了两种出队的方法poll()和take() , poll()为非阻塞获取,没有到期的元素直接返回null;take()阻塞方式获取,没有到期的元素线程将会等待。
public class delayqueuedemo {public static void main(string[] args) throws interruptedexception { order order1 = new order("order1", 5, timeunit.seconds); order order2 = new order("order2", 10, timeunit.seconds); order order3 = new order("order3", 15, timeunit.seconds); delayqueue<order> delayqueue = new delayqueue<>(); delayqueue.put(order1); delayqueue.put(order2); delayqueue.put(order3); system.out.println("订单延迟队列开始时间:" + localdatetime.now().format(datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss"))); while (delayqueue.size() != 0) { /** * 取队列头部元素是否过期 */ order task = delayqueue.poll(); if (task != null) { system.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name, localdatetime.now().format(datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss"))); } thread.sleep(1000); }}}
上边只是简单的实现入队与出队的操作,实际开发中会有专门的线程,负责消息的入队与消费。
执行后看到结果如下,order1、order2、order3 分别在 5秒、10秒、15秒后被执行,至此就用delayqueue实现了延时队列。
订单延迟队列开始时间:2020-05-06 14:59:09
订单:{order1}被取消, 取消时间:{2020-05-06 14:59:14}
订单:{order2}被取消, 取消时间:{2020-05-06 14:59:19}
订单:{order3}被取消, 取消时间:{2020-05-06 14:59:24}
6、rabbitmq 延时队列利用 rabbitmq 做延时队列是比较常见的一种方式,而实际上rabbitmq 自身并没有直接支持提供延迟队列功能,而是通过 rabbitmq 消息队列的 ttl和 dxl这两个属性间接实现的。
先来认识一下 ttl和 dxl两个概念:
time to live(ttl) :
ttl 顾名思义:指的是消息的存活时间,rabbitmq可以通过x-message-tt参数来设置指定queue(队列)和 message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。
rabbitmq 可以从两种维度设置消息过期时间,分别是队列和消息本身
设置队列过期时间,那么队列中所有消息都具有相同的过期时间。可以在队列中为每条消息单独设置过期时间,即使每个消息的ttl不同也可以实现。若队列和队列中消息的ttl同时被设置,则ttl的值以两者中较小的那个为准。如果队列中的消息存储时间超过了预设的ttl过期时间,那么它就会变成dead letter(死信)。
dead letter exchanges(dlx)
dlx即死信交换机,绑定在死信交换机上的即死信队列。rabbitmq的 queue(队列)可以配置两个参数x-dead-letter-exchange 和 x-dead-letter-routing-key(可选),一旦队列内出现了dead letter(死信),则按照这两个参数可以将消息重新路由到另一个exchange(交换机),让消息重新被消费。
x-dead-letter-exchange:队列中出现dead letter后将dead letter重新路由转发到指定 exchange(交换机)。
x-dead-letter-routing-key:指定routing-key发送,一般为要指定转发的队列。
队列出现dead letter的情况有:
消息或者队列的ttl过期
队列达到最大长度
消息被消费端拒绝(basic.reject or basic.nack)
下边结合一张图看看如何实现超30分钟未支付关单功能,我们将订单消息a0001发送到延迟队列order.delay.queue,并设置x-message-tt消息存活时间为30分钟,当到达30分钟后订单消息a0001成为了dead letter(死信),延迟队列检测到有死信,通过配置x-dead-letter-exchange,将死信重新转发到能正常消费的关单队列,直接监听关单队列处理关单逻辑即可。
发送消息时指定消息延迟的时间
public void send(string delaytimes) { amqptemplate.convertandsend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> { // 设置延迟毫秒值 message.getmessageproperties().setexpiration(string.valueof(delaytimes)); return message; }); }}
设置延迟队列出现死信后的转发规则
/** * 延时队列 */ @bean(name = "order.delay.queue") public queue getmessagequeue() { return queuebuilder .durable(rabbitconstant.dead_letter_queue) // 配置到期后转发的交换 .withargument("x-dead-letter-exchange", "order.close.exchange") // 配置到期后转发的路由键 .withargument("x-dead-letter-routing-key", "order.close.queue") .build(); }
7、时间轮前面几种实现延迟队列的方法相对简单,比较易于理解。相比之下,时间轮算法稍微有点抽象。kafka、netty都有基于时间轮算法实现延时队列,下边主要实践netty的延时队列讲一下时间轮是什么原理。
先来看一张时间轮的原理图,解读一下时间轮的几个基本概念
wheel :时间轮,图中的圆盘可以看作是钟表的刻度。举个例子,如果一圈round的长度为24秒,共分成8个刻度,那么每个刻度代表3秒。那么时间精度就是 3秒。时间长度 / 刻度数值越大,精度越大。
当添加一个定时、延时任务a,假如会延迟25秒后才会执行,可时间轮一圈round 的长度才24秒,那么此时会根据时间轮长度和刻度得到一个圈数 round和对应的指针位置 index,也是就任务a会绕一圈指向0格子上,此时时间轮会记录该任务的round和 index信息。指针处于0格,当 round=0 且 index=0 时不会执行任务a,因为 round=0 不符合条件。
所以每一个格子代表的是一些时间,比如1秒和25秒 都会指向0格子上,而任务则放在每个格子对应的链表中,这点和hashmap的数据有些类似。
netty构建延时队列主要用hashedwheeltimer,hashedwheeltimer底层数据结构依然是使用delayedqueue,只是采用时间轮的算法来实现。
下面我们用netty 简单实现延时队列,hashedwheeltimer构造函数比较多,解释一下各参数的含义。
threadfactory :表示用于生成工作线程,一般采用线程池;
tickduration和unit:每格的时间间隔,默认100ms;
ticksperwheel:一圈下来有几格,默认512,而如果传入数值的不是2的n次方,则会调整为大于等于该参数的一个2的n次方数值,有利于优化hash值的计算。
public hashedwheeltimer(threadfactory threadfactory, long tickduration, timeunit unit, int ticksperwheel) { this(threadfactory, tickduration, unit, ticksperwheel, true); }
timertask:一个定时任务的实现接口,其中run方法包装了定时任务的逻辑。
timeout:一个定时任务提交到timer之后返回的句柄,通过这个句柄外部可以取消这个定时任务,并对定时任务的状态进行一些基本的判断。 timer:是hashedwheeltimer实现的父接口,仅定义了如何提交定时任务和如何停止整个定时机制。
public class nettydelayqueue { public static void main(string[] args) { final timer timer = new hashedwheeltimer(executors.defaultthreadfactory(), 5, timeunit.seconds, 2); //定时任务 timertask task1 = new timertask() { public void run(timeout timeout) throws exception { system.out.println("order1 5s 后执行 "); timer.newtimeout(this, 5, timeunit.seconds);//结束时候再次注册 } }; timer.newtimeout(task1, 5, timeunit.seconds); timertask task2 = new timertask() { public void run(timeout timeout) throws exception { system.out.println("order2 10s 后执行"); timer.newtimeout(this, 10, timeunit.seconds);//结束时候再注册 } }; timer.newtimeout(task2, 10, timeunit.seconds); //延迟任务 timer.newtimeout(new timertask() { public void run(timeout timeout) throws exception { system.out.println("order3 15s 后执行一次"); } }, 15, timeunit.seconds); }}
从执行的结果看,order3、order3延时任务只执行了一次,而order2、order1为定时任务,按照不同的周期重复执行。
order1 5s 后执行
order2 10s 后执行
order3 15s 后执行一次
order1 5s 后执行
order2 10s 后执行
以上就是redis实现延迟队列的方法是什么的详细内容。
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product