Java并发编程实战线程池与并发工具类一、线程池原理与最佳实践线程池是Java并发编程的核心组件合理使用线程池可以显著提升系统性能。1.1 线程池核心参数ThreadPoolExecutor executor new ThreadPoolExecutor( 4, // corePoolSize: 核心线程数 8, // maximumPoolSize: 最大线程数 60L, // keepAliveTime: 空闲线程存活时间 TimeUnit.SECONDS, // unit: 时间单位 new ArrayBlockingQueue(100), // workQueue: 任务队列 Executors.defaultThreadFactory(),// threadFactory: 线程工厂 new CallerRunsPolicy() // handler: 拒绝策略 );1.2 拒绝策略选择策略行为适用场景CallerRunsPolicy调用者线程执行不重要的后台任务AbortPolicy抛出异常需要快速失败的场景DiscardPolicy静默丢弃日志收集等非关键任务DiscardOldestPolicy丢弃最老任务实时性要求高的场景1.3 线程池监控public class ThreadPoolMonitor { public static void monitor(ThreadPoolExecutor executor) { ScheduledExecutorService scheduler Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(() - { System.out.printf( 线程池状态: 活跃%d, 队列%d, 完成%d, 池大小%d%n, executor.getActiveCount(), executor.getQueue().size(), executor.getCompletedTaskCount(), executor.getPoolSize() ); }, 0, 10, TimeUnit.SECONDS); } }二、并发工具类深度解析2.1 CountDownLatch 同步屏障public class DataLoader { private final CountDownLatch latch; private ListDataResult results; public void loadData(ListString sources) throws InterruptedException { latch new CountDownLatch(sources.size()); results Collections.synchronizedList(new ArrayList()); ExecutorService executor Executors.newFixedThreadPool(4); for (String source : sources) { executor.submit(() - { try { DataResult result fetchData(source); results.add(result); } finally { latch.countDown(); } }); } latch.await(5, TimeUnit.MINUTES); executor.shutdown(); } }2.2 CyclicBarrier 循环屏障public class DataProcessor { private final CyclicBarrier barrier; public DataProcessor(int workerCount) { this.barrier new CyclicBarrier(workerCount, () - { System.out.println(所有阶段任务完成进入下一阶段); }); } public void process(ListDataChunk chunks) throws Exception { ExecutorService executor Executors.newFixedThreadPool(barrier.getParties()); for (DataChunk chunk : chunks) { executor.submit(() - { try { processChunk(chunk); barrier.await(); mergeResults(); barrier.await(); } catch (Exception e) { barrier.reset(); } }); } } }2.3 Semaphore 信号量控制public class ResourcePool { private final Semaphore semaphore; private final ListResource resources; public ResourcePool(int size) { this.semaphore new Semaphore(size); this.resources new ArrayList(); for (int i 0; i size; i) { resources.add(new Resource(i)); } } public Resource acquire() throws InterruptedException { semaphore.acquire(); return resources.remove(0); } public void release(Resource resource) { resources.add(resource); semaphore.release(); } }2.4 Phaser 阶段同步器public class MultiPhaseTask { private final Phaser phaser; public MultiPhaseTask(int parties) { this.phaser new Phaser(parties) { Override protected boolean onAdvance(int phase, int registeredParties) { System.out.printf(阶段 %d 完成参与方: %d%n, phase, registeredParties); return phase 3; } }; } public void execute() { for (int i 0; i 4; i) { new Thread(() - { doPhase1(); phaser.arriveAndAwaitAdvance(); doPhase2(); phaser.arriveAndAwaitAdvance(); doPhase3(); phaser.arriveAndDeregister(); }).start(); } } }三、原子类操作3.1 AtomicReference 原子引用public class NonBlockingCacheK, V { private static class NodeK, V { final K key; volatile V value; volatile NodeK, V next; Node(K key, V value) { this.key key; this.value value; } } private final AtomicReferenceNodeK, V head new AtomicReference(); public V get(K key) { NodeK, V current head.get(); while (current ! null) { if (key.equals(current.key)) { return current.value; } current current.next; } return null; } public void put(K key, V value) { NodeK, V newNode new Node(key, value); while (!head.compareAndSet(head.get(), newNode)) { newNode.next head.get(); } } }3.2 LongAdder 高性能计数器public class PerformanceCounter { private final LongAdder counter new LongAdder(); private final LongAdder totalTime new LongAdder(); public void record(long duration) { counter.increment(); totalTime.add(duration); } public double getAverage() { long count counter.sum(); return count 0 ? (double) totalTime.sum() / count : 0; } }四、并发容器使用技巧4.1 ConcurrentHashMap 分段锁机制public class ConcurrentCache { private final ConcurrentHashMapString, CacheEntry cache new ConcurrentHashMap(); public Object get(String key) { CacheEntry entry cache.get(key); if (entry null) { return null; } if (entry.isExpired()) { cache.remove(key, entry); return null; } return entry.getValue(); } public void put(String key, Object value, long ttl) { cache.put(key, new CacheEntry(value, ttl)); } }4.2 CopyOnWriteArrayList 读写分离public class EventListenerManager { private final CopyOnWriteArrayListEventListener listeners new CopyOnWriteArrayList(); public void register(EventListener listener) { listeners.addIfAbsent(listener); } public void unregister(EventListener listener) { listeners.remove(listener); } public void fireEvent(Event event) { for (EventListener listener : listeners) { listener.onEvent(event); } } }五、CompletableFuture 异步编程5.1 链式调用public class AsyncService { public CompletableFutureUser getUserById(Long id) { return CompletableFuture.supplyAsync(() - userRepository.findById(id)) .thenApply(user - { if (user.isPresent()) { return user.get(); } throw new UserNotFoundException(User not found: id); }); } public CompletableFutureListOrder getOrdersByUserId(Long userId) { return CompletableFuture.supplyAsync(() - orderRepository.findByUserId(userId)); } public CompletableFutureUserProfile getUserProfile(Long userId) { CompletableFutureUser userFuture getUserById(userId); CompletableFutureListOrder ordersFuture getOrdersByUserId(userId); return CompletableFuture.allOf(userFuture, ordersFuture) .thenApply(v - { User user userFuture.join(); ListOrder orders ordersFuture.join(); return UserProfile.builder() .user(user) .orders(orders) .build(); }); } }5.2 异常处理public CompletableFutureResult processWithFallback() { return CompletableFuture.supplyAsync(this::doRiskyOperation) .exceptionally(ex - { log.error(操作失败: {}, ex.getMessage()); return Result.failure(降级处理); }) .thenApply(result - { if (result.isSuccess()) { return enhanceResult(result); } return result; }); }六、并发编程常见问题6.1 线程安全问题// 错误示例 public class UnsafeCounter { private int count 0; public void increment() { count; // 非原子操作 } } // 正确示例 public class SafeCounter { private final AtomicInteger count new AtomicInteger(0); public void increment() { count.incrementAndGet(); } }6.2 死锁问题// 死锁示例 public class DeadlockExample { private final Object lock1 new Object(); private final Object lock2 new Object(); public void method1() { synchronized (lock1) { synchronized (lock2) { // 业务逻辑 } } } public void method2() { synchronized (lock2) { // 锁顺序不一致 synchronized (lock1) { // 业务逻辑 } } } }6.3 资源泄漏// 错误示例 - 忘记关闭资源 ExecutorService executor Executors.newFixedThreadPool(4); executor.submit(() - processData()); // 正确示例 ExecutorService executor Executors.newFixedThreadPool(4); try { executor.submit(() - processData()); } finally { executor.shutdown(); executor.awaitTermination(1, TimeUnit.MINUTES); }七、性能优化建议7.1 线程池大小计算公式线程池大小 CPU核心数 × (1 I/O耗时 / CPU耗时)7.2 避免线程饥饿// 使用不同的线程池处理不同类型的任务 ExecutorService cpuExecutor Executors.newFixedThreadPool(4); ExecutorService ioExecutor Executors.newFixedThreadPool(16); ExecutorService priorityExecutor Executors.newFixedThreadPool(2);7.3 使用 ThreadLocal 减少锁竞争public class RequestContext { private static final ThreadLocalRequestInfo context ThreadLocal.withInitial(RequestInfo::new); public static RequestInfo get() { return context.get(); } public static void set(RequestInfo info) { context.set(info); } public static void clear() { context.remove(); } }八、总结Java并发编程需要掌握以下核心要点线程池合理配置参数选择合适的拒绝策略同步工具根据场景选择CountDownLatch、CyclicBarrier等原子操作使用原子类保证线程安全并发容器选择适合的并发数据结构异步编程使用CompletableFuture简化异步流程问题排查警惕死锁、资源泄漏等常见问题通过系统化学习和实践可以构建高效、稳定的并发系统。
Java并发编程实战:线程池与并发工具类
Java并发编程实战线程池与并发工具类一、线程池原理与最佳实践线程池是Java并发编程的核心组件合理使用线程池可以显著提升系统性能。1.1 线程池核心参数ThreadPoolExecutor executor new ThreadPoolExecutor( 4, // corePoolSize: 核心线程数 8, // maximumPoolSize: 最大线程数 60L, // keepAliveTime: 空闲线程存活时间 TimeUnit.SECONDS, // unit: 时间单位 new ArrayBlockingQueue(100), // workQueue: 任务队列 Executors.defaultThreadFactory(),// threadFactory: 线程工厂 new CallerRunsPolicy() // handler: 拒绝策略 );1.2 拒绝策略选择策略行为适用场景CallerRunsPolicy调用者线程执行不重要的后台任务AbortPolicy抛出异常需要快速失败的场景DiscardPolicy静默丢弃日志收集等非关键任务DiscardOldestPolicy丢弃最老任务实时性要求高的场景1.3 线程池监控public class ThreadPoolMonitor { public static void monitor(ThreadPoolExecutor executor) { ScheduledExecutorService scheduler Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(() - { System.out.printf( 线程池状态: 活跃%d, 队列%d, 完成%d, 池大小%d%n, executor.getActiveCount(), executor.getQueue().size(), executor.getCompletedTaskCount(), executor.getPoolSize() ); }, 0, 10, TimeUnit.SECONDS); } }二、并发工具类深度解析2.1 CountDownLatch 同步屏障public class DataLoader { private final CountDownLatch latch; private ListDataResult results; public void loadData(ListString sources) throws InterruptedException { latch new CountDownLatch(sources.size()); results Collections.synchronizedList(new ArrayList()); ExecutorService executor Executors.newFixedThreadPool(4); for (String source : sources) { executor.submit(() - { try { DataResult result fetchData(source); results.add(result); } finally { latch.countDown(); } }); } latch.await(5, TimeUnit.MINUTES); executor.shutdown(); } }2.2 CyclicBarrier 循环屏障public class DataProcessor { private final CyclicBarrier barrier; public DataProcessor(int workerCount) { this.barrier new CyclicBarrier(workerCount, () - { System.out.println(所有阶段任务完成进入下一阶段); }); } public void process(ListDataChunk chunks) throws Exception { ExecutorService executor Executors.newFixedThreadPool(barrier.getParties()); for (DataChunk chunk : chunks) { executor.submit(() - { try { processChunk(chunk); barrier.await(); mergeResults(); barrier.await(); } catch (Exception e) { barrier.reset(); } }); } } }2.3 Semaphore 信号量控制public class ResourcePool { private final Semaphore semaphore; private final ListResource resources; public ResourcePool(int size) { this.semaphore new Semaphore(size); this.resources new ArrayList(); for (int i 0; i size; i) { resources.add(new Resource(i)); } } public Resource acquire() throws InterruptedException { semaphore.acquire(); return resources.remove(0); } public void release(Resource resource) { resources.add(resource); semaphore.release(); } }2.4 Phaser 阶段同步器public class MultiPhaseTask { private final Phaser phaser; public MultiPhaseTask(int parties) { this.phaser new Phaser(parties) { Override protected boolean onAdvance(int phase, int registeredParties) { System.out.printf(阶段 %d 完成参与方: %d%n, phase, registeredParties); return phase 3; } }; } public void execute() { for (int i 0; i 4; i) { new Thread(() - { doPhase1(); phaser.arriveAndAwaitAdvance(); doPhase2(); phaser.arriveAndAwaitAdvance(); doPhase3(); phaser.arriveAndDeregister(); }).start(); } } }三、原子类操作3.1 AtomicReference 原子引用public class NonBlockingCacheK, V { private static class NodeK, V { final K key; volatile V value; volatile NodeK, V next; Node(K key, V value) { this.key key; this.value value; } } private final AtomicReferenceNodeK, V head new AtomicReference(); public V get(K key) { NodeK, V current head.get(); while (current ! null) { if (key.equals(current.key)) { return current.value; } current current.next; } return null; } public void put(K key, V value) { NodeK, V newNode new Node(key, value); while (!head.compareAndSet(head.get(), newNode)) { newNode.next head.get(); } } }3.2 LongAdder 高性能计数器public class PerformanceCounter { private final LongAdder counter new LongAdder(); private final LongAdder totalTime new LongAdder(); public void record(long duration) { counter.increment(); totalTime.add(duration); } public double getAverage() { long count counter.sum(); return count 0 ? (double) totalTime.sum() / count : 0; } }四、并发容器使用技巧4.1 ConcurrentHashMap 分段锁机制public class ConcurrentCache { private final ConcurrentHashMapString, CacheEntry cache new ConcurrentHashMap(); public Object get(String key) { CacheEntry entry cache.get(key); if (entry null) { return null; } if (entry.isExpired()) { cache.remove(key, entry); return null; } return entry.getValue(); } public void put(String key, Object value, long ttl) { cache.put(key, new CacheEntry(value, ttl)); } }4.2 CopyOnWriteArrayList 读写分离public class EventListenerManager { private final CopyOnWriteArrayListEventListener listeners new CopyOnWriteArrayList(); public void register(EventListener listener) { listeners.addIfAbsent(listener); } public void unregister(EventListener listener) { listeners.remove(listener); } public void fireEvent(Event event) { for (EventListener listener : listeners) { listener.onEvent(event); } } }五、CompletableFuture 异步编程5.1 链式调用public class AsyncService { public CompletableFutureUser getUserById(Long id) { return CompletableFuture.supplyAsync(() - userRepository.findById(id)) .thenApply(user - { if (user.isPresent()) { return user.get(); } throw new UserNotFoundException(User not found: id); }); } public CompletableFutureListOrder getOrdersByUserId(Long userId) { return CompletableFuture.supplyAsync(() - orderRepository.findByUserId(userId)); } public CompletableFutureUserProfile getUserProfile(Long userId) { CompletableFutureUser userFuture getUserById(userId); CompletableFutureListOrder ordersFuture getOrdersByUserId(userId); return CompletableFuture.allOf(userFuture, ordersFuture) .thenApply(v - { User user userFuture.join(); ListOrder orders ordersFuture.join(); return UserProfile.builder() .user(user) .orders(orders) .build(); }); } }5.2 异常处理public CompletableFutureResult processWithFallback() { return CompletableFuture.supplyAsync(this::doRiskyOperation) .exceptionally(ex - { log.error(操作失败: {}, ex.getMessage()); return Result.failure(降级处理); }) .thenApply(result - { if (result.isSuccess()) { return enhanceResult(result); } return result; }); }六、并发编程常见问题6.1 线程安全问题// 错误示例 public class UnsafeCounter { private int count 0; public void increment() { count; // 非原子操作 } } // 正确示例 public class SafeCounter { private final AtomicInteger count new AtomicInteger(0); public void increment() { count.incrementAndGet(); } }6.2 死锁问题// 死锁示例 public class DeadlockExample { private final Object lock1 new Object(); private final Object lock2 new Object(); public void method1() { synchronized (lock1) { synchronized (lock2) { // 业务逻辑 } } } public void method2() { synchronized (lock2) { // 锁顺序不一致 synchronized (lock1) { // 业务逻辑 } } } }6.3 资源泄漏// 错误示例 - 忘记关闭资源 ExecutorService executor Executors.newFixedThreadPool(4); executor.submit(() - processData()); // 正确示例 ExecutorService executor Executors.newFixedThreadPool(4); try { executor.submit(() - processData()); } finally { executor.shutdown(); executor.awaitTermination(1, TimeUnit.MINUTES); }七、性能优化建议7.1 线程池大小计算公式线程池大小 CPU核心数 × (1 I/O耗时 / CPU耗时)7.2 避免线程饥饿// 使用不同的线程池处理不同类型的任务 ExecutorService cpuExecutor Executors.newFixedThreadPool(4); ExecutorService ioExecutor Executors.newFixedThreadPool(16); ExecutorService priorityExecutor Executors.newFixedThreadPool(2);7.3 使用 ThreadLocal 减少锁竞争public class RequestContext { private static final ThreadLocalRequestInfo context ThreadLocal.withInitial(RequestInfo::new); public static RequestInfo get() { return context.get(); } public static void set(RequestInfo info) { context.set(info); } public static void clear() { context.remove(); } }八、总结Java并发编程需要掌握以下核心要点线程池合理配置参数选择合适的拒绝策略同步工具根据场景选择CountDownLatch、CyclicBarrier等原子操作使用原子类保证线程安全并发容器选择适合的并发数据结构异步编程使用CompletableFuture简化异步流程问题排查警惕死锁、资源泄漏等常见问题通过系统化学习和实践可以构建高效、稳定的并发系统。