FIFO
模式运行的ForkJoinPool
,调度器可以通过设置启动参数调整,代码如下:
private static ForkJoinPool createDefaultScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> {
PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
return AccessController.doPrivileged(pa);
};
PrivilegedAction<ForkJoinPool> pa = () -> {
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
... //省略赋值操作
Thread.UncaughtExceptionHandler handler = (t, e) -> { };
boolean asyncMode = true; // FIFO
return new ForkJoinPool(parallelism, factory, handler, asyncMode,
0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
};
return AccessController.doPrivileged(pa);
}
OS Thread
的关系:
VirtualThread
与Platform Thread
, OS Thread
的关系:
IO
操作或Blocking
操作时,会自动切换到其他虚拟线程执行,从而避免当前线程等待,可以高效通过少数线程去调度大量虚拟线程,最大化提升线程的执行效率。Thread.startVirtualThread()
创建
//创建一个新的并且已启动的虚拟线程
Thread thread = Thread.startVirtualThread(runnable);
Thread.ofVirtual()
创建
// 创建一个新的并且已启动的虚拟线程
Thread thread = Thread.ofVirtual().start(runnable);
ThreadFactory
创建
// 获取线程工厂类
ThreadFactory factory = Thread.ofVirtual().factory();
// 创建虚拟线程
Thread thread = factory.newThread(runnable);
// 启动线程
thread.start();
Executors.newVirtualThreadPerTaskExecutor()
创建
//创建executor
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
//通过executor提交任务,采用虚拟线程执行
executor.submit(runnable);
状态 | 转换说明 |
---|---|
NEW -> STARTED | Thread.start |
STARTED -> TERMINATED | failed to start |
STARTED -> RUNNING | first run |
RUNNING -> PARKING | Thread attempts to park |
PARKING -> PARKED | cont.yield successful, thread is parked |
PARKING -> PINNED | cont.yield failed, thread is pinned |
PARKED -> RUNNABLE | unpark or interrupted |
PINNED -> RUNNABLE | unpark or interrupted |
RUNNABLE -> RUNNING | continue execution |
RUNNING -> YIELDING | Thread.yield |
YIELDING -> RUNNABLE | yield successful |
YIELDING -> RUNNING | yield failed |
RUNNING -> TERMINATED | done |
以下说明都是基于JDK21环境示例,如果是JDK19,则需要开启预览配置
--enable-preview
- 示例源码参考:virtualthread-sample
- 在
SpringBoot
中使用虚拟线程处理请求 ```java @EnableAsync @Configuration @ConditionalOnProperty(value = “spring.executor”, havingValue = “virtual”) public class ThreadConfig {
//为每个异步任务提供虚拟线程执行Executor
@Bean
public AsyncTaskExecutor applicationTaskExecutor() {
return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
}
//为tomcat提供虚拟线程执行Executor
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
}
- 在`application.yml`中添加配置来启用虚拟线程
```yml
spring:
#配置virtual表示启用虚拟线程,非virtual表示不启用,可以通过环境变量SPRING_EXECUTOR指定
executor: ${SPRING_EXECUTOR:virtual}
添加测试入口进行虚拟线程测试 ```java @RestController @SpringBootApplication public class VirtualthreadSampleApplication {
public static void main(String[] args) { SpringApplication.run(VirtualthreadSampleApplication.class, args); }
@GetMapping(“/hello/{timeMillis}”) public Object hello2(@PathVariable long timeMillis) throws InterruptedException { Map<String, Object> map = new HashMap<>(); map.put(“time”, System.currentTimeMillis()); map.put(“msg”, “Hello World!”); //查看当时线程信息,识别是否是虚拟线程 map.put(“thread”, Thread.currentThread().toString()); //模拟耗时IO操作 Thread.sleep(timeMillis); return map; }
}
### 性能测试
#### 资源版本
- Spring Boot: 3.1.4
- JDK: graalvm-jdk-21
- Docker Engine: 24.0.5
- Docker Resource: 4C/8G
#### 压测源码&镜像
- 压测源码:https://github.com/guanyang/spring-project-samples/tree/main/virtualthread-sample
- 镜像资源
- Dockerfile: virtualthread-sample/src/main/docker/Dockerfile
- 已构建示例镜像: guanyangsunlight/spring-project-samples:virtualthread-sample-0.0.1-SNAPSHOT
- JMH测试代码: virtualthread-sample/src/test/java/org/gy/demo/virtualthread/ThreadTest.java
- http测试接口:${host}/hello/{timeMillis}, host为服务地址,timeMillis为模拟IO操作的时间,单位毫秒,响应示例如下:
```json
{
msg: "Hello World!",
time: 1695871679753,
thread: "VirtualThread[#59]/runnable@ForkJoinPool-1-worker-1"
}
OpenJDK
团队开发的一款基准测试工具,参考链接:https://github.com/openjdk/jmh-i:指定请求数量 -u:模拟并发数量 –duration:请求时长定义,例如:60s,1m -e url:指定环境变量url,用于实际场景替换
##### `simple-test.js`脚本说明
import http from ‘k6/http’; import { check } from ‘k6’;
export default function () {
const res = http.get(${__ENV.url}
);
check(res, {
‘is status 200’: (r) => r.status === 200
});
}
##### 压测docker实例
```shell
## 启用虚拟线程实例
docker run --name virtualthread-sample-vt -p 8081:8080 -e SPRING_EXECUTOR=virtual -d guanyangsunlight/spring-project-samples:virtualthread-sample-0.0.1-SNAPSHOT
## 启用普通线程实例
docker run --name virtualthread-sample -p 8082:8080 -e SPRING_EXECUTOR=none -d guanyangsunlight/spring-project-samples:virtualthread-sample-0.0.1-SNAPSHOT
Case | QPS | Avg Latency | P95 |
---|---|---|---|
Spring Boot虚拟线程,-u 200 | 1620.869685/s | 123.09ms | 149.42ms |
Spring Boot虚拟线程,-u 400 | 2202.121674/s | 180.84ms | 277.14ms |
Spring Boot虚拟线程,-u 600 | 3195.845398/s | 186.44ms | 256.03ms |
Spring Boot虚拟线程,-u 800 | 3780.654388/s | 210.28ms | 294.79ms |
Spring Boot虚拟线程,-u 1000 | 4250.384928/s | 234.17ms | 319.92ms |
Spring Boot虚拟线程,-u 1200 | 4479.450088/s | 266.15ms | 370.17ms |
Spring Boot普通线程,-u 200 | 1418.709029/s | 140.69ms | 218.24ms |
Spring Boot普通线程,-u 400 | 1888.860872/s | 210.91ms | 247.39ms |
Spring Boot普通线程,-u 600 | 1889.607486/s | 315.49ms | 373.9ms |
Spring Boot普通线程,-u 800 | 1954.985051/s | 405.99ms | 428.44ms |
Spring Boot普通线程,-u 1000 | 1917.568269/s | 516.33ms | 585.76ms |
以上实例都是在jvm默认参数及tomcat线程池默认200大小场景下进行,没有进行任何调优配置
- 采用虚拟线程模式,随着并发数的提高,性能提升比较明显,整体性能明显优于普通线程模式。
- 采用普通线程模式,由于tomcat默认线程池配置,增加并发数并不能明显提升QPS,由于阻塞等待导致耗时边长。
- 虚拟线程在执行到
IO
操作或Blocking
操作时,会自动切换到其他虚拟线程执行,从而避免当前线程等待,可以高效通过少数线程去调度大量虚拟线程,最大化提升线程的执行效率。
@BenchmarkMode({Mode.AverageTime}) //平均响应时间模式
@OutputTimeUnit(TimeUnit.MILLISECONDS) //输出单位:毫秒模式
@State(Scope.Benchmark) //作用域为本次JMH测试,线程共享
@Fork(value = 1) //fork出一个JVM进程
@Threads(4) //使用4个线程去执行测试方法
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) //预热迭代5次,每次一秒
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) //测试迭代5次,每次一秒
public class ThreadTest {
@Param({"500", "1000", "2000"}) //模拟任务调度次数,分别500次,1000次,2000次
private int loop;
@Param({"50", "100", "200"}) //模拟线程池大小,也是虚拟线程调度器大小
private int nThreads;
private ExecutorService executor;
private ExecutorService virtualExecutor;
@Setup //每个测试方法前初始化
public void setup() {
//普通线程方式
executor = Executors.newFixedThreadPool(nThreads);
//定义虚拟线程调度器大小,保持跟平台线程池大小一样
System.setProperty("jdk.virtualThreadScheduler.maxPoolSize", String.valueOf(nThreads));
virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
}
@TearDown //每个测试方法执行后销毁
public void tearDown() {
executor.close();
virtualExecutor.close();
}
//主函数启动测试
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder().include(ThreadTest.class.getSimpleName()).build();
new Runner(opt).run();
}
//普通线程测试用例
@Benchmark
public void platformThreadTest(Blackhole bh) {
//模拟多个任务调度测试,返回最终结果
List<Integer> result = execute(loop, executor, ThreadTest::sleepTime);
bh.consume(result);
}
//虚拟线程测试用例
@Benchmark
public void virtualThreadTest(Blackhole bh) {
//模拟多个任务调度测试,返回最终结果
List<Integer> result = execute(loop, virtualExecutor, ThreadTest::sleepTime);
bh.consume(result);
}
//模拟多个任务调度测试,返回最终结果
private static <T> List<T> execute(int loop, ExecutorService executor, Supplier<T> supplier) {
CompletableFuture<T>[] futures = new CompletableFuture[loop];
for (int i = 0; i < loop; i++) {
//模拟执行耗时任务
futures[i] = CompletableFuture.supplyAsync(supplier, executor);
}
CompletableFuture<Void> result = CompletableFuture.allOf(futures);
result.join();
return Stream.of(futures).map(f -> f.getNow(null)).filter(Objects::nonNull).collect(Collectors.toList());
}
//sleep方法,模拟耗时IO操作,目前暂定30ms
@SneakyThrows
private static int sleepTime() {
Thread.sleep(Duration.ofMillis(sleepTimeMillis));
return sleepTimeMillis;
}
...
}
Score表示平均响应时间(ms),越小越好,loop表示任务次数,nThreads表示线程数,也是虚拟线程调度器大小
Benchmark (loop) (nThreads) Mode Cnt Score Error Units ThreadTest.platformThreadTest 500 50 avgt 5 1090.077 ± 324.304 ms/op ThreadTest.platformThreadTest 500 100 avgt 5 568.331 ± 106.303 ms/op ThreadTest.platformThreadTest 500 200 avgt 5 294.539 ± 17.419 ms/op ThreadTest.platformThreadTest 1000 50 avgt 5 2118.651 ± 426.003 ms/op ThreadTest.platformThreadTest 1000 100 avgt 5 923.840 ± 226.815 ms/op ThreadTest.platformThreadTest 1000 200 avgt 5 534.198 ± 115.960 ms/op ThreadTest.platformThreadTest 2000 50 avgt 5 4013.412 ± 2046.025 ms/op ThreadTest.platformThreadTest 2000 100 avgt 5 1828.609 ± 413.867 ms/op ThreadTest.platformThreadTest 2000 200 avgt 5 938.532 ± 173.568 ms/op ThreadTest.virtualThreadTest 500 50 avgt 5 31.733 ± 0.380 ms/op ThreadTest.virtualThreadTest 500 100 avgt 5 31.747 ± 0.468 ms/op ThreadTest.virtualThreadTest 500 200 avgt 5 31.771 ± 0.236 ms/op ThreadTest.virtualThreadTest 1000 50 avgt 5 32.783 ± 1.654 ms/op ThreadTest.virtualThreadTest 1000 100 avgt 5 32.827 ± 0.959 ms/op ThreadTest.virtualThreadTest 1000 200 avgt 5 32.672 ± 0.894 ms/op ThreadTest.virtualThreadTest 2000 50 avgt 5 34.578 ± 1.554 ms/op ThreadTest.virtualThreadTest 2000 100 avgt 5 35.001 ± 1.889 ms/op ThreadTest.virtualThreadTest 2000 200 avgt 5 35.236 ± 1.127 ms/op
IO
操作或Blocking
操作时性能提升十分明显,有数量级的提升,非常适合IO
密集型的场景。虚拟线程是Java为了解决并发编程中的一些常见问题而引入的新特性,特别是在I/O操作方面。以下是虚拟线程优缺点及使用场景总结:
还需要注意的是,虽然虚拟线程对于某些场景非常有用,但并不是所有问题都适合使用虚拟线程来解决。你应该根据项目的具体需求和环境选择最合适的工具。