异步编程
LCGYL Framework 基于 Java 21 虚拟线程提供强大的异步编程支持,让你轻松编写高性能的并发代码。
虚拟线程
什么是虚拟线程?
虚拟线程是 Java 21 引入的轻量级线程,由 JVM 管理而非操作系统。
优势:
- 轻量:可以创建数百万个虚拟线程
- 高效:阻塞操作不会阻塞平台线程
- 简单:使用同步代码风格编写异步逻辑
创建虚拟线程
java
// 方式 1:直接创建
Thread.ofVirtual().start(() -> {
System.out.println("在虚拟线程中运行");
});
// 方式 2:使用工厂
ThreadFactory factory = Thread.ofVirtual().factory();
Thread thread = factory.newThread(() -> {
System.out.println("在虚拟线程中运行");
});
thread.start();
// 方式 3:使用执行器
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
System.out.println("在虚拟线程中运行");
});
}异步服务
@Async 注解
java
@Component
public class EmailService {
@Async
public void sendEmail(String to, String subject, String body) {
// 在虚拟线程中异步执行
// 不阻塞调用者
emailClient.send(to, subject, body);
}
@Async
public CompletableFuture<Boolean> sendEmailAsync(String to, String subject, String body) {
// 返回 Future,调用者可以等待结果
boolean success = emailClient.send(to, subject, body);
return CompletableFuture.completedFuture(success);
}
}使用异步服务
java
@Component
public class UserService {
@Inject
private EmailService emailService;
public User createUser(String name, String email) {
User user = new User(name, email);
userRepository.save(user);
// 异步发送邮件,不等待
emailService.sendEmail(email, "欢迎", "欢迎加入!");
return user;
}
public User createUserAndWait(String name, String email) {
User user = new User(name, email);
userRepository.save(user);
// 异步发送邮件,等待结果
CompletableFuture<Boolean> future = emailService.sendEmailAsync(email, "欢迎", "欢迎加入!");
boolean sent = future.join(); // 等待完成
return user;
}
}CompletableFuture
基本用法
java
@Component
public class DataService {
public CompletableFuture<User> getUserAsync(Long id) {
return CompletableFuture.supplyAsync(() -> {
return userRepository.findById(id);
});
}
public CompletableFuture<Order> getOrderAsync(Long id) {
return CompletableFuture.supplyAsync(() -> {
return orderRepository.findById(id);
});
}
}链式调用
java
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> getUserById(1L))
.thenApply(user -> user.getName())
.thenApply(name -> "Hello, " + name)
.exceptionally(ex -> "Error: " + ex.getMessage());
String result = future.join();组合多个 Future
java
@Component
public class DashboardService {
@Inject
private UserService userService;
@Inject
private OrderService orderService;
@Inject
private StatisticsService statisticsService;
public Dashboard getDashboard(Long userId) {
// 并行获取数据
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(
() -> userService.findById(userId)
);
CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(
() -> orderService.findByUserId(userId)
);
CompletableFuture<Statistics> statsFuture = CompletableFuture.supplyAsync(
() -> statisticsService.getUserStats(userId)
);
// 等待所有完成
CompletableFuture.allOf(userFuture, ordersFuture, statsFuture).join();
// 组装结果
return new Dashboard(
userFuture.join(),
ordersFuture.join(),
statsFuture.join()
);
}
}任意一个完成
java
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "结果1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "结果2";
});
// 返回最先完成的结果
String result = CompletableFuture.anyOf(future1, future2)
.thenApply(Object::toString)
.join();
// result = "结果2"异步事件
异步发布
java
@Component
public class OrderService {
@Inject
private EventBus eventBus;
public Order createOrder(Order order) {
orderRepository.save(order);
// 异步发布事件
eventBus.publishAsync(new OrderCreatedEvent(order));
return order;
}
}异步监听
java
@Component
public class NotificationListener {
@Subscribe(async = true)
public void onOrderCreated(OrderCreatedEvent event) {
// 在虚拟线程中异步处理
sendNotification(event.getOrder());
}
}定时任务
@Scheduled 注解
java
@Component
public class ScheduledTasks {
// 固定延迟
@Scheduled(fixedDelay = 5000)
public void taskWithFixedDelay() {
// 每次执行完成后等待 5 秒再执行
}
// 固定频率
@Scheduled(fixedRate = 5000)
public void taskWithFixedRate() {
// 每 5 秒执行一次
}
// Cron 表达式
@Scheduled(cron = "0 0 * * * *")
public void taskWithCron() {
// 每小时执行一次
}
// 初始延迟
@Scheduled(initialDelay = 10000, fixedRate = 5000)
public void taskWithInitialDelay() {
// 启动后延迟 10 秒,然后每 5 秒执行
}
}编程式调度
java
@Component
public class TaskScheduler {
@Inject
private ScheduledExecutorService scheduler;
@PostConstruct
public void init() {
// 延迟执行
scheduler.schedule(() -> {
System.out.println("延迟执行");
}, 5, TimeUnit.SECONDS);
// 固定频率
scheduler.scheduleAtFixedRate(() -> {
System.out.println("固定频率");
}, 0, 10, TimeUnit.SECONDS);
// 固定延迟
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("固定延迟");
}, 0, 10, TimeUnit.SECONDS);
}
}并行流
使用并行流
java
@Component
public class BatchProcessor {
public List<Result> processBatch(List<Item> items) {
return items.parallelStream()
.map(this::processItem)
.collect(Collectors.toList());
}
public long countMatching(List<Item> items, Predicate<Item> predicate) {
return items.parallelStream()
.filter(predicate)
.count();
}
}自定义并行度
java
@Component
public class CustomParallelProcessor {
private final ForkJoinPool customPool = new ForkJoinPool(4);
public List<Result> process(List<Item> items) throws Exception {
return customPool.submit(() ->
items.parallelStream()
.map(this::processItem)
.collect(Collectors.toList())
).get();
}
@PreDestroy
public void cleanup() {
customPool.shutdown();
}
}结构化并发
使用 StructuredTaskScope
java
@Component
public class StructuredConcurrencyService {
public Dashboard fetchDashboard(Long userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 并行执行多个任务
Subtask<User> userTask = scope.fork(() -> userService.findById(userId));
Subtask<List<Order>> ordersTask = scope.fork(() -> orderService.findByUserId(userId));
Subtask<Statistics> statsTask = scope.fork(() -> statsService.getUserStats(userId));
// 等待所有任务完成
scope.join();
scope.throwIfFailed();
// 获取结果
return new Dashboard(
userTask.get(),
ordersTask.get(),
statsTask.get()
);
}
}
public String fetchFirstAvailable(Long id) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
// 并行执行,返回第一个成功的结果
scope.fork(() -> fetchFromService1(id));
scope.fork(() -> fetchFromService2(id));
scope.fork(() -> fetchFromService3(id));
scope.join();
return scope.result();
}
}
}响应式编程
使用 Flow API
java
@Component
public class ReactiveService {
public void processStream() {
// 创建发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 创建订阅者
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("收到: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("错误: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("完成");
}
};
// 订阅
publisher.subscribe(subscriber);
// 发布数据
publisher.submit("消息1");
publisher.submit("消息2");
publisher.submit("消息3");
// 关闭
publisher.close();
}
}异步 HTTP 客户端
使用 HttpClient
java
@Component
public class AsyncHttpService {
private final HttpClient httpClient = HttpClient.newBuilder()
.executor(Executors.newVirtualThreadPerTaskExecutor())
.connectTimeout(Duration.ofSeconds(10))
.build();
public CompletableFuture<String> fetchAsync(String url) {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.build();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
}
public CompletableFuture<List<String>> fetchMultiple(List<String> urls) {
List<CompletableFuture<String>> futures = urls.stream()
.map(this::fetchAsync)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
}错误处理
异步异常处理
java
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机错误");
}
return "成功";
})
.exceptionally(ex -> {
logger.error("异步任务失败", ex);
return "默认值";
});超时处理
java
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> longRunningTask())
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
return "超时";
}
return "错误";
});重试机制
java
@Component
public class RetryService {
public <T> CompletableFuture<T> withRetry(
Supplier<T> task,
int maxRetries,
Duration delay) {
return CompletableFuture.supplyAsync(task)
.handle((result, ex) -> {
if (ex == null) {
return CompletableFuture.completedFuture(result);
}
if (maxRetries <= 0) {
return CompletableFuture.<T>failedFuture(ex);
}
return CompletableFuture
.delayedExecutor(delay.toMillis(), TimeUnit.MILLISECONDS)
.execute(() -> {});
// 递归重试
return withRetry(task, maxRetries - 1, delay);
})
.thenCompose(Function.identity());
}
}最佳实践
1. 优先使用虚拟线程
java
// ✅ 推荐:使用虚拟线程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> task());
}
// ❌ 不推荐:使用固定线程池处理 I/O 任务
ExecutorService executor = Executors.newFixedThreadPool(10);2. 避免阻塞虚拟线程
java
// ✅ 推荐:使用异步 API
CompletableFuture<String> future = httpClient.sendAsync(request, BodyHandlers.ofString())
.thenApply(HttpResponse::body);
// ❌ 不推荐:同步阻塞
String body = httpClient.send(request, BodyHandlers.ofString()).body();3. 正确处理异常
java
// ✅ 推荐:处理异常
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> riskyTask())
.exceptionally(ex -> {
logger.error("任务失败", ex);
return defaultValue;
});4. 设置超时
java
// ✅ 推荐:设置超时
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> task())
.orTimeout(10, TimeUnit.SECONDS);5. 使用结构化并发
java
// ✅ 推荐:使用结构化并发
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> task1());
var task2 = scope.fork(() -> task2());
scope.join();
scope.throwIfFailed();
}常见问题
Q: 虚拟线程和平台线程有什么区别?
A:
- 平台线程:1:1 映射到操作系统线程,创建成本高
- 虚拟线程:由 JVM 管理,可以创建数百万个,阻塞时不占用平台线程
Q: 什么时候使用虚拟线程?
A:
- I/O 密集型任务(网络请求、数据库操作)
- 需要大量并发的场景
- 不适合 CPU 密集型任务
Q: CompletableFuture 和虚拟线程如何选择?
A:
- 虚拟线程:简单的异步任务,同步代码风格
- CompletableFuture:需要组合、转换、链式调用的场景