Skip to content

异步编程

LCGYL Framework 基于 Java 21 虚拟线程提供强大的异步编程支持,让你轻松编写高性能的并发代码。

虚拟线程

什么是虚拟线程?

虚拟线程是 Java 21 引入的轻量级线程,由 JVM 管理而非操作系统。

优势

  • 轻量:可以创建数百万个虚拟线程
  • 高效:阻塞操作不会阻塞平台线程
  • 简单:使用同步代码风格编写异步逻辑

创建虚拟线程

java
// 方式 1:直接创建
Thread.ofVirtual().start(() -> {
    System.out.println("在虚拟线程中运行");
});

// 方式 2:使用工厂
ThreadFactory factory = Thread.ofVirtual().factory();
Thread thread = factory.newThread(() -> {
    System.out.println("在虚拟线程中运行");
});
thread.start();

// 方式 3:使用执行器
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    executor.submit(() -> {
        System.out.println("在虚拟线程中运行");
    });
}

异步服务

@Async 注解

java
@Component
public class EmailService {
    
    @Async
    public void sendEmail(String to, String subject, String body) {
        // 在虚拟线程中异步执行
        // 不阻塞调用者
        emailClient.send(to, subject, body);
    }
    
    @Async
    public CompletableFuture<Boolean> sendEmailAsync(String to, String subject, String body) {
        // 返回 Future,调用者可以等待结果
        boolean success = emailClient.send(to, subject, body);
        return CompletableFuture.completedFuture(success);
    }
}

使用异步服务

java
@Component
public class UserService {
    
    @Inject
    private EmailService emailService;
    
    public User createUser(String name, String email) {
        User user = new User(name, email);
        userRepository.save(user);
        
        // 异步发送邮件,不等待
        emailService.sendEmail(email, "欢迎", "欢迎加入!");
        
        return user;
    }
    
    public User createUserAndWait(String name, String email) {
        User user = new User(name, email);
        userRepository.save(user);
        
        // 异步发送邮件,等待结果
        CompletableFuture<Boolean> future = emailService.sendEmailAsync(email, "欢迎", "欢迎加入!");
        boolean sent = future.join();  // 等待完成
        
        return user;
    }
}

CompletableFuture

基本用法

java
@Component
public class DataService {
    
    public CompletableFuture<User> getUserAsync(Long id) {
        return CompletableFuture.supplyAsync(() -> {
            return userRepository.findById(id);
        });
    }
    
    public CompletableFuture<Order> getOrderAsync(Long id) {
        return CompletableFuture.supplyAsync(() -> {
            return orderRepository.findById(id);
        });
    }
}

链式调用

java
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> getUserById(1L))
    .thenApply(user -> user.getName())
    .thenApply(name -> "Hello, " + name)
    .exceptionally(ex -> "Error: " + ex.getMessage());

String result = future.join();

组合多个 Future

java
@Component
public class DashboardService {
    
    @Inject
    private UserService userService;
    
    @Inject
    private OrderService orderService;
    
    @Inject
    private StatisticsService statisticsService;
    
    public Dashboard getDashboard(Long userId) {
        // 并行获取数据
        CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(
            () -> userService.findById(userId)
        );
        
        CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(
            () -> orderService.findByUserId(userId)
        );
        
        CompletableFuture<Statistics> statsFuture = CompletableFuture.supplyAsync(
            () -> statisticsService.getUserStats(userId)
        );
        
        // 等待所有完成
        CompletableFuture.allOf(userFuture, ordersFuture, statsFuture).join();
        
        // 组装结果
        return new Dashboard(
            userFuture.join(),
            ordersFuture.join(),
            statsFuture.join()
        );
    }
}

任意一个完成

java
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return "结果1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleep(500);
    return "结果2";
});

// 返回最先完成的结果
String result = CompletableFuture.anyOf(future1, future2)
    .thenApply(Object::toString)
    .join();
// result = "结果2"

异步事件

异步发布

java
@Component
public class OrderService {
    
    @Inject
    private EventBus eventBus;
    
    public Order createOrder(Order order) {
        orderRepository.save(order);
        
        // 异步发布事件
        eventBus.publishAsync(new OrderCreatedEvent(order));
        
        return order;
    }
}

异步监听

java
@Component
public class NotificationListener {
    
    @Subscribe(async = true)
    public void onOrderCreated(OrderCreatedEvent event) {
        // 在虚拟线程中异步处理
        sendNotification(event.getOrder());
    }
}

定时任务

@Scheduled 注解

java
@Component
public class ScheduledTasks {
    
    // 固定延迟
    @Scheduled(fixedDelay = 5000)
    public void taskWithFixedDelay() {
        // 每次执行完成后等待 5 秒再执行
    }
    
    // 固定频率
    @Scheduled(fixedRate = 5000)
    public void taskWithFixedRate() {
        // 每 5 秒执行一次
    }
    
    // Cron 表达式
    @Scheduled(cron = "0 0 * * * *")
    public void taskWithCron() {
        // 每小时执行一次
    }
    
    // 初始延迟
    @Scheduled(initialDelay = 10000, fixedRate = 5000)
    public void taskWithInitialDelay() {
        // 启动后延迟 10 秒,然后每 5 秒执行
    }
}

编程式调度

java
@Component
public class TaskScheduler {
    
    @Inject
    private ScheduledExecutorService scheduler;
    
    @PostConstruct
    public void init() {
        // 延迟执行
        scheduler.schedule(() -> {
            System.out.println("延迟执行");
        }, 5, TimeUnit.SECONDS);
        
        // 固定频率
        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("固定频率");
        }, 0, 10, TimeUnit.SECONDS);
        
        // 固定延迟
        scheduler.scheduleWithFixedDelay(() -> {
            System.out.println("固定延迟");
        }, 0, 10, TimeUnit.SECONDS);
    }
}

并行流

使用并行流

java
@Component
public class BatchProcessor {
    
    public List<Result> processBatch(List<Item> items) {
        return items.parallelStream()
            .map(this::processItem)
            .collect(Collectors.toList());
    }
    
    public long countMatching(List<Item> items, Predicate<Item> predicate) {
        return items.parallelStream()
            .filter(predicate)
            .count();
    }
}

自定义并行度

java
@Component
public class CustomParallelProcessor {
    
    private final ForkJoinPool customPool = new ForkJoinPool(4);
    
    public List<Result> process(List<Item> items) throws Exception {
        return customPool.submit(() ->
            items.parallelStream()
                .map(this::processItem)
                .collect(Collectors.toList())
        ).get();
    }
    
    @PreDestroy
    public void cleanup() {
        customPool.shutdown();
    }
}

结构化并发

使用 StructuredTaskScope

java
@Component
public class StructuredConcurrencyService {
    
    public Dashboard fetchDashboard(Long userId) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // 并行执行多个任务
            Subtask<User> userTask = scope.fork(() -> userService.findById(userId));
            Subtask<List<Order>> ordersTask = scope.fork(() -> orderService.findByUserId(userId));
            Subtask<Statistics> statsTask = scope.fork(() -> statsService.getUserStats(userId));
            
            // 等待所有任务完成
            scope.join();
            scope.throwIfFailed();
            
            // 获取结果
            return new Dashboard(
                userTask.get(),
                ordersTask.get(),
                statsTask.get()
            );
        }
    }
    
    public String fetchFirstAvailable(Long id) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
            // 并行执行,返回第一个成功的结果
            scope.fork(() -> fetchFromService1(id));
            scope.fork(() -> fetchFromService2(id));
            scope.fork(() -> fetchFromService3(id));
            
            scope.join();
            return scope.result();
        }
    }
}

响应式编程

使用 Flow API

java
@Component
public class ReactiveService {
    
    public void processStream() {
        // 创建发布者
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        
        // 创建订阅者
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;
            
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                subscription.request(1);
            }
            
            @Override
            public void onNext(String item) {
                System.out.println("收到: " + item);
                subscription.request(1);
            }
            
            @Override
            public void onError(Throwable throwable) {
                System.err.println("错误: " + throwable.getMessage());
            }
            
            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        };
        
        // 订阅
        publisher.subscribe(subscriber);
        
        // 发布数据
        publisher.submit("消息1");
        publisher.submit("消息2");
        publisher.submit("消息3");
        
        // 关闭
        publisher.close();
    }
}

异步 HTTP 客户端

使用 HttpClient

java
@Component
public class AsyncHttpService {
    
    private final HttpClient httpClient = HttpClient.newBuilder()
        .executor(Executors.newVirtualThreadPerTaskExecutor())
        .connectTimeout(Duration.ofSeconds(10))
        .build();
    
    public CompletableFuture<String> fetchAsync(String url) {
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(url))
            .GET()
            .build();
        
        return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::body);
    }
    
    public CompletableFuture<List<String>> fetchMultiple(List<String> urls) {
        List<CompletableFuture<String>> futures = urls.stream()
            .map(this::fetchAsync)
            .collect(Collectors.toList());
        
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));
    }
}

错误处理

异步异常处理

java
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() > 0.5) {
            throw new RuntimeException("随机错误");
        }
        return "成功";
    })
    .exceptionally(ex -> {
        logger.error("异步任务失败", ex);
        return "默认值";
    });

超时处理

java
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> longRunningTask())
    .orTimeout(5, TimeUnit.SECONDS)
    .exceptionally(ex -> {
        if (ex instanceof TimeoutException) {
            return "超时";
        }
        return "错误";
    });

重试机制

java
@Component
public class RetryService {
    
    public <T> CompletableFuture<T> withRetry(
            Supplier<T> task, 
            int maxRetries, 
            Duration delay) {
        
        return CompletableFuture.supplyAsync(task)
            .handle((result, ex) -> {
                if (ex == null) {
                    return CompletableFuture.completedFuture(result);
                }
                if (maxRetries <= 0) {
                    return CompletableFuture.<T>failedFuture(ex);
                }
                return CompletableFuture
                    .delayedExecutor(delay.toMillis(), TimeUnit.MILLISECONDS)
                    .execute(() -> {});
                // 递归重试
                return withRetry(task, maxRetries - 1, delay);
            })
            .thenCompose(Function.identity());
    }
}

最佳实践

1. 优先使用虚拟线程

java
// ✅ 推荐:使用虚拟线程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    executor.submit(() -> task());
}

// ❌ 不推荐:使用固定线程池处理 I/O 任务
ExecutorService executor = Executors.newFixedThreadPool(10);

2. 避免阻塞虚拟线程

java
// ✅ 推荐:使用异步 API
CompletableFuture<String> future = httpClient.sendAsync(request, BodyHandlers.ofString())
    .thenApply(HttpResponse::body);

// ❌ 不推荐:同步阻塞
String body = httpClient.send(request, BodyHandlers.ofString()).body();

3. 正确处理异常

java
// ✅ 推荐:处理异常
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> riskyTask())
    .exceptionally(ex -> {
        logger.error("任务失败", ex);
        return defaultValue;
    });

4. 设置超时

java
// ✅ 推荐:设置超时
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> task())
    .orTimeout(10, TimeUnit.SECONDS);

5. 使用结构化并发

java
// ✅ 推荐:使用结构化并发
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    var task1 = scope.fork(() -> task1());
    var task2 = scope.fork(() -> task2());
    scope.join();
    scope.throwIfFailed();
}

常见问题

Q: 虚拟线程和平台线程有什么区别?

A:

  • 平台线程:1:1 映射到操作系统线程,创建成本高
  • 虚拟线程:由 JVM 管理,可以创建数百万个,阻塞时不占用平台线程

Q: 什么时候使用虚拟线程?

A:

  • I/O 密集型任务(网络请求、数据库操作)
  • 需要大量并发的场景
  • 不适合 CPU 密集型任务

Q: CompletableFuture 和虚拟线程如何选择?

A:

  • 虚拟线程:简单的异步任务,同步代码风格
  • CompletableFuture:需要组合、转换、链式调用的场景

下一步

Released under the Apache License 2.0