一、问题背景:Canal 同步的"黑洞效应"
你是否遇到过这样的场景:
- 使用 Canal 监听 MySQL binlog 同步数据到 Elasticsearch
- 系统运行一段时间后,发现 ES 中的数据与 MySQL 不一致
- 某些时间段的数据完全丢失,却找不到任何错误日志
这就是典型的binlog 位点跳跃问题。Canal 在解析 binlog 时,可能因为网络抖动、服务重启、位点记录失败等原因,跳过部分 binlog 事件,导致数据同步丢失。
真实案例:某电商平台使用 Canal 同步商品数据到 ES,在一次服务重启后发现最近2小时的商品库存更新丢失,导致用户看到的库存与实际库存不一致,造成了严重的业务损失。
二、核心概念:Canal 位点管理机制
2.1 Canal 位点存储方式
| 存储方式 | 描述 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| MetaStore | 内存存储 | 性能高 | 重启丢失 | 测试环境 |
| Zookeeper | ZK 存储 | 可靠 | 依赖 ZK | 生产环境 |
| MySQL | 数据库存储 | 可靠 | 依赖 DB | 生产环境 |
| File | 文件存储 | 简单 | 单节点 | 小规模 |
2.2 位点跳跃的常见原因
┌──────────────────────────────────────────────────────────────────┐
│ 位点跳跃原因分析 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ 1. 网络问题 │
│ ├── Canal 与 MySQL 连接中断 │
│ ├── Canal Server 与 Client 连接中断 │
│ └── 位点提交失败 │
│ │
│ 2. 服务异常 │
│ ├── Canal Server 重启 │
│ ├── Canal Client 重启 │
│ └── MySQL 主从切换 │
│ │
│ 3. 位点记录失败 │
│ ├── Meta Store 写入失败 │
│ ├── 位点数据损坏 │
│ └── 位点回滚 │
│ │
│ 4. Binlog 问题 │
│ ├── Binlog 文件损坏 │
│ ├── Binlog 被清理 │
│ └── GTID 模式位点混乱 │
│ │
└──────────────────────────────────────────────────────────────────┘
2.3 位点跳跃示意图
MySQL Binlog 序列:
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Event1 │ │ Event2 │ │ Event3 │ │ Event4 │ │ Event5 │ │ Event6 │
│ 1001 │ │ 1002 │ │ 1003 │ │ 1004 │ │ 1005 │ │ 1006 │
└────────┘ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘
│ │ │ │ │ │
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
Canal 读取位点:
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Event1 │ │ Event2 │ SKIP! │ Event5 │ │ Event6 │
│ 1001 │ │ 1002 │ │ 1005 │ │ 1006 │
└────────┘ └────────┘ └────────┘ └────────┘
位点跳跃: 1002 → 1005,丢失 Event3、Event4
三、实现方案:位点校验 + 自动追平
3.1 方案架构设计
┌──────────────────────────────────────────────────────────────────┐
│ 数据同步补偿架构 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ MySQL 主库 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Binlog1 │ │ Binlog2 │ │ Binlog3 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Canal Server │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Binlog解析 │ │ 位点管理 │ │ 事件推送 │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ SpringBoot Client │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ 事件处理 │ │ 位点校验 │ │ 补偿机制 │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ 位点存储 │ │ 数据同步 │ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Elasticsearch │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Index 1 │ │ Index 2 │ │ Index 3 │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
3.2 位点校验机制
@Component
@Slf4j
public class PositionValidator {
@Autowired
private PositionStore positionStore;
@Autowired
private CanalConnector canalConnector;
@Autowired
private ElasticsearchTemplate esTemplate;
/**
* 位点校验阈值(允许的最大位点跳跃)
*/
private static final long POSITION_GAP_THRESHOLD = 100;
/**
* 校验位点是否正常
*/
public boolean validatePosition(CanalEntry.Entry entry) {
long currentPosition = entry.getHeader().getLogfileOffset();
long previousPosition = positionStore.getLastPosition();
// 第一次启动,无需校验
if (previousPosition == 0) {
return true;
}
// 计算位点差距
long positionGap = currentPosition - previousPosition;
// 正常情况:位点应该是连续或略有小跳跃
if (positionGap <= POSITION_GAP_THRESHOLD) {
return true;
}
// 位点跳跃过大,触发校验
log.warn("Position gap detected: previous={}, current={}, gap={}",
previousPosition, currentPosition, positionGap);
// 执行数据一致性校验
return performDataValidation(entry);
}
/**
* 执行数据一致性校验
*/
private boolean performDataValidation(CanalEntry.Entry entry) {
try {
String tableName = entry.getHeader().getTableName();
String schemaName = entry.getHeader().getSchemaName();
// 获取 MySQL 当前数据
List<Map<String, Object>> mysqlData = fetchMySQLData(schemaName, tableName);
// 获取 ES 当前数据
List<Map<String, Object>> esData = fetchESData(schemaName, tableName);
// 比较数据一致性
boolean isConsistent = compareData(mysqlData, esData);
if (!isConsistent) {
log.error("Data inconsistency detected for table: {}", tableName);
triggerCompensation(schemaName, tableName);
return false;
}
return true;
} catch (Exception e) {
log.error("Data validation failed", e);
return false;
}
}
/**
* 触发数据补偿
*/
private void triggerCompensation(String schemaName, String tableName) {
CompensationTask task = CompensationTask.builder()
.schemaName(schemaName)
.tableName(tableName)
.type(CompensationType.FULL_SYNC)
.createTime(LocalDateTime.now())
.build();
compensationService.submitTask(task);
}
}
3.3 位点存储实现
@Component
@Slf4j
public class PositionStore {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 位点存储键前缀
*/
private static final String POSITION_KEY_PREFIX = "canal:position:";
/**
* 记录位点
*/
public void recordPosition(CanalEntry.Entry entry) {
String key = POSITION_KEY_PREFIX + entry.getHeader().getSchemaName()
+ ":" + entry.getHeader().getTableName();
CanalPosition position = CanalPosition.builder()
.logfileName(entry.getHeader().getLogfileName())
.logfileOffset(entry.getHeader().getLogfileOffset())
.timestamp(entry.getHeader().getExecuteTime())
.serverId(entry.getHeader().getServerId())
.build();
redisTemplate.opsForValue().set(key, position);
log.debug("Recorded position: key={}, offset={}", key, position.getLogfileOffset());
}
/**
* 获取上次位点
*/
public long getLastPosition() {
// 从 Redis 获取全局位点
CanalPosition position = (CanalPosition) redisTemplate
.opsForValue().get(POSITION_KEY_PREFIX + "global");
return position != null ? position.getLogfileOffset() : 0;
}
/**
* 批量记录位点(用于事务提交)
*/
public void batchRecordPosition(List<CanalEntry.Entry> entries) {
Map<String, CanalPosition> positionMap = new HashMap<>();
for (CanalEntry.Entry entry : entries) {
String key = POSITION_KEY_PREFIX + entry.getHeader().getSchemaName()
+ ":" + entry.getHeader().getTableName();
CanalPosition position = CanalPosition.builder()
.logfileName(entry.getHeader().getLogfileName())
.logfileOffset(entry.getHeader().getLogfileOffset())
.timestamp(entry.getHeader().getExecuteTime())
.serverId(entry.getHeader().getServerId())
.build();
positionMap.put(key, position);
}
// 批量写入 Redis
redisTemplate.opsForValue().multiSet(positionMap);
log.debug("Batch recorded {} positions", entries.size());
}
/**
* 位点回滚(用于异常处理)
*/
public void rollbackPosition(String schemaName, String tableName) {
String key = POSITION_KEY_PREFIX + schemaName + ":" + tableName;
// 获取前一个位点
CanalPosition previousPosition = getPreviousPosition(schemaName, tableName);
if (previousPosition != null) {
redisTemplate.opsForValue().set(key, previousPosition);
log.info("Rolled back position for {}:{}", schemaName, tableName);
}
}
private CanalPosition getPreviousPosition(String schemaName, String tableName) {
String historyKey = POSITION_KEY_PREFIX + "history:" + schemaName + ":" + tableName;
List<CanalPosition> history = (List<CanalPosition>) redisTemplate
.opsForList().range(historyKey, 0, 10);
return history != null && history.size() > 1 ? history.get(1) : null;
}
}
3.4 补偿服务实现
@Service
@Slf4j
public class CompensationService {
@Autowired
private DataSource dataSource;
@Autowired
private ElasticsearchTemplate esTemplate;
@Autowired
private PositionStore positionStore;
@Autowired
private TaskQueue taskQueue;
/**
* 补偿线程池
*/
private final ExecutorService compensationExecutor = Executors.newFixedThreadPool(4);
/**
* 提交补偿任务
*/
public void submitTask(CompensationTask task) {
taskQueue.add(task);
log.info("Submitted compensation task: {}", task);
// 异步执行
compensationExecutor.submit(() -> executeCompensation(task));
}
/**
* 执行补偿任务
*/
private void executeCompensation(CompensationTask task) {
try {
log.info("Starting compensation task: {}", task);
switch (task.getType()) {
case FULL_SYNC:
executeFullSync(task);
break;
case RANGE_SYNC:
executeRangeSync(task);
break;
case SINGLE_RECORD:
executeSingleRecordSync(task);
break;
default:
log.warn("Unknown compensation type: {}", task.getType());
}
// 更新任务状态
task.setStatus(TaskStatus.COMPLETED);
task.setCompleteTime(LocalDateTime.now());
log.info("Compensation task completed: {}", task);
} catch (Exception e) {
log.error("Compensation task failed: {}", task, e);
task.setStatus(TaskStatus.FAILED);
task.setErrorMessage(e.getMessage());
}
}
/**
* 全量同步
*/
private void executeFullSync(CompensationTask task) {
String schemaName = task.getSchemaName();
String tableName = task.getTableName();
// 1. 清空 ES 索引
esTemplate.deleteIndex(schemaName + "_" + tableName);
esTemplate.createIndex(schemaName + "_" + tableName);
// 2. 从 MySQL 全量查询
long totalCount = countMySQLRecords(schemaName, tableName);
int batchSize = 1000;
int totalPages = (int) (totalCount / batchSize) + 1;
for (int page = 0; page < totalPages; page++) {
List<Map<String, Object>> records = fetchMySQLRecords(
schemaName, tableName, page * batchSize, batchSize);
// 3. 批量写入 ES
bulkInsertToES(schemaName, tableName, records);
log.info("Full sync progress: {} / {}", page + 1, totalPages);
}
// 4. 更新位点
updatePositionAfterSync(schemaName, tableName);
}
/**
* 范围同步
*/
private void executeRangeSync(CompensationTask task) {
String schemaName = task.getSchemaName();
String tableName = task.getTableName();
LocalDateTime startTime = task.getStartTime();
LocalDateTime endTime = task.getEndTime();
// 1. 查询时间范围内的数据
List<Map<String, Object>> records = fetchMySQLRecordsByTime(
schemaName, tableName, startTime, endTime);
// 2. 批量更新 ES
bulkInsertToES(schemaName, tableName, records);
log.info("Range sync completed: {} records", records.size());
}
/**
* 单条记录同步
*/
private void executeSingleRecordSync(CompensationTask task) {
String schemaName = task.getSchemaName();
String tableName = task.getTableName();
Object recordId = task.getRecordId();
// 1. 查询单条记录
Map<String, Object> record = fetchMySQLRecordById(schemaName, tableName, recordId);
if (record != null) {
// 2. 更新 ES
esTemplate.save(schemaName + "_" + tableName, record);
log.info("Single record sync completed: id={}", recordId);
} else {
// 3. 记录不存在,从 ES 删除
esTemplate.delete(schemaName + "_" + tableName, String.valueOf(recordId));
log.info("Single record deleted from ES: id={}", recordId);
}
}
/**
* 批量写入 ES
*/
private void bulkInsertToES(String schemaName, String tableName,
List<Map<String, Object>> records) {
IndexRequest[] requests = records.stream()
.map(record -> new IndexRequest(schemaName + "_" + tableName)
.source(record))
.toArray(IndexRequest[]::new);
esTemplate.bulkIndex(requests);
}
}
3.5 Canal 客户端监听器
@Component
@Slf4j
public class CanalClientListener {
@Autowired
private PositionValidator positionValidator;
@Autowired
private PositionStore positionStore;
@Autowired
private DataSyncService dataSyncService;
@Autowired
private CompensationService compensationService;
@PostConstruct
public void start() {
// 启动 Canal 连接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"",
""
);
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
} else {
processEntries(message.getEntries());
}
// 提交 ack
connector.ack(batchId);
}
connector.disconnect();
}
/**
* 处理 binlog 事件
*/
private void processEntries(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
// 位点校验
if (!positionValidator.validatePosition(entry)) {
log.warn("Position validation failed, triggering compensation");
handlePositionGap(entry);
continue;
}
// 处理数据变更
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
switch (rowChange.getEventType()) {
case INSERT:
dataSyncService.handleInsert(entry.getHeader(), rowData);
break;
case UPDATE:
dataSyncService.handleUpdate(entry.getHeader(), rowData);
break;
case DELETE:
dataSyncService.handleDelete(entry.getHeader(), rowData);
break;
default:
log.warn("Unknown event type: {}", rowChange.getEventType());
}
}
// 记录位点
positionStore.recordPosition(entry);
}
}
/**
* 处理位点跳跃
*/
private void handlePositionGap(CanalEntry.Entry entry) {
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
// 计算丢失的时间范围
LocalDateTime startTime = calculateLostStartTime(entry);
LocalDateTime endTime = LocalDateTime.now();
// 提交范围补偿任务
CompensationTask task = CompensationTask.builder()
.schemaName(schemaName)
.tableName(tableName)
.type(CompensationType.RANGE_SYNC)
.startTime(startTime)
.endTime(endTime)
.createTime(LocalDateTime.now())
.build();
compensationService.submitTask(task);
}
private LocalDateTime calculateLostStartTime(CanalEntry.Entry entry) {
// 根据上一个位点计算丢失的起始时间
CanalPosition previousPosition = positionStore.getPreviousPosition(
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName()
);
if (previousPosition != null) {
return LocalDateTime.ofEpochSecond(previousPosition.getTimestamp() / 1000, 0, ZoneOffset.UTC);
}
// 默认补偿最近1小时
return LocalDateTime.now().minusHours(1);
}
}
3.6 数据同步服务
@Service
@Slf4j
public class DataSyncService {
@Autowired
private ElasticsearchTemplate esTemplate;
/**
* 处理插入事件
*/
public void handleInsert(CanalEntry.Header header, CanalEntry.RowData rowData) {
String indexName = header.getSchemaName() + "_" + header.getTableName();
Map<String, Object> data = convertRowDataToMap(rowData.getAfterColumnsList());
esTemplate.save(indexName, data);
log.debug("Inserted document to ES: index={}, id={}", indexName, data.get("id"));
}
/**
* 处理更新事件
*/
public void handleUpdate(CanalEntry.Header header, CanalEntry.RowData rowData) {
String indexName = header.getSchemaName() + "_" + header.getTableName();
Map<String, Object> before = convertRowDataToMap(rowData.getBeforeColumnsList());
Map<String, Object> after = convertRowDataToMap(rowData.getAfterColumnsList());
// 获取主键
String id = String.valueOf(before.get("id"));
// 更新文档
esTemplate.update(indexName, id, after);
log.debug("Updated document in ES: index={}, id={}", indexName, id);
}
/**
* 处理删除事件
*/
public void handleDelete(CanalEntry.Header header, CanalEntry.RowData rowData) {
String indexName = header.getSchemaName() + "_" + header.getTableName();
Map<String, Object> before = convertRowDataToMap(rowData.getBeforeColumnsList());
// 获取主键
String id = String.valueOf(before.get("id"));
// 删除文档
esTemplate.delete(indexName, id);
log.debug("Deleted document from ES: index={}, id={}", indexName, id);
}
/**
* 转换 Canal 列数据为 Map
*/
private Map<String, Object> convertRowDataToMap(List<CanalEntry.Column> columns) {
Map<String, Object> map = new HashMap<>();
for (CanalEntry.Column column : columns) {
map.put(column.getName(), column.getValue());
}
return map;
}
}
四、配置文件示例
server:
port: 8080
spring:
application:
name: canal-sync-demo
datasource:
url: jdbc:mysql://localhost:3306/product_db?useSSL=false&serverTimezone=UTC
username: root
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
elasticsearch:
rest:
uris: http://localhost:9200
# Canal 配置
canal:
server:
host: 127.0.0.1
port: 11111
destination: example
client:
batch-size: 100
fetch-interval: 1000
# 位点配置
position:
store:
type: redis
redis:
key-prefix: canal:position:
validation:
enabled: true
gap-threshold: 100
# 补偿配置
compensation:
enabled: true
thread-pool-size: 4
batch-size: 1000
retry-count: 3
logging:
level:
com.example.canal: DEBUG
五、监控与告警
5.1 位点监控指标
@Component
public class CanalSyncMetrics {
private final MeterRegistry meterRegistry;
public CanalSyncMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
registerMetrics();
}
private void registerMetrics() {
// 位点差距
Gauge.builder("canal.position.gap",
() -> positionGapMonitor.getCurrentGap())
.register(meterRegistry);
// 事件处理计数
Counter.builder("canal.event.processed")
.tag("type", "insert")
.register(meterRegistry);
Counter.builder("canal.event.processed")
.tag("type", "update")
.register(meterRegistry);
Counter.builder("canal.event.processed")
.tag("type", "delete")
.register(meterRegistry);
// 补偿任务计数
Counter.builder("canal.compensation.executed")
.tag("type", "full_sync")
.register(meterRegistry);
Counter.builder("canal.compensation.executed")
.tag("type", "range_sync")
.register(meterRegistry);
// 位点校验失败
Counter.builder("canal.validation.failed")
.register(meterRegistry);
}
}
5.2 Prometheus 告警规则
groups:
- name: canal_sync_alerts
rules:
- alert: CanalPositionGapHigh
expr: canal_position_gap > 1000
for: 1m
labels:
severity: warning
annotations:
summary: "Canal 位点差距过大"
description: "位点差距超过1000,可能存在数据丢失"
- alert: CanalValidationFailed
expr: rate(canal_validation_failed_total[5m]) > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Canal 位点校验失败"
description: "位点校验失败,已触发数据补偿"
- alert: CanalCompensationRunning
expr: canal_compensation_executed_total > 0
for: 1m
labels:
severity: warning
annotations:
summary: "Canal 数据补偿正在执行"
description: "系统正在执行数据补偿任务"
- alert: CanalSyncLatencyHigh
expr: canal_sync_latency_seconds > 30
for: 5m
labels:
severity: warning
annotations:
summary: "Canal 同步延迟过高"
description: "同步延迟超过30秒"
六、最佳实践建议
6.1 位点管理建议
| 配置项 | 建议值 | 说明 |
|---|---|---|
batch-size | 100-500 | 单批次处理事件数 |
gap-threshold | 100-500 | 位点差距阈值 |
retry-count | 3-5 | 补偿重试次数 |
thread-pool-size | 4-8 | 补偿线程池大小 |
6.2 补偿策略选择
| 补偿类型 | 适用场景 | 性能影响 | 数据范围 |
|---|---|---|---|
| 全量同步 | 大范围丢失、索引重建 | 高 | 全表 |
| 范围同步 | 时间段丢失 | 中 | 时间范围 |
| 单条同步 | 单条记录丢失 | 低 | 单条 |
6.3 注意事项
- 位点存储可靠性:使用 Redis 或 MySQL 存储,避免重启丢失
- 补偿幂等性:补偿任务需要支持重复执行
- 并行处理限制:避免过多并行补偿任务
- 监控告警:及时发现位点跳跃和补偿失败
- 数据一致性:补偿后验证 MySQL 和 ES 数据一致性
互动话题
您在使用 Canal 同步数据时遇到过数据丢失问题吗?您是如何解决的?欢迎在评论区分享您的经验!
