The author is a loyal fan of RocketMQ. In the process of reading the source code, I learned a lot of programming skills.

In this article, the author combines the RocketMQ source code to share the relevant knowledge points of the three artifacts of concurrent programming.

CountDownLatch is a synchronization tool class, which is used to coordinate the synchronization between multiple threads. It can make a thread continue to execute after waiting for other threads to complete their work.

The following figure is the core method of CountDownLatch:

We can think of it as a built-in counter, and the constructor initializes the counter value. Whenever the thread executes the countDown method, the value of the counter will be reduced by one. When the value of the counter is 0, it means that all tasks have been executed, and then the thread waiting on CountDownLatch can resume the next task.

For example, the database has 1 million pieces of data to be processed, and single-thread execution is relatively slow. We can divide the task into 5 batches, and the thread pool executes each batch. When the 5 batches are executed as a whole, print out the task time of execution.

 long start = System.currentTimeMillis();
 ExecutorService executorService = Executors.newFixedThreadPool(10);
 int batchSize = 5;
 CountDownLatch countDownLatch = new CountDownLatch(batchSize);
 for (int i = 0; i < batchSize; i++) {
   final int batchNumber = i;
   executorService.execute(new Runnable() {
      @Override
      public void run() {
        try {
           doSomething(batchNumber);
        } catch (Exception e) {
           e.printStackTrace();
        } finally {
           countDownLatch.countDown();
        }
      }
   });
}
countDownLatch.await();
System.out.println("任务执行耗时:" + (System.currentTimeMillis() - start) + "毫秒");

After reviewing the knowledge points of CountDownLatch, return to the RocketMQ source code.

Before the author was exposed to network programming, I was always wondering, How is the network synchronization request implemented?

Synchronous requests refer to:After the client thread initiates the call, it needs to wait for the response result within the specified timeout period before completing the call.If no result is obtained within the timeout period, a timeout exception will be thrown.

RocketMQ’s synchronous message sending interface is shown in the figure below:

Tracing the source code, the method to actually send the request iscommunication moduleSynchronous request method for invokeSyncImpl .

Overall process:

  1. After the Netty channel object of the sending message thread calls the writeAndFlush method, its essence is to send the data packet to the kernel through the Netty read and write thread, and this process itself is asynchronous;
  2. A CountDownLatch object is built into the ResponseFuture class. The responseFuture object calls the waitRepsone method, and the sending thread will be blocked;

  1. After the client receives the response command, it executes the processResponseCommand method, and the core logic is to execute the putResponse method of ResponseFuture.

The essence of this method is to fill the response object and call the countDown method of countDownLatch, so that the sending thread will no longer be blocked.

CountDownLatch is a very practical skill to implement network synchronization requests. In many open source middleware, such as Metaq and Xmemcached, there are similar implementations.

A read-write lock is a lock divided into two parts: a read lock and a write lock. The read lock allows multiple threads to obtain it at the same time, while the write lock is a mutual exclusion lock.

Its rules are: read and read are not mutually exclusive, read and write are mutually exclusive, and write and write are mutually exclusive , suitable for business scenarios with more reads and fewer writes.

We generally use ReentrantReadWriteLock, which implements ReadWriteLock. The ReadWriteLock interface is also very simple, and it mainly provides two methods inside, which return the read lock and write lock respectively.

 public interface ReadWriteLock {
    //获取读锁
    Lock readLock();
    //获取写锁
    Lock writeLock();
}

Read-write locks are used as follows:

  1. Create a ReentrantReadWriteLock object. When using ReadWriteLock, it is not used directly, but to obtain its internal read lock and write lock, and then call the lock / unlock method respectively;
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  1. read shared data;
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
   // TODO 查询共享数据
} finally {
   readLock.unlock();
}
  1. write shared data;
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
   // TODO 修改共享数据
} finally {
   writeLock.unlock();
}

The RocketMQ architecture is mainly divided into four parts, as shown in the figure below:

  1. Producer: The role of message publishing. Producer selects the corresponding Broker cluster queue for message delivery through the MQ load balancing module. The delivery process supports fast failure and low latency.

  2. Consumer: The role of message consumption, which supports two modes of push and pull to consume messages.

  3. BrokerServer: Broker is mainly responsible for the storage, delivery and query of messages and the high availability guarantee of services.

  4. NameServer: The name service is a very simple topic routing registry, its role is similar to the zookeeper in Dubbo, and it supports the dynamic registration and discovery of Broker.

NameServer is an almost stateless node that can be deployed in a cluster without any information synchronization between nodes. After the Broker starts, it will periodically (every 30s) send to all NameServersheartbeat packet(routing information), NameServer will periodically scan the Broker survival list, if it exceeds 120s If there is no heartbeat, the related information of this Broker will be removed, which means offline.

So how does NameServer save routing information?

Routing information is saved through several HashMaps. When Broker sends a heartbeat packet (routing information) to Nameserver, Nameserver needs to update the data of HashMap, but we all know that HashMap is not thread-safe. In high concurrency scenarios, 100% CPU is prone to occur. Problem, so you need to lock when updating HashMap, RocketMQ uses JDK’s read-write lock ReentrantReadWriteLock.

  1. Update routing information, operate write lock

  1. Query topic information, operate read lock

Read-write locks are suitable for scenarios with more reads and fewer writes, such as name services and configuration services.

In the RocketMQ master-slave architecture, the data synchronization/replication methods between the master node and the slave node are as follows:synchronous double writeandasynchronous replicationTwo modes.

Asynchronous replication means that after the message is successfully placed on the master node, it will tell the client that the message has been sent successfully. There is no need to wait for the message to be copied from the master node to the slave node. The copy of the message is completed by other threads.

Synchronous double writing means that after the master node successfully writes the message to the disk, it needs to wait for the slave node to copy it successfully, and then tell the client that the message has been sent successfully.

The synchronous double-write mode is blocking. According to the source code of RocketMQ 4.6.1, the author sorts out the timing diagram of the master node processing a request to send a message.

Overall process:

  1. The producer sends the message to the Broker, and after the Broker receives the message, it sends the execution thread pool of the message processor SendMessageProcessor SendMessageExecutor Thread pool to process sending message commands;

  2. Execute the putMessage method of ComitLog;

  3. ComitLog internally executes the appendMessage method first;

  4. Then submit a GroupCommitRequest to the synchronous replication service HAService, and wait for HAService to notify the GroupCommitRequest to complete;

  5. Return the write result and respond to the client.

We can see: The execution thread that sends the message needs to wait for the message to be copied from the node, and send the message Return to the producer to begin processing the next message.

In the source code of RocketMQ 4.6.1, the number of threads executing the thread pool is 1. If the thread processing master-slave synchronization speed is slow, the system cannot process new message sending requests at this moment, causing CPU resources to be unable to be fully utilized. At the same time, the system’s Throughput is also reduced.

So what about optimizing synchronous double write?

Starting from RocketMQ 4.7, RocketMQ introduced CompletableFuture implementationAsynchronous message processing .

  1. The thread of execution that sends the messageno longer waitThe message is copied to the slave node before processing the new request, butgenerate in advance CompletableFuture and return ;
  2. After the thread in HAService is copied successfully, it calls the complete method of CompletableFuture to notify the remoting module to respond to the client (thread pool: PutMessageExecutor).

Let’s analyze the core code of RocketMQ 4.9.4:

  1. After the Broker receives the message, it sends the execution thread pool of the message processor SendMessageProcessor SendMessageExecutor Thread pool to process sending message commands;
  2. Call the asyncProcessRequest method of SendMessageProcessor;

  1. Call Commitlog’s aysncPutMessage method to write the message;

    In this code, after commitLog executes appendMessage, it needs to executeDisk taskandsynchronous replicationtwo tasks.

    But these two tasks are not executed synchronously, but asynchronously.

  2. After the copy thread copies the message, wake up the future;

  3. Assemble the response command and return the response command to the client.

In order to facilitate the understanding of the threading model of this message sending process, the author made several buried points in the RocketMQ source code, modified the log configuration of Logback, sent a common message, and observed the server log.

From the logs, we can observe:

  1. The execution thread (red in the figure) that sends the message does not wait for the execution of the two tasks to complete after executing the creation of the flashing future and the synchronous copying future, but can process the sending message request after finishing the asyncProcessRequest method;
  2. After the flashing thread and the copying thread perform their respective tasks, they wake up the future, then assemble and store the results through the flashing thread, and finally return the response command to the client through the PutMessageExecutor thread pool (yellow in the figure).

The author has always believed that:Asynchrony is a way to use system resources in a more fine-grained mannerin the process of asynchronous message processing, through the artifact of CompletableFuture, each thread performs its own duties, which elegantly and efficiently improves the performance of RocketMQ.


If my article is helpful to you, please helpLike, Watch, RetweetYour support will motivate me to output higher quality articles, thank you very much!

#Read #source #code #RocketMQ #learn #major #artifacts #concurrent #programming

Leave a Comment

Your email address will not be published. Required fields are marked *