SpringBoot + Redis 大 Key 拆分 + 自动检测告警:单个 Key 超 10MB?自动拆分防阻塞
前言
在使用 Redis 时,大 Key 是一个常见的性能瓶颈。大 Key 指的是占用内存较大的键值对,通常指单个 Key 大小超过 10MB 的情况。大 Key 会带来一系列问题:
- 内存占用:大 Key 会占用大量内存,导致内存使用不均衡
- 网络传输:大 Key 会增加网络传输时间,影响系统响应速度
- 阻塞操作:对大 Key 进行操作时,会阻塞 Redis 服务器,影响其他操作
- 过期删除:大 Key 过期时,Redis 会进行同步删除,可能导致服务卡顿
想象一下这样的场景:你的应用在高峰期突然变得响应缓慢,甚至出现服务不可用的情况。通过监控发现,Redis 服务器 CPU 使用率突然飙升,内存使用异常。经过排查,发现是某个 Key 的大小超过了 10MB,导致 Redis 服务器在处理这个 Key 时被阻塞。
如何解决这个问题? 本文将详细介绍如何在 Spring Boot 中实现 Redis 大 Key 的自动检测、拆分和告警,帮助你避免大 Key 带来的性能问题。
一、核心概念
1.1 大 Key
大 Key 是指占用内存较大的键值对,通常有以下几种类型:
- String 类型:单个字符串值很大,如存储序列化后的大对象
- List 类型:列表中元素数量过多
- Hash 类型:哈希表中字段数量过多
- Set 类型:集合中元素数量过多
- ZSet 类型:有序集合中元素数量过多
1.2 大 Key 的危害
- 内存占用不均衡:大 Key 会导致 Redis 实例内存使用不均衡,影响内存管理
- 网络传输缓慢:大 Key 会增加网络传输时间,影响系统响应速度
- 阻塞操作:对大 Key 进行操作时,会阻塞 Redis 服务器,影响其他操作
- 过期删除卡顿:大 Key 过期时,Redis 会进行同步删除,可能导致服务卡顿
- 主从复制延迟:大 Key 会增加主从复制的时间,导致复制延迟
1.3 大 Key 拆分策略
针对不同类型的大 Key,有不同的拆分策略:
- String 类型:将大字符串拆分为多个小字符串,使用前缀 + 序号的方式存储
- List 类型:将大列表拆分为多个小列表,使用前缀 + 序号的方式存储
- Hash 类型:根据哈希字段的特性,将哈希表拆分为多个小哈希表
- Set 类型:将大集合拆分为多个小集合
- ZSet 类型:将大有序集合拆分为多个小有序集合
1.4 大 Key 检测方法
- Redis 命令:使用
DEBUG OBJECT命令查看 Key 的大小 - Redis 监控:使用 Redis 的监控工具,如 Redis Sentinel、Redis Cluster
- 第三方工具:使用 Redis RDB 分析工具,如 redis-rdb-tools
- 自定义脚本:编写脚本定期检测 Key 的大小
1.5 告警机制
- 阈值告警:当 Key 大小超过阈值时,触发告警
- 趋势告警:当 Key 大小增长过快时,触发告警
- 自动处理:当检测到大 Key 时,自动进行拆分处理
二、技术方案
2.1 架构设计
Redis 大 Key 拆分和自动检测告警的架构设计主要包括以下几个部分:
- 数据层:Redis 存储层,负责存储数据
- 服务层:Spring Boot 应用层,负责业务逻辑和大 Key 处理
- 监控层:监控 Redis Key 大小,检测大 Key
- 告警层:当检测到大 Key 时,触发告警
- 处理层:当检测到大 Key 时,自动进行拆分处理
2.2 技术选型
- Spring Boot:作为基础框架,提供依赖注入、配置管理等功能
- Spring Data Redis:用于操作 Redis
- Lettuce:Redis 客户端,提供异步操作能力
- Redisson:Redis 客户端,提供分布式锁、集合等高级功能
- ScheduledExecutorService:用于定期检测大 Key
- Spring Boot Actuator:用于暴露监控端点
- Prometheus:用于监控系统指标
- Grafana:用于可视化监控数据
- 企业微信/钉钉:用于发送告警通知
2.3 核心流程
- 数据存储:应用通过 Spring Data Redis 操作 Redis,存储数据
- 大 Key 检测:定期检测 Redis Key 的大小,发现大 Key
- 告警触发:当检测到大 Key 时,触发告警通知
- 自动拆分:当检测到大 Key 时,自动进行拆分处理
- 数据访问:应用通过封装的接口访问拆分后的数据
三、Spring Boot Redis 大 Key 拆分实现
3.1 依赖配置
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Data Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.7</version>
</dependency>
<!-- Spring Boot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Micrometer -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3.2 配置文件
spring:
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 10000
lettuce:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
# 大 Key 配置
redis:
big-key:
threshold: 10485760 # 10MB,单位字节
check-interval: 60000 # 检查间隔,单位毫秒
split-enabled: true # 是否启用自动拆分
alert-enabled: true # 是否启用告警
alert-threshold: 5242880 # 告警阈值,5MB
notify-url: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key" # 企业微信告警地址
# 监控配置
management:
endpoints:
web:
exposure:
include: health,info,prometheus
3.3 大 Key 拆分工具类
@Slf4j
@Component
public class RedisBigKeySplitter {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 拆分 String 类型的大 Key
*/
public void splitStringKey(String key, byte[] value) {
log.info("开始拆分 String 类型的大 Key: {}", key);
// 计算拆分数量
int chunkSize = 1024 * 1024; // 1MB 每块
int totalChunks = (value.length + chunkSize - 1) / chunkSize;
// 存储拆分后的数据
for (int i = 0; i < totalChunks; i++) {
int start = i * chunkSize;
int end = Math.min(start + chunkSize, value.length);
byte[] chunk = Arrays.copyOfRange(value, start, end);
redisTemplate.opsForValue().set(key + ":chunk:" + i, new String(chunk, StandardCharsets.UTF_8));
}
// 存储元数据
redisTemplate.opsForHash().put(key + ":meta", "type", "string");
redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(totalChunks));
redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(value.length));
// 删除原始大 Key
redisTemplate.delete(key);
log.info("String 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, totalChunks);
}
/**
* 拆分 List 类型的大 Key
*/
public void splitListKey(String key, List<String> values) {
log.info("开始拆分 List 类型的大 Key: {}", key);
// 计算拆分数量
int chunkSize = 1000; // 每块 1000 个元素
int totalChunks = (values.size() + chunkSize - 1) / chunkSize;
// 存储拆分后的数据
for (int i = 0; i < totalChunks; i++) {
int start = i * chunkSize;
int end = Math.min(start + chunkSize, values.size());
List<String> chunk = values.subList(start, end);
for (String value : chunk) {
redisTemplate.opsForList().rightPush(key + ":chunk:" + i, value);
}
}
// 存储元数据
redisTemplate.opsForHash().put(key + ":meta", "type", "list");
redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(totalChunks));
redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(values.size()));
// 删除原始大 Key
redisTemplate.delete(key);
log.info("List 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, totalChunks);
}
/**
* 拆分 Hash 类型的大 Key
*/
public void splitHashKey(String key, Map<String, String> values) {
log.info("开始拆分 Hash 类型的大 Key: {}", key);
// 计算拆分数量
int chunkSize = 1000; // 每块 1000 个字段
List<Map<String, String>> chunks = new ArrayList<>();
Map<String, String> currentChunk = new HashMap<>();
for (Map.Entry<String, String> entry : values.entrySet()) {
if (currentChunk.size() >= chunkSize) {
chunks.add(currentChunk);
currentChunk = new HashMap<>();
}
currentChunk.put(entry.getKey(), entry.getValue());
}
if (!currentChunk.isEmpty()) {
chunks.add(currentChunk);
}
// 存储拆分后的数据
for (int i = 0; i < chunks.size(); i++) {
Map<String, String> chunk = chunks.get(i);
redisTemplate.opsForHash().putAll(key + ":chunk:" + i, chunk);
}
// 存储元数据
redisTemplate.opsForHash().put(key + ":meta", "type", "hash");
redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(chunks.size()));
redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(values.size()));
// 删除原始大 Key
redisTemplate.delete(key);
log.info("Hash 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, chunks.size());
}
/**
* 拆分 Set 类型的大 Key
*/
public void splitSetKey(String key, Set<String> values) {
log.info("开始拆分 Set 类型的大 Key: {}", key);
// 计算拆分数量
int chunkSize = 1000; // 每块 1000 个元素
List<Set<String>> chunks = new ArrayList<>();
Set<String> currentChunk = new HashSet<>();
for (String value : values) {
if (currentChunk.size() >= chunkSize) {
chunks.add(currentChunk);
currentChunk = new HashSet<>();
}
currentChunk.add(value);
}
if (!currentChunk.isEmpty()) {
chunks.add(currentChunk);
}
// 存储拆分后的数据
for (int i = 0; i < chunks.size(); i++) {
Set<String> chunk = chunks.get(i);
for (String value : chunk) {
redisTemplate.opsForSet().add(key + ":chunk:" + i, value);
}
}
// 存储元数据
redisTemplate.opsForHash().put(key + ":meta", "type", "set");
redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(chunks.size()));
redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(values.size()));
// 删除原始大 Key
redisTemplate.delete(key);
log.info("Set 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, chunks.size());
}
/**
* 拆分 ZSet 类型的大 Key
*/
public void splitZSetKey(String key, Set<ZSetOperations.TypedTuple<String>> values) {
log.info("开始拆分 ZSet 类型的大 Key: {}", key);
// 计算拆分数量
int chunkSize = 1000; // 每块 1000 个元素
List<Set<ZSetOperations.TypedTuple<String>>> chunks = new ArrayList<>();
Set<ZSetOperations.TypedTuple<String>> currentChunk = new HashSet<>();
for (ZSetOperations.TypedTuple<String> value : values) {
if (currentChunk.size() >= chunkSize) {
chunks.add(currentChunk);
currentChunk = new HashSet<>();
}
currentChunk.add(value);
}
if (!currentChunk.isEmpty()) {
chunks.add(currentChunk);
}
// 存储拆分后的数据
for (int i = 0; i < chunks.size(); i++) {
Set<ZSetOperations.TypedTuple<String>> chunk = chunks.get(i);
redisTemplate.opsForZSet().add(key + ":chunk:" + i, chunk);
}
// 存储元数据
redisTemplate.opsForHash().put(key + ":meta", "type", "zset");
redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(chunks.size()));
redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(values.size()));
// 删除原始大 Key
redisTemplate.delete(key);
log.info("ZSet 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, chunks.size());
}
}
3.4 大 Key 检测服务
@Service
@Slf4j
public class RedisBigKeyDetector {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedisBigKeySplitter redisBigKeySplitter;
@Value("${redis.big-key.threshold:10485760}")
private long threshold;
@Value("${redis.big-key.alert-threshold:5242880}")
private long alertThreshold;
@Value("${redis.big-key.split-enabled:true}")
private boolean splitEnabled;
@Value("${redis.big-key.alert-enabled:true}")
private boolean alertEnabled;
@Value("${redis.big-key.notify-url}")
private String notifyUrl;
/**
* 检测大 Key
*/
public void detectBigKeys() {
log.info("开始检测大 Key");
try {
// 获取所有 Key
Set<String> keys = redisTemplate.keys("*");
if (keys == null || keys.isEmpty()) {
log.info("没有找到 Key");
return;
}
for (String key : keys) {
// 跳过元数据 Key
if (key.endsWith(":meta")) {
continue;
}
// 跳过拆分后的 Key
if (key.contains(":chunk:")) {
continue;
}
// 获取 Key 的大小
long size = getKeySize(key);
log.info("Key: {}, 大小: {} bytes", key, size);
// 检测是否为大 Key
if (size > threshold) {
log.warn("检测到大 Key: {}, 大小: {} bytes", key, size);
// 触发告警
if (alertEnabled) {
sendAlert(key, size);
}
// 自动拆分
if (splitEnabled) {
splitBigKey(key, size);
}
} else if (size > alertThreshold) {
log.warn("Key 接近阈值: {}, 大小: {} bytes", key, size);
// 触发告警
if (alertEnabled) {
sendAlert(key, size);
}
}
}
} catch (Exception e) {
log.error("检测大 Key 失败", e);
}
log.info("大 Key 检测完成");
}
/**
* 获取 Key 的大小
*/
private long getKeySize(String key) {
try {
// 获取 Key 的类型
String type = redisTemplate.type(key).getCode();
switch (type) {
case "string":
return redisTemplate.opsForValue().get(key) != null ? redisTemplate.opsForValue().get(key).toString().getBytes(StandardCharsets.UTF_8).length : 0;
case "list":
return redisTemplate.opsForList().size(key) * 100; // 估算大小
case "hash":
return redisTemplate.opsForHash().size(key) * 100; // 估算大小
case "set":
return redisTemplate.opsForSet().size(key) * 100; // 估算大小
case "zset":
return redisTemplate.opsForZSet().size(key) * 100; // 估算大小
default:
return 0;
}
} catch (Exception e) {
log.error("获取 Key 大小失败: {}", key, e);
return 0;
}
}
/**
* 拆分大 Key
*/
private void splitBigKey(String key, long size) {
try {
// 获取 Key 的类型
String type = redisTemplate.type(key).getCode();
switch (type) {
case "string":
String value = (String) redisTemplate.opsForValue().get(key);
if (value != null) {
redisBigKeySplitter.splitStringKey(key, value.getBytes(StandardCharsets.UTF_8));
}
break;
case "list":
List<String> listValues = redisTemplate.opsForList().range(key, 0, -1);
if (listValues != null) {
redisBigKeySplitter.splitListKey(key, listValues);
}
break;
case "hash":
Map<Object, Object> hashValues = redisTemplate.opsForHash().entries(key);
if (hashValues != null) {
Map<String, String> stringHashValues = new HashMap<>();
for (Map.Entry<Object, Object> entry : hashValues.entrySet()) {
stringHashValues.put(entry.getKey().toString(), entry.getValue().toString());
}
redisBigKeySplitter.splitHashKey(key, stringHashValues);
}
break;
case "set":
Set<Object> setValues = redisTemplate.opsForSet().members(key);
if (setValues != null) {
Set<String> stringSetValues = new HashSet<>();
for (Object value1 : setValues) {
stringSetValues.add(value1.toString());
}
redisBigKeySplitter.splitSetKey(key, stringSetValues);
}
break;
case "zset":
Set<ZSetOperations.TypedTuple<Object>> zsetValues = redisTemplate.opsForZSet().rangeWithScores(key, 0, -1);
if (zsetValues != null) {
Set<ZSetOperations.TypedTuple<String>> stringZSetValues = new HashSet<>();
for (ZSetOperations.TypedTuple<Object> value1 : zsetValues) {
stringZSetValues.add(new DefaultTypedTuple<>(value1.getValue().toString(), value1.getScore()));
}
redisBigKeySplitter.splitZSetKey(key, stringZSetValues);
}
break;
default:
log.warn("不支持的 Key 类型: {}", type);
}
} catch (Exception e) {
log.error("拆分大 Key 失败: {}", key, e);
}
}
/**
* 发送告警
*/
private void sendAlert(String key, long size) {
try {
// 构建告警消息
String message = String.format("【Redis 大 Key 告警】\nKey: %s\n大小: %s bytes (%.2f MB)\n阈值: %s bytes (%.2f MB)",
key, size, size / (1024.0 * 1024.0), threshold, threshold / (1024.0 * 1024.0));
// 发送告警
if (StringUtils.isNotBlank(notifyUrl)) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(notifyUrl))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString("{\"msgtype\":\"text\",\"text\":{\"content\":\"" + message + "\"}}"))
.build();
client.send(request, HttpResponse.BodyHandlers.ofString());
log.info("告警发送成功: {}", key);
} else {
log.info("告警地址未配置,仅记录日志: {}", message);
}
} catch (Exception e) {
log.error("发送告警失败: {}", key, e);
}
}
}
3.5 定时任务
@Component
public class RedisBigKeyCheckTask {
@Autowired
private RedisBigKeyDetector redisBigKeyDetector;
@Value("${redis.big-key.check-interval:60000}")
private long checkInterval;
@PostConstruct
public void init() {
// 启动定时任务
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(() -> {
try {
redisBigKeyDetector.detectBigKeys();
} catch (Exception e) {
e.printStackTrace();
}
}, 0, checkInterval, TimeUnit.MILLISECONDS);
}
}
3.6 大 Key 访问封装
@Service
public class RedisBigKeyService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 获取 String 类型的值
*/
public String getStringValue(String key) {
// 检查是否为拆分后的 Key
if (redisTemplate.hasKey(key + ":meta")) {
// 获取元数据
String type = (String) redisTemplate.opsForHash().get(key + ":meta", "type");
if ("string".equals(type)) {
// 获取拆分块数
String chunksStr = (String) redisTemplate.opsForHash().get(key + ":meta", "chunks");
int chunks = Integer.parseInt(chunksStr);
// 拼接所有块
StringBuilder sb = new StringBuilder();
for (int i = 0; i < chunks; i++) {
String chunk = (String) redisTemplate.opsForValue().get(key + ":chunk:" + i);
if (chunk != null) {
sb.append(chunk);
}
}
return sb.toString();
}
}
// 原始 Key
return (String) redisTemplate.opsForValue().get(key);
}
/**
* 设置 String 类型的值
*/
public void setStringValue(String key, String value) {
// 检查值大小
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
if (bytes.length > 10485760) { // 10MB
// 自动拆分
RedisBigKeySplitter splitter = new RedisBigKeySplitter();
splitter.splitStringKey(key, bytes);
} else {
// 直接存储
redisTemplate.opsForValue().set(key, value);
}
}
/**
* 获取 List 类型的值
*/
public List<String> getListValue(String key) {
// 检查是否为拆分后的 Key
if (redisTemplate.hasKey(key + ":meta")) {
// 获取元数据
String type = (String) redisTemplate.opsForHash().get(key + ":meta", "type");
if ("list".equals(type)) {
// 获取拆分块数
String chunksStr = (String) redisTemplate.opsForHash().get(key + ":meta", "chunks");
int chunks = Integer.parseInt(chunksStr);
// 拼接所有块
List<String> result = new ArrayList<>();
for (int i = 0; i < chunks; i++) {
List<String> chunk = redisTemplate.opsForList().range(key + ":chunk:" + i, 0, -1);
if (chunk != null) {
result.addAll(chunk);
}
}
return result;
}
}
// 原始 Key
return redisTemplate.opsForList().range(key, 0, -1);
}
/**
* 添加 List 类型的值
*/
public void addListValue(String key, String value) {
// 检查是否为拆分后的 Key
if (redisTemplate.hasKey(key + ":meta")) {
// 获取元数据
String type = (String) redisTemplate.opsForHash().get(key + ":meta", "type");
if ("list".equals(type)) {
// 获取拆分块数
String chunksStr = (String) redisTemplate.opsForHash().get(key + ":meta", "chunks");
int chunks = Integer.parseInt(chunksStr);
// 添加到最后一个块
redisTemplate.opsForList().rightPush(key + ":chunk:" + (chunks - 1), value);
return;
}
}
// 原始 Key
redisTemplate.opsForList().rightPush(key, value);
}
// 其他类型的操作方法...
}
四、自动检测告警实现
4.1 监控配置
# 监控配置
management:
endpoints:
web:
exposure:
include: health,info,prometheus
# Prometheus 配置
micrometer:
prometheus:
enabled: true
4.2 自定义指标
@Component
public class RedisBigKeyMetrics {
@Autowired
private MeterRegistry meterRegistry;
private Counter bigKeyCounter;
@PostConstruct
public void init() {
// 初始化大 Key 计数器
bigKeyCounter = Counter.builder("redis.big.key.count")
.description("Redis 大 Key 数量")
.tag("type", "big_key")
.register(meterRegistry);
}
/**
* 增加大 Key 计数
*/
public void incrementBigKeyCount() {
bigKeyCounter.increment();
}
/**
* 记录大 Key 大小
*/
public void recordBigKeySize(String key, long size) {
Gauge.builder("redis.big.key.size", size)
.description("Redis 大 Key 大小")
.tag("key", key)
.register(meterRegistry);
}
}
4.3 告警配置
在 Prometheus 中配置告警规则:
groups:
- name: redis_big_key_alerts
rules:
- alert: RedisBigKeyDetected
expr: redis_big_key_count > 0
for: 1m
labels:
severity: warning
annotations:
summary: "Redis 大 Key 检测"
description: "检测到 Redis 大 Key,请及时处理"
- alert: RedisBigKeySizeExceeded
expr: redis_big_key_size > 10485760
for: 1m
labels:
severity: critical
annotations:
summary: "Redis 大 Key 大小超限"
description: "Redis Key {{ $labels.key }} 大小超过 10MB,请及时处理"
4.4 企业微信告警
@Component
public class WechatNotifier {
@Value("${redis.big-key.notify-url}")
private String notifyUrl;
/**
* 发送企业微信告警
*/
public void sendWechatNotification(String message) {
try {
if (StringUtils.isNotBlank(notifyUrl)) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(notifyUrl))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString("{\"msgtype\":\"text\",\"text\":{\"content\":\"" + message + "\"}}"))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
log.info("企业微信告警发送成功: {}", response.body());
}
} catch (Exception e) {
log.error("发送企业微信告警失败", e);
}
}
}
五、Spring Boot 完整实现
5.1 项目结构
redis-big-key-demo/
├── src/
│ ├── main/
│ │ ├── java/com/example/redis/ # 源代码
│ │ │ ├── config/ # 配置类
│ │ │ ├── service/ # 服务类
│ │ │ ├── util/ # 工具类
│ │ │ ├── task/ # 定时任务
│ │ │ └── RedisBigKeyDemoApplication.java # 应用入口
│ │ └── resources/ # 配置文件
│ └── test/ # 测试代码
└── pom.xml # Maven 依赖
5.2 核心配置
spring:
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 10000
lettuce:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
# 大 Key 配置
redis:
big-key:
threshold: 10485760 # 10MB,单位字节
check-interval: 60000 # 检查间隔,单位毫秒
split-enabled: true # 是否启用自动拆分
alert-enabled: true # 是否启用告警
alert-threshold: 5242880 # 告警阈值,5MB
notify-url: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key" # 企业微信告警地址
# 监控配置
management:
endpoints:
web:
exposure:
include: health,info,prometheus
# Prometheus 配置
micrometer:
prometheus:
enabled: true
5.3 核心代码
5.3.1 Redis 配置
@Configuration
@EnableRedisRepositories
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
template.afterPropertiesSet();
return template;
}
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
5.3.2 大 Key 拆分工具类
@Slf4j
@Component
public class RedisBigKeySplitter {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 拆分 String 类型的大 Key
*/
public void splitStringKey(String key, byte[] value) {
log.info("开始拆分 String 类型的大 Key: {}", key);
// 计算拆分数量
int chunkSize = 1024 * 1024; // 1MB 每块
int totalChunks = (value.length + chunkSize - 1) / chunkSize;
// 存储拆分后的数据
for (int i = 0; i < totalChunks; i++) {
int start = i * chunkSize;
int end = Math.min(start + chunkSize, value.length);
byte[] chunk = Arrays.copyOfRange(value, start, end);
redisTemplate.opsForValue().set(key + ":chunk:" + i, new String(chunk, StandardCharsets.UTF_8));
}
// 存储元数据
redisTemplate.opsForHash().put(key + ":meta", "type", "string");
redisTemplate.opsForHash().put(key + ":meta", "chunks", String.valueOf(totalChunks));
redisTemplate.opsForHash().put(key + ":meta", "originalSize", String.valueOf(value.length));
// 删除原始大 Key
redisTemplate.delete(key);
log.info("String 类型的大 Key 拆分完成: {}, 拆分为 {} 块", key, totalChunks);
}
// 其他类型的拆分方法...
}
5.3.3 大 Key 检测服务
@Service
@Slf4j
public class RedisBigKeyDetector {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedisBigKeySplitter redisBigKeySplitter;
@Autowired
private RedisBigKeyMetrics redisBigKeyMetrics;
@Autowired
private WechatNotifier wechatNotifier;
@Value("${redis.big-key.threshold:10485760}")
private long threshold;
@Value("${redis.big-key.alert-threshold:5242880}")
private long alertThreshold;
@Value("${redis.big-key.split-enabled:true}")
private boolean splitEnabled;
@Value("${redis.big-key.alert-enabled:true}")
private boolean alertEnabled;
/**
* 检测大 Key
*/
public void detectBigKeys() {
log.info("开始检测大 Key");
try {
// 获取所有 Key
Set<String> keys = redisTemplate.keys("*");
if (keys == null || keys.isEmpty()) {
log.info("没有找到 Key");
return;
}
for (String key : keys) {
// 跳过元数据 Key
if (key.endsWith(":meta")) {
continue;
}
// 跳过拆分后的 Key
if (key.contains(":chunk:")) {
continue;
}
// 获取 Key 的大小
long size = getKeySize(key);
log.info("Key: {}, 大小: {} bytes", key, size);
// 检测是否为大 Key
if (size > threshold) {
log.warn("检测到大 Key: {}, 大小: {} bytes", key, size);
// 增加大 Key 计数
redisBigKeyMetrics.incrementBigKeyCount();
// 记录大 Key 大小
redisBigKeyMetrics.recordBigKeySize(key, size);
// 触发告警
if (alertEnabled) {
sendAlert(key, size);
}
// 自动拆分
if (splitEnabled) {
splitBigKey(key, size);
}
} else if (size > alertThreshold) {
log.warn("Key 接近阈值: {}, 大小: {} bytes", key, size);
// 触发告警
if (alertEnabled) {
sendAlert(key, size);
}
}
}
} catch (Exception e) {
log.error("检测大 Key 失败", e);
}
log.info("大 Key 检测完成");
}
// 其他方法...
/**
* 发送告警
*/
private void sendAlert(String key, long size) {
try {
// 构建告警消息
String message = String.format("【Redis 大 Key 告警】\nKey: %s\n大小: %s bytes (%.2f MB)\n阈值: %s bytes (%.2f MB)",
key, size, size / (1024.0 * 1024.0), threshold, threshold / (1024.0 * 1024.0));
// 发送企业微信告警
wechatNotifier.sendWechatNotification(message);
} catch (Exception e) {
log.error("发送告警失败: {}", key, e);
}
}
}
5.3.4 定时任务
@Component
public class RedisBigKeyCheckTask {
@Autowired
private RedisBigKeyDetector redisBigKeyDetector;
@Value("${redis.big-key.check-interval:60000}")
private long checkInterval;
@PostConstruct
public void init() {
// 启动定时任务
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(() -> {
try {
redisBigKeyDetector.detectBigKeys();
} catch (Exception e) {
e.printStackTrace();
}
}, 0, checkInterval, TimeUnit.MILLISECONDS);
}
}
5.3.5 大 Key 访问封装
@Service
public class RedisBigKeyService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 获取 String 类型的值
*/
public String getStringValue(String key) {
// 检查是否为拆分后的 Key
if (redisTemplate.hasKey(key + ":meta")) {
// 获取元数据
String type = (String) redisTemplate.opsForHash().get(key + ":meta", "type");
if ("string".equals(type)) {
// 获取拆分块数
String chunksStr = (String) redisTemplate.opsForHash().get(key + ":meta", "chunks");
int chunks = Integer.parseInt(chunksStr);
// 拼接所有块
StringBuilder sb = new StringBuilder();
for (int i = 0; i < chunks; i++) {
String chunk = (String) redisTemplate.opsForValue().get(key + ":chunk:" + i);
if (chunk != null) {
sb.append(chunk);
}
}
return sb.toString();
}
}
// 原始 Key
return (String) redisTemplate.opsForValue().get(key);
}
/**
* 设置 String 类型的值
*/
public void setStringValue(String key, String value) {
// 检查值大小
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
if (bytes.length > 10485760) { // 10MB
// 自动拆分
RedisBigKeySplitter splitter = new RedisBigKeySplitter();
splitter.splitStringKey(key, bytes);
} else {
// 直接存储
redisTemplate.opsForValue().set(key, value);
}
}
// 其他类型的操作方法...
}
六、最佳实践
6.1 大 Key 预防最佳实践
原则:
- 合理设计 Key:避免使用过大的 Key,合理设计 Key 的结构
- 数据分片:将大数据分片存储,避免单个 Key 过大
- 定期清理:定期清理过期数据,避免数据积累
- 监控预警:建立监控预警机制,及时发现大 Key
建议:
- 使用 Redis 集群,分散数据存储
- 对大型数据使用分片存储,如使用前缀 + 序号的方式
- 定期检查 Redis Key 的大小,发现大 Key 及时处理
- 使用 Redis 的过期机制,自动清理过期数据
6.2 大 Key 拆分最佳实践
原则:
- 类型适配:根据 Key 的类型选择合适的拆分策略
- 粒度适中:拆分粒度要适中,避免过多的小 Key
- 元数据管理:合理管理拆分后的元数据,便于数据访问
- 向后兼容:保持接口的向后兼容,避免影响现有代码
建议:
- String 类型:按 1MB 大小拆分
- List 类型:按 1000 个元素拆分
- Hash 类型:按 1000 个字段拆分
- Set 类型:按 1000 个元素拆分
- ZSet 类型:按 1000 个元素拆分
6.3 监控告警最佳实践
原则:
- 实时监控:实时监控 Redis Key 的大小
- 阈值合理:设置合理的告警阈值
- 多渠道告警:使用多种告警渠道,确保告警及时送达
- 自动处理:对于大 Key,实现自动拆分处理
建议:
- 使用 Prometheus + Grafana 监控 Redis Key 大小
- 设置两级告警阈值:预警阈值和处理阈值
- 使用企业微信、钉钉等渠道发送告警
- 实现大 Key 的自动拆分处理
6.4 性能优化最佳实践
原则:
- 减少网络传输:减少大 Key 的网络传输
- 避免阻塞操作:避免对大 Key 进行阻塞操作
- 优化存储结构:优化数据存储结构,减少内存占用
- 合理使用数据类型:根据业务场景选择合适的数据类型
建议:
- 使用 Redis Pipeline 批量操作,减少网络往返
- 对大 Key 的操作使用异步方式,避免阻塞主线程
- 使用压缩算法,减少数据大小
- 合理使用 Redis 数据类型,如使用 Hash 存储对象
七、总结
Redis 大 Key 是一个常见的性能瓶颈,会导致内存占用不均衡、网络传输缓慢、阻塞操作、过期删除卡顿等问题。通过本文的实现方案,开发者可以构建一个功能强大的 Redis 大 Key 管理系统,帮助团队更好地管理 Redis 数据,避免大 Key 带来的性能问题。
互动话题:
- 你在实际项目中遇到过大 Key 问题吗?是如何解决的?
- 你认为大 Key 拆分的最佳粒度是多少?
- 你有使用过类似的工具或方案吗?
欢迎在评论区留言讨论!更多技术文章,欢迎关注公众号:服务端技术精选
标题:SpringBoot + Redis 大 Key 拆分 + 自动检测告警:单个 Key 超 10MB?自动拆分防阻塞
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/04/11/1775830865099.html
公众号:服务端技术精选
- 前言
- 一、核心概念
- 1.1 大 Key
- 1.2 大 Key 的危害
- 1.3 大 Key 拆分策略
- 1.4 大 Key 检测方法
- 1.5 告警机制
- 二、技术方案
- 2.1 架构设计
- 2.2 技术选型
- 2.3 核心流程
- 三、Spring Boot Redis 大 Key 拆分实现
- 3.1 依赖配置
- 3.2 配置文件
- 3.3 大 Key 拆分工具类
- 3.4 大 Key 检测服务
- 3.5 定时任务
- 3.6 大 Key 访问封装
- 四、自动检测告警实现
- 4.1 监控配置
- 4.2 自定义指标
- 4.3 告警配置
- 4.4 企业微信告警
- 五、Spring Boot 完整实现
- 5.1 项目结构
- 5.2 核心配置
- 5.3 核心代码
- 5.3.1 Redis 配置
- 5.3.2 大 Key 拆分工具类
- 5.3.3 大 Key 检测服务
- 5.3.4 定时任务
- 5.3.5 大 Key 访问封装
- 六、最佳实践
- 6.1 大 Key 预防最佳实践
- 6.2 大 Key 拆分最佳实践
- 6.3 监控告警最佳实践
- 6.4 性能优化最佳实践
- 七、总结
评论
0 评论