文章 555
评论 5
浏览 199932
SpringBoot + Canal 数据同步丢失补偿:MySQL binlog 位点跳跃导致 ES 数据不全?位点校验 + 自动追平

SpringBoot + Canal 数据同步丢失补偿:MySQL binlog 位点跳跃导致 ES 数据不全?位点校验 + 自动追平

一、问题背景:Canal 同步的"黑洞效应"

你是否遇到过这样的场景:

  1. 使用 Canal 监听 MySQL binlog 同步数据到 Elasticsearch
  2. 系统运行一段时间后,发现 ES 中的数据与 MySQL 不一致
  3. 某些时间段的数据完全丢失,却找不到任何错误日志

这就是典型的binlog 位点跳跃问题。Canal 在解析 binlog 时,可能因为网络抖动、服务重启、位点记录失败等原因,跳过部分 binlog 事件,导致数据同步丢失。

真实案例:某电商平台使用 Canal 同步商品数据到 ES,在一次服务重启后发现最近2小时的商品库存更新丢失,导致用户看到的库存与实际库存不一致,造成了严重的业务损失。


二、核心概念:Canal 位点管理机制

2.1 Canal 位点存储方式

存储方式描述优点缺点适用场景
MetaStore内存存储性能高重启丢失测试环境
ZookeeperZK 存储可靠依赖 ZK生产环境
MySQL数据库存储可靠依赖 DB生产环境
File文件存储简单单节点小规模

2.2 位点跳跃的常见原因

┌──────────────────────────────────────────────────────────────────┐
│                    位点跳跃原因分析                              │
├──────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. 网络问题                                                     │
│     ├── Canal 与 MySQL 连接中断                                  │
│     ├── Canal Server 与 Client 连接中断                          │
│     └── 位点提交失败                                             │
│                                                                 │
│  2. 服务异常                                                     │
│     ├── Canal Server 重启                                        │
│     ├── Canal Client 重启                                        │
│     └── MySQL 主从切换                                            │
│                                                                 │
│  3. 位点记录失败                                                 │
│     ├── Meta Store 写入失败                                      │
│     ├── 位点数据损坏                                             │
│     └── 位点回滚                                                 │
│                                                                 │
│  4. Binlog 问题                                                  │
│     ├── Binlog 文件损坏                                          │
│     ├── Binlog 被清理                                            │
│     └── GTID 模式位点混乱                                         │
│                                                                 │
└──────────────────────────────────────────────────────────────────┘

2.3 位点跳跃示意图

MySQL Binlog 序列:
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Event1 │ │ Event2 │ │ Event3 │ │ Event4 │ │ Event5 │ │ Event6 │
│ 1001   │ │ 1002   │ │ 1003   │ │ 1004   │ │ 1005   │ │ 1006   │
└────────┘ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘
     │         │         │         │         │         │
     │         │         │         │         │         │
     ▼         ▼         ▼         ▼         ▼         ▼
Canal 读取位点:
┌────────┐ ┌────────┐         ┌────────┐ ┌────────┐
│ Event1 │ │ Event2 │  SKIP!  │ Event5 │ │ Event6 │
│ 1001   │ │ 1002   │         │ 1005   │ │ 1006   │
└────────┘ └────────┘         └────────┘ └────────┘

位点跳跃: 1002 → 1005,丢失 Event3、Event4

三、实现方案:位点校验 + 自动追平

3.1 方案架构设计

┌──────────────────────────────────────────────────────────────────┐
│                    数据同步补偿架构                              │
├──────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │                     MySQL 主库                             │  │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐                   │  │
│  │  │ Binlog1 │  │ Binlog2 │  │ Binlog3 │                   │  │
│  │  └─────────┘  └─────────┘  └─────────┘                   │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                  │
│                              ▼                                  │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │                     Canal Server                           │  │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐    │  │
│  │  │ Binlog解析   │  │ 位点管理     │  │ 事件推送     │    │  │
│  │  └──────────────┘  └──────────────┘  └──────────────┘    │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                  │
│                              ▼                                  │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │                  SpringBoot Client                         │  │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐    │  │
│  │  │ 事件处理     │  │ 位点校验     │  │ 补偿机制     │    │  │
│  │  └──────────────┘  └──────────────┘  └──────────────┘    │  │
│  │  ┌──────────────┐  ┌──────────────┐                     │  │
│  │  │ 位点存储     │  │ 数据同步     │                     │  │
│  │  └──────────────┘  └──────────────┘                     │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                  │
│                              ▼                                  │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │                   Elasticsearch                            │  │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐    │  │
│  │  │ Index 1      │  │ Index 2      │  │ Index 3      │    │  │
│  │  └──────────────┘  └──────────────┘  └──────────────┘    │  │
│  └───────────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────────────┘

3.2 位点校验机制

@Component
@Slf4j
public class PositionValidator {
    
    @Autowired
    private PositionStore positionStore;
    
    @Autowired
    private CanalConnector canalConnector;
    
    @Autowired
    private ElasticsearchTemplate esTemplate;
    
    /**
     * 位点校验阈值(允许的最大位点跳跃)
     */
    private static final long POSITION_GAP_THRESHOLD = 100;
    
    /**
     * 校验位点是否正常
     */
    public boolean validatePosition(CanalEntry.Entry entry) {
        long currentPosition = entry.getHeader().getLogfileOffset();
        long previousPosition = positionStore.getLastPosition();
        
        // 第一次启动,无需校验
        if (previousPosition == 0) {
            return true;
        }
        
        // 计算位点差距
        long positionGap = currentPosition - previousPosition;
        
        // 正常情况:位点应该是连续或略有小跳跃
        if (positionGap <= POSITION_GAP_THRESHOLD) {
            return true;
        }
        
        // 位点跳跃过大,触发校验
        log.warn("Position gap detected: previous={}, current={}, gap={}",
            previousPosition, currentPosition, positionGap);
        
        // 执行数据一致性校验
        return performDataValidation(entry);
    }
    
    /**
     * 执行数据一致性校验
     */
    private boolean performDataValidation(CanalEntry.Entry entry) {
        try {
            String tableName = entry.getHeader().getTableName();
            String schemaName = entry.getHeader().getSchemaName();
            
            // 获取 MySQL 当前数据
            List<Map<String, Object>> mysqlData = fetchMySQLData(schemaName, tableName);
            
            // 获取 ES 当前数据
            List<Map<String, Object>> esData = fetchESData(schemaName, tableName);
            
            // 比较数据一致性
            boolean isConsistent = compareData(mysqlData, esData);
            
            if (!isConsistent) {
                log.error("Data inconsistency detected for table: {}", tableName);
                triggerCompensation(schemaName, tableName);
                return false;
            }
            
            return true;
        } catch (Exception e) {
            log.error("Data validation failed", e);
            return false;
        }
    }
    
    /**
     * 触发数据补偿
     */
    private void triggerCompensation(String schemaName, String tableName) {
        CompensationTask task = CompensationTask.builder()
            .schemaName(schemaName)
            .tableName(tableName)
            .type(CompensationType.FULL_SYNC)
            .createTime(LocalDateTime.now())
            .build();
        
        compensationService.submitTask(task);
    }
}

3.3 位点存储实现

@Component
@Slf4j
public class PositionStore {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 位点存储键前缀
     */
    private static final String POSITION_KEY_PREFIX = "canal:position:";
    
    /**
     * 记录位点
     */
    public void recordPosition(CanalEntry.Entry entry) {
        String key = POSITION_KEY_PREFIX + entry.getHeader().getSchemaName() 
            + ":" + entry.getHeader().getTableName();
        
        CanalPosition position = CanalPosition.builder()
            .logfileName(entry.getHeader().getLogfileName())
            .logfileOffset(entry.getHeader().getLogfileOffset())
            .timestamp(entry.getHeader().getExecuteTime())
            .serverId(entry.getHeader().getServerId())
            .build();
        
        redisTemplate.opsForValue().set(key, position);
        
        log.debug("Recorded position: key={}, offset={}", key, position.getLogfileOffset());
    }
    
    /**
     * 获取上次位点
     */
    public long getLastPosition() {
        // 从 Redis 获取全局位点
        CanalPosition position = (CanalPosition) redisTemplate
            .opsForValue().get(POSITION_KEY_PREFIX + "global");
        
        return position != null ? position.getLogfileOffset() : 0;
    }
    
    /**
     * 批量记录位点(用于事务提交)
     */
    public void batchRecordPosition(List<CanalEntry.Entry> entries) {
        Map<String, CanalPosition> positionMap = new HashMap<>();
        
        for (CanalEntry.Entry entry : entries) {
            String key = POSITION_KEY_PREFIX + entry.getHeader().getSchemaName() 
                + ":" + entry.getHeader().getTableName();
            
            CanalPosition position = CanalPosition.builder()
                .logfileName(entry.getHeader().getLogfileName())
                .logfileOffset(entry.getHeader().getLogfileOffset())
                .timestamp(entry.getHeader().getExecuteTime())
                .serverId(entry.getHeader().getServerId())
                .build();
            
            positionMap.put(key, position);
        }
        
        // 批量写入 Redis
        redisTemplate.opsForValue().multiSet(positionMap);
        
        log.debug("Batch recorded {} positions", entries.size());
    }
    
    /**
     * 位点回滚(用于异常处理)
     */
    public void rollbackPosition(String schemaName, String tableName) {
        String key = POSITION_KEY_PREFIX + schemaName + ":" + tableName;
        
        // 获取前一个位点
        CanalPosition previousPosition = getPreviousPosition(schemaName, tableName);
        
        if (previousPosition != null) {
            redisTemplate.opsForValue().set(key, previousPosition);
            log.info("Rolled back position for {}:{}", schemaName, tableName);
        }
    }
    
    private CanalPosition getPreviousPosition(String schemaName, String tableName) {
        String historyKey = POSITION_KEY_PREFIX + "history:" + schemaName + ":" + tableName;
        List<CanalPosition> history = (List<CanalPosition>) redisTemplate
            .opsForList().range(historyKey, 0, 10);
        
        return history != null && history.size() > 1 ? history.get(1) : null;
    }
}

3.4 补偿服务实现

@Service
@Slf4j
public class CompensationService {
    
    @Autowired
    private DataSource dataSource;
    
    @Autowired
    private ElasticsearchTemplate esTemplate;
    
    @Autowired
    private PositionStore positionStore;
    
    @Autowired
    private TaskQueue taskQueue;
    
    /**
     * 补偿线程池
     */
    private final ExecutorService compensationExecutor = Executors.newFixedThreadPool(4);
    
    /**
     * 提交补偿任务
     */
    public void submitTask(CompensationTask task) {
        taskQueue.add(task);
        log.info("Submitted compensation task: {}", task);
        
        // 异步执行
        compensationExecutor.submit(() -> executeCompensation(task));
    }
    
    /**
     * 执行补偿任务
     */
    private void executeCompensation(CompensationTask task) {
        try {
            log.info("Starting compensation task: {}", task);
            
            switch (task.getType()) {
                case FULL_SYNC:
                    executeFullSync(task);
                    break;
                case RANGE_SYNC:
                    executeRangeSync(task);
                    break;
                case SINGLE_RECORD:
                    executeSingleRecordSync(task);
                    break;
                default:
                    log.warn("Unknown compensation type: {}", task.getType());
            }
            
            // 更新任务状态
            task.setStatus(TaskStatus.COMPLETED);
            task.setCompleteTime(LocalDateTime.now());
            
            log.info("Compensation task completed: {}", task);
        } catch (Exception e) {
            log.error("Compensation task failed: {}", task, e);
            task.setStatus(TaskStatus.FAILED);
            task.setErrorMessage(e.getMessage());
        }
    }
    
    /**
     * 全量同步
     */
    private void executeFullSync(CompensationTask task) {
        String schemaName = task.getSchemaName();
        String tableName = task.getTableName();
        
        // 1. 清空 ES 索引
        esTemplate.deleteIndex(schemaName + "_" + tableName);
        esTemplate.createIndex(schemaName + "_" + tableName);
        
        // 2. 从 MySQL 全量查询
        long totalCount = countMySQLRecords(schemaName, tableName);
        int batchSize = 1000;
        int totalPages = (int) (totalCount / batchSize) + 1;
        
        for (int page = 0; page < totalPages; page++) {
            List<Map<String, Object>> records = fetchMySQLRecords(
                schemaName, tableName, page * batchSize, batchSize);
            
            // 3. 批量写入 ES
            bulkInsertToES(schemaName, tableName, records);
            
            log.info("Full sync progress: {} / {}", page + 1, totalPages);
        }
        
        // 4. 更新位点
        updatePositionAfterSync(schemaName, tableName);
    }
    
    /**
     * 范围同步
     */
    private void executeRangeSync(CompensationTask task) {
        String schemaName = task.getSchemaName();
        String tableName = task.getTableName();
        LocalDateTime startTime = task.getStartTime();
        LocalDateTime endTime = task.getEndTime();
        
        // 1. 查询时间范围内的数据
        List<Map<String, Object>> records = fetchMySQLRecordsByTime(
            schemaName, tableName, startTime, endTime);
        
        // 2. 批量更新 ES
        bulkInsertToES(schemaName, tableName, records);
        
        log.info("Range sync completed: {} records", records.size());
    }
    
    /**
     * 单条记录同步
     */
    private void executeSingleRecordSync(CompensationTask task) {
        String schemaName = task.getSchemaName();
        String tableName = task.getTableName();
        Object recordId = task.getRecordId();
        
        // 1. 查询单条记录
        Map<String, Object> record = fetchMySQLRecordById(schemaName, tableName, recordId);
        
        if (record != null) {
            // 2. 更新 ES
            esTemplate.save(schemaName + "_" + tableName, record);
            log.info("Single record sync completed: id={}", recordId);
        } else {
            // 3. 记录不存在,从 ES 删除
            esTemplate.delete(schemaName + "_" + tableName, String.valueOf(recordId));
            log.info("Single record deleted from ES: id={}", recordId);
        }
    }
    
    /**
     * 批量写入 ES
     */
    private void bulkInsertToES(String schemaName, String tableName, 
                                List<Map<String, Object>> records) {
        IndexRequest[] requests = records.stream()
            .map(record -> new IndexRequest(schemaName + "_" + tableName)
                .source(record))
            .toArray(IndexRequest[]::new);
        
        esTemplate.bulkIndex(requests);
    }
}

3.5 Canal 客户端监听器

@Component
@Slf4j
public class CanalClientListener {
    
    @Autowired
    private PositionValidator positionValidator;
    
    @Autowired
    private PositionStore positionStore;
    
    @Autowired
    private DataSyncService dataSyncService;
    
    @Autowired
    private CompensationService compensationService;
    
    @PostConstruct
    public void start() {
        // 启动 Canal 连接
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("127.0.0.1", 11111),
            "example",
            "",
            ""
        );
        
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        
        while (true) {
            Message message = connector.getWithoutAck(100);
            long batchId = message.getId();
            int size = message.getEntries().size();
            
            if (batchId == -1 || size == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    break;
                }
            } else {
                processEntries(message.getEntries());
            }
            
            // 提交 ack
            connector.ack(batchId);
        }
        
        connector.disconnect();
    }
    
    /**
     * 处理 binlog 事件
     */
    private void processEntries(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN 
                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            
            // 位点校验
            if (!positionValidator.validatePosition(entry)) {
                log.warn("Position validation failed, triggering compensation");
                handlePositionGap(entry);
                continue;
            }
            
            // 处理数据变更
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                switch (rowChange.getEventType()) {
                    case INSERT:
                        dataSyncService.handleInsert(entry.getHeader(), rowData);
                        break;
                    case UPDATE:
                        dataSyncService.handleUpdate(entry.getHeader(), rowData);
                        break;
                    case DELETE:
                        dataSyncService.handleDelete(entry.getHeader(), rowData);
                        break;
                    default:
                        log.warn("Unknown event type: {}", rowChange.getEventType());
                }
            }
            
            // 记录位点
            positionStore.recordPosition(entry);
        }
    }
    
    /**
     * 处理位点跳跃
     */
    private void handlePositionGap(CanalEntry.Entry entry) {
        String schemaName = entry.getHeader().getSchemaName();
        String tableName = entry.getHeader().getTableName();
        
        // 计算丢失的时间范围
        LocalDateTime startTime = calculateLostStartTime(entry);
        LocalDateTime endTime = LocalDateTime.now();
        
        // 提交范围补偿任务
        CompensationTask task = CompensationTask.builder()
            .schemaName(schemaName)
            .tableName(tableName)
            .type(CompensationType.RANGE_SYNC)
            .startTime(startTime)
            .endTime(endTime)
            .createTime(LocalDateTime.now())
            .build();
        
        compensationService.submitTask(task);
    }
    
    private LocalDateTime calculateLostStartTime(CanalEntry.Entry entry) {
        // 根据上一个位点计算丢失的起始时间
        CanalPosition previousPosition = positionStore.getPreviousPosition(
            entry.getHeader().getSchemaName(),
            entry.getHeader().getTableName()
        );
        
        if (previousPosition != null) {
            return LocalDateTime.ofEpochSecond(previousPosition.getTimestamp() / 1000, 0, ZoneOffset.UTC);
        }
        
        // 默认补偿最近1小时
        return LocalDateTime.now().minusHours(1);
    }
}

3.6 数据同步服务

@Service
@Slf4j
public class DataSyncService {
    
    @Autowired
    private ElasticsearchTemplate esTemplate;
    
    /**
     * 处理插入事件
     */
    public void handleInsert(CanalEntry.Header header, CanalEntry.RowData rowData) {
        String indexName = header.getSchemaName() + "_" + header.getTableName();
        
        Map<String, Object> data = convertRowDataToMap(rowData.getAfterColumnsList());
        
        esTemplate.save(indexName, data);
        
        log.debug("Inserted document to ES: index={}, id={}", indexName, data.get("id"));
    }
    
    /**
     * 处理更新事件
     */
    public void handleUpdate(CanalEntry.Header header, CanalEntry.RowData rowData) {
        String indexName = header.getSchemaName() + "_" + header.getTableName();
        
        Map<String, Object> before = convertRowDataToMap(rowData.getBeforeColumnsList());
        Map<String, Object> after = convertRowDataToMap(rowData.getAfterColumnsList());
        
        // 获取主键
        String id = String.valueOf(before.get("id"));
        
        // 更新文档
        esTemplate.update(indexName, id, after);
        
        log.debug("Updated document in ES: index={}, id={}", indexName, id);
    }
    
    /**
     * 处理删除事件
     */
    public void handleDelete(CanalEntry.Header header, CanalEntry.RowData rowData) {
        String indexName = header.getSchemaName() + "_" + header.getTableName();
        
        Map<String, Object> before = convertRowDataToMap(rowData.getBeforeColumnsList());
        
        // 获取主键
        String id = String.valueOf(before.get("id"));
        
        // 删除文档
        esTemplate.delete(indexName, id);
        
        log.debug("Deleted document from ES: index={}, id={}", indexName, id);
    }
    
    /**
     * 转换 Canal 列数据为 Map
     */
    private Map<String, Object> convertRowDataToMap(List<CanalEntry.Column> columns) {
        Map<String, Object> map = new HashMap<>();
        
        for (CanalEntry.Column column : columns) {
            map.put(column.getName(), column.getValue());
        }
        
        return map;
    }
}

四、配置文件示例

server:
  port: 8080

spring:
  application:
    name: canal-sync-demo
  datasource:
    url: jdbc:mysql://localhost:3306/product_db?useSSL=false&serverTimezone=UTC
    username: root
    password: password
    driver-class-name: com.mysql.cj.jdbc.Driver
  elasticsearch:
    rest:
      uris: http://localhost:9200

# Canal 配置
canal:
  server:
    host: 127.0.0.1
    port: 11111
    destination: example
  client:
    batch-size: 100
    fetch-interval: 1000

# 位点配置
position:
  store:
    type: redis
    redis:
      key-prefix: canal:position:
  validation:
    enabled: true
    gap-threshold: 100

# 补偿配置
compensation:
  enabled: true
  thread-pool-size: 4
  batch-size: 1000
  retry-count: 3

logging:
  level:
    com.example.canal: DEBUG

五、监控与告警

5.1 位点监控指标

@Component
public class CanalSyncMetrics {
    
    private final MeterRegistry meterRegistry;
    
    public CanalSyncMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        registerMetrics();
    }
    
    private void registerMetrics() {
        // 位点差距
        Gauge.builder("canal.position.gap", 
            () -> positionGapMonitor.getCurrentGap())
            .register(meterRegistry);
        
        // 事件处理计数
        Counter.builder("canal.event.processed")
            .tag("type", "insert")
            .register(meterRegistry);
        
        Counter.builder("canal.event.processed")
            .tag("type", "update")
            .register(meterRegistry);
        
        Counter.builder("canal.event.processed")
            .tag("type", "delete")
            .register(meterRegistry);
        
        // 补偿任务计数
        Counter.builder("canal.compensation.executed")
            .tag("type", "full_sync")
            .register(meterRegistry);
        
        Counter.builder("canal.compensation.executed")
            .tag("type", "range_sync")
            .register(meterRegistry);
        
        // 位点校验失败
        Counter.builder("canal.validation.failed")
            .register(meterRegistry);
    }
}

5.2 Prometheus 告警规则

groups:
- name: canal_sync_alerts
  rules:
  - alert: CanalPositionGapHigh
    expr: canal_position_gap > 1000
    for: 1m
    labels:
      severity: warning
    annotations:
      summary: "Canal 位点差距过大"
      description: "位点差距超过1000,可能存在数据丢失"
  
  - alert: CanalValidationFailed
    expr: rate(canal_validation_failed_total[5m]) > 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Canal 位点校验失败"
      description: "位点校验失败,已触发数据补偿"
  
  - alert: CanalCompensationRunning
    expr: canal_compensation_executed_total > 0
    for: 1m
    labels:
      severity: warning
    annotations:
      summary: "Canal 数据补偿正在执行"
      description: "系统正在执行数据补偿任务"
  
  - alert: CanalSyncLatencyHigh
    expr: canal_sync_latency_seconds > 30
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "Canal 同步延迟过高"
      description: "同步延迟超过30秒"

六、最佳实践建议

6.1 位点管理建议

配置项建议值说明
batch-size100-500单批次处理事件数
gap-threshold100-500位点差距阈值
retry-count3-5补偿重试次数
thread-pool-size4-8补偿线程池大小

6.2 补偿策略选择

补偿类型适用场景性能影响数据范围
全量同步大范围丢失、索引重建全表
范围同步时间段丢失时间范围
单条同步单条记录丢失单条

6.3 注意事项

  1. 位点存储可靠性:使用 Redis 或 MySQL 存储,避免重启丢失
  2. 补偿幂等性:补偿任务需要支持重复执行
  3. 并行处理限制:避免过多并行补偿任务
  4. 监控告警:及时发现位点跳跃和补偿失败
  5. 数据一致性:补偿后验证 MySQL 和 ES 数据一致性

互动话题

您在使用 Canal 同步数据时遇到过数据丢失问题吗?您是如何解决的?欢迎在评论区分享您的经验!


标题:SpringBoot + Canal 数据同步丢失补偿:MySQL binlog 位点跳跃导致 ES 数据不全?位点校验 + 自动追平
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/06/30/1782637813233.html
公众号:服务端技术精选

服务端开发博客:后端架构、高并发、性能优化与微服务实战教程

取消