学无先后,达者为师

网站首页 编程语言 正文

定时任务和延时任务详解

作者:望京小哥 更新时间: 2022-07-22 编程语言

场景

1、订单成功后,在30分钟内没有支付,自动取消订单

2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。

3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存

4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

解决方案

1、DelayQueue 延时队列

代码dome

package com.study.base.delayMessage;

import org.apache.commons.lang3.time.DateFormatUtils;

import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueue_JDKVesion {

    // data  prepare 
    static class DalayData implements Delayed{

        private long times;

        public DalayData(long times){
            this.times = times;
        }

        /**
         * 需实现最低维度为:TimeUnit.NANOSECONDS 级别的剩余时间
         *
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return times - System.nanoTime();
        }

        @Override
        public int compareTo(Delayed o) {
            DalayData od = (DalayData) o;
            return this.times - od.times < -1L ? -1 : this.times - od.times == 0 ? 0 : 1;
        }
    }

    // test
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DalayData> delayQueue = new DelayQueue();
        delayQueue.add(new DalayData(System.nanoTime()));
        delayQueue.add(new DalayData(System.nanoTime() + 5L * 1000 * 1000 * 1000));
        delayQueue.add(new DalayData(System.nanoTime() + 10L * 1000 * 1000 * 1000));
        delayQueue.add(new DalayData(System.nanoTime() + 15L * 1000 * 1000 * 1000));
        delayQueue.add(new DalayData(System.nanoTime() + 20L * 1000 * 1000 * 1000));
        delayQueue.add(new DalayData(System.nanoTime() + 25L * 1000 * 1000 * 1000));

        while (true){
            DalayData dalayData = delayQueue.take();
            System.out.println(DateFormatUtils.format(new Date(), DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT.getPattern()));
        }
    }




}

流程分析

准备数据列队数据 implements Delayed

(列队为优先级列表,到期的放前面),核心方法:delayQueue.take() 阻塞式获取数据

  • 若队列没有元素,进入条件队列Condition,wait方法,线程park挂起

  • 若队列有元素,获取队列peek首个数据,若此个元素时间<=0,poll处理,若>0,加入条件队列Condition,休息awaitNanos(delay)

  • 若此时新增数据是最小的元素,条件队列Condition唤醒第一个线程,来尝试获取数据

流程图

image-20220721192513171

优缺点

总体看,没啥缺点,每个节点,环节,都无所谓的性能浪费

场景限定:jvm中,可能数据流失

2、定时任务 + mysql

优缺点

解决单jvm问题

性能不高

适合小数据量

定时任务 + redis(zset)

优缺点

Queue放在公共节点上,解决单jvm问题

若数据不多,浪费节点对queue空拉取

时效误差,定时任务间隔

相对(定时任务 + mysql)原理一样,体现redis 和 mysql 特性和性能问题

3、netty 时间轮延迟队列

代码dome

@Test
    @org.junit.jupiter.api.Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
    public void testTimerOverflowWheelLength() throws InterruptedException {
        final HashedWheelTimer timer = new HashedWheelTimer(
            Executors.defaultThreadFactory(), 100, TimeUnit.MILLISECONDS, 32);
        final CountDownLatch latch = new CountDownLatch(3);

        timer.newTimeout(new TimerTask() {
            @Override
            public void run(final Timeout timeout) throws Exception {
                timer.newTimeout(this, 100, TimeUnit.MILLISECONDS);
                latch.countDown();
            }
        }, 100, TimeUnit.MILLISECONDS);

        latch.await();
        assertFalse(timer.stop().isEmpty());
    }

流程分析

创建HashedWheelTimer,核心4个元素(任务thread工厂,间隔时间,间隔时间单位,数组长度)

addTimeOut(添加任务),(若这是第一个任务,HashedWheelTimer还未启动,任务启动,懒加载的方式启动)

HashedWheelTimer 任务启动后,有个work线程,一直轮询时间轮的数据,获取到expire任务放到线程池处理

流程图

image-20220721192341309

优缺点

和jdk 的DelayQueue 比较类似,差别不大,设计思路和netty处理reactor网络模型主从模式思路比较类似

  • 数据结构: 数据 + 链表 vs 完全二叉树 (查询,新增,效率要高点)

  • 获取任务的方式:固定时间轮询到下个节点 vs 完全准时,无时差 (没有jdkDelayQueue 效率高,有数据执行误差,即间隔时间)

3、rocketMQ官方 延迟消息

代码dome

public class ScheduledMessageProducer {
    
     public static void main(String[] args) throws Exception {
         // Instantiate a producer to send scheduled messages
         DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
         // Launch producer
         producer.start();
         int totalMessagesToSend = 100;
         for (int i = 0; i < totalMessagesToSend; i++) {
             Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
             // This message will be delivered to consumer 10 seconds later.
             message.setDelayTimeLevel(3);
             // Send the message
             producer.send(message);
         }
         // Shutdown producer after use.
         producer.shutdown();
     }
        
 }

流程分析

定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:

  • level == 0,消息为非延迟消息

  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s

  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

细节待更新。。。

3、中间件延迟消息【neptune】

RocketMQ 迁移,延迟消息升级指南

neptune是基于rocketmq的延迟消息服务,主要为解决rocketmq无法发送任意时间段的消息。

  1. 支持任意时间(看硬盘情况。默认90天)、精确到秒的的延迟消息。

  2. grpc的外部API,可用于直接存储延迟消息,主要用于发送rocketmq失败后异步的重试. 消息查询等。

  3. 存储基于rocksdb

  4. 支持HA,主背复制,主背切换

  5. 监控指标直接report到influxdb和log中,采用micrometer,可以很方便到report到别的时序数据库中

推荐使用RocketMQ的延时队列,功能强大,但是也要看具体场景

原文链接:https://blog.csdn.net/leige07112033/article/details/125919093

栏目分类
最近更新