foreword
Hello everyone! happy New Year!It’s a pleasure to meet you again!Happy New Years Eve everyone good luck! May all your wishes come true! Peace and safety! May all go well with you!
Gobrs-Async has brought you many useful, efficient and practical functions. In the new year, use a brand-new development thinking to welcome a brand-new bright future!
Dear technical friends, fasten your seat belts and get ready to start. Let’s Go! ! !
Overtime task
As the name implies, a single task can determine whether the task can continue to execute by setting the timeout period. It is very useful for developers to develop IO and CPU computing tasks.
How to use
use @Task
in the comment timeoutInMilliseconds
properties to configure.
- timeoutInMilliseconds fixed number of milliseconds used
package com.gobrs.async.test.task.timeout;
import com.gobrs.async.core.TaskSupport;
import com.gobrs.async.core.anno.Task;
import com.gobrs.async.core.task.AsyncTask;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Task(timeoutInMilliseconds = 300)
public class CaseTimeoutTaskA extends AsyncTask {
int i = 10000;
@Override
public void prepare(Object o) {
log.info(this.getName() + " 使用线程---" + Thread.currentThread().getName());
}
@SneakyThrows
@Override
public String task(Object o, TaskSupport support) {
System.out.println("CaseTimeoutTaskA Begin");
Thread.sleep(400);
for (int i1 = 0; i1 < i; i1++) {
i1 += i1;
}
System.out.println("CaseTimeoutTaskA Finish");
return "result";
}
}
Timeout listener thread pool configuration
The so-called monitoring thread pool configuration is: the number of core threads in the thread pool where the monitoring task times out, and why this parameter should be configured. Please review the special instructions below.
gobrs:
async:
config:
rules:
- name: "chain1"
content: "taskA->taskB->taskC"
timeout-core-size: 200 # 核心线程数量
Special Note
- Timeout tasks do not support thread multiplexing, because logical judgment needs to be made by controlling thread timeout. If thread multiplexing is supported, task execution of multiplexing threads may be interrupted.
- Fuse derating principle reference
Hystrix
way, if theHystrix
If you are not familiar with it, you can learn from the analysis of Hystrix fuse downgrade principle - Due to thread scheduling, the timeout time may have an error within 10ms, which can be ignored!
thread reuse
small question
Imagine that in the following task process, under ideal circumstances, at least a few threads can be used to complete the execution of the process? taskA->taskB,TaskC->taskD->taskE,taskF
Is it 6 threads or 5, 4, 3, 2 You may have thought that the answer is that 2 threads (except the main thread) can complete this complex multi-threaded process. why?Because there are only two concurrent tasks in TaskB, TaskC, or TaskE and TaskF during concurrent execution, the fundamental condition for determining the number of threads to use is how manyConcurrent tasks are executed at the same time Then look at how many threads are executing gobrs-async at this time
test case
address
operation result
主线程使用main
使用main
TaskA
2022-12-11 13:58:22.581 INFO 8039 --- [ main] com.gobrs.async.core.task.AsyncTask : <11780902254572224> [taskA] execution
使用main
TaskC
使用pool-2-thread-1
TaskB
2022-12-11 13:58:22.694 INFO 8039 --- [ main] com.gobrs.async.core.task.AsyncTask : <11780902254572224> [TaskC] execution
2022-12-11 13:58:22.694 INFO 8039 --- [pool-2-thread-1] com.gobrs.async.core.task.AsyncTask : <11780902254572224> [taskB] execution
使用pool-2-thread-1
TaskD
2022-12-11 13:58:22.804 INFO 8039 --- [pool-2-thread-1] com.gobrs.async.core.task.AsyncTask : <11780902254572224> [taskD] execution
使用pool-2-thread-1
使用pool-2-thread-2
TaskE
2022-12-11 13:58:22.909 INFO 8039 --- [pool-2-thread-2] com.gobrs.async.core.task.AsyncTask : <11780902254572224> [taskE] execution
TaskF
2022-12-11 13:58:22.910 INFO 8039 --- [pool-2-thread-1] com.gobrs.async.core.task.AsyncTask : <11780902254572224> [taskF] execution
2022-12-11 13:58:22.913 INFO 8039 --- [ main] com.gobrs.async.core.TaskLoader : 【ProcessTrace】Total cost: 440ms | traceId = 11780902254572224 | 【task】taskA cost :103ms【state】:success; ->【task】taskB cost :103ms【state】:success; ->【task】TaskC cost :103ms【state】:success; ->【task】taskD cost :106ms【state】:success; ->【task】taskE cost :101ms【state】:success; ->【task】taskF cost :101ms【state】:success;
耗时462
in conclusion
You can see through the log TaskC
used TaskA
The thread executes the task, becauseTaskB
withTaskC
It is parallel, so it is necessary to open up a new thread to execute at this timeTaskB
,wait untilTaskB
After execution is complete, TaskD
continue to use TaskB
The thread pool-2-thread-1 executes the task, at this timeTaskC
After the execution is completed, it is found that its subtask has been executed by the thread released by TaskB, so there is no need to use its own thread to execute the task. Similarly, the task flow continues to execute. A total of 3 threads (including the main thread) are used in the whole process.
thread pool isolation
introduce
Gobrs-Async
Provides a thread pool configuration isolation mechanism. Different rules can use different thread pools to process tasks, preventing the thread pool performance bottleneck of a certain task rule from affecting the operation of other rule processes. If the rule does not do thread pool configuration. Then the unified thread pool configuration will be used by default. If there is no unified thread pool configuration.butSDKwill use by default Executors.newCachedThreadPool()
as the default thread pool.
Custom fixed thread pool (API method)
Gobrs-Async The default is to use Executors.newCachedThreadPool()
The thread pool, if you want to customize the thread pool. Meet your own thread pool needs.just inheritGobrsThreadPoolConfiguration
rewritedoInitialize
method, configured as follows:
@Configuration
public class ThreadPoolConfig extends GobrsThreadPoolConfiguration {
@Override
protected void doInitialize(GobrsAsyncThreadPoolFactory factory) {
/**
* 自定义线程池
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(300, 500, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue());
// ExecutorService executorService = Executors.newCachedThreadPool();
factory.setDefaultThreadPoolExecutor(threadPoolExecutor);
// factory.setThreadPoolExecutor("ruleName",threadPoolExecutor); // 设置规则隔离的线程池 ruleName 为 yml中配置的规则名称
}
}
Configuration method YML
Default thread pool configuration
gobrs:
async:
config:
## 如果规则没有制定 线程池 则使用 统一的线程池配置 如果通过 API 的方式动态更新了线程池 则使用动态更新 替换配置文件线程池配置 参见: ThreadPoolConfig
thread-pool:
core-pool-size: 1000
max-pool-size: 2000
Custom rule configuration
If the developer has made a separate thread pool configuration for process rules. Then the custom configuration of the rule will be used first. That is as follows:caseOne
The process will use the thread pool configured ascorePoolSize: 10 maxPoolSize: 20
gobrs:
async:
config:
## 如果规则没有制定 线程池 则使用 统一的线程池配置 如果通过 API 的方式动态更新了线程池 则使用动态更新 替换配置文件线程池配置 参见: ThreadPoolConfig
thread-pool:
core-pool-size: 1000
max-pool-size: 2000
rules:
- name: "caseOne"
content: "caseOneTaskA->caseOneTaskB,caseOneTaskC,caseOneTaskD"
threadPool:
corePoolSize: 10
maxPoolSize: 20
# 官方场景二 https://async.sizegang.cn/pages/2f844b/#%E5%9C%BA%E6%99%AF%E4%BA%8C
- name: "caseTwo"
content: "caseTwoTaskA->caseTwoTaskB->caseTwoTaskC,caseTwoTaskD"
threadPool:
corePoolSize: 30
maxPoolSize: 40
Hot update thread pool configuration
Developers may have this kind of distress. The thread pool starts from the thread pool when the project is initialized at runtimeapplication.yml
Loaded in , once the program is running, the thread pool used cannot be modified.If your company has a distributed configuration center that can update the application of the program memory in real time, thengobrsAlso provides you with access.
Our company has its own hot update components, all of which can be used as follows:
Configuration center configuration
{
corePoolSize: 210,
maxPoolSize: 600,
keepAliveTime: 30,
capacity: 10000,
threadNamePrefix: "m-detail"
rejectedExecutionHandler: "CallerRunsPolicy"
}
@Slf4j
@Configuration
public class ThreadPoolConfig {
@Autowired
private GobrsAsyncThreadPoolFactory factory;
@Resource
private DUCCConfigService duccConfigService;
@PostConstruct
public void gobrsThreadPoolExecutor() {
// 从配置中心拿到 线程池配置规则 DuccConstant.GOBRS_ASYNC_THREAD_POOL 为线程池配置在配置中心的key
String config = duccConfigService.getString(DuccConstant.GOBRS_ASYNC_THREAD_POOL);
ThreadPool threadPool = JSONObject.parseObject(config, ThreadPool.class);
// 通过gobrs-async 提供的构造器进行构造线程池
ThreadPoolExecutor executor = ThreadPoolBuilder.buildByThreadPool(threadPool);
factory.setDefaultThreadPoolExecutor(executor); // 设置默认线程池
// factory.setThreadPoolExecutor("ruleName",threadPoolExecutor); // 设置规则隔离线程池
listenerDucc();
}
// 监听配置中心 线程池改动
private void listenerDucc() {
duccConfigService.addListener(new DuccListener(DuccConstant.GOBRS_ASYNC_THREAD_POOL, property -> {
log.warn("监听到DUCC配置GobrsAsync 线程池配置变更,property:{}", JSON.toJSONString(property.getValue()));
ThreadPool threadPool = JSONObject.parseObject(property.getValue().toString(), ThreadPool.class);
ThreadPoolExecutor executor = ThreadPoolBuilder.buildByThreadPool(threadPool);
factory.setThreadPoolExecutor(executor);
// 线程池更新成功
log.warn("GobrsAsync thread pool update success");
}));
}
}
configuration priority
Real-time update configuration > API configuration > (yml, yaml, properties) configuration
Plug-in mechanism
Gobrs-Async
use SPI
The mechanism builds the plug-in system. Users only need to introduce the plug-in dependencies provided by the SDK to complete the plug-in intervention. Currently supported plug-ins include the following two.Will continue to update in the future! ! !.
Monitoring series
skywalking plugin
skywalking is a full-link monitoring platform, because skywalking is not compatible with multi-thread traceId, soGobrs-Async
supplyskywalking plugin
pom.xml
<dependency>
<groupId>io.github.memorydoc</groupId>
<artifactId>gobrs-async-skywalking-plugin</artifactId>
<version>1.2.9-RELEASE</version>
</dependency>
Only need to introduce dependencies to complete the perfect adaptation with skywalking. Isn’t it amazing!
log series
Full link traceId plugin
All development students already know that the full link traceId is printed in the log for convenient link tracking序列号
. With it, you can easily track online problems, simple and easy to use.
pom.xml
<dependency>
<groupId>io.github.memorydoc</groupId>
<artifactId>gobrs-async-trace-plugin</artifactId>
<version>1.2.9-RELEASE</version>
</dependency>
static injection
need to be inSpringBoot
Write in the startup class
static {
GobrsLogger.logger();
}
Summarize
Gobrs-Async provides high performanceMulti-thread managementwithMulti-threaded concurrent orchestrationfunctions, and will continue to launch multi-threaded task visual management (thread pool monitoring, log analysis, dynamic tasks, etc.) in the future. If you want to swim in the world of multi-threaded programming, please pay attention as soon as possible.Gobrs-Async And give the project a small ❤️ remember to star~
Your star is my biggest open source motivation! ! !
Links
Moreabout quick access ,Performance stress test Please visit the official website to view the report.
Official website address
Gitee
GitHub
#GobrsAsync #129RELEASE #High #Performance #Concurrent #Programming #Framework #News Fast Delivery