1.为什么使用多线程Java 多线程技术是 Java 并发编程的核心从基础的线程创建到高级的并发工具包JUC再到 JVM 底层的内存模型是一个庞大而精密的体系。简单来说Java 多线程的核心目的只有两个1. 充分利用硬件资源特别是 CPU2. 提升系统的响应速度和吞吐量。如果没有多线程现代互联网高并发系统如淘宝、微信、抖音将完全无法运行。以下从核心痛点、具体场景和直观对比三个维度详细拆解一、核心痛点单线程的致命缺陷在单线程模型中程序是顺序执行的。如果前一个任务没做完后一个任务必须等着。这会带来两个致命问题1. CPU 资源的极大浪费针对 CPU 密集型现状现在的服务器通常是多核 CPU比如 8 核、16 核甚至更多。单线程问题单线程程序同一时刻只能利用1 个 CPU 核心。剩下的 7/15 个核心都在“围观”处于空闲状态。后果花了几万块买的服务器实际只发挥了 1/8 的性能。多线程解决开启多个线程让每个核心都跑起来算力最大化。2. 阻塞导致的系统“假死”针对 IO 密集型现状现代应用大部分时间花在IO 操作上读数据库、调远程接口、读写文件、网络请求。速度差异CPU 计算速度纳秒级极快。磁盘/网络 IO 速度毫秒级极慢相差百万倍。单线程问题线程 A 发起一个数据库查询耗时 100ms。在这 100ms 内CPU 无事可做只能傻等阻塞。此时如果有用户 B 访问系统必须排队等待用户 A 的 100ms 结束。如果有 1000 个用户每人等 100ms最后一个人要等 100 秒系统直接卡死。多线程解决线程 A 发起查询后挂起等待不占用 CPU。CPU 立即切换去执行线程 B、线程 C 的任务。等线程 A 的数据库结果回来了CPU 再切回来处理。效果CPU 永远在忙没有空转系统吞吐量成倍提升。什么是线程线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中是进程中的实际运作单位。Java 天生支持多线程。线程的状态Java 线程状态定义在Thread.State枚举中NEW: 新建未启动 (start())RUNNABLE: 运行中包含就绪态和运行态由 OS 调度BLOCKED: 阻塞等待获取 synchronized 锁WAITING: 无限期等待等待其他线程通知 (wait(),join(),LockSupport.park())TIMED_WAITING: 限期等待 (sleep(ms),wait(ms))TERMINATED: 终止执行结束2.怎么使用多线程2.1创建多线程的四种方式方式描述优点缺点推荐度继承 Thread 类class MyThread extends Thread简单直接单继承限制无法继承其他类⭐⭐实现 Runnable 接口class MyTask implements Runnable解耦可共享资源可继承其他类无返回值⭐⭐⭐⭐实现 Callable 接口class MyTask implements CallableT有返回值可抛出异常需配合 FutureTask 使用⭐⭐⭐⭐⭐线程池执行ExecutorService.submit()资源复用可控高性能配置稍复杂⭐⭐⭐⭐⭐ (生产环境唯一推荐)2.1.1 继承Thread类我们以经典的火车站卖票为例子public class UnsafeThreadTicket extends Thread{ //静台变量所有线程共享 private static int totalTickets 100; public UnsafeThreadTicket(String name) { super(name); } Override public void run(){ while(true){ //判断是否有票 危险区域没有 synchronized没有 lock if (totalTickets 0){ try { Thread.sleep(10); // 这里最容易发生线程切换 }catch (InterruptedException e){ e.printStackTrace(); } //2.买票 System.out.println(Thread.currentThread().getName()卖出了第totalTickets张票); totalTickets--; } else { break; } } System.out.println(Thread.currentThread().getName()下班了剩余票数totalTickets); } }调用过程如下public class Demo { public static void main(String[] args) { UnsafeThreadTicket test1 new UnsafeThreadTicket(窗口A); UnsafeThreadTicket test2 new UnsafeThreadTicket(窗口B); UnsafeThreadTicket test3 new UnsafeThreadTicket(窗口C); test1.start(); test2.start(); test3.start(); } }运行结果如下我们可以看到对于共享资源不加锁简直是灾难的接下来我们来看看加锁版本的public class UnsafeThreadTicket extends Thread{ //静台变量所有线程共享 private static int totalTickets 100; public UnsafeThreadTicket(String name) { super(name); } // 锁对象也可以使用 static Object lock new Object(); // 这里直接用类对象锁或者实例锁均可因为 totalTickets 是 static 的建议锁 class 对象或静态变量 private static final Object lock new Object(); Override public void run(){ while(true) { //判断是否有票 加锁 // 锁住 lock 对象保证同一时刻只有一个线程能进入这块代码 synchronized (lock) { if (totalTickets 0) { break; // 票卖完了退出循环 } try { Thread.sleep(10); // 这里最容易发生线程切换 } catch (InterruptedException e) { e.printStackTrace(); } //2.买票 System.out.println(Thread.currentThread().getName() 卖出了第 totalTickets 张票); totalTickets--; } } System.out.println(Thread.currentThread().getName()下班了剩余票数totalTickets); } }执行如下业务正常多线程访问共享资源一定要加锁2.1.2实现Runnable类先不加锁为例子public class UnsafeRunnableTask implements Runnable { private int totalTickets 100; Override public void run() { while (true) { if (totalTickets 0) { try { Thread.sleep(5);//制造竞争条件 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() 卖出了第 totalTickets 张票); totalTickets--; } else { break; } } System.out.println(Thread.currentThread().getName() 下班了,剩余 totalTickets); } }main函数代码如下public class Demo { public static void main(String[] args) { UnsafeRunnableTask task new UnsafeRunnableTask(); Thread t1 new Thread(task,窗口A); Thread t2 new Thread(task,窗口B); Thread t3 new Thread(task,窗口C); t1.start(); t2.start(); t3.start(); } }执行结果如下同样的会有严重的超卖问题接下来我们改善一下加锁版本如下public class UnsafeRunnableTask implements Runnable { private int totalTickets 100; // 显式锁 private final ReentrantLock lock new ReentrantLock(); Override public void run() { while (true) { lock.lock(); try { if (totalTickets 0) { break; } try { Thread.sleep(5);//制造竞争条件 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() 卖出了第 totalTickets 张票); totalTickets--; }finally { // 【关键】必须在 finally 中释放锁否则一旦发生异常锁永远不释放系统死锁 lock.unlock(); } } System.out.println(Thread.currentThread().getName() 下班了,剩余 totalTickets); } }测试如下2.1.3实现 Callable 接口public class TicketCallable implements CallableString { //Callable通常用于独立任务若要共享状态需要外部传入共享对象 //为了演示我们让外部传入一个共享的计数器对象 private final TicketCounter counter; private final String windowName; private int soldCount 0; //记录本窗口卖出的数量 public TicketCallable(TicketCounter counter, String windowName) { this.counter counter; this.windowName windowName; } Override public String call() throws Exception { while(true){ //调用共享对象中的加锁方法 int ticketNo counter.sellTicket(); if (ticketNo -1){ break; //无票 } soldCount; System.out.println(windowName卖出了第ticketNo张票); Thread.sleep(10); } return windowName 共卖出soldCount张票; } } class TicketCounter { private int totalTickets 100; public int sellTicket() { synchronized (this) { if (totalTickets 0) { return -1; } return totalTickets--; } } }main函数调用public class Demo { public static void main(String[] args) throws Exception{ TicketCounter counter new TicketCounter(); TicketCallable c1 new TicketCallable(counter, 窗口A); TicketCallable c2 new TicketCallable(counter, 窗口B); TicketCallable c3 new TicketCallable(counter, 窗口C); //配合FutureTask FutureTaskString f1 new FutureTask(c1); FutureTaskString f2 new FutureTask(c2); FutureTaskString f3 new FutureTask(c3); new Thread(f1,窗口A现成).start(); new Thread(f2,窗口A现成).start(); new Thread(f3,窗口A现成).start(); System.out.println(结果Af1.get()); System.out.println(结果Bf2.get()); System.out.println(结果Cf3.get()); } }测试结果如下2.4线程池public class DemoThreadPoolFixed { // 共享计数器 private static final AtomicInteger count new AtomicInteger(100); public static void main(String[] args) { // 1. 创建线程池 ThreadPoolExecutor pool new ThreadPoolExecutor( 3, 3, 50L, TimeUnit.SECONDS, new ArrayBlockingQueueRunnable(10), new ThreadPoolExecutor.CallerRunsPolicy() ); try { // 2. 提交任务 for (int i 1; i 3; i) { final int windowId i; pool.submit(() - { String name 窗口- windowId; while (true) { // decrementAndGet: 先减 1再返回新值 int currentVal count.decrementAndGet(); // 如果减完之后 0说明减之前 0即没票了 // 注意如果有多个线程同时抢到最后一张只有一个能拿到 0其他拿到 -1, -2... if (currentVal 0) { break; // 没票了退出循环 } // 能走到这里说明 currentVal 0代表卖出了第 (currentVal 1) 张 System.out.println(name 卖出了第 (currentVal 1) 张票); // 模拟业务耗时 try { Thread.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 恢复中断状态 break; } } System.out.println(name 下班了); }); } } finally { // 3. 【关键修复】关闭线程池否则主线程结束后程序不会退出 pool.shutdown(); // 可选等待所有任务执行完毕后再彻底关闭防止任务没跑完程序就退了 try { if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); } } catch (InterruptedException e) { pool.shutdownNow(); Thread.currentThread().interrupt(); } } } }现在有这样一个需求我有几千万的商品api,有六千多个商品池三级分类id,包含商品id,商品信息价格都需要调用单独的接口需要快速导入到我们数据库里这时候肯定要使用多线程示例代码如下Component Slf4j public class SpuSyncTask { /** * 注入持久层对象 */ Resource private JdSkuDao jdSkuDao; Resource private JdSpuDao jdSpuDao; Resource Qualifier(jdSpuCategoryDao) private JdSpuCategoryDao jdSpuCategoryDao; Resource private SkuDao skuDao; Resource private SpuDao spuDao; Resource private SpuCategoryDao spuCategoryDao; Resource private CategoryDao categoryDao; Resource private StringRedisTemplate template; /** * syncSkuData 专用线程池4 核 8G 服务器I/O 为主按 CPU*2 官方推荐设为 8 线程 */ private static final int SKU_SYNC_THREADS 8; private static final int SKU_SYNC_QUEUE_CAPACITY 200; private final ExecutorService skuSyncExecutor new ThreadPoolExecutor( SKU_SYNC_THREADS, SKU_SYNC_THREADS, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(SKU_SYNC_QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy() ); // Scheduled(cron 30 50 16 07 12 ?) public R syncSkuData() throws Exception { ListGetSkuPoolInfoItemGoodsResp skuPoolList ReadExcel(/data/project/backend/employeebenefits/jdvop/商品池分类.xlsx); log.info(syncSkuData 定时任务开始商品池数量: {}, skuPoolList.size()); String token template.opsForValue().get(Constants.JdVopRedisKey); JdClient jdClient JDClientSingleton.getInstance(Constants.SERVER_URL, token, Constants.APP_KEY, Constants.APP_SECRET); processSkuPoolBatch(skuPoolList, jdClient); log.info(syncSkuData 定时任务完成); return R.ok(); } /** 获取当前 JdClient每次从 Redis 取 token */ private JdClient getJdClient() { String token template.opsForValue().get(Constants.JdVopRedisKey); return JDClientSingleton.getInstance(Constants.SERVER_URL, token, Constants.APP_KEY, Constants.APP_SECRET); } /** Token 过期时同步刷新避免多线程重复刷新 */ private JdClient refreshTokenAndGetClient() { synchronized (JdAccess.class) { try { JdAccess.getToken(); } catch (Exception e) { log.error(syncSkuData 刷新 token 失败, e); } } return getJdClient(); } /** * 处理单个 SKU可在线程中执行查详情、查价格、落库 */ private void processSingleSku(Long skuId, JdClient client) { try { // 3. 查询商品详情 VopGoodsGetSkuDetailInfoRequest getGoodsRequest new VopGoodsGetSkuDetailInfoRequest(); getGoodsRequest.setSkuId(skuId); VopGoodsGetSkuDetailInfoResponse getGoodsResponse client.execute(getGoodsRequest); String resultCode getGoodsResponse.getCode(); if (18.equals(resultCode) || 19.equals(resultCode)) { client refreshTokenAndGetClient(); getGoodsResponse client.execute(getGoodsRequest); } GetSkuPoolInfoGoodsResp result getGoodsResponse.getOpenRpcResult().getResult(); if (result null) { return; } // 4. 查询商品价格 VopGoodsGetSellPriceRequest goodsPriceReq new VopGoodsGetSellPriceRequest(); goodsPriceReq.setSkuId(String.valueOf(skuId)); VopGoodsGetSellPriceResponse goodsPriceResp client.execute(goodsPriceReq); resultCode goodsPriceResp.getCode(); if (18.equals(resultCode) || 19.equals(resultCode)) { client refreshTokenAndGetClient(); goodsPriceResp client.execute(goodsPriceReq); } if (goodsPriceResp.getOpenRpcResult().getResult() null) { return; } JdSku sku new JdSku(); sku.setSkuId(result.getSkuId()); sku.setSkuName(result.getSkuName()); sku.setSkuStatus(result.getSkuState()); sku.setSpuId(result.getSpuId()); sku.setImagePath(result.getImagePath()); sku.setSelfSellType(result.getSelfSellType()); sku.setSalePrice(goodsPriceResp.getOpenRpcResult().getResult().get(0).getSalePrice()); sku.setJdPrice(goodsPriceResp.getOpenRpcResult().getResult().get(0).getJdPrice()); sku.setLowestBuy(result.getLowestBuy() null ? 1 : result.getLowestBuy()); sku.setSaleUnit(result.getSaleUnit()); JdSpu spu new JdSpu(); spu.setSpuId(result.getSpuId()); spu.setName(result.getSpuName()); spu.setBrandName(result.getBrandName()); spu.setImagePath(result.getImagePath()); spu.setLogisticsType(result.getLogisticsType()); spu.setSalePrice(goodsPriceResp.getOpenRpcResult().getResult().get(0).getSalePrice()); jdSpuDao.upsertSpuData(spu); jdSkuDao.upsertSkuData(sku); log.debug(添加商品 id: {}, result.getSpuId()); // 同步商品分类 ListJdSpuCategory categories new ArrayList(); String categoryIdsStr result.getCategory(); if (categoryIdsStr ! null !categoryIdsStr.isEmpty()) { for (String categoryId : categoryIdsStr.split(;)) { int categoryIdInt Integer.parseInt(categoryId.trim()); JdSpuCategory category new JdSpuCategory(); category.setCategoryId(categoryIdInt); category.setSpuId(result.getSpuId()); categories.add(category); } if (!categories.isEmpty()) { jdSpuCategoryDao.batchInsert(categories); } } } catch (Exception e) { log.warn(processSingleSku 失败 skuId{}, skuId, e); } } //插入全部商品 private void processSkuPoolBatch(ListGetSkuPoolInfoItemGoodsResp skuPoolBatch, JdClient jdClient) { for (GetSkuPoolInfoItemGoodsResp item : skuPoolBatch) { long offset 0; while (true) { VopGoodsQuerySkuByPageRequest vopGoodsRequest new VopGoodsQuerySkuByPageRequest(); vopGoodsRequest.setBizPoolId(item.getBizPoolId()); vopGoodsRequest.setOffset(offset); vopGoodsRequest.setPageSize(10); try { VopGoodsQuerySkuByPageResponse vopGoodsResponse jdClient.execute(vopGoodsRequest); String resultCode vopGoodsResponse.getCode(); log.debug(分页查询状态: {}, resultCode); if (18.equals(resultCode) || 19.equals(resultCode)) { jdClient refreshTokenAndGetClient(); vopGoodsResponse jdClient.execute(vopGoodsRequest); } if (vopGoodsResponse.getOpenRpcResult().getResult() null) { break; } ListLong skus vopGoodsResponse.getOpenRpcResult().getResult().getSkus(); if (skus.isEmpty()) { break; } // 多线程并行处理当前页的 SKU ListCompletableFutureVoid futures skus.stream() .map(skuId - CompletableFuture.runAsync( () - processSingleSku(skuId, getJdClient()), skuSyncExecutor)) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); offset skus.get(skus.size() - 1); if (vopGoodsResponse.getOpenRpcResult().getResult().getRemainPage() 0) { break; } } catch (Exception e) { log.error(processSkuPoolBatch 分页异常 bizPoolId{}, offset{}, item.getBizPoolId(), offset, e); break; } } } } }关于线程池的七个参数public ThreadPoolExecutor( int corePoolSize, // 1. 核心线程数 int maximumPoolSize, // 2. 最大线程数 long keepAliveTime, // 3. 非核心线程存活时间 TimeUnit unit, // 4. 时间单位 BlockingQueueRunnable workQueue, // 5. 任务队列 ThreadFactory threadFactory, // 6. 线程工厂 (可选) RejectedExecutionHandler handler // 7. 拒绝策略 )想象一个银行大厅corePoolSize(核心线程数)正式柜员数量。即使没有客户这些柜员也一直在岗除非设置了allowCoreThreadTimeOut。任务来了优先交给他们。workQueue(任务队列)等候区的椅子数量。当正式柜员都忙不过来时新来的客户先在椅子上排队等待。如果椅子坐满了就要叫临时工了。maximumPoolSize(最大线程数)正式柜员 临时柜员的总上限。当等候区队列满了银行会开启临时窗口非核心线程。一旦临时窗口也开到了上限再来的客户就会被拒绝。注意只有当队列满了之后才会创建超过 corePoolSize 的线程。keepAliveTimeunit(存活时间)临时工裁员时限。如果临时柜员非核心线程在这么长时间内没活干就会被辞退线程销毁只保留正式柜员。threadFactory(线程工厂)招聘专员。用来创建新线程的。通常用来给线程起有意义的名字如 Order-Thread-1方便出问题时排查。handler(拒绝策略)满员后的处理方式。当正式工忙、临时工满、椅子也坐满了新来的客户怎么办当队列满且线程达到最大值时触发拒绝策略AbortPolicy(默认)行为直接抛出RejectedExecutionException异常。后果如果不捕获程序会崩溃。适用不允许丢失任务且希望快速感知系统过载的场景。CallerRunsPolicy(推荐用于重要业务)行为谁提交的任務谁自己去执行调用者线程执行。后果提交任务的主线程或线程池中的线程会被阻塞不再提交新任务从而降低提交速度给后端处理争取时间。适用不允许丢失任务且希望系统自动降速保护的场景如你的卖票系统。DiscardPolicy行为直接丢弃任务不抛异常。适用任务丢失也没关系的场景如日志记录、非关键监控数据。DiscardOldestPolicy行为丢弃队列中最老的一个任务然后尝试重新提交当前任务。适用希望尽量处理最新数据的场景。《阿里巴巴 Java 开发手册》中强制规定线程池不允许使用Executors去创建而是通过ThreadPoolExecutor的方式。// 1. FixedThreadPool: 允许请求队列长度为 Integer.MAX_VALUE // 风险堆积大量请求导致 OOM (内存溢出) ExecutorService executor Executors.newFixedThreadPool(10); // 2. CachedThreadPool: 允许创建线程数量为 Integer.MAX_VALUE // 风险创建大量线程导致 CPU 100% 或 OOM ExecutorService executor Executors.newCachedThreadPool(); // 3. SingleThreadExecutor: 允许请求队列长度为 Integer.MAX_VALUE // 风险同上OOM ExecutorService executor Executors.newSingleThreadExecutor();原因Executors创建的线程池其队列长度或最大线程数默认为Integer.MAX_VALUE。在高并发场景下如果任务处理速度慢于提交速度内存会瞬间爆满导致服务器崩溃。2.2多线程使用问题2.2.1线程安全多线程最大的问题是线程安全主要源于原子性、可见性、有序性。1. 三大问题详解原子性操作要么全部成功要么全部失败不可中断。反例i不是原子操作读-改-写。可见性一个线程修改了变量其他线程能立即看到。原因CPU 缓存不一致。有序性指令重排序导致执行顺序与代码顺序不一致。原因编译器和处理器优化。解决办法加锁
深入理解java多线程技术
1.为什么使用多线程Java 多线程技术是 Java 并发编程的核心从基础的线程创建到高级的并发工具包JUC再到 JVM 底层的内存模型是一个庞大而精密的体系。简单来说Java 多线程的核心目的只有两个1. 充分利用硬件资源特别是 CPU2. 提升系统的响应速度和吞吐量。如果没有多线程现代互联网高并发系统如淘宝、微信、抖音将完全无法运行。以下从核心痛点、具体场景和直观对比三个维度详细拆解一、核心痛点单线程的致命缺陷在单线程模型中程序是顺序执行的。如果前一个任务没做完后一个任务必须等着。这会带来两个致命问题1. CPU 资源的极大浪费针对 CPU 密集型现状现在的服务器通常是多核 CPU比如 8 核、16 核甚至更多。单线程问题单线程程序同一时刻只能利用1 个 CPU 核心。剩下的 7/15 个核心都在“围观”处于空闲状态。后果花了几万块买的服务器实际只发挥了 1/8 的性能。多线程解决开启多个线程让每个核心都跑起来算力最大化。2. 阻塞导致的系统“假死”针对 IO 密集型现状现代应用大部分时间花在IO 操作上读数据库、调远程接口、读写文件、网络请求。速度差异CPU 计算速度纳秒级极快。磁盘/网络 IO 速度毫秒级极慢相差百万倍。单线程问题线程 A 发起一个数据库查询耗时 100ms。在这 100ms 内CPU 无事可做只能傻等阻塞。此时如果有用户 B 访问系统必须排队等待用户 A 的 100ms 结束。如果有 1000 个用户每人等 100ms最后一个人要等 100 秒系统直接卡死。多线程解决线程 A 发起查询后挂起等待不占用 CPU。CPU 立即切换去执行线程 B、线程 C 的任务。等线程 A 的数据库结果回来了CPU 再切回来处理。效果CPU 永远在忙没有空转系统吞吐量成倍提升。什么是线程线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中是进程中的实际运作单位。Java 天生支持多线程。线程的状态Java 线程状态定义在Thread.State枚举中NEW: 新建未启动 (start())RUNNABLE: 运行中包含就绪态和运行态由 OS 调度BLOCKED: 阻塞等待获取 synchronized 锁WAITING: 无限期等待等待其他线程通知 (wait(),join(),LockSupport.park())TIMED_WAITING: 限期等待 (sleep(ms),wait(ms))TERMINATED: 终止执行结束2.怎么使用多线程2.1创建多线程的四种方式方式描述优点缺点推荐度继承 Thread 类class MyThread extends Thread简单直接单继承限制无法继承其他类⭐⭐实现 Runnable 接口class MyTask implements Runnable解耦可共享资源可继承其他类无返回值⭐⭐⭐⭐实现 Callable 接口class MyTask implements CallableT有返回值可抛出异常需配合 FutureTask 使用⭐⭐⭐⭐⭐线程池执行ExecutorService.submit()资源复用可控高性能配置稍复杂⭐⭐⭐⭐⭐ (生产环境唯一推荐)2.1.1 继承Thread类我们以经典的火车站卖票为例子public class UnsafeThreadTicket extends Thread{ //静台变量所有线程共享 private static int totalTickets 100; public UnsafeThreadTicket(String name) { super(name); } Override public void run(){ while(true){ //判断是否有票 危险区域没有 synchronized没有 lock if (totalTickets 0){ try { Thread.sleep(10); // 这里最容易发生线程切换 }catch (InterruptedException e){ e.printStackTrace(); } //2.买票 System.out.println(Thread.currentThread().getName()卖出了第totalTickets张票); totalTickets--; } else { break; } } System.out.println(Thread.currentThread().getName()下班了剩余票数totalTickets); } }调用过程如下public class Demo { public static void main(String[] args) { UnsafeThreadTicket test1 new UnsafeThreadTicket(窗口A); UnsafeThreadTicket test2 new UnsafeThreadTicket(窗口B); UnsafeThreadTicket test3 new UnsafeThreadTicket(窗口C); test1.start(); test2.start(); test3.start(); } }运行结果如下我们可以看到对于共享资源不加锁简直是灾难的接下来我们来看看加锁版本的public class UnsafeThreadTicket extends Thread{ //静台变量所有线程共享 private static int totalTickets 100; public UnsafeThreadTicket(String name) { super(name); } // 锁对象也可以使用 static Object lock new Object(); // 这里直接用类对象锁或者实例锁均可因为 totalTickets 是 static 的建议锁 class 对象或静态变量 private static final Object lock new Object(); Override public void run(){ while(true) { //判断是否有票 加锁 // 锁住 lock 对象保证同一时刻只有一个线程能进入这块代码 synchronized (lock) { if (totalTickets 0) { break; // 票卖完了退出循环 } try { Thread.sleep(10); // 这里最容易发生线程切换 } catch (InterruptedException e) { e.printStackTrace(); } //2.买票 System.out.println(Thread.currentThread().getName() 卖出了第 totalTickets 张票); totalTickets--; } } System.out.println(Thread.currentThread().getName()下班了剩余票数totalTickets); } }执行如下业务正常多线程访问共享资源一定要加锁2.1.2实现Runnable类先不加锁为例子public class UnsafeRunnableTask implements Runnable { private int totalTickets 100; Override public void run() { while (true) { if (totalTickets 0) { try { Thread.sleep(5);//制造竞争条件 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() 卖出了第 totalTickets 张票); totalTickets--; } else { break; } } System.out.println(Thread.currentThread().getName() 下班了,剩余 totalTickets); } }main函数代码如下public class Demo { public static void main(String[] args) { UnsafeRunnableTask task new UnsafeRunnableTask(); Thread t1 new Thread(task,窗口A); Thread t2 new Thread(task,窗口B); Thread t3 new Thread(task,窗口C); t1.start(); t2.start(); t3.start(); } }执行结果如下同样的会有严重的超卖问题接下来我们改善一下加锁版本如下public class UnsafeRunnableTask implements Runnable { private int totalTickets 100; // 显式锁 private final ReentrantLock lock new ReentrantLock(); Override public void run() { while (true) { lock.lock(); try { if (totalTickets 0) { break; } try { Thread.sleep(5);//制造竞争条件 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() 卖出了第 totalTickets 张票); totalTickets--; }finally { // 【关键】必须在 finally 中释放锁否则一旦发生异常锁永远不释放系统死锁 lock.unlock(); } } System.out.println(Thread.currentThread().getName() 下班了,剩余 totalTickets); } }测试如下2.1.3实现 Callable 接口public class TicketCallable implements CallableString { //Callable通常用于独立任务若要共享状态需要外部传入共享对象 //为了演示我们让外部传入一个共享的计数器对象 private final TicketCounter counter; private final String windowName; private int soldCount 0; //记录本窗口卖出的数量 public TicketCallable(TicketCounter counter, String windowName) { this.counter counter; this.windowName windowName; } Override public String call() throws Exception { while(true){ //调用共享对象中的加锁方法 int ticketNo counter.sellTicket(); if (ticketNo -1){ break; //无票 } soldCount; System.out.println(windowName卖出了第ticketNo张票); Thread.sleep(10); } return windowName 共卖出soldCount张票; } } class TicketCounter { private int totalTickets 100; public int sellTicket() { synchronized (this) { if (totalTickets 0) { return -1; } return totalTickets--; } } }main函数调用public class Demo { public static void main(String[] args) throws Exception{ TicketCounter counter new TicketCounter(); TicketCallable c1 new TicketCallable(counter, 窗口A); TicketCallable c2 new TicketCallable(counter, 窗口B); TicketCallable c3 new TicketCallable(counter, 窗口C); //配合FutureTask FutureTaskString f1 new FutureTask(c1); FutureTaskString f2 new FutureTask(c2); FutureTaskString f3 new FutureTask(c3); new Thread(f1,窗口A现成).start(); new Thread(f2,窗口A现成).start(); new Thread(f3,窗口A现成).start(); System.out.println(结果Af1.get()); System.out.println(结果Bf2.get()); System.out.println(结果Cf3.get()); } }测试结果如下2.4线程池public class DemoThreadPoolFixed { // 共享计数器 private static final AtomicInteger count new AtomicInteger(100); public static void main(String[] args) { // 1. 创建线程池 ThreadPoolExecutor pool new ThreadPoolExecutor( 3, 3, 50L, TimeUnit.SECONDS, new ArrayBlockingQueueRunnable(10), new ThreadPoolExecutor.CallerRunsPolicy() ); try { // 2. 提交任务 for (int i 1; i 3; i) { final int windowId i; pool.submit(() - { String name 窗口- windowId; while (true) { // decrementAndGet: 先减 1再返回新值 int currentVal count.decrementAndGet(); // 如果减完之后 0说明减之前 0即没票了 // 注意如果有多个线程同时抢到最后一张只有一个能拿到 0其他拿到 -1, -2... if (currentVal 0) { break; // 没票了退出循环 } // 能走到这里说明 currentVal 0代表卖出了第 (currentVal 1) 张 System.out.println(name 卖出了第 (currentVal 1) 张票); // 模拟业务耗时 try { Thread.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 恢复中断状态 break; } } System.out.println(name 下班了); }); } } finally { // 3. 【关键修复】关闭线程池否则主线程结束后程序不会退出 pool.shutdown(); // 可选等待所有任务执行完毕后再彻底关闭防止任务没跑完程序就退了 try { if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); } } catch (InterruptedException e) { pool.shutdownNow(); Thread.currentThread().interrupt(); } } } }现在有这样一个需求我有几千万的商品api,有六千多个商品池三级分类id,包含商品id,商品信息价格都需要调用单独的接口需要快速导入到我们数据库里这时候肯定要使用多线程示例代码如下Component Slf4j public class SpuSyncTask { /** * 注入持久层对象 */ Resource private JdSkuDao jdSkuDao; Resource private JdSpuDao jdSpuDao; Resource Qualifier(jdSpuCategoryDao) private JdSpuCategoryDao jdSpuCategoryDao; Resource private SkuDao skuDao; Resource private SpuDao spuDao; Resource private SpuCategoryDao spuCategoryDao; Resource private CategoryDao categoryDao; Resource private StringRedisTemplate template; /** * syncSkuData 专用线程池4 核 8G 服务器I/O 为主按 CPU*2 官方推荐设为 8 线程 */ private static final int SKU_SYNC_THREADS 8; private static final int SKU_SYNC_QUEUE_CAPACITY 200; private final ExecutorService skuSyncExecutor new ThreadPoolExecutor( SKU_SYNC_THREADS, SKU_SYNC_THREADS, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(SKU_SYNC_QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy() ); // Scheduled(cron 30 50 16 07 12 ?) public R syncSkuData() throws Exception { ListGetSkuPoolInfoItemGoodsResp skuPoolList ReadExcel(/data/project/backend/employeebenefits/jdvop/商品池分类.xlsx); log.info(syncSkuData 定时任务开始商品池数量: {}, skuPoolList.size()); String token template.opsForValue().get(Constants.JdVopRedisKey); JdClient jdClient JDClientSingleton.getInstance(Constants.SERVER_URL, token, Constants.APP_KEY, Constants.APP_SECRET); processSkuPoolBatch(skuPoolList, jdClient); log.info(syncSkuData 定时任务完成); return R.ok(); } /** 获取当前 JdClient每次从 Redis 取 token */ private JdClient getJdClient() { String token template.opsForValue().get(Constants.JdVopRedisKey); return JDClientSingleton.getInstance(Constants.SERVER_URL, token, Constants.APP_KEY, Constants.APP_SECRET); } /** Token 过期时同步刷新避免多线程重复刷新 */ private JdClient refreshTokenAndGetClient() { synchronized (JdAccess.class) { try { JdAccess.getToken(); } catch (Exception e) { log.error(syncSkuData 刷新 token 失败, e); } } return getJdClient(); } /** * 处理单个 SKU可在线程中执行查详情、查价格、落库 */ private void processSingleSku(Long skuId, JdClient client) { try { // 3. 查询商品详情 VopGoodsGetSkuDetailInfoRequest getGoodsRequest new VopGoodsGetSkuDetailInfoRequest(); getGoodsRequest.setSkuId(skuId); VopGoodsGetSkuDetailInfoResponse getGoodsResponse client.execute(getGoodsRequest); String resultCode getGoodsResponse.getCode(); if (18.equals(resultCode) || 19.equals(resultCode)) { client refreshTokenAndGetClient(); getGoodsResponse client.execute(getGoodsRequest); } GetSkuPoolInfoGoodsResp result getGoodsResponse.getOpenRpcResult().getResult(); if (result null) { return; } // 4. 查询商品价格 VopGoodsGetSellPriceRequest goodsPriceReq new VopGoodsGetSellPriceRequest(); goodsPriceReq.setSkuId(String.valueOf(skuId)); VopGoodsGetSellPriceResponse goodsPriceResp client.execute(goodsPriceReq); resultCode goodsPriceResp.getCode(); if (18.equals(resultCode) || 19.equals(resultCode)) { client refreshTokenAndGetClient(); goodsPriceResp client.execute(goodsPriceReq); } if (goodsPriceResp.getOpenRpcResult().getResult() null) { return; } JdSku sku new JdSku(); sku.setSkuId(result.getSkuId()); sku.setSkuName(result.getSkuName()); sku.setSkuStatus(result.getSkuState()); sku.setSpuId(result.getSpuId()); sku.setImagePath(result.getImagePath()); sku.setSelfSellType(result.getSelfSellType()); sku.setSalePrice(goodsPriceResp.getOpenRpcResult().getResult().get(0).getSalePrice()); sku.setJdPrice(goodsPriceResp.getOpenRpcResult().getResult().get(0).getJdPrice()); sku.setLowestBuy(result.getLowestBuy() null ? 1 : result.getLowestBuy()); sku.setSaleUnit(result.getSaleUnit()); JdSpu spu new JdSpu(); spu.setSpuId(result.getSpuId()); spu.setName(result.getSpuName()); spu.setBrandName(result.getBrandName()); spu.setImagePath(result.getImagePath()); spu.setLogisticsType(result.getLogisticsType()); spu.setSalePrice(goodsPriceResp.getOpenRpcResult().getResult().get(0).getSalePrice()); jdSpuDao.upsertSpuData(spu); jdSkuDao.upsertSkuData(sku); log.debug(添加商品 id: {}, result.getSpuId()); // 同步商品分类 ListJdSpuCategory categories new ArrayList(); String categoryIdsStr result.getCategory(); if (categoryIdsStr ! null !categoryIdsStr.isEmpty()) { for (String categoryId : categoryIdsStr.split(;)) { int categoryIdInt Integer.parseInt(categoryId.trim()); JdSpuCategory category new JdSpuCategory(); category.setCategoryId(categoryIdInt); category.setSpuId(result.getSpuId()); categories.add(category); } if (!categories.isEmpty()) { jdSpuCategoryDao.batchInsert(categories); } } } catch (Exception e) { log.warn(processSingleSku 失败 skuId{}, skuId, e); } } //插入全部商品 private void processSkuPoolBatch(ListGetSkuPoolInfoItemGoodsResp skuPoolBatch, JdClient jdClient) { for (GetSkuPoolInfoItemGoodsResp item : skuPoolBatch) { long offset 0; while (true) { VopGoodsQuerySkuByPageRequest vopGoodsRequest new VopGoodsQuerySkuByPageRequest(); vopGoodsRequest.setBizPoolId(item.getBizPoolId()); vopGoodsRequest.setOffset(offset); vopGoodsRequest.setPageSize(10); try { VopGoodsQuerySkuByPageResponse vopGoodsResponse jdClient.execute(vopGoodsRequest); String resultCode vopGoodsResponse.getCode(); log.debug(分页查询状态: {}, resultCode); if (18.equals(resultCode) || 19.equals(resultCode)) { jdClient refreshTokenAndGetClient(); vopGoodsResponse jdClient.execute(vopGoodsRequest); } if (vopGoodsResponse.getOpenRpcResult().getResult() null) { break; } ListLong skus vopGoodsResponse.getOpenRpcResult().getResult().getSkus(); if (skus.isEmpty()) { break; } // 多线程并行处理当前页的 SKU ListCompletableFutureVoid futures skus.stream() .map(skuId - CompletableFuture.runAsync( () - processSingleSku(skuId, getJdClient()), skuSyncExecutor)) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); offset skus.get(skus.size() - 1); if (vopGoodsResponse.getOpenRpcResult().getResult().getRemainPage() 0) { break; } } catch (Exception e) { log.error(processSkuPoolBatch 分页异常 bizPoolId{}, offset{}, item.getBizPoolId(), offset, e); break; } } } } }关于线程池的七个参数public ThreadPoolExecutor( int corePoolSize, // 1. 核心线程数 int maximumPoolSize, // 2. 最大线程数 long keepAliveTime, // 3. 非核心线程存活时间 TimeUnit unit, // 4. 时间单位 BlockingQueueRunnable workQueue, // 5. 任务队列 ThreadFactory threadFactory, // 6. 线程工厂 (可选) RejectedExecutionHandler handler // 7. 拒绝策略 )想象一个银行大厅corePoolSize(核心线程数)正式柜员数量。即使没有客户这些柜员也一直在岗除非设置了allowCoreThreadTimeOut。任务来了优先交给他们。workQueue(任务队列)等候区的椅子数量。当正式柜员都忙不过来时新来的客户先在椅子上排队等待。如果椅子坐满了就要叫临时工了。maximumPoolSize(最大线程数)正式柜员 临时柜员的总上限。当等候区队列满了银行会开启临时窗口非核心线程。一旦临时窗口也开到了上限再来的客户就会被拒绝。注意只有当队列满了之后才会创建超过 corePoolSize 的线程。keepAliveTimeunit(存活时间)临时工裁员时限。如果临时柜员非核心线程在这么长时间内没活干就会被辞退线程销毁只保留正式柜员。threadFactory(线程工厂)招聘专员。用来创建新线程的。通常用来给线程起有意义的名字如 Order-Thread-1方便出问题时排查。handler(拒绝策略)满员后的处理方式。当正式工忙、临时工满、椅子也坐满了新来的客户怎么办当队列满且线程达到最大值时触发拒绝策略AbortPolicy(默认)行为直接抛出RejectedExecutionException异常。后果如果不捕获程序会崩溃。适用不允许丢失任务且希望快速感知系统过载的场景。CallerRunsPolicy(推荐用于重要业务)行为谁提交的任務谁自己去执行调用者线程执行。后果提交任务的主线程或线程池中的线程会被阻塞不再提交新任务从而降低提交速度给后端处理争取时间。适用不允许丢失任务且希望系统自动降速保护的场景如你的卖票系统。DiscardPolicy行为直接丢弃任务不抛异常。适用任务丢失也没关系的场景如日志记录、非关键监控数据。DiscardOldestPolicy行为丢弃队列中最老的一个任务然后尝试重新提交当前任务。适用希望尽量处理最新数据的场景。《阿里巴巴 Java 开发手册》中强制规定线程池不允许使用Executors去创建而是通过ThreadPoolExecutor的方式。// 1. FixedThreadPool: 允许请求队列长度为 Integer.MAX_VALUE // 风险堆积大量请求导致 OOM (内存溢出) ExecutorService executor Executors.newFixedThreadPool(10); // 2. CachedThreadPool: 允许创建线程数量为 Integer.MAX_VALUE // 风险创建大量线程导致 CPU 100% 或 OOM ExecutorService executor Executors.newCachedThreadPool(); // 3. SingleThreadExecutor: 允许请求队列长度为 Integer.MAX_VALUE // 风险同上OOM ExecutorService executor Executors.newSingleThreadExecutor();原因Executors创建的线程池其队列长度或最大线程数默认为Integer.MAX_VALUE。在高并发场景下如果任务处理速度慢于提交速度内存会瞬间爆满导致服务器崩溃。2.2多线程使用问题2.2.1线程安全多线程最大的问题是线程安全主要源于原子性、可见性、有序性。1. 三大问题详解原子性操作要么全部成功要么全部失败不可中断。反例i不是原子操作读-改-写。可见性一个线程修改了变量其他线程能立即看到。原因CPU 缓存不一致。有序性指令重排序导致执行顺序与代码顺序不一致。原因编译器和处理器优化。解决办法加锁