Author: JD Retail Zhang Luyao
At present, there are many functions in the system that require delayed processing: payment timeout cancellation, queuing timeout, delayed sending of SMS, WeChat and other reminders, token refresh, membership card expiration, etc. Through delayed processing, system resources are greatly saved, and there is no need to poll the database for processing tasks.
At present, most of the functions are completed by timing tasks. There are two types of timing tasks: quartz and xxljob. The polling time is short and executed once per second, which puts a certain pressure on the database and has a 1-second error. The polling time is long, for example, once every 30 minutes, a piece of data is inserted at 03:01, and the expiration is normally executed at 3:31, but when the polling is performed at 3:30, the data from 3:00-3:30 is scanned, but the scan is less than 3: The data of 31 can only be scanned at 4:00, which is equivalent to a delay of 29 minutes!
1. DelayQueue
1. Implementation method:
The delay blocking queue provided by jvm sorts the tasks with different delay times through the priority queue, blocks through the condition, and acquires the delayed tasks during the sleep dealy time.
When a new task is added, it will judge whether the new task is the first task to be executed. If so, the queue sleep will be released to prevent the newly added elements that need to be executed and cannot be normally obtained by the execution thread.
2. Existing problems:
1. Stand-alone operation, after the system is down, effective retrying cannot be performed
2. Failure to perform logging and backup
3. No retry mechanism
4. When the system restarts, the task will be cleared!
5. Fragment consumption is not allowed
3. Advantages: simple implementation, blocking when there is no task, saving resources, and accurate execution time
2. Delay queue mq
Implementation method: rely on mq, and achieve the delayed consumption function by setting the delayed consumption time. Like rabbitMq and jmq, you can set the delayed consumption time. RabbitMq is implemented by setting the expiration time of the message and putting it into the dead letter queue for consumption.
Existing problems:
1. The time setting is not flexible. Each queue has a fixed expiration time. Every time a delay queue is newly created, a new message queue needs to be created
Advantages: Relying on jmq, it can effectively monitor, consume records, and retry, has the ability to consume multiple machines at the same time, and is not afraid of downtime
3. Scheduled tasks
Polling qualified data through scheduled tasks
shortcoming:
1. It is necessary to read the business database, which puts a certain pressure on the database.
2. There is a delay
3. When the amount of scanned data is too large, it takes up too many system resources.
4. Fragmentation cannot be consumed
advantage:
1. After consumption fails, you can continue to consume next time and have the ability to retry.
2. Stable spending power
4. redis
The tasks are stored in redis, and the zset queue of redis is used to sort according to the score. The program continuously obtains the queue data consumption through the thread to realize the delay queue
advantage:
1. Querying redis is faster than the database, and the set queue length is too large, and the query will be performed according to the jump table structure, which is highly efficient
2. Redis can be sorted according to the timestamp, you only need to query the task of the score in the current timestamp
3. No fear of machine restart
4. Distributed consumption
shortcoming:
1. Limited by redis performance, concurrent 10W
2. Multiple commands cannot guarantee atomicity. Using lua scripts will require all data to be on one redis shard.
5. Time wheel
Delayed task execution through the time wheel is also based on jvm stand-alone operation. For example, kafka and netty both implement time wheels, and redisson’s watchdog is also realized through netty’s time wheel.
Disadvantages: It is not suitable for the use of distributed services, and tasks will be lost after downtime.
Compatible with the asynchronous event components currently in use, and provide more reliable, retryable, recorded, alarm-monitoring, and high-performance delay components.
•Message transmission reliability: After a message enters the delay queue, it is guaranteed to be consumed at least once.
•Client supports rich: supports multiple languages.
• High availability: supports multi-instance deployment. After an instance is suspended, there is a backup instance that continues to provide services.
•Real-time: a certain time error is allowed.
•Support message deletion: business users can delete specified messages at any time.
• Support consumption query
• Support manual retry
• Increase monitoring of the execution of the current asynchronous event
1. Implementation principle
At present, we choose to use jimdb to implement the delay function through zset, and store the task id and corresponding execution time as score in the zset queue. By default, they will be sorted by score, and each time we take the task id of 0-score within the current time,
When sending a delayed task, a unique id will be generated according to the timestamp + machine ip + queueName + sequence, and the message body will be constructed, encrypted and put into the zset queue.
By moving the thread, the task that has reached the execution time is moved to the release queue, waiting for the consumer to obtain it.
The monitoring party integrates ump
Consumption records are completed through redis backup + database persistence.
The method implemented by caching is only one kind of implementation, which implementation method can be controlled through parameters, and can be freely expanded through spi.
2. Message structure
Each Job must contain the following attributes:
•Topic: Job type, that is, QueueName
•Id: the unique identifier of the Job. Used to retrieve and delete the specified Job information.
•Delay: Job needs to delay the time. Unit: seconds. (The server will convert it to absolute time)
•Body: The content of the Job, which is stored in json format for consumers to do specific business processing.
•traceId: the traceId of the sending thread, after the subsequent pfinder supports setting traceId, it can share the same traceid with the sending thread, which is convenient for log tracking
The specific structure is shown in the figure below:
TTR is designed to ensure the reliability of message transmission.
3. Data flow and flow chart
Publish and consume based on the redis-disruptor method, which can be used as a message. Consumers use the original asynchronous event disruptor lock-free queue for consumption, and there is no lock between different applications and different queues
1. Support the application to only publish, not consume, and achieve the function of message queue.
2: Support bucketing. For the problem of large keys, if there are many events, you can set the number of delay queues and task queue buckets to reduce the redis blocking problem caused by large keys.
3: Through the ducc configuration, the performance is expanded. Currently, only consumption is enabled and consumption is disabled.
4: Support setting timeout configuration to prevent consumer threads from executing for too long
Bottleneck: The consumption speed is slow and the production speed is too fast, which will cause the ringbuffer queue to be full. When the current application is both a producer and a consumer, the producer will sleep, and the performance depends on the consumption speed. The machine can be expanded horizontally to directly improve performance. Monitor the length of the redis queue. If it continues to grow, consider adding consumers to directly improve performance.
Possible situation: Because an application shares a disruptor and has 64 consumer threads, if the consumption of a certain event is too slow, all 64 threads are consuming this event, which will cause other events to be consumed by no consumer thread, and the producer thread will also consume is blocked, causing the consumption of all events to be blocked.
To observe whether there is such a performance bottleneck later, you can give each queue a consumer thread pool.
Add configuration file
Determine whether to enable jd.event.enable:true
<dependency> <groupId>com.jd.car</groupId>
<artifactId>senna-event</artifactId>
<version>1.0-SNAPSHOT</version> </dependency>
configuration
jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle
Consumption code:
package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {
@Override
protected void onHandle(String key, String eventType) {
log.info("Handler开始消费:{}", key);
}
@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler开始消费:{}", key);
}
}
Annotation form:
package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler {
@Override
protected void onHandle(String key, String eventType) {
log.info("Handler开始消费:{}", key);
}
@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler开始消费:{}", key);
}
}
send code
package com.jd.car.senna.admin.controller;
import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {
@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;
@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {
log.info("发送无延迟消息");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}
@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {
log.info("发送延迟5秒消息");
eventQueue.push(" delay 5000 millseconds message,name",1000*5L);
return "ok";
}
@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {
log.info("发送延迟到2022-04-02 00:00:00执行的消息");
eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
}
}
Refer to Youzan Design: https://tech.youzan.com/queuing_delay/
1. Yunxiu will automatically cancel after 24 hours in line at the store
2. Meituan requests token refresh regularly.
3. Warranty card will be generated within 24 hours
5. Postponement of statement generation
6. Delayed SMS sending
#Implementation #Asynchronous #Delay #Queue #Cloud #Developers #Personal #Space #News Fast Delivery