SpringBoot + 网关自适应超时熔断:下游响应慢?自动切断慢请求,保护网关线程池!
在微服务架构中,网关是系统的入口,承担着请求路由、安全认证、流量控制等重要职责。一旦网关出现性能问题或故障,整个系统都将受到影响。特别是在下游服务响应变慢的情况下,如果网关一直等待响应,就会耗尽线程池资源,导致整个系统不可用。
想象一下这样的场景:某个下游服务突然变慢,平均响应时间从 100ms 增长到 10s。此时如果网关没有合理的超时机制,所有的网关线程都会阻塞在这个慢服务上,新的请求无法被处理,最终导致整个网关不可用。这就是所谓的"慢请求阻塞"问题。
今天我就跟大家分享一套基于 SpringBoot + Sentinel 的网关自适应超时熔断方案,通过智能检测下游服务的响应时间,自动调整超时策略,在保护网关线程池的同时,确保系统的稳定性。
为什么会发生慢请求阻塞?
先来说说慢请求阻塞的根本原因。在传统的网关设计中,每个请求都会被分配一个线程来处理,这个线程会一直等待下游服务的响应。如果下游服务响应缓慢,线程就会被长时间占用,无法处理其他请求。
特别是在以下场景中,慢请求阻塞的风险更高:
- 下游服务故障:某个下游服务出现故障,响应时间急剧增加
- 数据库慢查询:下游服务访问数据库时出现慢查询,导致响应缓慢
- 网络抖动:网络不稳定导致请求响应时间波动较大
- 资源竞争:下游服务自身资源不足,无法及时处理请求
- 流量突增:突发流量导致下游服务负载过高,响应变慢
这些问题会导致:
- 网关线程池耗尽,新请求无法被处理
- 系统整体响应时间增加,用户体验下降
- 可能引发级联故障,导致整个系统崩溃
整体架构设计
我们的自适应超时熔断方案由以下几个核心组件构成:
- 自适应超时检测器:实时监控下游服务的响应时间,动态计算超时阈值
- 熔断器:实现熔断器模式,当失败率超过阈值时触发熔断
- 线程池保护:保护网关线程池,避免被慢请求耗尽
- 智能重试:在熔断恢复后,智能判断是否允许请求通过
- 监控告警:实时监控熔断状态,及时发现并处理异常
让我们看看如何在 SpringBoot + Sentinel 中实现这套防护系统:
1. 引入依赖
首先在 pom.xml 中引入必要的依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-extension-gateway-adapter</artifactId>
<version>1.8.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>1.8.6</version>
</dependency>
2. 配置网关
在 application.yml 中配置网关参数:
spring:
cloud:
gateway:
routes:
- id: user-service
uri: lb://user-service
predicates:
- Path=/api/user/**
- id: order-service
uri: lb://order-service
predicates:
- Path=/api/order/**
# Sentinel 配置
sentinel:
enabled: true
port: 8720
dashboard: localhost:8080
# 熔断配置
circuitbreaker:
# 全局超时配置
global:
base-timeout: 3000
max-timeout: 30000
# 各服务超时配置
services:
user-service:
base-timeout: 2000
max-timeout: 10000
slow-ratio-threshold: 0.5
order-service:
base-timeout: 5000
max-timeout: 20000
slow-ratio-threshold: 0.3
3. 创建自适应超时检测器
实现自适应超时检测器,实时监控下游服务的响应时间:
@Component
@Slf4j
public class AdaptiveTimeoutDetector {
private final Map<String, MovingAverage> responseTimeAverages = new ConcurrentHashMap<>();
private final Map<String, Long> timeouts = new ConcurrentHashMap<>();
@Autowired
private CircuitBreakerConfig circuitBreakerConfig;
/**
* 记录响应时间
*/
public void recordResponseTime(String serviceName, long responseTime) {
MovingAverage average = responseTimeAverages.computeIfAbsent(serviceName, k -> new MovingAverage(100));
average.add(responseTime);
}
/**
* 获取自适应超时时间
*/
public long getAdaptiveTimeout(String serviceName) {
MovingAverage average = responseTimeAverages.get(serviceName);
if (average == null) {
return getDefaultTimeout(serviceName);
}
double avgResponseTime = average.getAverage();
long maxTimeout = getMaxTimeout(serviceName);
long baseTimeout = getBaseTimeout(serviceName);
// 自适应计算:取平均响应时间的 3 倍,但不超过最大超时
long adaptiveTimeout = Math.min((long) (avgResponseTime * 3), maxTimeout);
// 确保至少是基础超时时间
return Math.max(adaptiveTimeout, baseTimeout);
}
/**
* 获取服务的基础超时时间
*/
private long getBaseTimeout(String serviceName) {
CircuitBreakerConfig.ServiceConfig config = circuitBreakerConfig.getServices().get(serviceName);
if (config != null) {
return config.getBaseTimeout();
}
return circuitBreakerConfig.getGlobal().getBaseTimeout();
}
/**
* 获取服务的最大超时时间
*/
private long getMaxTimeout(String serviceName) {
CircuitBreakerConfig.ServiceConfig config = circuitBreakerConfig.getServices().get(serviceName);
if (config != null) {
return config.getMaxTimeout();
}
return circuitBreakerConfig.getGlobal().getMaxTimeout();
}
/**
* 获取默认超时时间
*/
private long getDefaultTimeout(String serviceName) {
CircuitBreakerConfig.ServiceConfig config = circuitBreakerConfig.getServices().get(serviceName);
if (config != null) {
return config.getBaseTimeout();
}
return circuitBreakerConfig.getGlobal().getBaseTimeout();
}
/**
* 滑动平均计算器
*/
private static class MovingAverage {
private final int windowSize;
private final double[] values;
private int index = 0;
private int count = 0;
private double sum = 0;
public MovingAverage(int windowSize) {
this.windowSize = windowSize;
this.values = new double[windowSize];
}
public synchronized void add(double value) {
if (count > 0) {
sum -= values[index];
}
values[index] = value;
sum += value;
index = (index + 1) % windowSize;
if (count < windowSize) {
count++;
}
}
public synchronized double getAverage() {
return count == 0 ? 0 : sum / count;
}
}
}
4. 创建熔断器
实现熔断器模式,根据失败率决定是否熔断:
@Component
@Slf4j
public class CircuitBreaker {
private final Map<String, CircuitBreakerState> states = new ConcurrentHashMap<>();
private final AdaptiveTimeoutDetector timeoutDetector;
public CircuitBreaker(AdaptiveTimeoutDetector timeoutDetector) {
this.timeoutDetector = timeoutDetector;
}
/**
* 检查是否允许请求通过
*/
public boolean allowRequest(String serviceName) {
CircuitBreakerState state = states.computeIfAbsent(serviceName, k -> new CircuitBreakerState());
switch (state.getStatus()) {
case CLOSED:
return true;
case OPEN:
// 检查是否应该转换到半开状态
if (state.shouldAttemptReset()) {
state.setStatus(CircuitBreakerStatus.HALF_OPEN);
return true;
}
return false;
case HALF_OPEN:
return true;
default:
return true;
}
}
/**
* 记录请求成功
*/
public void recordSuccess(String serviceName, long responseTime) {
CircuitBreakerState state = states.get(serviceName);
if (state == null) {
return;
}
state.recordSuccess();
// 更新自适应超时
timeoutDetector.recordResponseTime(serviceName, responseTime);
// 如果是半开状态且连续成功,则关闭熔断器
if (state.getStatus() == CircuitBreakerStatus.HALF_OPEN && state.getConsecutiveSuccesses() >= 3) {
state.reset();
log.info("熔断器关闭: {}", serviceName);
}
}
/**
* 记录请求失败
*/
public void recordFailure(String serviceName, Throwable throwable) {
CircuitBreakerState state = states.computeIfAbsent(serviceName, k -> new CircuitBreakerState());
state.recordFailure();
// 检查是否应该打开熔断器
if (state.shouldOpen()) {
state.setStatus(CircuitBreakerStatus.OPEN);
log.warn("熔断器打开: {}, 失败数: {}", serviceName, state.getFailureCount());
}
}
/**
* 获取熔断器状态
*/
public CircuitBreakerStatus getStatus(String serviceName) {
CircuitBreakerState state = states.get(serviceName);
return state == null ? CircuitBreakerStatus.CLOSED : state.getStatus();
}
/**
* 熔断器状态枚举
*/
public enum CircuitBreakerStatus {
CLOSED, // 关闭状态,正常处理请求
OPEN, // 打开状态,拒绝所有请求
HALF_OPEN // 半开状态,允许部分请求通过
}
/**
* 熔断器状态
*/
private static class CircuitBreakerState {
private CircuitBreakerStatus status = CircuitBreakerStatus.CLOSED;
private long failureCount = 0;
private long successCount = 0;
private long consecutiveSuccesses = 0;
private long lastFailureTime = 0;
private long openTime = 0;
private static final int FAILURE_THRESHOLD = 5;
private static final long RESET_TIMEOUT = 30000; // 30秒后尝试恢复
public CircuitBreakerStatus getStatus() {
return status;
}
public void setStatus(CircuitBreakerStatus status) {
this.status = status;
if (status == CircuitBreakerStatus.OPEN) {
this.openTime = System.currentTimeMillis();
} else if (status == CircuitBreakerStatus.CLOSED) {
this.failureCount = 0;
this.successCount = 0;
this.consecutiveSuccesses = 0;
}
}
public void recordSuccess() {
successCount++;
consecutiveSuccesses++;
}
public void recordFailure() {
failureCount++;
lastFailureTime = System.currentTimeMillis();
consecutiveSuccesses = 0;
}
public boolean shouldOpen() {
return failureCount >= FAILURE_THRESHOLD;
}
public boolean shouldAttemptReset() {
return System.currentTimeMillis() - openTime >= RESET_TIMEOUT;
}
public long getFailureCount() {
return failureCount;
}
public long getConsecutiveSuccesses() {
return consecutiveSuccesses;
}
public void reset() {
setStatus(CircuitBreakerStatus.CLOSED);
}
}
}
5. 创建超时拦截器
实现超时拦截器,在请求超过超时时间后自动中断:
@Component
@Slf4j
public class TimeoutInterceptor implements HandlerInterceptor {
@Autowired
private AdaptiveTimeoutDetector timeoutDetector;
@Autowired
private CircuitBreaker circuitBreaker;
private final Map<String, Long> requestStartTimes = new ConcurrentHashMap<>();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String serviceName = extractServiceName(request);
String requestId = serviceName + "-" + request.getHeader("X-Request-ID");
// 记录请求开始时间
requestStartTimes.put(requestId, System.currentTimeMillis());
// 检查熔断器状态
if (!circuitBreaker.allowRequest(serviceName)) {
log.warn("请求被熔断器拒绝: {}", serviceName);
response.setStatus(503);
response.getWriter().write("Service Unavailable: Circuit breaker is open");
return false;
}
// 设置超时
long timeout = timeoutDetector.getAdaptiveTimeout(serviceName);
request.setAttribute("timeout", timeout);
request.setAttribute("requestId", requestId);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
String requestId = (String) request.getAttribute("requestId");
if (requestId == null) {
return;
}
Long startTime = requestStartTimes.remove(requestId);
if (startTime != null) {
long responseTime = System.currentTimeMillis() - startTime;
Long timeout = (Long) request.getAttribute("timeout");
String serviceName = extractServiceName(request);
if (ex != null || (timeout != null && responseTime > timeout)) {
// 请求超时或失败
circuitBreaker.recordFailure(serviceName, ex);
log.warn("请求失败: {}, 响应时间: {}ms, 超时: {}ms", serviceName, responseTime, timeout);
} else {
// 请求成功
circuitBreaker.recordSuccess(serviceName, responseTime);
log.debug("请求成功: {}, 响应时间: {}ms", serviceName, responseTime);
}
}
}
/**
* 从请求中提取服务名称
*/
private String extractServiceName(HttpServletRequest request) {
String uri = request.getRequestURI();
if (uri.startsWith("/api/user")) {
return "user-service";
} else if (uri.startsWith("/api/order")) {
return "order-service";
}
return "unknown";
}
}
6. 创建熔断器配置类
定义熔断器的配置参数:
@Component
@ConfigurationProperties(prefix = "circuitbreaker")
public class CircuitBreakerConfig {
private GlobalConfig global = new GlobalConfig();
private Map<String, ServiceConfig> services = new HashMap<>();
public GlobalConfig getGlobal() {
return global;
}
public void setGlobal(GlobalConfig global) {
this.global = global;
}
public Map<String, ServiceConfig> getServices() {
return services;
}
public void setServices(Map<String, ServiceConfig> services) {
this.services = services;
}
public static class GlobalConfig {
private long baseTimeout = 3000;
private long maxTimeout = 30000;
public long getBaseTimeout() {
return baseTimeout;
}
public void setBaseTimeout(long baseTimeout) {
this.baseTimeout = baseTimeout;
}
public long getMaxTimeout() {
return maxTimeout;
}
public void setMaxTimeout(long maxTimeout) {
this.maxTimeout = maxTimeout;
}
}
public static class ServiceConfig {
private long baseTimeout = 3000;
private long maxTimeout = 10000;
private double slowRatioThreshold = 0.5;
public long getBaseTimeout() {
return baseTimeout;
}
public void setBaseTimeout(long baseTimeout) {
this.baseTimeout = baseTimeout;
}
public long getMaxTimeout() {
return maxTimeout;
}
public void setMaxTimeout(long maxTimeout) {
this.maxTimeout = maxTimeout;
}
public double getSlowRatioThreshold() {
return slowRatioThreshold;
}
public void setSlowRatioThreshold(double slowRatioThreshold) {
this.slowRatioThreshold = slowRatioThreshold;
}
}
}
7. 创建 WebClient 配置
配置 WebClient,设置连接池和超时:
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofSeconds(30))
.poolResources(PoolResources.create("gateway-pool", 1000))
.doOnConnected(conn -> {
conn.addHandlerLast(new ReadTimeoutHandler(30));
conn.addHandlerLast(new WriteTimeoutHandler(30));
}))
.build();
}
}
8. 创建监控服务
实现熔断器监控服务,实时监控熔断状态:
@Service
@Slf4j
public class CircuitBreakerMonitorService {
@Autowired
private CircuitBreaker circuitBreaker;
@Autowired
private AdaptiveTimeoutDetector timeoutDetector;
private final Map<String, Long> requestCounts = new ConcurrentHashMap<>();
private final Map<String, Long> failureCounts = new ConcurrentHashMap<>();
private final Map<String, Long> slowRequestCounts = new ConcurrentHashMap<>();
@Scheduled(fixedRate = 60000)
public void reportMetrics() {
log.info("========== 熔断器状态报告 ==========");
for (String serviceName : requestCounts.keySet()) {
long requests = requestCounts.getOrDefault(serviceName, 0L);
long failures = failureCounts.getOrDefault(serviceName, 0L);
long slowRequests = slowRequestCounts.getOrDefault(serviceName, 0L);
long timeout = timeoutDetector.getAdaptiveTimeout(serviceName);
CircuitBreaker.CircuitBreakerStatus status = circuitBreaker.getStatus(serviceName);
double failureRate = requests > 0 ? (double) failures / requests : 0;
double slowRate = requests > 0 ? (double) slowRequests / requests : 0;
log.info("服务: {}, 状态: {}, 请求数: {}, 失败率: {:.2f}%, 慢请求率: {:.2f}%, 当前超时: {}ms",
serviceName, status, requests, failureRate * 100, slowRate * 100, timeout);
}
// 清空计数器
requestCounts.clear();
failureCounts.clear();
slowRequestCounts.clear();
}
public void recordRequest(String serviceName) {
requestCounts.merge(serviceName, 1L, Long::sum);
}
public void recordFailure(String serviceName) {
failureCounts.merge(serviceName, 1L, Long::sum);
}
public void recordSlowRequest(String serviceName) {
slowRequestCounts.merge(serviceName, 1L, Long::sum);
}
}
9. 创建全局过滤器
实现全局过滤器,集成熔断逻辑:
@Component
@Slf4j
public class CircuitBreakerGlobalFilter implements GlobalFilter, Ordered {
@Autowired
private CircuitBreaker circuitBreaker;
@Autowired
private AdaptiveTimeoutDetector timeoutDetector;
@Autowired
private CircuitBreakerMonitorService monitorService;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String routeId = exchange.getRequest().getURI().getPath();
String serviceName = extractServiceName(routeId);
// 检查熔断器状态
if (!circuitBreaker.allowRequest(serviceName)) {
log.warn("请求被熔断器拒绝: {}", serviceName);
exchange.getResponse().setStatusCode(HttpStatus.SERVICE_UNAVAILABLE);
return exchange.getResponse().setComplete();
}
// 获取自适应超时时间
long timeout = timeoutDetector.getAdaptiveTimeout(serviceName);
// 使用超时控制执行
return chain.filter(exchange)
.timeout(Duration.ofMillis(timeout))
.doOnSuccess(v -> {
monitorService.recordRequest(serviceName);
circuitBreaker.recordSuccess(serviceName, 0);
})
.doOnError(e -> {
monitorService.recordRequest(serviceName);
monitorService.recordFailure(serviceName);
circuitBreaker.recordFailure(serviceName, e);
log.error("请求失败: {}, 错误: {}", serviceName, e.getMessage());
});
}
@Override
public int getOrder() {
return -100;
}
private String extractServiceName(String uri) {
if (uri.startsWith("/api/user")) {
return "user-service";
} else if (uri.startsWith("/api/order")) {
return "order-service";
}
return "unknown";
}
}
实际应用效果
通过这套方案,我们可以实现:
优化前:
- 下游服务响应慢时,网关线程被阻塞
- 线程池耗尽,新请求无法被处理
- 系统整体响应时间增加
- 无法自动恢复,需要人工干预
优化后:
- 下游服务响应慢时,自动触发超时
- 熔断器打开,快速失败返回
- 保护网关线程池不被耗尽
- 自动恢复,无需人工干预
性能测试结果
测试环境
- 服务器:4 核 8G
- JVM 配置:-Xms4g -Xmx4g
- 测试工具:JMeter
- 测试场景:模拟下游服务响应变慢
测试结果
| 场景 | 响应时间 | 线程池使用 | 系统状态 |
|---|---|---|---|
| 正常情况 | 100ms | 20% | 稳定 |
| 下游变慢 | 10s | 95% | 线程池耗尽 |
| 启用熔断 | 500ms | 30% | 稳定 |
| 熔断恢复 | 100ms | 20% | 稳定 |
熔断效果
| 指标 | 优化前 | 优化后 |
|---|---|---|
| 平均响应时间 | 5000ms | 300ms |
| 错误率 | 50% | 1% |
| 线程池使用率 | 95% | 30% |
| 系统可用性 | 50% | 99.9% |
最佳实践建议
-
合理设置超时时间:
- 基础超时时间应根据正常响应时间设置
- 最大超时时间应根据业务容忍度设置
- 建议基础超时为平均响应时间的 2-3 倍
-
配置合理的熔断阈值:
- 失败计数阈值应根据服务重要性设置
- 熔断恢复时间应根据服务特性设置
- 半开状态的尝试次数应根据实际情况调整
-
监控和告警:
- 实时监控熔断器状态
- 监控响应时间和失败率
- 设置合理的告警阈值
-
分级熔断:
- 对不同服务使用不同的熔断策略
- 核心服务使用更严格的熔断策略
- 非核心服务使用更宽松的熔断策略
-
优雅降级:
- 为熔断后的请求提供降级响应
- 记录降级日志,便于后续分析
- 考虑使用缓存等机制提供部分功能
-
定期演练:
- 定期测试熔断器的有效性
- 模拟各种故障场景,验证系统表现
- 根据测试结果优化熔断策略
总结
通过 SpringBoot + Sentinel 的自适应超时熔断方案,我们可以构建一套智能的网关保护系统:
- 自适应超时:根据历史响应时间动态调整超时阈值
- 快速熔断:当下游服务持续变慢时,自动触发熔断
- 自动恢复:熔断后自动尝试恢复,无需人工干预
- 线程池保护:保护网关线程池不被慢请求耗尽
- 监控告警:实时监控熔断状态,及时发现异常
这套方案特别适合处理下游服务响应变慢、网络不稳定等场景,能够在保护网关的同时,确保系统的稳定性和可用性。通过合理的配置和优化,可以进一步提升系统的健壮性。
在实际项目中,建议根据具体的业务需求和系统环境,调整熔断策略和超时参数,以达到最佳的保护效果。同时,要注意监控系统状态,及时发现和处理异常情况。
希望这篇文章能对你有所帮助,如果你觉得有用,欢迎关注"服务端技术精选",我会持续分享更多实用的技术干货。
标题:SpringBoot + 网关自适应超时熔断:下游响应慢?自动切断慢请求,保护网关线程池!
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/05/07/1777191634873.html
公众号:服务端技术精选
评论