SpringBoot + 读写分离 + 动态数据源路由:主库写、从库读,自动故障切换

引言

大家好,我是服务端技术精选的作者。最近项目数据库压力越来越大,单台MySQL已经扛不住了。传统的垂直分库方案改动太大,后来发现SpringBoot的读写分离配合动态数据源路由,可以在不改变业务代码的情况下,轻松实现主库写、从库读的架构升级。

很多同学一提到数据库扩展就想到分库分表,但其实读写分离是最简单有效的第一步。今天就来聊聊如何用SpringBoot实现智能的读写分离,让系统既能写又能读,还能自动处理故障切换。

为什么需要读写分离?

单数据库的瓶颈

面对高并发场景,单数据库的问题:

写入瓶颈

  • 主库既要处理写请求,又要同步数据到从库
  • 写入QPS达到几千就容易出现瓶颈
  • 锁竞争严重,影响整体性能

读取瓶颈

  • 大量查询请求都打到主库
  • 主库CPU和IO资源被读写混合消耗
  • 查询复杂度高时影响写入性能

扩展困难

  • 垂直分库需要改造业务代码
  • 水平分表复杂度高
  • 数据一致性难以保证

核心架构设计

我们的读写分离架构:

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   应用层        │───▶│  动态数据源路由   │───▶│   数据源选择     │
│  (SpringBoot)   │    │  (AOP + 注解)    │    │  (主/从库)      │
└─────────────────┘    └──────────────────┘    └─────────────────┘
                              │
        ┌─────────────────────┼─────────────────────┐
        ▼                     ▼                     ▼
┌─────────────┐      ┌─────────────┐      ┌─────────────┐
│   主库写入   │      │   从库读取   │      │  故障检测    │
│  (Master)   │      │  (Slave)    │      │  (Health)   │
└─────────────┘      └─────────────┘      └─────────────┘
        │                     │                     │
        ▼                     ▼                     ▼
┌─────────────────────────────────────────────────────────┐
│                    MySQL主从复制                         │
└─────────────────────────────────────────────────────────┘

核心设计要点

1. 动态数据源路由

// 数据源上下文持有器
public class DataSourceContextHolder {
    private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
    
    public static void setDataSourceKey(String dataSourceKey) {
        CONTEXT_HOLDER.set(dataSourceKey);
    }
    
    public static String getDataSourceKey() {
        return CONTEXT_HOLDER.get();
    }
    
    public static void clearDataSourceKey() {
        CONTEXT_HOLDER.remove();
    }
}

// 动态数据源路由
public class DynamicDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContextHolder.getDataSourceKey();
    }
}

2. 读写分离注解

// 读操作注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
    String value() default "slave";
}

// 写操作注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface WriteOnly {
    String value() default "master";
}

3. AOP切面实现

@Aspect
@Component
@Order(1)  // 确保在事务之前执行
public class DataSourceRoutingAspect {
    
    @Around("@annotation(readOnly)")
    public Object proceedRead(ProceedingJoinPoint joinPoint, ReadOnly readOnly) throws Throwable {
        try {
            // 设置从库数据源
            DataSourceContextHolder.setDataSourceKey(readOnly.value());
            return joinPoint.proceed();
        } finally {
            DataSourceContextHolder.clearDataSourceKey();
        }
    }
    
    @Around("@annotation(writeOnly)")
    public Object proceedWrite(ProceedingJoinPoint joinPoint, WriteOnly writeOnly) throws Throwable {
        try {
            // 设置主库数据源
            DataSourceContextHolder.setDataSourceKey(writeOnly.value());
            return joinPoint.proceed();
        } finally {
            DataSourceContextHolder.clearDataSourceKey();
        }
    }
}

关键实现细节

1. 数据源配置

@Configuration
public class DataSourceConfig {
    
    @Bean
    @Primary
    public DataSource dynamicDataSource() {
        DynamicDataSource dynamicDataSource = new DynamicDataSource();
        
        // 配置主从数据源
        Map<Object, Object> dataSourceMap = new HashMap<>();
        dataSourceMap.put("master", masterDataSource());
        dataSourceMap.put("slave1", slaveDataSource1());
        dataSourceMap.put("slave2", slaveDataSource2());
        
        dynamicDataSource.setTargetDataSources(dataSourceMap);
        dynamicDataSource.setDefaultTargetDataSource(masterDataSource());
        
        return dynamicDataSource;
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave1")
    public DataSource slaveDataSource1() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave2")
    public DataSource slaveDataSource2() {
        return DataSourceBuilder.create().build();
    }
}

2. 负载均衡策略

@Component
public class SlaveLoadBalancer {
    private final List<String> slaveKeys = Arrays.asList("slave1", "slave2");
    private final AtomicInteger counter = new AtomicInteger(0);
    
    public String getNextSlaveKey() {
        int index = counter.getAndIncrement() % slaveKeys.size();
        return slaveKeys.get(index);
    }
    
    public String getSlaveKeyByHash(String key) {
        int hash = key.hashCode();
        int index = Math.abs(hash) % slaveKeys.size();
        return slaveKeys.get(index);
    }
}

3. 故障检测与切换

@Component
@Slf4j
public class DataSourceHealthChecker {
    
    private final Map<String, Boolean> dataSourceStatus = new ConcurrentHashMap<>();
    private final Map<String, DataSource> dataSources;
    
    @PostConstruct
    public void init() {
        // 初始化所有数据源状态为健康
        dataSources.keySet().forEach(key -> dataSourceStatus.put(key, true));
        // 启动健康检查任务
        startHealthCheckTask();
    }
    
    @Scheduled(fixedRate = 30000)  // 每30秒检查一次
    public void checkDataSourceHealth() {
        dataSources.forEach((key, dataSource) -> {
            boolean isHealthy = testDataSourceConnection(dataSource);
            boolean wasHealthy = dataSourceStatus.get(key);
            
            if (isHealthy != wasHealthy) {
                dataSourceStatus.put(key, isHealthy);
                log.info("数据源状态变更: {} -> {}", key, isHealthy ? "健康" : "故障");
                
                // 触发故障转移
                if (!isHealthy) {
                    handleDataSourceFailure(key);
                }
            }
        });
    }
    
    private boolean testDataSourceConnection(DataSource dataSource) {
        try (Connection connection = dataSource.getConnection()) {
            return connection.isValid(3);  // 3秒超时
        } catch (Exception e) {
            log.warn("数据源连接测试失败", e);
            return false;
        }
    }
    
    private void handleDataSourceFailure(String failedKey) {
        if ("master".equals(failedKey)) {
            // 主库故障,切换到备用主库
            switchToBackupMaster();
        } else {
            // 从库故障,从负载均衡中移除
            removeFromLoadBalancer(failedKey);
        }
    }
}

4. 事务管理配置

@Configuration
@EnableTransactionManagement
public class TransactionConfig {
    
    @Bean
    public PlatformTransactionManager transactionManager(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }
    
    // 确保事务内的所有操作都使用同一个数据源
    @Bean
    public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
        TransactionTemplate template = new TransactionTemplate(transactionManager);
        template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        return template;
    }
}

业务场景应用

1. 用户服务读写分离

@Service
public class UserService {
    
    @WriteOnly
    @Transactional
    public User createUser(CreateUserRequest request) {
        // 写操作路由到主库
        User user = new User();
        user.setUsername(request.getUsername());
        user.setEmail(request.getEmail());
        return userRepository.save(user);
    }
    
    @ReadOnly
    public User getUserById(Long userId) {
        // 读操作路由到从库
        return userRepository.findById(userId)
            .orElseThrow(() -> new UserNotFoundException(userId));
    }
    
    @ReadOnly
    public List<User> searchUsers(String keyword) {
        // 复杂查询路由到从库
        return userRepository.findByUsernameContaining(keyword);
    }
    
    @WriteOnly
    @Transactional
    public User updateUser(Long userId, UpdateUserRequest request) {
        // 更新操作路由到主库
        User user = getUserById(userId);
        user.setUsername(request.getUsername());
        user.setEmail(request.getEmail());
        return userRepository.save(user);
    }
}

2. 订单服务混合场景

@Service
public class OrderService {
    
    @WriteOnly
    @Transactional
    public Order createOrder(CreateOrderRequest request) {
        // 创建订单写入主库
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setAmount(request.getAmount());
        order.setStatus(OrderStatus.PENDING);
        
        Order savedOrder = orderRepository.save(order);
        
        // 同时更新用户积分(写操作)
        userService.updateUserPoints(request.getUserId(), request.getAmount().intValue());
        
        return savedOrder;
    }
    
    @ReadOnly
    public Order getOrderWithDetails(Long orderId) {
        // 读取订单详情从从库
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
            
        // 读取用户信息从从库
        User user = userService.getUserById(order.getUserId());
        order.setUser(user);
        
        return order;
    }
    
    @WriteOnly
    @Transactional
    public Order payOrder(Long orderId) {
        // 支付订单需要读写混合操作
        Order order = getOrderWithDetails(orderId);  // 先读取(从库)
        order.setStatus(OrderStatus.PAID);           // 再更新(主库)
        return orderRepository.save(order);
    }
}

3. 报表统计服务

@Service
public class ReportService {
    
    @ReadOnly
    public UserStatisticsReport getUserStatistics(LocalDate date) {
        // 大量统计查询路由到从库
        UserStatisticsReport report = new UserStatisticsReport();
        
        report.setTotalUsers(userRepository.count());
        report.setActiveUsers(userRepository.countActiveUsers(date));
        report.setNewUsers(userRepository.countNewUsers(date));
        
        return report;
    }
    
    @ReadOnly
    public OrderAnalysisReport getOrderAnalysis(LocalDate startDate, LocalDate endDate) {
        // 复杂分析查询路由到从库
        List<Order> orders = orderRepository.findByDateRange(startDate, endDate);
        
        OrderAnalysisReport report = new OrderAnalysisReport();
        report.setTotalOrders(orders.size());
        report.setTotalAmount(orders.stream().mapToDouble(Order::getAmount).sum());
        report.setAverageOrderValue(report.getTotalAmount() / report.getTotalOrders());
        
        return report;
    }
}

最佳实践建议

1. 数据一致性保障

@Component
@Slf4j
public class DataConsistencyManager {
    
    // 强一致性读取(读主库)
    @WriteOnly
    public <T> T readFromMaster(Supplier<T> readOperation) {
        return readOperation.get();
    }
    
    // 最终一致性读取(读从库)
    @ReadOnly
    public <T> T readFromSlave(Supplier<T> readOperation) {
        return readOperation.get();
    }
    
    // 读写分离延迟处理
    public void handleReplicationDelay(String operation, long delayMs) {
        if (delayMs > 1000) {  // 延迟超过1秒
            log.warn("主从复制延迟较高: operation={}, delay={}ms", operation, delayMs);
            // 可以触发告警或降级策略
        }
    }
}

2. 监控告警体系

@Component
public class DataSourceMonitor {
    
    private final MeterRegistry meterRegistry;
    private final Map<String, Timer> queryTimers = new ConcurrentHashMap<>();
    
    @EventListener
    public void handleQueryExecution(QueryExecutionEvent event) {
        String dataSourceKey = DataSourceContextHolder.getDataSourceKey();
        String operationType = event.isWriteOperation() ? "write" : "read";
        
        // 记录查询性能指标
        String timerName = "database.query." + operationType;
        Timer timer = queryTimers.computeIfAbsent(timerName, 
            name -> Timer.builder(name)
                .tag("datasource", dataSourceKey)
                .register(meterRegistry));
        
        timer.record(event.getExecutionTime(), TimeUnit.MILLISECONDS);
        
        // 慢查询告警
        if (event.getExecutionTime() > 1000) {
            log.warn("慢查询告警: datasource={}, operation={}, time={}ms", 
                dataSourceKey, operationType, event.getExecutionTime());
        }
    }
    
    @EventListener
    public void handleDataSourceFailure(DataSourceFailureEvent event) {
        // 数据源故障告警
        log.error("数据源故障: key={}, error={}", 
            event.getDataSourceKey(), event.getError());
            
        // 触发告警通知
        alertService.sendAlert("数据源故障", 
            String.format("数据源 %s 发生故障: %s", 
                event.getDataSourceKey(), event.getError()));
    }
}

3. 配置管理优化

@ConfigurationProperties(prefix = "datasource.routing")
@Data
public class DataSourceRoutingProperties {
    private LoadBalanceStrategy loadBalance = LoadBalanceStrategy.ROUND_ROBIN;
    private long healthCheckInterval = 30000;  // 30秒
    private int maxRetryAttempts = 3;
    private long retryDelay = 1000;  // 1秒
    
    public enum LoadBalanceStrategy {
        ROUND_ROBIN,    // 轮询
        RANDOM,         // 随机
        HASH,          // 哈希
        WEIGHTED       // 加权
    }
}

@Component
public class RoutingConfigurationManager {
    
    @Autowired
    private DataSourceRoutingProperties properties;
    
    public String selectSlaveDataSource(String routingKey) {
        switch (properties.getLoadBalance()) {
            case ROUND_ROBIN:
                return slaveLoadBalancer.getNextSlaveKey();
            case RANDOM:
                return slaveLoadBalancer.getRandomSlaveKey();
            case HASH:
                return slaveLoadBalancer.getSlaveKeyByHash(routingKey);
            case WEIGHTED:
                return slaveLoadBalancer.getWeightedSlaveKey();
            default:
                return slaveLoadBalancer.getNextSlaveKey();
        }
    }
}

预期效果

通过读写分离方案,我们可以实现:

  • 性能提升:读写分离后,主库专注写入,从库处理查询,整体性能提升50-100%
  • 扩展性强:可以轻松增加从库数量,线性提升读取能力
  • 高可用性:自动故障检测和切换,保证服务连续性
  • 成本优化:相比垂直分库,改造成本更低,风险更小
  • 业务透明:对上层业务代码无侵入,平滑升级

这套方案是数据库扩展的经典路径,既解决了当前的性能瓶颈,又为未来的业务增长预留了空间。


欢迎关注公众号"服务端技术精选",获取更多技术干货!
欢迎大家加群沟通


标题:SpringBoot + 读写分离 + 动态数据源路由:主库写、从库读,自动故障切换
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/02/12/1770701724162.html

    评论
    0 评论
avatar

取消