Redis/MySQL 中间件深度优化与生产选型一、场景痛点中间件性能瓶颈Redis 和 MySQL 是互联网应用中使用最广泛的中间件承担着缓存和存储的核心职责。在高并发场景下这两个中间件往往成为系统的性能瓶颈。Redis 的常见问题内存碎片化、bigkey 导致阻塞、hotkey 引发数据倾斜、持久化阻塞主线程。MySQL 的常见问题慢查询、锁竞争、连接池耗尽、主从延迟、索引失效。这些问题在低并发下可能完全不会显现但在流量高峰时会突然爆发导致服务雪崩。本文将深入探讨 Redis 和 MySQL 的深度优化策略和选型建议。二、底层机制与原理深度剖析2.1 Redis 内存管理与持久化机制flowchart TD A[客户端请求] -- B[命令解析] B -- C[内存分配] C -- D{data} subgraph 内存分配器 E[jemalloc] -- F[small class] E -- G[large class] E -- F[ huge class] end D -- E subgraph 持久化 H[RDB] -- I[BGSAVE] J[AOF] -- K[fsync] H -- L[子进程快照] end style E fill:#b8d4ff style H fill:#FFE4B5 style J fill:#FFE4B52.2 MySQL 查询优化器原理flowchart LR A[SQL 语句] -- B[解析器] B -- C[语法树] C -- D[查询优化器] D -- E{成本估算} E --|基于规则| F[RBO] E --|基于代价| G[CBO] F -- H[执行计划] G -- H三、生产级代码实现与最佳实践3.1 Redis 连接池与性能优化// Redis 连接池配置 Configuration public class RedisConfig { Bean public RedisConnectionFactory redisConnectionFactory() { GenericObjectPoolConfig poolConfig new GenericObjectPoolConfig(); // 连接池配置 poolConfig.setMaxTotal(200); // 最大连接数 poolConfig.setMaxIdle(50); // 最大空闲连接 poolConfig.setMinIdle(10); // 最小空闲连接 poolConfig.setMaxWait(Duration.ofMillis(3000)); // 空闲检测 poolConfig.setTestWhileIdle(true); poolConfig.setTestOnBorrow(true); // 借出检测 poolConfig.setTestOnReturn(false); poolConfig.setTestOnCreate(true); // 创建检测 // 空闲时检测 poolConfig.setMinEvictableIdleTime(Duration.ofMinutes(5)); poolConfig.setEvictableIdleTime(Duration.ofMinutes(10)); poolConfig.setNumTestsPerEvictionRun(10); // 连接耗尽策略 poolConfig.setBlockWhenExhausted(true); // Redis Cluster 配置 RedisClusterConfiguration clusterConfig new RedisClusterConfiguration(); clusterConfig.setClusterNodes(Arrays.asList( new RedisNode(10.0.0.1, 6379), new RedisNode(10.0.0.2, 6379), new RedisNode(10.0.0.3, 6379) )); clusterConfig.setMaxRedirects(3); clusterConfig.setPassword(RedisPassword.of(password)); return new LettuceConnectionFactory(clusterConfig, ClientConfiguration.builder() .useSsl() .connectTimeout(Duration.ofSeconds(5)) .readTimeout(Duration.ofSeconds(3)) .commandTimeout(Duration.ofSeconds(3)) .build()); } Bean public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template new StringRedisTemplate(factory); // 开启事务 template.setExecutePipeline(true); return template; } } // Redis 管道批量操作 Component public class RedisBatchOperations { private final StringRedisTemplate redisTemplate; public RedisBatchOperations(StringRedisTemplate redisTemplate) { this.redisTemplate redisTemplate; } /** * 批量获取Pipeline * 减少网络往返次数 */ public ListString batchGet(ListString keys) { if (keys.isEmpty()) return Collections.emptyList(); return redisTemplate.executePipelined((RedisCallbackListString) connection - { for (String key : keys) { connection.stringCommands().get(key.getBytes()); } return null; }); } /** * 批量写入Pipeline */ public void batchSet(MapString, String keyValues) { if (keyValues.isEmpty()) return; redisTemplate.executePipelined((RedisCallbackObject) connection - { for (Map.EntryString, String entry : keyValues.entrySet()) { connection.stringCommands() .set(entry.getKey().getBytes(), entry.getValue().getBytes()); } return null; }); } /** * Lua 脚本保证原子性 */ public Long incrementIfNotExists(String key, long delta, Duration ttl) { String luaScript local current redis.call(GET, KEYS[1]) if current false then redis.call(SET, KEYS[1], ARGV[1], PX, ARGV[2]) return 1 else return 0 end ; return redisTemplate.execute( new DefaultRedisScript(luaScript, Long.class), List.of(key), String.valueOf(delta), String.valueOf(ttl.toMillis()) ); } }3.2 MySQL 连接池与查询优化# Druid 连接池配置 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/app?useUnicodetruecharacterEncodingutf8 username: root password: password # Druid 连接池配置 druid: # 初始连接数 initial-size: 10 # 最大连接数 max-active: 100 # 最小空闲连接 min-idle: 10 # 获取连接最大等待时间 max-wait: 60000 # 连接泄漏检测 remove-abandoned: true remove-abandoned-timeout: 60 # 连接有效性检测 validation-query: SELECT 1 test-while-idle: true test-on-borrow: false test-on-return: false # 检测间隔 time-between-eviction-runs-millis: 60000 # 最小保持空闲时间 min-evictable-idle-time-millis: 300000 # 监控配置 filter: stat: enabled: true log-slow-sql: true slow-sql-millis: 2000 wall: enabled: true config: multi-statement-allow: true-- 慢查询优化示例 -- 1. 检查慢查询 SELECT query, db, exec_count, total_latency, avg_latency, rows_sent, rows_scanned FROM performance_schema.events_statements_summary_by_digest WHERE schema app ORDER BY total_latency DESC LIMIT 10; -- 2. 分析执行计划 EXPLAIN ANALYZE SELECT u.*, o.* FROM users u LEFT JOIN orders o ON u.id o.user_id WHERE u.created_at 2024-01-01 AND o.status completed; -- 3. 创建合适索引 CREATE INDEX idx_users_created_at ON users(created_at); CREATE INDEX idx_orders_user_id_status ON orders(user_id, status); -- 4. 分页优化延迟关联 SELECT u.* FROM users u INNER JOIN ( SELECT id FROM users WHERE created_at 2024-01-01 ORDER BY created_at LIMIT 100 OFFSET 1000 ) t ON u.id t.id; -- 5. 覆盖索引避免回表 CREATE INDEX idx_user_covered ON users(id, name, email, created_at);3.3 MySQL 分库分表策略// ShardingSphere 分库分表配置 Configuration public class ShardingConfig { Bean public DataSource dataSource() { MapString, DataSource dataSourceMap new HashMap(); // 配置分库 dataSourceMap.put(ds_0, createDataSource(10.0.0.1:3306)); dataSourceMap.put(ds_1, createDataSource(10.0.0.2:3306)); // 分片规则配置 ShardingRuleConfiguration shardingRuleConfig new ShardingRuleConfiguration(); // 订单表分片规则 TableRuleConfiguration orderTableRule new TableRuleConfiguration(orders, ds_${0..1}.orders_${0..15}); orderTableRule.setTableShardingStrategyConfig( new StandardShardingStrategyConfiguration( user_id, new PreciseShardingAlgorithmLong() { Override public String doSharding( CollectionString availableTargetNames, PreciseShardingValueLong value) { long shardId value.getValue() % 16; int dbIndex (int) (shardId / 8); return ds_ dbIndex; } }, new RangeShardingAlgorithmLong() { Override public CollectionString doSharding( CollectionString availableTargetNames, RangeShardingValueLong value) { // 范围查询广播到所有库 return availableTargetNames; } } ) ); shardingRuleConfig.getTableRuleConfigs().add(orderTableRule); // 默认数据库策略 shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig( new InlineShardingStrategyConfiguration( user_id, ds_${user_id % 2} ) ); // 绑定表避免跨库 join shardingRuleConfig.getBindingTableGroups().add(orders,order_items); try { return ShardingDataSourceFactory.createDataSource( dataSourceMap, shardingRuleConfig, new Properties() ); } catch (Exception e) { throw new RuntimeException(e); } } }3.4 缓存与数据库一致性// Cache-Aside 模式实现 Service public class UserService { private final StringRedisTemplate redisTemplate; private final UserMapper userMapper; private static final String USER_CACHE_KEY user:; private static final Duration CACHE_TTL Duration.ofMinutes(30); /** * Cache-Aside 读流程 * 1. 先查缓存 * 2. 缓存未命中查数据库 * 3. 写入缓存 */ public User getUserById(Long userId) { String cacheKey USER_CACHE_KEY userId; // 1. 查缓存 User cached getCachedUser(cacheKey); if (cached ! null) { return cached; } // 2. 查数据库 User user userMapper.selectById(userId); if (user null) { return null; } // 3. 写缓存 cacheUser(cacheKey, user); return user; } /** * 更新时删除缓存而非更新 * 避免并发下的缓存不一致 */ Transactional public void updateUser(User user) { // 1. 更新数据库 userMapper.updateById(user); // 2. 删除缓存 String cacheKey USER_CACHE_KEY user.getId(); redisTemplate.delete(cacheKey); // 3. 延迟双删确保一致性 CompletableFuture.runAsync(() - { try { Thread.sleep(200); redisTemplate.delete(cacheKey); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } /** * 分布式锁保证缓存重建的原子性 */ public User getUserWithLock(Long userId) { String cacheKey USER_CACHE_KEY userId; // 1. 尝试从缓存获取 User cached getCachedUser(cacheKey); if (cached ! null) { return cached; } String lockKey lock: cacheKey; String lockValue UUID.randomUUID().toString(); // 2. 获取分布式锁 Boolean acquired redisTemplate.opsForValue() .setIfAbsent(lockKey, lockValue, Duration.ofSeconds(10)); if (Boolean.TRUE.equals(acquired)) { try { // Double Check cached getCachedUser(cacheKey); if (cached ! null) { return cached; } // 3. 查数据库 User user userMapper.selectById(userId); // 4. 写入缓存 if (user ! null) { cacheUser(cacheKey, user); } else { // 缓存空结果避免缓存穿透 cacheEmpty(cacheKey); } return user; } finally { // 5. 释放锁 redisTemplate.delete(lockKey); } } else { // 等待其他线程重建缓存 try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return getUserById(userId); } } private User getCachedUser(String key) { String json redisTemplate.opsForValue().get(key); if (json null) { return null; } if ({}.equals(json)) { return null; // 空结果 } return JSON.parseObject(json, User.class); } private void cacheUser(String key, User user) { redisTemplate.opsForValue().set(key, JSON.toJSONString(user), CACHE_TTL); } private void cacheEmpty(String key) { // 短过期时间的空缓存防止缓存穿透 redisTemplate.opsForValue().set(key, {}, Duration.ofMinutes(5)); } }四、边界分析与 Trade-offs4.1 Redis vs Memcached维度RedisMemcached数据类型多种字符串持久化支持不支持集群原生 Cluster需客户端分片性能稍低稍高内存管理更复杂简单4.2 MySQL 存储引擎选择引擎适用场景特点InnoDBOLTP事务支持、行锁MyISAM读多写少表锁、空间节省Memory临时表内存存储TokuDB大数据压缩率高五、总结Redis 和 MySQL 的深度优化是后端工程师的必备技能连接池管理合理配置连接池参数避免连接耗尽索引优化基于 Explain 分析创建合适索引缓存策略合理使用缓存避免缓存穿透/击穿/雪崩分库分表根据业务增长规划数据分片一致性保证在性能和一致性间权衡中间件优化没有银弹需要基于业务场景和数据特点进行针对性调优。
Redis/MySQL 中间件深度优化与生产选型
Redis/MySQL 中间件深度优化与生产选型一、场景痛点中间件性能瓶颈Redis 和 MySQL 是互联网应用中使用最广泛的中间件承担着缓存和存储的核心职责。在高并发场景下这两个中间件往往成为系统的性能瓶颈。Redis 的常见问题内存碎片化、bigkey 导致阻塞、hotkey 引发数据倾斜、持久化阻塞主线程。MySQL 的常见问题慢查询、锁竞争、连接池耗尽、主从延迟、索引失效。这些问题在低并发下可能完全不会显现但在流量高峰时会突然爆发导致服务雪崩。本文将深入探讨 Redis 和 MySQL 的深度优化策略和选型建议。二、底层机制与原理深度剖析2.1 Redis 内存管理与持久化机制flowchart TD A[客户端请求] -- B[命令解析] B -- C[内存分配] C -- D{data} subgraph 内存分配器 E[jemalloc] -- F[small class] E -- G[large class] E -- F[ huge class] end D -- E subgraph 持久化 H[RDB] -- I[BGSAVE] J[AOF] -- K[fsync] H -- L[子进程快照] end style E fill:#b8d4ff style H fill:#FFE4B5 style J fill:#FFE4B52.2 MySQL 查询优化器原理flowchart LR A[SQL 语句] -- B[解析器] B -- C[语法树] C -- D[查询优化器] D -- E{成本估算} E --|基于规则| F[RBO] E --|基于代价| G[CBO] F -- H[执行计划] G -- H三、生产级代码实现与最佳实践3.1 Redis 连接池与性能优化// Redis 连接池配置 Configuration public class RedisConfig { Bean public RedisConnectionFactory redisConnectionFactory() { GenericObjectPoolConfig poolConfig new GenericObjectPoolConfig(); // 连接池配置 poolConfig.setMaxTotal(200); // 最大连接数 poolConfig.setMaxIdle(50); // 最大空闲连接 poolConfig.setMinIdle(10); // 最小空闲连接 poolConfig.setMaxWait(Duration.ofMillis(3000)); // 空闲检测 poolConfig.setTestWhileIdle(true); poolConfig.setTestOnBorrow(true); // 借出检测 poolConfig.setTestOnReturn(false); poolConfig.setTestOnCreate(true); // 创建检测 // 空闲时检测 poolConfig.setMinEvictableIdleTime(Duration.ofMinutes(5)); poolConfig.setEvictableIdleTime(Duration.ofMinutes(10)); poolConfig.setNumTestsPerEvictionRun(10); // 连接耗尽策略 poolConfig.setBlockWhenExhausted(true); // Redis Cluster 配置 RedisClusterConfiguration clusterConfig new RedisClusterConfiguration(); clusterConfig.setClusterNodes(Arrays.asList( new RedisNode(10.0.0.1, 6379), new RedisNode(10.0.0.2, 6379), new RedisNode(10.0.0.3, 6379) )); clusterConfig.setMaxRedirects(3); clusterConfig.setPassword(RedisPassword.of(password)); return new LettuceConnectionFactory(clusterConfig, ClientConfiguration.builder() .useSsl() .connectTimeout(Duration.ofSeconds(5)) .readTimeout(Duration.ofSeconds(3)) .commandTimeout(Duration.ofSeconds(3)) .build()); } Bean public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template new StringRedisTemplate(factory); // 开启事务 template.setExecutePipeline(true); return template; } } // Redis 管道批量操作 Component public class RedisBatchOperations { private final StringRedisTemplate redisTemplate; public RedisBatchOperations(StringRedisTemplate redisTemplate) { this.redisTemplate redisTemplate; } /** * 批量获取Pipeline * 减少网络往返次数 */ public ListString batchGet(ListString keys) { if (keys.isEmpty()) return Collections.emptyList(); return redisTemplate.executePipelined((RedisCallbackListString) connection - { for (String key : keys) { connection.stringCommands().get(key.getBytes()); } return null; }); } /** * 批量写入Pipeline */ public void batchSet(MapString, String keyValues) { if (keyValues.isEmpty()) return; redisTemplate.executePipelined((RedisCallbackObject) connection - { for (Map.EntryString, String entry : keyValues.entrySet()) { connection.stringCommands() .set(entry.getKey().getBytes(), entry.getValue().getBytes()); } return null; }); } /** * Lua 脚本保证原子性 */ public Long incrementIfNotExists(String key, long delta, Duration ttl) { String luaScript local current redis.call(GET, KEYS[1]) if current false then redis.call(SET, KEYS[1], ARGV[1], PX, ARGV[2]) return 1 else return 0 end ; return redisTemplate.execute( new DefaultRedisScript(luaScript, Long.class), List.of(key), String.valueOf(delta), String.valueOf(ttl.toMillis()) ); } }3.2 MySQL 连接池与查询优化# Druid 连接池配置 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/app?useUnicodetruecharacterEncodingutf8 username: root password: password # Druid 连接池配置 druid: # 初始连接数 initial-size: 10 # 最大连接数 max-active: 100 # 最小空闲连接 min-idle: 10 # 获取连接最大等待时间 max-wait: 60000 # 连接泄漏检测 remove-abandoned: true remove-abandoned-timeout: 60 # 连接有效性检测 validation-query: SELECT 1 test-while-idle: true test-on-borrow: false test-on-return: false # 检测间隔 time-between-eviction-runs-millis: 60000 # 最小保持空闲时间 min-evictable-idle-time-millis: 300000 # 监控配置 filter: stat: enabled: true log-slow-sql: true slow-sql-millis: 2000 wall: enabled: true config: multi-statement-allow: true-- 慢查询优化示例 -- 1. 检查慢查询 SELECT query, db, exec_count, total_latency, avg_latency, rows_sent, rows_scanned FROM performance_schema.events_statements_summary_by_digest WHERE schema app ORDER BY total_latency DESC LIMIT 10; -- 2. 分析执行计划 EXPLAIN ANALYZE SELECT u.*, o.* FROM users u LEFT JOIN orders o ON u.id o.user_id WHERE u.created_at 2024-01-01 AND o.status completed; -- 3. 创建合适索引 CREATE INDEX idx_users_created_at ON users(created_at); CREATE INDEX idx_orders_user_id_status ON orders(user_id, status); -- 4. 分页优化延迟关联 SELECT u.* FROM users u INNER JOIN ( SELECT id FROM users WHERE created_at 2024-01-01 ORDER BY created_at LIMIT 100 OFFSET 1000 ) t ON u.id t.id; -- 5. 覆盖索引避免回表 CREATE INDEX idx_user_covered ON users(id, name, email, created_at);3.3 MySQL 分库分表策略// ShardingSphere 分库分表配置 Configuration public class ShardingConfig { Bean public DataSource dataSource() { MapString, DataSource dataSourceMap new HashMap(); // 配置分库 dataSourceMap.put(ds_0, createDataSource(10.0.0.1:3306)); dataSourceMap.put(ds_1, createDataSource(10.0.0.2:3306)); // 分片规则配置 ShardingRuleConfiguration shardingRuleConfig new ShardingRuleConfiguration(); // 订单表分片规则 TableRuleConfiguration orderTableRule new TableRuleConfiguration(orders, ds_${0..1}.orders_${0..15}); orderTableRule.setTableShardingStrategyConfig( new StandardShardingStrategyConfiguration( user_id, new PreciseShardingAlgorithmLong() { Override public String doSharding( CollectionString availableTargetNames, PreciseShardingValueLong value) { long shardId value.getValue() % 16; int dbIndex (int) (shardId / 8); return ds_ dbIndex; } }, new RangeShardingAlgorithmLong() { Override public CollectionString doSharding( CollectionString availableTargetNames, RangeShardingValueLong value) { // 范围查询广播到所有库 return availableTargetNames; } } ) ); shardingRuleConfig.getTableRuleConfigs().add(orderTableRule); // 默认数据库策略 shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig( new InlineShardingStrategyConfiguration( user_id, ds_${user_id % 2} ) ); // 绑定表避免跨库 join shardingRuleConfig.getBindingTableGroups().add(orders,order_items); try { return ShardingDataSourceFactory.createDataSource( dataSourceMap, shardingRuleConfig, new Properties() ); } catch (Exception e) { throw new RuntimeException(e); } } }3.4 缓存与数据库一致性// Cache-Aside 模式实现 Service public class UserService { private final StringRedisTemplate redisTemplate; private final UserMapper userMapper; private static final String USER_CACHE_KEY user:; private static final Duration CACHE_TTL Duration.ofMinutes(30); /** * Cache-Aside 读流程 * 1. 先查缓存 * 2. 缓存未命中查数据库 * 3. 写入缓存 */ public User getUserById(Long userId) { String cacheKey USER_CACHE_KEY userId; // 1. 查缓存 User cached getCachedUser(cacheKey); if (cached ! null) { return cached; } // 2. 查数据库 User user userMapper.selectById(userId); if (user null) { return null; } // 3. 写缓存 cacheUser(cacheKey, user); return user; } /** * 更新时删除缓存而非更新 * 避免并发下的缓存不一致 */ Transactional public void updateUser(User user) { // 1. 更新数据库 userMapper.updateById(user); // 2. 删除缓存 String cacheKey USER_CACHE_KEY user.getId(); redisTemplate.delete(cacheKey); // 3. 延迟双删确保一致性 CompletableFuture.runAsync(() - { try { Thread.sleep(200); redisTemplate.delete(cacheKey); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } /** * 分布式锁保证缓存重建的原子性 */ public User getUserWithLock(Long userId) { String cacheKey USER_CACHE_KEY userId; // 1. 尝试从缓存获取 User cached getCachedUser(cacheKey); if (cached ! null) { return cached; } String lockKey lock: cacheKey; String lockValue UUID.randomUUID().toString(); // 2. 获取分布式锁 Boolean acquired redisTemplate.opsForValue() .setIfAbsent(lockKey, lockValue, Duration.ofSeconds(10)); if (Boolean.TRUE.equals(acquired)) { try { // Double Check cached getCachedUser(cacheKey); if (cached ! null) { return cached; } // 3. 查数据库 User user userMapper.selectById(userId); // 4. 写入缓存 if (user ! null) { cacheUser(cacheKey, user); } else { // 缓存空结果避免缓存穿透 cacheEmpty(cacheKey); } return user; } finally { // 5. 释放锁 redisTemplate.delete(lockKey); } } else { // 等待其他线程重建缓存 try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return getUserById(userId); } } private User getCachedUser(String key) { String json redisTemplate.opsForValue().get(key); if (json null) { return null; } if ({}.equals(json)) { return null; // 空结果 } return JSON.parseObject(json, User.class); } private void cacheUser(String key, User user) { redisTemplate.opsForValue().set(key, JSON.toJSONString(user), CACHE_TTL); } private void cacheEmpty(String key) { // 短过期时间的空缓存防止缓存穿透 redisTemplate.opsForValue().set(key, {}, Duration.ofMinutes(5)); } }四、边界分析与 Trade-offs4.1 Redis vs Memcached维度RedisMemcached数据类型多种字符串持久化支持不支持集群原生 Cluster需客户端分片性能稍低稍高内存管理更复杂简单4.2 MySQL 存储引擎选择引擎适用场景特点InnoDBOLTP事务支持、行锁MyISAM读多写少表锁、空间节省Memory临时表内存存储TokuDB大数据压缩率高五、总结Redis 和 MySQL 的深度优化是后端工程师的必备技能连接池管理合理配置连接池参数避免连接耗尽索引优化基于 Explain 分析创建合适索引缓存策略合理使用缓存避免缓存穿透/击穿/雪崩分库分表根据业务增长规划数据分片一致性保证在性能和一致性间权衡中间件优化没有银弹需要基于业务场景和数据特点进行针对性调优。