很多时候我们会有延时处理一个任务的需求,比如说:
- 2个小时后给用户发送短信。
- 15分钟后关闭网络连接。
- 2分钟后再次尝试回调。
下面我们来分别探讨一下几种实现方案:
Java中的DelayQueue
Java中的DelayQueue位于java.util.concurrent包下,本质是由PriorityQueue和BlockingQueue实现的阻塞优先级队列。
放入队列的元素需要实现Delayed接口:
public interface Delayed extends Comparable{ /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit);}
通过实现这个接口,来完成对队列中元素,按照时间延迟先后排序的目的。
从队列中取元素:
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
可以看到,在这段代码里,在第一个元素的延迟时间还没到的情况下:
- 如果当前没有其他线程等待,则阻塞当前线程直到延迟时间。
- 如果有其他线程在等待,则阻塞当前线程。
向队列中放入元素:
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return true * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
在放入元素的时候,会唤醒等待中的读线程。
如果我们不考虑分布式运行和任务持久化的话,Java中的DelayQueue是一个很理想的方案,精巧好用。
但是如果我们需要分布式运行和任务持久化,就需要引入一些外部组件。
使用Redis实现
前文我们看到,可以通过优先级队列来实现延迟队列的功能。
Redis提供了很多数据结构,其中的zset是一种有序的数据结构;我们可以通过Redis中的zset来实现一个延迟队列。
基本的方法就是使用时间戳作为元素的score存入zset。
redis> ZADD delayqueue"messsage"
获取所有已经“就绪”的message,并且删除message。
redis> MULTIredis> ZRANGEBYSCORE delayqueue 0redis> ZREMRANGEBYSCORE delayqueue 0 redis> EXEC
但是这个方案也有一些问题:
Redis事务虽然保证了一致性和隔离性,但是并没有提供回滚功能。消息处理失败是不能被恢复的,如果处理某条消息的线程崩溃或机器宕机,这条未被处理不能被自动的再次处理。
也有考虑过将分为TODO和Doing两条队列:
先从TODO队列中取出任务,放入Doing中,再开始处理;如果停留在Doing队列总过久,则重新放入TODO队列。
但是由于Redis的事务特性,并不能做到完全可靠;并且检查Doing超时的逻辑也略复杂。
那么有没有一个成熟的消息队列可以支持延迟投递消息的功能呢?
答案当然是有的,本文的标题就是使用RabbitMQ实现DelayQueue。
使用RabbitMQ实现
这是RabbitMQ众多隐藏的强大特性中的一个,可以轻松的降低代码的复杂度,实现DelayQueue的功能。
我们需要两个队列,一个用来做主队列,真正的投递消息;另一个用来延迟处理消息。
ConnectionFactory factory = new ConnectionFactory();factory.setHost(host);factory.setPort(port);connection = factory.newConnection();channel = connection.createChannel();channel.queueDeclare("MAIN_QUEUE", true, false, false, null);channel.queueBind("MAIN_QUEUE", "amq.direct", "MAIN_QUEUE");HashMaparguments = new HashMap ();arguments.put("x-dead-letter-exchange", "amq.direct");arguments.put("x-dead-letter-routing-key", "MAIN_QUEUE");channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments);
放入延迟消息:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();AMQP.BasicProperties properties = builder.expiration(String.valueOf(task.getDelayMillis())).deliveryMode(2).build();channel.basicPublish("", "DELAY_QUEUE", properties, SerializationUtils.serialize(task));
而关键点,就在于 x-dead-letter-exchange 和 x-dead-letter-routing-key 两个参数上。这两个参数说明了:消息过期后的处理方式 --> 投递到我们指定的MAIN_QUEUE;然后我们只需要在MAIN_QUEUE中等待消息投递即可。
RabbitMQ本身提供了消息持久化和没有收到ACK的重投递功能,这样我们就可以实现一个高可靠的分布式延迟消息队列了。
PS
上面讲述的RabbitMQ定时任务方案有问题, 中写道:
Caveats
While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired. Hence resources used by such expired messages will not be freed, and they will be counted in queue statistics (e.g. the number of messages in the queue).
per-queue TTL不会有问题,因为快要过期的消息总是在队列的前边;但是如果使用per-message TTL的话,过期的消息有可能会在未过期的消息后边,直到前边的消息过期或者被消费。因为RabbitMQ保证过期的消息一定不会被消费者消费,但是不能保证消息过期就会从队列中移除。
ActiveMQ
ActiveMQ from version 5.4 has an optional persistent scheduler built into the ActiveMQ message broker.
可以支持定时、延迟投递、重复投递和Cron调度。
在配置文件中,启用<broker ... schedulerSupport="true">
选项后即可使用。
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");long delay = 30 * 1000;long period = 10 * 1000;int repeat = 9;message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);producer.send(message);
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");producer.send(message);
PS
由于ActiveMQ采用的是类似于Java中DelayQueue的方式,通过先将消息排序再定时触发的方式来实现延迟消息。在往队列中投递大量(10w+)定时消息之后,ActiveMQ的性能将会变得接近不可用,大量的消息挤压得不到投递。
其他一些可能的实现方式
-
RocketMQ 支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。
-
通过MySQL等数据库记录消息应该被投递的时间,然后循环进行查找,并把当前时间应该投递的消息放入普通的消息队列。