SpringBoot + 读写分离 + 动态数据源路由:主库写、从库读,自动故障切换
引言
大家好,我是服务端技术精选的作者。最近项目数据库压力越来越大,单台MySQL已经扛不住了。传统的垂直分库方案改动太大,后来发现SpringBoot的读写分离配合动态数据源路由,可以在不改变业务代码的情况下,轻松实现主库写、从库读的架构升级。
很多同学一提到数据库扩展就想到分库分表,但其实读写分离是最简单有效的第一步。今天就来聊聊如何用SpringBoot实现智能的读写分离,让系统既能写又能读,还能自动处理故障切换。
为什么需要读写分离?
单数据库的瓶颈
面对高并发场景,单数据库的问题:
写入瓶颈:
- 主库既要处理写请求,又要同步数据到从库
- 写入QPS达到几千就容易出现瓶颈
- 锁竞争严重,影响整体性能
读取瓶颈:
- 大量查询请求都打到主库
- 主库CPU和IO资源被读写混合消耗
- 查询复杂度高时影响写入性能
扩展困难:
- 垂直分库需要改造业务代码
- 水平分表复杂度高
- 数据一致性难以保证
核心架构设计
我们的读写分离架构:
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ 应用层 │───▶│ 动态数据源路由 │───▶│ 数据源选择 │
│ (SpringBoot) │ │ (AOP + 注解) │ │ (主/从库) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
┌─────────────────────┼─────────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 主库写入 │ │ 从库读取 │ │ 故障检测 │
│ (Master) │ │ (Slave) │ │ (Health) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────┐
│ MySQL主从复制 │
└─────────────────────────────────────────────────────────┘
核心设计要点
1. 动态数据源路由
// 数据源上下文持有器
public class DataSourceContextHolder {
private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
public static void setDataSourceKey(String dataSourceKey) {
CONTEXT_HOLDER.set(dataSourceKey);
}
public static String getDataSourceKey() {
return CONTEXT_HOLDER.get();
}
public static void clearDataSourceKey() {
CONTEXT_HOLDER.remove();
}
}
// 动态数据源路由
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceKey();
}
}
2. 读写分离注解
// 读操作注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
String value() default "slave";
}
// 写操作注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface WriteOnly {
String value() default "master";
}
3. AOP切面实现
@Aspect
@Component
@Order(1) // 确保在事务之前执行
public class DataSourceRoutingAspect {
@Around("@annotation(readOnly)")
public Object proceedRead(ProceedingJoinPoint joinPoint, ReadOnly readOnly) throws Throwable {
try {
// 设置从库数据源
DataSourceContextHolder.setDataSourceKey(readOnly.value());
return joinPoint.proceed();
} finally {
DataSourceContextHolder.clearDataSourceKey();
}
}
@Around("@annotation(writeOnly)")
public Object proceedWrite(ProceedingJoinPoint joinPoint, WriteOnly writeOnly) throws Throwable {
try {
// 设置主库数据源
DataSourceContextHolder.setDataSourceKey(writeOnly.value());
return joinPoint.proceed();
} finally {
DataSourceContextHolder.clearDataSourceKey();
}
}
}
关键实现细节
1. 数据源配置
@Configuration
public class DataSourceConfig {
@Bean
@Primary
public DataSource dynamicDataSource() {
DynamicDataSource dynamicDataSource = new DynamicDataSource();
// 配置主从数据源
Map<Object, Object> dataSourceMap = new HashMap<>();
dataSourceMap.put("master", masterDataSource());
dataSourceMap.put("slave1", slaveDataSource1());
dataSourceMap.put("slave2", slaveDataSource2());
dynamicDataSource.setTargetDataSources(dataSourceMap);
dynamicDataSource.setDefaultTargetDataSource(masterDataSource());
return dynamicDataSource;
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave1")
public DataSource slaveDataSource1() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave2")
public DataSource slaveDataSource2() {
return DataSourceBuilder.create().build();
}
}
2. 负载均衡策略
@Component
public class SlaveLoadBalancer {
private final List<String> slaveKeys = Arrays.asList("slave1", "slave2");
private final AtomicInteger counter = new AtomicInteger(0);
public String getNextSlaveKey() {
int index = counter.getAndIncrement() % slaveKeys.size();
return slaveKeys.get(index);
}
public String getSlaveKeyByHash(String key) {
int hash = key.hashCode();
int index = Math.abs(hash) % slaveKeys.size();
return slaveKeys.get(index);
}
}
3. 故障检测与切换
@Component
@Slf4j
public class DataSourceHealthChecker {
private final Map<String, Boolean> dataSourceStatus = new ConcurrentHashMap<>();
private final Map<String, DataSource> dataSources;
@PostConstruct
public void init() {
// 初始化所有数据源状态为健康
dataSources.keySet().forEach(key -> dataSourceStatus.put(key, true));
// 启动健康检查任务
startHealthCheckTask();
}
@Scheduled(fixedRate = 30000) // 每30秒检查一次
public void checkDataSourceHealth() {
dataSources.forEach((key, dataSource) -> {
boolean isHealthy = testDataSourceConnection(dataSource);
boolean wasHealthy = dataSourceStatus.get(key);
if (isHealthy != wasHealthy) {
dataSourceStatus.put(key, isHealthy);
log.info("数据源状态变更: {} -> {}", key, isHealthy ? "健康" : "故障");
// 触发故障转移
if (!isHealthy) {
handleDataSourceFailure(key);
}
}
});
}
private boolean testDataSourceConnection(DataSource dataSource) {
try (Connection connection = dataSource.getConnection()) {
return connection.isValid(3); // 3秒超时
} catch (Exception e) {
log.warn("数据源连接测试失败", e);
return false;
}
}
private void handleDataSourceFailure(String failedKey) {
if ("master".equals(failedKey)) {
// 主库故障,切换到备用主库
switchToBackupMaster();
} else {
// 从库故障,从负载均衡中移除
removeFromLoadBalancer(failedKey);
}
}
}
4. 事务管理配置
@Configuration
@EnableTransactionManagement
public class TransactionConfig {
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
// 确保事务内的所有操作都使用同一个数据源
@Bean
public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
TransactionTemplate template = new TransactionTemplate(transactionManager);
template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
return template;
}
}
业务场景应用
1. 用户服务读写分离
@Service
public class UserService {
@WriteOnly
@Transactional
public User createUser(CreateUserRequest request) {
// 写操作路由到主库
User user = new User();
user.setUsername(request.getUsername());
user.setEmail(request.getEmail());
return userRepository.save(user);
}
@ReadOnly
public User getUserById(Long userId) {
// 读操作路由到从库
return userRepository.findById(userId)
.orElseThrow(() -> new UserNotFoundException(userId));
}
@ReadOnly
public List<User> searchUsers(String keyword) {
// 复杂查询路由到从库
return userRepository.findByUsernameContaining(keyword);
}
@WriteOnly
@Transactional
public User updateUser(Long userId, UpdateUserRequest request) {
// 更新操作路由到主库
User user = getUserById(userId);
user.setUsername(request.getUsername());
user.setEmail(request.getEmail());
return userRepository.save(user);
}
}
2. 订单服务混合场景
@Service
public class OrderService {
@WriteOnly
@Transactional
public Order createOrder(CreateOrderRequest request) {
// 创建订单写入主库
Order order = new Order();
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.PENDING);
Order savedOrder = orderRepository.save(order);
// 同时更新用户积分(写操作)
userService.updateUserPoints(request.getUserId(), request.getAmount().intValue());
return savedOrder;
}
@ReadOnly
public Order getOrderWithDetails(Long orderId) {
// 读取订单详情从从库
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
// 读取用户信息从从库
User user = userService.getUserById(order.getUserId());
order.setUser(user);
return order;
}
@WriteOnly
@Transactional
public Order payOrder(Long orderId) {
// 支付订单需要读写混合操作
Order order = getOrderWithDetails(orderId); // 先读取(从库)
order.setStatus(OrderStatus.PAID); // 再更新(主库)
return orderRepository.save(order);
}
}
3. 报表统计服务
@Service
public class ReportService {
@ReadOnly
public UserStatisticsReport getUserStatistics(LocalDate date) {
// 大量统计查询路由到从库
UserStatisticsReport report = new UserStatisticsReport();
report.setTotalUsers(userRepository.count());
report.setActiveUsers(userRepository.countActiveUsers(date));
report.setNewUsers(userRepository.countNewUsers(date));
return report;
}
@ReadOnly
public OrderAnalysisReport getOrderAnalysis(LocalDate startDate, LocalDate endDate) {
// 复杂分析查询路由到从库
List<Order> orders = orderRepository.findByDateRange(startDate, endDate);
OrderAnalysisReport report = new OrderAnalysisReport();
report.setTotalOrders(orders.size());
report.setTotalAmount(orders.stream().mapToDouble(Order::getAmount).sum());
report.setAverageOrderValue(report.getTotalAmount() / report.getTotalOrders());
return report;
}
}
最佳实践建议
1. 数据一致性保障
@Component
@Slf4j
public class DataConsistencyManager {
// 强一致性读取(读主库)
@WriteOnly
public <T> T readFromMaster(Supplier<T> readOperation) {
return readOperation.get();
}
// 最终一致性读取(读从库)
@ReadOnly
public <T> T readFromSlave(Supplier<T> readOperation) {
return readOperation.get();
}
// 读写分离延迟处理
public void handleReplicationDelay(String operation, long delayMs) {
if (delayMs > 1000) { // 延迟超过1秒
log.warn("主从复制延迟较高: operation={}, delay={}ms", operation, delayMs);
// 可以触发告警或降级策略
}
}
}
2. 监控告警体系
@Component
public class DataSourceMonitor {
private final MeterRegistry meterRegistry;
private final Map<String, Timer> queryTimers = new ConcurrentHashMap<>();
@EventListener
public void handleQueryExecution(QueryExecutionEvent event) {
String dataSourceKey = DataSourceContextHolder.getDataSourceKey();
String operationType = event.isWriteOperation() ? "write" : "read";
// 记录查询性能指标
String timerName = "database.query." + operationType;
Timer timer = queryTimers.computeIfAbsent(timerName,
name -> Timer.builder(name)
.tag("datasource", dataSourceKey)
.register(meterRegistry));
timer.record(event.getExecutionTime(), TimeUnit.MILLISECONDS);
// 慢查询告警
if (event.getExecutionTime() > 1000) {
log.warn("慢查询告警: datasource={}, operation={}, time={}ms",
dataSourceKey, operationType, event.getExecutionTime());
}
}
@EventListener
public void handleDataSourceFailure(DataSourceFailureEvent event) {
// 数据源故障告警
log.error("数据源故障: key={}, error={}",
event.getDataSourceKey(), event.getError());
// 触发告警通知
alertService.sendAlert("数据源故障",
String.format("数据源 %s 发生故障: %s",
event.getDataSourceKey(), event.getError()));
}
}
3. 配置管理优化
@ConfigurationProperties(prefix = "datasource.routing")
@Data
public class DataSourceRoutingProperties {
private LoadBalanceStrategy loadBalance = LoadBalanceStrategy.ROUND_ROBIN;
private long healthCheckInterval = 30000; // 30秒
private int maxRetryAttempts = 3;
private long retryDelay = 1000; // 1秒
public enum LoadBalanceStrategy {
ROUND_ROBIN, // 轮询
RANDOM, // 随机
HASH, // 哈希
WEIGHTED // 加权
}
}
@Component
public class RoutingConfigurationManager {
@Autowired
private DataSourceRoutingProperties properties;
public String selectSlaveDataSource(String routingKey) {
switch (properties.getLoadBalance()) {
case ROUND_ROBIN:
return slaveLoadBalancer.getNextSlaveKey();
case RANDOM:
return slaveLoadBalancer.getRandomSlaveKey();
case HASH:
return slaveLoadBalancer.getSlaveKeyByHash(routingKey);
case WEIGHTED:
return slaveLoadBalancer.getWeightedSlaveKey();
default:
return slaveLoadBalancer.getNextSlaveKey();
}
}
}
预期效果
通过读写分离方案,我们可以实现:
- 性能提升:读写分离后,主库专注写入,从库处理查询,整体性能提升50-100%
- 扩展性强:可以轻松增加从库数量,线性提升读取能力
- 高可用性:自动故障检测和切换,保证服务连续性
- 成本优化:相比垂直分库,改造成本更低,风险更小
- 业务透明:对上层业务代码无侵入,平滑升级
这套方案是数据库扩展的经典路径,既解决了当前的性能瓶颈,又为未来的业务增长预留了空间。
欢迎关注公众号"服务端技术精选",获取更多技术干货!
欢迎大家加群沟通
标题:SpringBoot + 读写分离 + 动态数据源路由:主库写、从库读,自动故障切换
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/02/12/1770701724162.html
评论
0 评论