1024programmer Blog Exploration on the Implementation Principle of RocketMQ Delayed Messages

Exploration on the Implementation Principle of RocketMQ Delayed Messages

class=”markdown_views prism-atom-one-dark”>

Since there are several scenarios of using delayed messages in daily development, and the message middleware used in business currently includes rabbitmq and kafka, the support for delayed messages is not ideal.
where

  • The rabbitmq delay message is realized by setting the queue ttl+dead letter exchange
    • Disadvantages: You have to set up two queues every time, one is used to implement the delay, and after the expiration, it will be transferred to the corresponding business queue for consumption through the dead letter exchange.
    • Another: rabbitmq provides a delay plugin, but there are many disadvantages, such as: 1. The startup plugin must either be restarted or introduce a new cluster; 2. It does not support high availability, and the delayed message is only stored in the current broker before sending In the internal database Mnesia of the node, it will not be mirrored and copied; 3. The delay is unreliable, there are a large number of messages or the delay is inaccurate after a long time of use; 4: Large-scale messages are not supported, same as 3; 5: Only ram nodes are supported (Mnesia database exists on disk)
  • The kafka delayed message is implemented by judging whether the message reaches the consumption time at the consumer end, and decides whether to consume it. If the delay time is not reached, the consumption will be suspended.
    • Disadvantage: fixed delay time for a single topic. Additional development is required on the consumer side (in fact, most message queues can do this way of controlling the delay on the consumer side)

Whether it is rabbitmq dead letter or kafka consumer control, basically each topic can only use a fixed delay time. But in reality, there are also some scenarios where the same business scenario uses messages with different delay times:

  • Mandatory hand-in after exam. The prescribed time for different exams is different, and it is impossible to create a new queue for each exam.
  • Some abnormal retry operations. After an operation fails, multiple delay retries with different levels are required. (Although it is also possible to use a local thread, there is still a high possibility of failure when retrying on the same machine with a delay. Therefore, in more critical scenarios, you can use delayed messages and distribute them to other machines for execution.)

So I investigated other message middleware and found that rocketmq seems to support delayed messages. Although there are only 18 fixed delay levels, it is much better than the fixed delayed messages of rabbitmq and kafka. So I started to learn and explore the implementation of rocketmq delayed messages.

Text begins

Learn about rocketmq first

rocketmq architecture

Insert picture description here

The entire structure is shown in the figure, briefly describe:

  • nameServer provides registry services, is responsible for the management of brokers, and the management of topic routing information.
  • brokerServer is mainly responsible for message storage, delivery, query and high availability.
  • After the Producer connects to the nameServer to get the broker information, it sends the information to the corresponding broker.
  • Consumer also first connects to nameServer, queries topic routing information, and then connects to broker to consume messages.

Message storage

Insert picture description here
As shown in the figure, all rocketmq messages are stored in the commitlog, Then ConsumerQueue, as a logical consumption queue, maintains an index of topic messages and records some information about messages in the topic in the commitlog. The storage unit of ConsumeQueue is 8-byte offset + 4-byte size + 8-byte tags hashcode. For delayed messages, the last 8 bytes are used to store the scheduled delivery time of the message.

Then about the use of rocketmq delayed messages

rocketmq only supports fixed 18 levels of delayed messages:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Just setDelayTimeLevel is all you need to send a delayed message, (there is not even a constant related to the delay level…)

 DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup  ");
      // Launch producer
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i  totalMessagesToSend; i++) {
     
          Message message = new Message("TestTopic"  , ("Hello scheduled message" + i).getBytes());
          // This message will be delivered to consumer 10 seconds later.
          message.setDelayTimeLevel(3);
          // Send the message
          producer.send(message);
      }

      // Shutdown producer after use.
      producer.shutdown();
 

Then about the implementation of delayed messages

Look at the flow chart first
Insert picture description here

Then text + code introduction

  • org.apache.rocketmq.store.CommitLog#putMessage
    By viewing the source code of putMessage, we can know that rocketmq finally saves the message into the commitlog , it will first judge whether to delay the message, if the message is delayed, replace the topic with SCHEDULE_TOPIC_XXXX, and store the original topic in message.properties, and then store it in the specified queue according to the delay level. (18 delay levels correspond to 18 queue ids respectively).

    ...
         // Delay Delivery
         if (msg.getDelayTimeLevel()  0) {
           
             if (msg.getDelayTimeLevel()  this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel())   {
           
                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService  ().getMaxDelayLevel());
             }
    
             topic = ScheduleMessageService.SCHEDULE_TOPIC;
             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel  span>());
    
             // Backup real topic, queueId
             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.  getQueueId()));
             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
             msg.setTopic(topic);
             msg.setQueueId(queueId);
         }
     ...
     
  • org.apache.rocketmq.store.schedule.ScheduleMessageService#start
    Delayed sending code, after starting 1s, start a delivery delay message to the created delayQueue task, and then pull the corresponding messages in batches according to the offset to determine whether the delivery time is reached. If it does not arrive, use the timer to delay the corresponding time and start the next delivery task; when the delivery time is reached, restore the original topic and queueid and call writeMessageStore .putMessage(msgInner) Post the message to the commitlog again, and then post the next message.

    public void start() {
           
         if (started.compareAndSet(false, true))   {
           
             this.timer = <span class="token keywoScheduleMessageService.this.writeMessageStore
                                                 .putMessage(msgInner);
    
                                         ....
                                        
                                     } catch (Exception e) {
           
                                         .....
                                     }
                                 }
                             } else {
           
                                 ScheduleMessageService.this.timer.schedule(
                                     new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset)  span>,countdown);
                                 ScheduleMessageService.this.updateOffset(this  .delayLevel, nextOffset);
                                 return;
                             }
                         } // end of for
    
                         nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)  ;
                         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                             this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                         ScheduleMessageService.this.updateOffset(this  .delayLevel, nextOffset);
                         return;
                     } finally {
           
    
                         bufferCQ.release();
                     }
                 } // end of if (bufferCQ != null)
                 else {
           
                     long cqMinOffset = cq.getMinOffsetInQueue();
                     if (offset  cqMinOffset) {
           
                         failScheduleOffset = cqMinOffset;
                         log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                             + cqMinOffset + ", queueId=" + cq.getQueueId(  ));
                     }
                 }
             } // end of if (cq != null)
    
             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                 failScheduleOffset), DELAY_FOR_A_WHILE);
         }
     

Summary

rocketmq first stores messages of different delay levels in the corresponding internal delay queue, and then continuously pulls messages from the delay queue to determine whether they are due, and then delivers them to the corresponding topic.

By fixing the delay level, the messages in the same queue all have the same delay level, and there is no need to sort the messages. You only need to pull the messages in order to determine whether they can be delivered. But it also limits the delay time.

In addition, as long as the delayed message is stored in the delayed queue, it will be written into the commitlog file, and then rocketmq’s high availability (synchronous replication or asynchronous replication) will copy the message to the slave to ensure the delay message reliability.

Although rocketmq does not support any delay time, compared with rabbitmq’s dead letter message, it still provides 18 delay levels, which can basically cover many scenarios.

In addition: Later, I saw Qunar’s open source qmq, which seems to support any delay time, and I feel that I can learn a lot.

an>
} finally {

bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {

long cqMinOffset = cq.getMinOffsetInQueue();
if (offset cqMinOffset) {

failScheduleOffset = cqMinOffset;
log.error(“schedule CQ offset invalid. offset=” + offset + “, cqMinOffset=”
+ cqMinOffset + “, queueId=” + cq.getQueueId( ));
}
}
} // end of if (cq != null)

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}

Summary

rocketmq first stores messages of different delay levels in the corresponding internal delay queue, and then continuously pulls messages from the delay queue to determine whether they are due, and then delivers them to the corresponding topic.

By fixing the delay level, the messages in the same queue all have the same delay level, and there is no need to sort the messages. You only need to pull the messages in order to determine whether they can be delivered. But it also limits the delay time.

In addition, as long as the delayed message is stored in the delayed queue, it will be written into the commitlog file, and then rocketmq’s high availability (synchronous replication or asynchronous replication) will copy the message to the slave to ensure the delay message reliability.

Although rocketmq does not support any delay time, compared with rabbitmq’s dead letter message, it still provides 18 delay levels, which can basically cover many scenarios.

In addition: Later, I saw Qunar’s open source qmq, which seems to support any delay time, and I feel that I can learn a lot.

This article is from the internet and does not represent1024programmerPosition, please indicate the source when reprinting:https://www.1024programmer.com/exploration-on-the-implementation-principle-of-rocketmq-delayed-messages/

author: admin

Previous article
Next article

Leave a Reply

Your email address will not be published. Required fields are marked *

Contact Us

Contact us

181-3619-1160

Online consultation: QQ交谈

E-mail: [email protected]

Working hours: Monday to Friday, 9:00-17:30, holidays off

Follow wechat
Scan wechat and follow us

Scan wechat and follow us

Follow Weibo
Back to top
首页
微信
电话
搜索