SpringBoot + 事务日志归档 + 冷存储迁移:历史事务日志自动归档,保障主库轻量化
前言
在企业应用中,事务日志是非常重要的组成部分,它记录了所有数据库操作的详细信息,对于审计、故障排查和数据恢复都有着至关重要的作用。然而,随着业务的增长,事务日志会不断累积,占用大量的存储空间,导致主数据库性能下降,备份时间变长,维护成本增加。
想象一下这样的场景:你的系统已经运行了几年,事务日志已经占用了数TB的存储空间,每次备份都需要数小时,数据库查询性能也因为大量的历史日志而受到影响。此时,你需要一种方法来管理这些历史日志,将其从主数据库中移除,同时又能在需要时快速访问这些日志。
事务日志归档和冷存储迁移正是为了解决这个问题而设计的。通过定期将历史事务日志归档,并迁移到低成本的冷存储中,可以保持主数据库的轻量化,提高系统性能,降低存储成本。本文将详细介绍如何在 Spring Boot 中实现事务日志的归档和冷存储迁移功能。
一、核心概念
1.1 事务日志
事务日志是数据库系统用来记录所有事务操作的日志文件,包括插入、更新、删除等操作。事务日志的主要作用是:
- 保证数据一致性和完整性
- 支持事务回滚
- 用于数据库恢复
- 提供审计和故障排查的依据
1.2 日志归档
日志归档是将历史事务日志从主数据库中移动到专门的归档存储中的过程。归档的目的是:
- 减少主数据库的存储空间占用
- 提高主数据库的性能
- 简化备份和恢复操作
- 满足合规性要求
1.3 冷存储
冷存储是一种低成本、高容量的存储解决方案,适用于不经常访问的数据。冷存储的特点是:
- 存储成本低
- 访问速度相对较慢
- 适合长期存储
- 通常用于归档数据
1.4 热存储 vs 冷存储
| 特性 | 热存储 | 冷存储 |
|---|---|---|
| 访问速度 | 快 | 慢 |
| 存储成本 | 高 | 低 |
| 适用场景 | 频繁访问的数据 | 不经常访问的归档数据 |
| 存储介质 | SSD、内存 | HDD、云存储 |
1.5 为什么需要事务日志归档和冷存储迁移
- 性能优化:减少主数据库的存储空间占用,提高查询和写入性能
- 成本降低:将不经常访问的历史日志迁移到低成本的冷存储中
- 合规性:满足数据保留政策和法规要求
- 灾备保障:确保历史数据的安全存储和可恢复性
- 管理简化:简化数据库管理和备份操作
二、技术方案
2.1 架构设计
事务日志归档和冷存储迁移的架构设计主要包括以下几个部分:
- 事务日志收集:收集主数据库产生的事务日志
- 日志归档:将历史日志从主数据库中移动到归档存储
- 冷存储迁移:将归档日志迁移到冷存储中
- 日志检索:提供对归档和冷存储中日志的检索能力
- 监控和告警:监控归档和迁移过程,及时发现和处理问题
2.2 技术选型
- Spring Boot:作为基础框架,提供依赖注入、配置管理等功能
- Spring Data JPA:用于数据库操作
- Spring Scheduler:用于定时执行归档和迁移任务
- MinIO:作为对象存储,用于存储归档的事务日志
- Quartz:用于复杂的调度任务
- Spring Actuator:用于监控和管理
2.3 核心流程
- 日志收集:通过数据库触发器或应用程序拦截器收集事务日志
- 日志存储:将日志存储在主数据库的事务日志表中
- 日志归档:定期将超过一定时间的日志归档到归档表或对象存储
- 冷存储迁移:将归档日志迁移到冷存储中
- 日志检索:提供API用于检索归档和冷存储中的日志
- 监控和告警:监控归档和迁移过程,及时发现和处理问题
三、Spring Boot 事务日志归档实现
3.1 依赖配置
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- MySQL -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- MinIO -->
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.5.2</version>
</dependency>
<!-- Quartz -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<!-- Spring Boot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Micrometer -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Spring Boot Configuration Processor -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3.2 数据库设计
3.2.1 事务日志表
CREATE TABLE `transaction_log` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`transaction_id` VARCHAR(64) NOT NULL,
`user_id` BIGINT(20) NOT NULL,
`operation` VARCHAR(50) NOT NULL,
`table_name` VARCHAR(100) NOT NULL,
`old_value` JSON DEFAULT NULL,
`new_value` JSON DEFAULT NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
INDEX `idx_transaction_id` (`transaction_id`),
INDEX `idx_user_id` (`user_id`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
3.2.2 归档日志表
CREATE TABLE `archived_transaction_log` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`transaction_id` VARCHAR(64) NOT NULL,
`user_id` BIGINT(20) NOT NULL,
`operation` VARCHAR(50) NOT NULL,
`table_name` VARCHAR(100) NOT NULL,
`old_value` JSON DEFAULT NULL,
`new_value` JSON DEFAULT NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`archived_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
INDEX `idx_transaction_id` (`transaction_id`),
INDEX `idx_user_id` (`user_id`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
3.3 实体类
3.3.1 事务日志实体
@Entity
@Table(name = "transaction_log")
@Data
public class TransactionLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "transaction_id", nullable = false)
private String transactionId;
@Column(name = "user_id", nullable = false)
private Long userId;
@Column(name = "operation", nullable = false)
private String operation;
@Column(name = "table_name", nullable = false)
private String tableName;
@Column(name = "old_value", columnDefinition = "json")
private String oldValue;
@Column(name = "new_value", columnDefinition = "json")
private String newValue;
@Column(name = "created_at", nullable = false, updatable = false)
private LocalDateTime createdAt;
}
3.3.2 归档日志实体
@Entity
@Table(name = "archived_transaction_log")
@Data
public class ArchivedTransactionLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "transaction_id", nullable = false)
private String transactionId;
@Column(name = "user_id", nullable = false)
private Long userId;
@Column(name = "operation", nullable = false)
private String operation;
@Column(name = "table_name", nullable = false)
private String tableName;
@Column(name = "old_value", columnDefinition = "json")
private String oldValue;
@Column(name = "new_value", columnDefinition = "json")
private String newValue;
@Column(name = "created_at", nullable = false, updatable = false)
private LocalDateTime createdAt;
@Column(name = "archived_at", nullable = false, updatable = false)
private LocalDateTime archivedAt;
}
3.4 仓库接口
3.4.1 事务日志仓库
public interface TransactionLogRepository extends JpaRepository<TransactionLog, Long> {
List<TransactionLog> findByCreatedAtBefore(LocalDateTime dateTime);
void deleteByCreatedAtBefore(LocalDateTime dateTime);
}
3.4.2 归档日志仓库
public interface ArchivedTransactionLogRepository extends JpaRepository<ArchivedTransactionLog, Long> {
List<ArchivedTransactionLog> findByCreatedAtBetween(LocalDateTime startDateTime, LocalDateTime endDateTime);
List<ArchivedTransactionLog> findByUserIdAndCreatedAtBetween(Long userId, LocalDateTime startDateTime, LocalDateTime endDateTime);
}
3.5 归档服务
@Service
@Slf4j
public class TransactionLogArchiveService {
@Autowired
private TransactionLogRepository transactionLogRepository;
@Autowired
private ArchivedTransactionLogRepository archivedTransactionLogRepository;
@Autowired
private TransactionLogColdStorageService coldStorageService;
@Value("${transaction.log.archive.days:30}")
private int archiveDays;
@Value("${transaction.log.cold.storage.days:90}")
private int coldStorageDays;
/**
* 执行日志归档
*/
@Transactional
public void archiveLogs() {
log.info("开始执行事务日志归档");
// 计算归档日期
LocalDateTime archiveDate = LocalDateTime.now().minusDays(archiveDays);
// 查询需要归档的日志
List<TransactionLog> logs = transactionLogRepository.findByCreatedAtBefore(archiveDate);
log.info("找到 {} 条需要归档的日志", logs.size());
if (!logs.isEmpty()) {
// 转换为归档日志
List<ArchivedTransactionLog> archivedLogs = logs.stream()
.map(this::convertToArchivedLog)
.collect(Collectors.toList());
// 保存归档日志
archivedTransactionLogRepository.saveAll(archivedLogs);
log.info("已保存 {} 条归档日志", archivedLogs.size());
// 删除原日志
transactionLogRepository.deleteByCreatedAtBefore(archiveDate);
log.info("已删除 {} 条原日志", logs.size());
}
log.info("事务日志归档完成");
}
/**
* 执行冷存储迁移
*/
@Transactional
public void migrateToColdStorage() {
log.info("开始执行事务日志冷存储迁移");
// 计算冷存储迁移日期
LocalDateTime coldStorageDate = LocalDateTime.now().minusDays(coldStorageDays);
// 查询需要迁移到冷存储的归档日志
List<ArchivedTransactionLog> archivedLogs = archivedTransactionLogRepository.findByCreatedAtBefore(coldStorageDate);
log.info("找到 {} 条需要迁移到冷存储的归档日志", archivedLogs.size());
if (!archivedLogs.isEmpty()) {
// 迁移到冷存储
coldStorageService.migrateToColdStorage(archivedLogs);
log.info("已迁移 {} 条归档日志到冷存储", archivedLogs.size());
// 删除已迁移的归档日志
archivedLogs.forEach(log -> archivedTransactionLogRepository.delete(log));
log.info("已删除 {} 条已迁移的归档日志", archivedLogs.size());
}
log.info("事务日志冷存储迁移完成");
}
/**
* 将事务日志转换为归档日志
*/
private ArchivedTransactionLog convertToArchivedLog(TransactionLog log) {
ArchivedTransactionLog archivedLog = new ArchivedTransactionLog();
archivedLog.setTransactionId(log.getTransactionId());
archivedLog.setUserId(log.getUserId());
archivedLog.setOperation(log.getOperation());
archivedLog.setTableName(log.getTableName());
archivedLog.setOldValue(log.getOldValue());
archivedLog.setNewValue(log.getNewValue());
archivedLog.setCreatedAt(log.getCreatedAt());
archivedLog.setArchivedAt(LocalDateTime.now());
return archivedLog;
}
}
3.6 定时任务
@Configuration
public class SchedulerConfig {
@Autowired
private TransactionLogArchiveService archiveService;
@Bean
public JobDetail archiveJobDetail() {
return JobBuilder.newJob(ArchiveJob.class)
.withIdentity("archiveJob")
.storeDurably()
.build();
}
@Bean
public Trigger archiveJobTrigger() {
return TriggerBuilder.newTrigger()
.forJob(archiveJobDetail())
.withIdentity("archiveJobTrigger")
.withSchedule(CronScheduleBuilder.dailyAtHourAndMinute(0, 0)) // 每天凌晨执行
.build();
}
@Bean
public JobDetail coldStorageJobDetail() {
return JobBuilder.newJob(ColdStorageJob.class)
.withIdentity("coldStorageJob")
.storeDurably()
.build();
}
@Bean
public Trigger coldStorageJobTrigger() {
return TriggerBuilder.newTrigger()
.forJob(coldStorageJobDetail())
.withIdentity("coldStorageJobTrigger")
.withSchedule(CronScheduleBuilder.weeklyOnDayAndHourAndMinute(1, 1, 0)) // 每周一凌晨执行
.build();
}
public static class ArchiveJob implements Job {
@Autowired
private TransactionLogArchiveService archiveService;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
archiveService.archiveLogs();
}
}
public static class ColdStorageJob implements Job {
@Autowired
private TransactionLogArchiveService archiveService;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
archiveService.migrateToColdStorage();
}
}
}
四、冷存储迁移实现
4.1 MinIO 配置
# MinIO 配置
minio:
url: http://localhost:9000
access-key: minioadmin
secret-key: minioadmin
bucket: transaction-logs
4.2 MinIO 配置类
@Data
@ConfigurationProperties(prefix = "minio")
public class MinioProperties {
private String url;
private String accessKey;
private String secretKey;
private String bucket;
}
4.3 MinIO 客户端配置
@Configuration
public class MinioConfig {
@Autowired
private MinioProperties properties;
@Bean
public MinioClient minioClient() throws Exception {
MinioClient client = MinioClient.builder()
.endpoint(properties.getUrl())
.credentials(properties.getAccessKey(), properties.getSecretKey())
.build();
// 创建桶(如果不存在)
boolean bucketExists = client.bucketExists(BucketExistsArgs.builder().bucket(properties.getBucket()).build());
if (!bucketExists) {
client.makeBucket(MakeBucketArgs.builder().bucket(properties.getBucket()).build());
}
return client;
}
}
4.4 冷存储服务
@Service
@Slf4j
public class TransactionLogColdStorageService {
@Autowired
private MinioClient minioClient;
@Autowired
private MinioProperties properties;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* 迁移归档日志到冷存储
*/
public void migrateToColdStorage(List<ArchivedTransactionLog> logs) {
try {
// 按日期分组
Map<String, List<ArchivedTransactionLog>> logsByDate = logs.stream()
.collect(Collectors.groupingBy(log -> log.getCreatedAt().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))));
// 批量上传
for (Map.Entry<String, List<ArchivedTransactionLog>> entry : logsByDate.entrySet()) {
String date = entry.getKey();
List<ArchivedTransactionLog> dateLogs = entry.getValue();
// 生成文件名
String fileName = String.format("transaction-logs-%s.json", date);
String objectName = String.format("archived/%s", fileName);
// 转换为JSON
String json = objectMapper.writeValueAsString(dateLogs);
// 上传到MinIO
minioClient.putObject(
PutObjectArgs.builder()
.bucket(properties.getBucket())
.object(objectName)
.contentType("application/json")
.stream(new ByteArrayInputStream(json.getBytes()), json.length(), -1)
.build()
);
log.info("已上传 {} 条日志到冷存储: {}", dateLogs.size(), objectName);
}
} catch (Exception e) {
log.error("迁移到冷存储失败", e);
throw new RuntimeException("迁移到冷存储失败", e);
}
}
/**
* 从冷存储中检索日志
*/
public List<ArchivedTransactionLog> retrieveFromColdStorage(String date) {
try {
// 生成文件名
String fileName = String.format("transaction-logs-%s.json", date);
String objectName = String.format("archived/%s", fileName);
// 从MinIO下载
GetObjectResponse response = minioClient.getObject(
GetObjectArgs.builder()
.bucket(properties.getBucket())
.object(objectName)
.build()
);
// 解析JSON
List<ArchivedTransactionLog> logs = objectMapper.readValue(response, new TypeReference<List<ArchivedTransactionLog>>() {});
log.info("从冷存储中检索到 {} 条日志: {}", logs.size(), objectName);
return logs;
} catch (Exception e) {
log.error("从冷存储中检索失败", e);
return Collections.emptyList();
}
}
}
五、日志检索实现
5.1 控制器
@RestController
@RequestMapping("/api/transaction-logs")
public class TransactionLogController {
@Autowired
private TransactionLogRepository transactionLogRepository;
@Autowired
private ArchivedTransactionLogRepository archivedTransactionLogRepository;
@Autowired
private TransactionLogColdStorageService coldStorageService;
/**
* 查询当前日志
*/
@GetMapping("/current")
public ResponseEntity<List<TransactionLog>> getCurrentLogs(
@RequestParam(required = false) Long userId,
@RequestParam(required = false) String operation,
@RequestParam(required = false) String tableName,
@RequestParam(required = false) LocalDateTime startDate,
@RequestParam(required = false) LocalDateTime endDate) {
// 构建查询
Specification<TransactionLog> spec = (root, query, cb) -> {
List<Predicate> predicates = new ArrayList<>();
if (userId != null) {
predicates.add(cb.equal(root.get("userId"), userId));
}
if (operation != null) {
predicates.add(cb.equal(root.get("operation"), operation));
}
if (tableName != null) {
predicates.add(cb.equal(root.get("tableName"), tableName));
}
if (startDate != null) {
predicates.add(cb.greaterThanOrEqualTo(root.get("createdAt"), startDate));
}
if (endDate != null) {
predicates.add(cb.lessThanOrEqualTo(root.get("createdAt"), endDate));
}
return cb.and(predicates.toArray(new Predicate[0]));
};
List<TransactionLog> logs = transactionLogRepository.findAll(spec);
return ResponseEntity.ok(logs);
}
/**
* 查询归档日志
*/
@GetMapping("/archived")
public ResponseEntity<List<ArchivedTransactionLog>> getArchivedLogs(
@RequestParam(required = false) Long userId,
@RequestParam(required = false) String operation,
@RequestParam(required = false) String tableName,
@RequestParam(required = false) LocalDateTime startDate,
@RequestParam(required = false) LocalDateTime endDate) {
// 构建查询
Specification<ArchivedTransactionLog> spec = (root, query, cb) -> {
List<Predicate> predicates = new ArrayList<>();
if (userId != null) {
predicates.add(cb.equal(root.get("userId"), userId));
}
if (operation != null) {
predicates.add(cb.equal(root.get("operation"), operation));
}
if (tableName != null) {
predicates.add(cb.equal(root.get("tableName"), tableName));
}
if (startDate != null) {
predicates.add(cb.greaterThanOrEqualTo(root.get("createdAt"), startDate));
}
if (endDate != null) {
predicates.add(cb.lessThanOrEqualTo(root.get("createdAt"), endDate));
}
return cb.and(predicates.toArray(new Predicate[0]));
};
List<ArchivedTransactionLog> logs = archivedTransactionLogRepository.findAll(spec);
return ResponseEntity.ok(logs);
}
/**
* 查询冷存储日志
*/
@GetMapping("/cold-storage")
public ResponseEntity<List<ArchivedTransactionLog>> getColdStorageLogs(
@RequestParam String date) {
List<ArchivedTransactionLog> logs = coldStorageService.retrieveFromColdStorage(date);
return ResponseEntity.ok(logs);
}
}
六、Spring Boot 完整实现
6.1 项目依赖
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- MySQL -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- MinIO -->
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.5.2</version>
</dependency>
<!-- Quartz -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<!-- Spring Boot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Micrometer -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Spring Boot Configuration Processor -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
6.2 配置文件
server:
port: 8080
spring:
application:
name: transaction-log-archive-demo
datasource:
url: jdbc:mysql://localhost:3306/transaction_log_demo?useSSL=false&serverTimezone=UTC
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: update
show-sql: true
quartz:
job-store-type: jdbc
# MinIO 配置
minio:
url: http://localhost:9000
access-key: minioadmin
secret-key: minioadmin
bucket: transaction-logs
# 事务日志配置
transaction:
log:
archive:
days: 30
cold-storage:
days: 90
# 监控配置
management:
endpoints:
web:
exposure:
include: health,info,prometheus
6.3 核心配置类
6.3.1 实体类
@Entity
@Table(name = "transaction_log")
@Data
public class TransactionLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "transaction_id", nullable = false)
private String transactionId;
@Column(name = "user_id", nullable = false)
private Long userId;
@Column(name = "operation", nullable = false)
private String operation;
@Column(name = "table_name", nullable = false)
private String tableName;
@Column(name = "old_value", columnDefinition = "json")
private String oldValue;
@Column(name = "new_value", columnDefinition = "json")
private String newValue;
@Column(name = "created_at", nullable = false, updatable = false)
private LocalDateTime createdAt;
}
@Entity
@Table(name = "archived_transaction_log")
@Data
public class ArchivedTransactionLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "transaction_id", nullable = false)
private String transactionId;
@Column(name = "user_id", nullable = false)
private Long userId;
@Column(name = "operation", nullable = false)
private String operation;
@Column(name = "table_name", nullable = false)
private String tableName;
@Column(name = "old_value", columnDefinition = "json")
private String oldValue;
@Column(name = "new_value", columnDefinition = "json")
private String newValue;
@Column(name = "created_at", nullable = false, updatable = false)
private LocalDateTime createdAt;
@Column(name = "archived_at", nullable = false, updatable = false)
private LocalDateTime archivedAt;
}
6.3.2 仓库接口
public interface TransactionLogRepository extends JpaRepository<TransactionLog, Long> {
List<TransactionLog> findByCreatedAtBefore(LocalDateTime dateTime);
void deleteByCreatedAtBefore(LocalDateTime dateTime);
}
public interface ArchivedTransactionLogRepository extends JpaRepository<ArchivedTransactionLog, Long> {
List<ArchivedTransactionLog> findByCreatedAtBefore(LocalDateTime dateTime);
List<ArchivedTransactionLog> findByCreatedAtBetween(LocalDateTime startDateTime, LocalDateTime endDateTime);
List<ArchivedTransactionLog> findByUserIdAndCreatedAtBetween(Long userId, LocalDateTime startDateTime, LocalDateTime endDateTime);
}
6.3.3 服务实现
@Service
@Slf4j
public class TransactionLogArchiveService {
@Autowired
private TransactionLogRepository transactionLogRepository;
@Autowired
private ArchivedTransactionLogRepository archivedTransactionLogRepository;
@Autowired
private TransactionLogColdStorageService coldStorageService;
@Value("${transaction.log.archive.days:30}")
private int archiveDays;
@Value("${transaction.log.cold.storage.days:90}")
private int coldStorageDays;
/**
* 执行日志归档
*/
@Transactional
public void archiveLogs() {
log.info("开始执行事务日志归档");
// 计算归档日期
LocalDateTime archiveDate = LocalDateTime.now().minusDays(archiveDays);
// 查询需要归档的日志
List<TransactionLog> logs = transactionLogRepository.findByCreatedAtBefore(archiveDate);
log.info("找到 {} 条需要归档的日志", logs.size());
if (!logs.isEmpty()) {
// 转换为归档日志
List<ArchivedTransactionLog> archivedLogs = logs.stream()
.map(this::convertToArchivedLog)
.collect(Collectors.toList());
// 保存归档日志
archivedTransactionLogRepository.saveAll(archivedLogs);
log.info("已保存 {} 条归档日志", archivedLogs.size());
// 删除原日志
transactionLogRepository.deleteByCreatedAtBefore(archiveDate);
log.info("已删除 {} 条原日志", logs.size());
}
log.info("事务日志归档完成");
}
/**
* 执行冷存储迁移
*/
@Transactional
public void migrateToColdStorage() {
log.info("开始执行事务日志冷存储迁移");
// 计算冷存储迁移日期
LocalDateTime coldStorageDate = LocalDateTime.now().minusDays(coldStorageDays);
// 查询需要迁移到冷存储的归档日志
List<ArchivedTransactionLog> archivedLogs = archivedTransactionLogRepository.findByCreatedAtBefore(coldStorageDate);
log.info("找到 {} 条需要迁移到冷存储的归档日志", archivedLogs.size());
if (!archivedLogs.isEmpty()) {
// 迁移到冷存储
coldStorageService.migrateToColdStorage(archivedLogs);
log.info("已迁移 {} 条归档日志到冷存储", archivedLogs.size());
// 删除已迁移的归档日志
archivedLogs.forEach(log -> archivedTransactionLogRepository.delete(log));
log.info("已删除 {} 条已迁移的归档日志", archivedLogs.size());
}
log.info("事务日志冷存储迁移完成");
}
/**
* 将事务日志转换为归档日志
*/
private ArchivedTransactionLog convertToArchivedLog(TransactionLog log) {
ArchivedTransactionLog archivedLog = new ArchivedTransactionLog();
archivedLog.setTransactionId(log.getTransactionId());
archivedLog.setUserId(log.getUserId());
archivedLog.setOperation(log.getOperation());
archivedLog.setTableName(log.getTableName());
archivedLog.setOldValue(log.getOldValue());
archivedLog.setNewValue(log.getNewValue());
archivedLog.setCreatedAt(log.getCreatedAt());
archivedLog.setArchivedAt(LocalDateTime.now());
return archivedLog;
}
}
@Service
@Slf4j
public class TransactionLogColdStorageService {
@Autowired
private MinioClient minioClient;
@Autowired
private MinioProperties properties;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* 迁移归档日志到冷存储
*/
public void migrateToColdStorage(List<ArchivedTransactionLog> logs) {
try {
// 按日期分组
Map<String, List<ArchivedTransactionLog>> logsByDate = logs.stream()
.collect(Collectors.groupingBy(log -> log.getCreatedAt().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))));
// 批量上传
for (Map.Entry<String, List<ArchivedTransactionLog>> entry : logsByDate.entrySet()) {
String date = entry.getKey();
List<ArchivedTransactionLog> dateLogs = entry.getValue();
// 生成文件名
String fileName = String.format("transaction-logs-%s.json", date);
String objectName = String.format("archived/%s", fileName);
// 转换为JSON
String json = objectMapper.writeValueAsString(dateLogs);
// 上传到MinIO
minioClient.putObject(
PutObjectArgs.builder()
.bucket(properties.getBucket())
.object(objectName)
.contentType("application/json")
.stream(new ByteArrayInputStream(json.getBytes()), json.length(), -1)
.build()
);
log.info("已上传 {} 条日志到冷存储: {}", dateLogs.size(), objectName);
}
} catch (Exception e) {
log.error("迁移到冷存储失败", e);
throw new RuntimeException("迁移到冷存储失败", e);
}
}
/**
* 从冷存储中检索日志
*/
public List<ArchivedTransactionLog> retrieveFromColdStorage(String date) {
try {
// 生成文件名
String fileName = String.format("transaction-logs-%s.json", date);
String objectName = String.format("archived/%s", fileName);
// 从MinIO下载
GetObjectResponse response = minioClient.getObject(
GetObjectArgs.builder()
.bucket(properties.getBucket())
.object(objectName)
.build()
);
// 解析JSON
List<ArchivedTransactionLog> logs = objectMapper.readValue(response, new TypeReference<List<ArchivedTransactionLog>>() {});
log.info("从冷存储中检索到 {} 条日志: {}", logs.size(), objectName);
return logs;
} catch (Exception e) {
log.error("从冷存储中检索失败", e);
return Collections.emptyList();
}
}
}
6.3.4 定时任务
@Configuration
public class SchedulerConfig {
@Autowired
private TransactionLogArchiveService archiveService;
@Bean
public JobDetail archiveJobDetail() {
return JobBuilder.newJob(ArchiveJob.class)
.withIdentity("archiveJob")
.storeDurably()
.build();
}
@Bean
public Trigger archiveJobTrigger() {
return TriggerBuilder.newTrigger()
.forJob(archiveJobDetail())
.withIdentity("archiveJobTrigger")
.withSchedule(CronScheduleBuilder.dailyAtHourAndMinute(0, 0)) // 每天凌晨执行
.build();
}
@Bean
public JobDetail coldStorageJobDetail() {
return JobBuilder.newJob(ColdStorageJob.class)
.withIdentity("coldStorageJob")
.storeDurably()
.build();
}
@Bean
public Trigger coldStorageJobTrigger() {
return TriggerBuilder.newTrigger()
.forJob(coldStorageJobDetail())
.withIdentity("coldStorageJobTrigger")
.withSchedule(CronScheduleBuilder.weeklyOnDayAndHourAndMinute(1, 1, 0)) // 每周一凌晨执行
.build();
}
public static class ArchiveJob implements Job {
@Autowired
private TransactionLogArchiveService archiveService;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
archiveService.archiveLogs();
}
}
public static class ColdStorageJob implements Job {
@Autowired
private TransactionLogArchiveService archiveService;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
archiveService.migrateToColdStorage();
}
}
}
6.3.5 控制器
@RestController
@RequestMapping("/api/transaction-logs")
public class TransactionLogController {
@Autowired
private TransactionLogRepository transactionLogRepository;
@Autowired
private ArchivedTransactionLogRepository archivedTransactionLogRepository;
@Autowired
private TransactionLogColdStorageService coldStorageService;
/**
* 查询当前日志
*/
@GetMapping("/current")
public ResponseEntity<List<TransactionLog>> getCurrentLogs(
@RequestParam(required = false) Long userId,
@RequestParam(required = false) String operation,
@RequestParam(required = false) String tableName,
@RequestParam(required = false) LocalDateTime startDate,
@RequestParam(required = false) LocalDateTime endDate) {
// 构建查询
Specification<TransactionLog> spec = (root, query, cb) -> {
List<Predicate> predicates = new ArrayList<>();
if (userId != null) {
predicates.add(cb.equal(root.get("userId"), userId));
}
if (operation != null) {
predicates.add(cb.equal(root.get("operation"), operation));
}
if (tableName != null) {
predicates.add(cb.equal(root.get("tableName"), tableName));
}
if (startDate != null) {
predicates.add(cb.greaterThanOrEqualTo(root.get("createdAt"), startDate));
}
if (endDate != null) {
predicates.add(cb.lessThanOrEqualTo(root.get("createdAt"), endDate));
}
return cb.and(predicates.toArray(new Predicate[0]));
};
List<TransactionLog> logs = transactionLogRepository.findAll(spec);
return ResponseEntity.ok(logs);
}
/**
* 查询归档日志
*/
@GetMapping("/archived")
public ResponseEntity<List<ArchivedTransactionLog>> getArchivedLogs(
@RequestParam(required = false) Long userId,
@RequestParam(required = false) String operation,
@RequestParam(required = false) String tableName,
@RequestParam(required = false) LocalDateTime startDate,
@RequestParam(required = false) LocalDateTime endDate) {
// 构建查询
Specification<ArchivedTransactionLog> spec = (root, query, cb) -> {
List<Predicate> predicates = new ArrayList<>();
if (userId != null) {
predicates.add(cb.equal(root.get("userId"), userId));
}
if (operation != null) {
predicates.add(cb.equal(root.get("operation"), operation));
}
if (tableName != null) {
predicates.add(cb.equal(root.get("tableName"), tableName));
}
if (startDate != null) {
predicates.add(cb.greaterThanOrEqualTo(root.get("createdAt"), startDate));
}
if (endDate != null) {
predicates.add(cb.lessThanOrEqualTo(root.get("createdAt"), endDate));
}
return cb.and(predicates.toArray(new Predicate[0]));
};
List<ArchivedTransactionLog> logs = archivedTransactionLogRepository.findAll(spec);
return ResponseEntity.ok(logs);
}
/**
* 查询冷存储日志
*/
@GetMapping("/cold-storage")
public ResponseEntity<List<ArchivedTransactionLog>> getColdStorageLogs(
@RequestParam String date) {
List<ArchivedTransactionLog> logs = coldStorageService.retrieveFromColdStorage(date);
return ResponseEntity.ok(logs);
}
}
6.4 应用入口
@SpringBootApplication
public class TransactionLogArchiveDemoApplication {
public static void main(String[] args) {
SpringApplication.run(TransactionLogArchiveDemoApplication.class, args);
}
}
七、最佳实践
7.1 归档策略
原则:
- 定期归档:根据业务需求和数据量,定期执行归档操作
- 增量归档:每次只归档新增的历史数据,避免重复处理
- 事务保障:归档操作应该在事务中执行,确保数据一致性
- 错误处理:妥善处理归档过程中的错误,避免数据丢失
建议:
- 对于高频操作的系统,建议每天执行一次归档
- 对于低频操作的系统,建议每周执行一次归档
- 归档操作应该在系统负载较低的时间段执行,如凌晨
- 归档后应该验证归档数据的完整性和一致性
7.2 冷存储策略
原则:
- 分层存储:根据数据访问频率,将数据存储在不同层级的存储中
- 成本优化:将不经常访问的数据迁移到低成本的冷存储中
- 数据压缩:对冷存储中的数据进行压缩,减少存储空间
- 数据加密:对冷存储中的敏感数据进行加密,保障数据安全
建议:
- 对于30天以内的日志,存储在主数据库中
- 对于30-90天的日志,存储在归档表中
- 对于90天以上的日志,迁移到冷存储中
- 冷存储可以选择对象存储服务,如MinIO、AWS S3等
7.3 性能优化
原则:
- 批量处理:批量处理日志数据,减少数据库操作次数
- 索引优化:为查询频繁的字段创建索引
- 并行处理:对于大量数据的归档和迁移,使用并行处理提高效率
- 监控优化:监控归档和迁移过程,及时发现和解决性能问题
建议:
- 批量处理日志时,每次处理的记录数应该根据系统性能进行调整
- 为transaction_id、user_id、created_at等字段创建索引
- 对于大量数据的迁移,可以使用多线程并行处理
- 使用Spring Boot Actuator监控归档和迁移过程的性能指标
7.4 监控和告警
原则:
- 实时监控:实时监控归档和迁移过程的状态
- 异常告警:当归档或迁移过程出现异常时,及时告警
- 性能监控:监控归档和迁移过程的性能指标
- 趋势分析:分析归档和迁移的趋势,优化策略
建议:
- 使用Spring Boot Actuator和Micrometer监控系统指标
- 设置告警阈值,当归档或迁移失败时及时通知运维人员
- 定期分析归档和迁移的性能数据,优化处理策略
- 建立监控面板,直观展示归档和迁移的状态和趋势
互动话题:
- 你在实际项目中是如何管理事务日志的?
- 你认为事务日志归档和冷存储迁移最大的挑战是什么?
- 你有使用过类似的日志管理方案吗?
欢迎在评论区留言讨论!更多技术文章,欢迎关注公众号:服务端技术精选
标题:SpringBoot + 事务日志归档 + 冷存储迁移:历史事务日志自动归档,保障主库轻量化
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/04/09/1775466541433.html
公众号:服务端技术精选
- 前言
- 一、核心概念
- 1.1 事务日志
- 1.2 日志归档
- 1.3 冷存储
- 1.4 热存储 vs 冷存储
- 1.5 为什么需要事务日志归档和冷存储迁移
- 二、技术方案
- 2.1 架构设计
- 2.2 技术选型
- 2.3 核心流程
- 三、Spring Boot 事务日志归档实现
- 3.1 依赖配置
- 3.2 数据库设计
- 3.2.1 事务日志表
- 3.2.2 归档日志表
- 3.3 实体类
- 3.3.1 事务日志实体
- 3.3.2 归档日志实体
- 3.4 仓库接口
- 3.4.1 事务日志仓库
- 3.4.2 归档日志仓库
- 3.5 归档服务
- 3.6 定时任务
- 四、冷存储迁移实现
- 4.1 MinIO 配置
- 4.2 MinIO 配置类
- 4.3 MinIO 客户端配置
- 4.4 冷存储服务
- 五、日志检索实现
- 5.1 控制器
- 六、Spring Boot 完整实现
- 6.1 项目依赖
- 6.2 配置文件
- 6.3 核心配置类
- 6.3.1 实体类
- 6.3.2 仓库接口
- 6.3.3 服务实现
- 6.3.4 定时任务
- 6.3.5 控制器
- 6.4 应用入口
- 七、最佳实践
- 7.1 归档策略
- 7.2 冷存储策略
- 7.3 性能优化
- 7.4 监控和告警
评论
0 评论