Spring Cloud Gateway + 灰度发布流量录制 + 回放验证:新版本上线前用真实流量预演

前言

在现代应用开发中,版本发布是一个关键环节。每次新版本上线,都面临着各种风险,如功能异常、性能问题、兼容性问题等。为了降低这些风险,灰度发布成为了一种常用的发布策略。然而,即使采用了灰度发布,仍然可能会遇到一些问题,因为灰度发布只是将流量部分导入新版本,而无法完全覆盖所有可能的场景。

想象一下这样的场景:你的团队开发了一个新版本的服务,经过了严格的测试,然后通过灰度发布将10%的流量导入新版本。然而,上线后不久,你发现新版本在处理某些特定请求时出现了异常,导致用户体验受到影响。这是因为测试环境无法完全模拟生产环境的真实流量和场景,导致一些问题在测试阶段没有被发现。

流量录制和回放验证是解决这个问题的有效方案。通过录制生产环境的真实流量,然后在测试环境中回放这些流量来验证新版本的性能和稳定性,可以在正式上线前发现和解决潜在问题。本文将详细介绍如何在 Spring Cloud Gateway 中实现灰度发布、流量录制和回放验证功能。

一、核心概念

1.1 灰度发布

灰度发布是一种增量发布策略,通过将部分流量导入新版本,逐步验证新版本的稳定性和性能,然后根据验证结果决定是否扩大发布范围。灰度发布可以降低发布风险,确保系统的稳定性。

1.2 流量录制

流量录制是指在生产环境中记录真实的请求和响应数据,包括请求路径、请求参数、请求体、响应状态码、响应体等信息。这些录制的流量可以用于后续的回放验证。

1.3 流量回放验证

流量回放验证是指将录制的流量在测试环境中回放,验证新版本的服务在处理这些流量时的表现。通过对比回放结果和原始响应,可以发现新版本可能存在的问题。

1.4 为什么需要流量录制和回放验证

  • 真实场景模拟:录制的流量来自生产环境,能够真实反映用户的使用场景
  • 问题提前发现:在正式上线前发现和解决潜在问题
  • 性能评估:评估新版本的性能表现,确保不会出现性能退化
  • 回归测试:确保新版本不会破坏现有功能
  • 降低发布风险:通过充分的验证,降低灰度发布的风险

二、Spring Cloud Gateway 流量录制实现

2.1 依赖配置

<dependencies>
    <!-- Spring Cloud Gateway -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>

    <!-- Spring Boot Actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!-- Micrometer -->
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>

    <!-- Spring Boot Configuration Processor -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2.2 流量录制配置

2.2.1 配置文件

# 流量录制配置
traffic:
  recording:
    enabled: true
    path: "records"
    include-paths:
      - "/api/**"
    exclude-paths:
      - "/api/health"
      - "/api/metrics"

2.2.2 配置类

@Data
@ConfigurationProperties(prefix = "traffic.recording")
public class TrafficRecordingProperties {

    private boolean enabled = true;
    private String path = "records";
    private List<String> includePaths = new ArrayList<>();
    private List<String> excludePaths = new ArrayList<>();

}

2.3 流量录制过滤器

@Component
@Slf4j
public class TrafficRecordingGatewayFilterFactory extends AbstractGatewayFilterFactory<TrafficRecordingGatewayFilterFactory.Config> {

    @Autowired
    private TrafficRecordingProperties properties;

    private final ObjectMapper objectMapper = new ObjectMapper();

    public TrafficRecordingGatewayFilterFactory() {
        super(Config.class);
    }

    @Override
    public GatewayFilter apply(Config config) {
        return (exchange, chain) -> {
            if (!properties.isEnabled()) {
                return chain.filter(exchange);
            }

            // 获取请求路径
            String path = exchange.getRequest().getPath().value();

            // 检查是否需要录制
            if (!shouldRecord(path)) {
                return chain.filter(exchange);
            }

            // 记录请求信息
            TrafficRecord record = new TrafficRecord();
            record.setTimestamp(System.currentTimeMillis());
            record.setMethod(exchange.getRequest().getMethod().name());
            record.setPath(path);
            record.setHeaders(extractHeaders(exchange.getRequest().getHeaders()));
            record.setQueryParams(extractQueryParams(exchange.getRequest().getQueryParams()));

            // 记录请求体
            return DataBufferUtils.join(exchange.getRequest().getBody())
                    .flatMap(dataBuffer -> {
                        byte[] content = new byte[dataBuffer.readableByteCount()];
                        dataBuffer.read(content);
                        DataBufferUtils.release(dataBuffer);
                        record.setRequestBody(new String(content, StandardCharsets.UTF_8));

                        // 包装响应以记录响应信息
                        ServerHttpResponse originalResponse = exchange.getResponse();
                        ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
                            @Override
                            public Mono<ServerHttpResponse> writeWith(Publisher<? extends DataBuffer> body) {
                                if (body instanceof Flux) {
                                    Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
                                    return super.writeWith(fluxBody.map(dataBuffer -> {
                                        byte[] responseContent = new byte[dataBuffer.readableByteCount()];
                                        dataBuffer.read(responseContent);
                                        DataBufferUtils.release(dataBuffer);
                                        record.setResponseBody(new String(responseContent, StandardCharsets.UTF_8));
                                        record.setStatusCode(originalResponse.getStatusCode().value());
                                        
                                        // 保存记录
                                        saveRecord(record);
                                        
                                        return originalResponse.bufferFactory().wrap(responseContent);
                                    }));
                                }
                                return super.writeWith(body);
                            }
                        };

                        return chain.filter(exchange.mutate().response(decoratedResponse).build());
                    });
        };
    }

    private boolean shouldRecord(String path) {
        // 检查是否在排除路径中
        for (String excludePath : properties.getExcludePaths()) {
            if (path.matches(excludePath.replace("**", ".*"))) {
                return false;
            }
        }

        // 检查是否在包含路径中
        for (String includePath : properties.getIncludePaths()) {
            if (path.matches(includePath.replace("**", ".*"))) {
                return true;
            }
        }

        return false;
    }

    private Map<String, String> extractHeaders(HttpHeaders headers) {
        Map<String, String> headerMap = new HashMap<>();
        for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
            headerMap.put(entry.getKey(), String.join(", ", entry.getValue()));
        }
        return headerMap;
    }

    private Map<String, String> extractQueryParams(MultiValueMap<String, String> queryParams) {
        Map<String, String> queryParamMap = new HashMap<>();
        for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
            queryParamMap.put(entry.getKey(), String.join(", ", entry.getValue()));
        }
        return queryParamMap;
    }

    private void saveRecord(TrafficRecord record) {
        try {
            // 创建录制目录
            File dir = new File(properties.getPath());
            if (!dir.exists()) {
                dir.mkdirs();
            }

            // 生成文件名
            String fileName = String.format("record-%d.json", record.getTimestamp());
            File file = new File(dir, fileName);

            // 写入文件
            objectMapper.writeValue(file, record);
            log.info("Traffic record saved: {}", file.getAbsolutePath());
        } catch (Exception e) {
            log.error("Failed to save traffic record", e);
        }
    }

    public static class Config {
        // 配置类
    }

    @Data
    private static class TrafficRecord {
        private long timestamp;
        private String method;
        private String path;
        private Map<String, String> headers;
        private Map<String, String> queryParams;
        private String requestBody;
        private int statusCode;
        private String responseBody;
    }

}

三、流量回放验证实现

3.1 流量回放配置

3.1.1 配置文件

# 流量回放配置
traffic:
  replay:
    enabled: true
    path: "records"
    target-uri: "http://localhost:8081"
    compare-responses: true

3.1.2 配置类

@Data
@ConfigurationProperties(prefix = "traffic.replay")
public class TrafficReplayProperties {

    private boolean enabled = true;
    private String path = "records";
    private String targetUri = "http://localhost:8081";
    private boolean compareResponses = true;

}

3.2 流量回放服务

@Service
@Slf4j
public class TrafficReplayService {

    @Autowired
    private TrafficReplayProperties properties;

    private final RestTemplate restTemplate = new RestTemplate();
    private final ObjectMapper objectMapper = new ObjectMapper();

    public void replayTraffic() {
        if (!properties.isEnabled()) {
            log.info("Traffic replay is disabled");
            return;
        }

        try {
            // 读取录制的流量
            File dir = new File(properties.getPath());
            if (!dir.exists()) {
                log.warn("No traffic records found at: {}", properties.getPath());
                return;
            }

            File[] files = dir.listFiles((dir1, name) -> name.endsWith(".json"));
            if (files == null || files.length == 0) {
                log.warn("No traffic records found at: {}", properties.getPath());
                return;
            }

            // 回放流量
            for (File file : files) {
                replayRecord(file);
            }
        } catch (Exception e) {
            log.error("Failed to replay traffic", e);
        }
    }

    private void replayRecord(File file) {
        try {
            // 读取记录
            TrafficRecord record = objectMapper.readValue(file, TrafficRecord.class);
            log.info("Replaying traffic record: {} {}", record.getMethod(), record.getPath());

            // 构建请求
            HttpHeaders headers = new HttpHeaders();
            if (record.getHeaders() != null) {
                for (Map.Entry<String, String> entry : record.getHeaders().entrySet()) {
                    headers.add(entry.getKey(), entry.getValue());
                }
            }

            HttpEntity<String> requestEntity = new HttpEntity<>(record.getRequestBody(), headers);
            URI uri = buildUri(record.getPath(), record.getQueryParams());

            // 发送请求
            ResponseEntity<String> responseEntity = restTemplate.exchange(
                    uri, 
                    HttpMethod.valueOf(record.getMethod()), 
                    requestEntity, 
                    String.class
            );

            // 比较响应
            if (properties.isCompareResponses()) {
                compareResponses(record, responseEntity);
            }

            log.info("Traffic record replayed successfully: {} {}", record.getMethod(), record.getPath());
        } catch (Exception e) {
            log.error("Failed to replay traffic record: {}", file.getName(), e);
        }
    }

    private URI buildUri(String path, Map<String, String> queryParams) throws URISyntaxException {
        UriComponentsBuilder builder = UriComponentsBuilder.fromUriString(properties.getTargetUri() + path);
        if (queryParams != null) {
            for (Map.Entry<String, String> entry : queryParams.entrySet()) {
                builder.queryParam(entry.getKey(), entry.getValue());
            }
        }
        return builder.build().toUri();
    }

    private void compareResponses(TrafficRecord record, ResponseEntity<String> responseEntity) {
        // 比较状态码
        if (record.getStatusCode() != responseEntity.getStatusCodeValue()) {
            log.warn("Status code mismatch: expected {}, actual {}", record.getStatusCode(), responseEntity.getStatusCodeValue());
        }

        // 比较响应体
        if (!record.getResponseBody().equals(responseEntity.getBody())) {
            log.warn("Response body mismatch for {} {}", record.getMethod(), record.getPath());
            log.debug("Expected: {}", record.getResponseBody());
            log.debug("Actual: {}", responseEntity.getBody());
        }
    }

    @Data
    private static class TrafficRecord {
        private long timestamp;
        private String method;
        private String path;
        private Map<String, String> headers;
        private Map<String, String> queryParams;
        private String requestBody;
        private int statusCode;
        private String responseBody;
    }

}

四、灰度发布实现

4.1 灰度发布配置

4.1.1 配置文件

# 灰度发布配置
gray:
  release:
    enabled: true
    routes:
      - id: v1-route
        uri: http://localhost:8081
        predicates:
          - Path=/api/**
          - Header=X-User-Id, [1-100]
        filters:
          - StripPrefix=1
      - id: v2-route
        uri: http://localhost:8082
        predicates:
          - Path=/api/**
        filters:
          - StripPrefix=1

4.1.2 配置类

@Data
@ConfigurationProperties(prefix = "gray.release")
public class GrayReleaseProperties {

    private boolean enabled = true;
    private List<GrayRoute> routes = new ArrayList<>();

    @Data
    public static class GrayRoute {
        private String id;
        private String uri;
        private List<String> predicates;
        private List<String> filters;
    }

}

4.2 灰度发布过滤器

@Component
@Slf4j
public class GrayReleaseGatewayFilterFactory extends AbstractGatewayFilterFactory<GrayReleaseGatewayFilterFactory.Config> {

    @Autowired
    private GrayReleaseProperties properties;

    public GrayReleaseGatewayFilterFactory() {
        super(Config.class);
    }

    @Override
    public GatewayFilter apply(Config config) {
        return (exchange, chain) -> {
            if (!properties.isEnabled()) {
                return chain.filter(exchange);
            }

            // 获取请求路径
            String path = exchange.getRequest().getPath().value();

            // 查找匹配的灰度路由
            GrayReleaseProperties.GrayRoute grayRoute = findMatchingGrayRoute(path, exchange);

            if (grayRoute != null) {
                // 重定向到灰度版本
                URI uri = null;
                try {
                    uri = new URI(grayRoute.getUri() + path);
                } catch (URISyntaxException e) {
                    log.error("Invalid gray route URI: {}", grayRoute.getUri(), e);
                    return chain.filter(exchange);
                }

                ServerHttpRequest request = exchange.getRequest().mutate()
                        .uri(uri)
                        .build();

                log.info("Gray release route: {} -> {}", path, grayRoute.getUri());
                return chain.filter(exchange.mutate().request(request).build());
            }

            // 使用默认路由
            return chain.filter(exchange);
        };
    }

    private GrayReleaseProperties.GrayRoute findMatchingGrayRoute(String path, ServerWebExchange exchange) {
        for (GrayReleaseProperties.GrayRoute route : properties.getRoutes()) {
            if (matchesPredicates(route.getPredicates(), path, exchange)) {
                return route;
            }
        }
        return null;
    }

    private boolean matchesPredicates(List<String> predicates, String path, ServerWebExchange exchange) {
        for (String predicate : predicates) {
            if (predicate.startsWith("Path=")) {
                String pattern = predicate.substring(5);
                if (!path.matches(pattern.replace("**", ".*"))) {
                    return false;
                }
            } else if (predicate.startsWith("Header=")) {
                String[] parts = predicate.substring(7).split(",");
                if (parts.length != 2) {
                    continue;
                }
                String headerName = parts[0].trim();
                String headerPattern = parts[1].trim();
                String headerValue = exchange.getRequest().getHeaders().getFirst(headerName);
                if (headerValue == null || !headerValue.matches(headerPattern)) {
                    return false;
                }
            }
        }
        return true;
    }

    public static class Config {
        // 配置类
    }

}

五、Spring Cloud Gateway 完整实现

5.1 项目依赖

<dependencies>
    <!-- Spring Cloud Gateway -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>

    <!-- Spring Boot Actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!-- Micrometer -->
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>

    <!-- Spring Boot Configuration Processor -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

5.2 配置文件

server:
  port: 8080

spring:
  application:
    name: gateway-gray-release-demo
  cloud:
    gateway:
      routes:
        - id: v1-route
          uri: http://localhost:8081
          predicates:
            - Path=/api/**
          filters:
            - TrafficRecording
            - GrayRelease

# 流量录制配置
traffic:
  recording:
    enabled: true
    path: "records"
    include-paths:
      - "/api/**"
    exclude-paths:
      - "/api/health"
      - "/api/metrics"

# 流量回放配置
traffic:
  replay:
    enabled: true
    path: "records"
    target-uri: "http://localhost:8082"
    compare-responses: true

# 灰度发布配置
gray:
  release:
    enabled: true
    routes:
      - id: v1-route
        uri: http://localhost:8081
        predicates:
          - Path=/api/**
          - Header=X-User-Id, [1-100]
        filters:
          - StripPrefix=1
      - id: v2-route
        uri: http://localhost:8082
        predicates:
          - Path=/api/**
        filters:
          - StripPrefix=1

# 监控配置
management:
  endpoints:
    web:
      exposure:
        include: health,info,prometheus,traffic-replay

5.3 核心配置类

5.3.1 流量录制配置

@Data
@ConfigurationProperties(prefix = "traffic.recording")
public class TrafficRecordingProperties {

    private boolean enabled = true;
    private String path = "records";
    private List<String> includePaths = new ArrayList<>();
    private List<String> excludePaths = new ArrayList<>();

}

5.3.2 流量回放配置

@Data
@ConfigurationProperties(prefix = "traffic.replay")
public class TrafficReplayProperties {

    private boolean enabled = true;
    private String path = "records";
    private String targetUri = "http://localhost:8081";
    private boolean compareResponses = true;

}

5.3.3 灰度发布配置

@Data
@ConfigurationProperties(prefix = "gray.release")
public class GrayReleaseProperties {

    private boolean enabled = true;
    private List<GrayRoute> routes = new ArrayList<>();

    @Data
    public static class GrayRoute {
        private String id;
        private String uri;
        private List<String> predicates;
        private List<String> filters;
    }

}

5.3.4 应用配置

@Configuration
@EnableConfigurationProperties({TrafficRecordingProperties.class, TrafficReplayProperties.class, GrayReleaseProperties.class})
public class ApplicationConfig {

}

5.4 过滤器实现

5.4.1 流量录制过滤器

@Component
@Slf4j
public class TrafficRecordingGatewayFilterFactory extends AbstractGatewayFilterFactory<TrafficRecordingGatewayFilterFactory.Config> {

    @Autowired
    private TrafficRecordingProperties properties;

    private final ObjectMapper objectMapper = new ObjectMapper();

    public TrafficRecordingGatewayFilterFactory() {
        super(Config.class);
    }

    @Override
    public GatewayFilter apply(Config config) {
        return (exchange, chain) -> {
            if (!properties.isEnabled()) {
                return chain.filter(exchange);
            }

            // 获取请求路径
            String path = exchange.getRequest().getPath().value();

            // 检查是否需要录制
            if (!shouldRecord(path)) {
                return chain.filter(exchange);
            }

            // 记录请求信息
            TrafficRecord record = new TrafficRecord();
            record.setTimestamp(System.currentTimeMillis());
            record.setMethod(exchange.getRequest().getMethod().name());
            record.setPath(path);
            record.setHeaders(extractHeaders(exchange.getRequest().getHeaders()));
            record.setQueryParams(extractQueryParams(exchange.getRequest().getQueryParams()));

            // 记录请求体
            return DataBufferUtils.join(exchange.getRequest().getBody())
                    .flatMap(dataBuffer -> {
                        byte[] content = new byte[dataBuffer.readableByteCount()];
                        dataBuffer.read(content);
                        DataBufferUtils.release(dataBuffer);
                        record.setRequestBody(new String(content, StandardCharsets.UTF_8));

                        // 包装响应以记录响应信息
                        ServerHttpResponse originalResponse = exchange.getResponse();
                        ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
                            @Override
                            public Mono<ServerHttpResponse> writeWith(Publisher<? extends DataBuffer> body) {
                                if (body instanceof Flux) {
                                    Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
                                    return super.writeWith(fluxBody.map(dataBuffer -> {
                                        byte[] responseContent = new byte[dataBuffer.readableByteCount()];
                                        dataBuffer.read(responseContent);
                                        DataBufferUtils.release(dataBuffer);
                                        record.setResponseBody(new String(responseContent, StandardCharsets.UTF_8));
                                        record.setStatusCode(originalResponse.getStatusCode().value());
                                        
                                        // 保存记录
                                        saveRecord(record);
                                        
                                        return originalResponse.bufferFactory().wrap(responseContent);
                                    }));
                                }
                                return super.writeWith(body);
                            }
                        };

                        return chain.filter(exchange.mutate().response(decoratedResponse).build());
                    });
        };
    }

    private boolean shouldRecord(String path) {
        // 检查是否在排除路径中
        for (String excludePath : properties.getExcludePaths()) {
            if (path.matches(excludePath.replace("**", ".*"))) {
                return false;
            }
        }

        // 检查是否在包含路径中
        for (String includePath : properties.getIncludePaths()) {
            if (path.matches(includePath.replace("**", ".*"))) {
                return true;
            }
        }

        return false;
    }

    private Map<String, String> extractHeaders(HttpHeaders headers) {
        Map<String, String> headerMap = new HashMap<>();
        for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
            headerMap.put(entry.getKey(), String.join(", ", entry.getValue()));
        }
        return headerMap;
    }

    private Map<String, String> extractQueryParams(MultiValueMap<String, String> queryParams) {
        Map<String, String> queryParamMap = new HashMap<>();
        for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
            queryParamMap.put(entry.getKey(), String.join(", ", entry.getValue()));
        }
        return queryParamMap;
    }

    private void saveRecord(TrafficRecord record) {
        try {
            // 创建录制目录
            File dir = new File(properties.getPath());
            if (!dir.exists()) {
                dir.mkdirs();
            }

            // 生成文件名
            String fileName = String.format("record-%d.json", record.getTimestamp());
            File file = new File(dir, fileName);

            // 写入文件
            objectMapper.writeValue(file, record);
            log.info("Traffic record saved: {}", file.getAbsolutePath());
        } catch (Exception e) {
            log.error("Failed to save traffic record", e);
        }
    }

    public static class Config {
        // 配置类
    }

    @Data
    private static class TrafficRecord {
        private long timestamp;
        private String method;
        private String path;
        private Map<String, String> headers;
        private Map<String, String> queryParams;
        private String requestBody;
        private int statusCode;
        private String responseBody;
    }

}

5.4.2 灰度发布过滤器

@Component
@Slf4j
public class GrayReleaseGatewayFilterFactory extends AbstractGatewayFilterFactory<GrayReleaseGatewayFilterFactory.Config> {

    @Autowired
    private GrayReleaseProperties properties;

    public GrayReleaseGatewayFilterFactory() {
        super(Config.class);
    }

    @Override
    public GatewayFilter apply(Config config) {
        return (exchange, chain) -> {
            if (!properties.isEnabled()) {
                return chain.filter(exchange);
            }

            // 获取请求路径
            String path = exchange.getRequest().getPath().value();

            // 查找匹配的灰度路由
            GrayReleaseProperties.GrayRoute grayRoute = findMatchingGrayRoute(path, exchange);

            if (grayRoute != null) {
                // 重定向到灰度版本
                URI uri = null;
                try {
                    uri = new URI(grayRoute.getUri() + path);
                } catch (URISyntaxException e) {
                    log.error("Invalid gray route URI: {}", grayRoute.getUri(), e);
                    return chain.filter(exchange);
                }

                ServerHttpRequest request = exchange.getRequest().mutate()
                        .uri(uri)
                        .build();

                log.info("Gray release route: {} -> {}", path, grayRoute.getUri());
                return chain.filter(exchange.mutate().request(request).build());
            }

            // 使用默认路由
            return chain.filter(exchange);
        };
    }

    private GrayReleaseProperties.GrayRoute findMatchingGrayRoute(String path, ServerWebExchange exchange) {
        for (GrayReleaseProperties.GrayRoute route : properties.getRoutes()) {
            if (matchesPredicates(route.getPredicates(), path, exchange)) {
                return route;
            }
        }
        return null;
    }

    private boolean matchesPredicates(List<String> predicates, String path, ServerWebExchange exchange) {
        for (String predicate : predicates) {
            if (predicate.startsWith("Path=")) {
                String pattern = predicate.substring(5);
                if (!path.matches(pattern.replace("**", ".*"))) {
                    return false;
                }
            } else if (predicate.startsWith("Header=")) {
                String[] parts = predicate.substring(7).split(",");
                if (parts.length != 2) {
                    continue;
                }
                String headerName = parts[0].trim();
                String headerPattern = parts[1].trim();
                String headerValue = exchange.getRequest().getHeaders().getFirst(headerName);
                if (headerValue == null || !headerValue.matches(headerPattern)) {
                    return false;
                }
            }
        }
        return true;
    }

    public static class Config {
        // 配置类
    }

}

5.5 服务实现

5.5.1 流量回放服务

@Service
@Slf4j
public class TrafficReplayService {

    @Autowired
    private TrafficReplayProperties properties;

    private final RestTemplate restTemplate = new RestTemplate();
    private final ObjectMapper objectMapper = new ObjectMapper();

    public void replayTraffic() {
        if (!properties.isEnabled()) {
            log.info("Traffic replay is disabled");
            return;
        }

        try {
            // 读取录制的流量
            File dir = new File(properties.getPath());
            if (!dir.exists()) {
                log.warn("No traffic records found at: {}", properties.getPath());
                return;
            }

            File[] files = dir.listFiles((dir1, name) -> name.endsWith(".json"));
            if (files == null || files.length == 0) {
                log.warn("No traffic records found at: {}", properties.getPath());
                return;
            }

            // 回放流量
            for (File file : files) {
                replayRecord(file);
            }
        } catch (Exception e) {
            log.error("Failed to replay traffic", e);
        }
    }

    private void replayRecord(File file) {
        try {
            // 读取记录
            TrafficRecord record = objectMapper.readValue(file, TrafficRecord.class);
            log.info("Replaying traffic record: {} {}", record.getMethod(), record.getPath());

            // 构建请求
            HttpHeaders headers = new HttpHeaders();
            if (record.getHeaders() != null) {
                for (Map.Entry<String, String> entry : record.getHeaders().entrySet()) {
                    headers.add(entry.getKey(), entry.getValue());
                }
            }

            HttpEntity<String> requestEntity = new HttpEntity<>(record.getRequestBody(), headers);
            URI uri = buildUri(record.getPath(), record.getQueryParams());

            // 发送请求
            ResponseEntity<String> responseEntity = restTemplate.exchange(
                    uri, 
                    HttpMethod.valueOf(record.getMethod()), 
                    requestEntity, 
                    String.class
            );

            // 比较响应
            if (properties.isCompareResponses()) {
                compareResponses(record, responseEntity);
            }

            log.info("Traffic record replayed successfully: {} {}", record.getMethod(), record.getPath());
        } catch (Exception e) {
            log.error("Failed to replay traffic record: {}", file.getName(), e);
        }
    }

    private URI buildUri(String path, Map<String, String> queryParams) throws URISyntaxException {
        UriComponentsBuilder builder = UriComponentsBuilder.fromUriString(properties.getTargetUri() + path);
        if (queryParams != null) {
            for (Map.Entry<String, String> entry : queryParams.entrySet()) {
                builder.queryParam(entry.getKey(), entry.getValue());
            }
        }
        return builder.build().toUri();
    }

    private void compareResponses(TrafficRecord record, ResponseEntity<String> responseEntity) {
        // 比较状态码
        if (record.getStatusCode() != responseEntity.getStatusCodeValue()) {
            log.warn("Status code mismatch: expected {}, actual {}", record.getStatusCode(), responseEntity.getStatusCodeValue());
        }

        // 比较响应体
        if (!record.getResponseBody().equals(responseEntity.getBody())) {
            log.warn("Response body mismatch for {} {}", record.getMethod(), record.getPath());
            log.debug("Expected: {}", record.getResponseBody());
            log.debug("Actual: {}", responseEntity.getBody());
        }
    }

    @Data
    private static class TrafficRecord {
        private long timestamp;
        private String method;
        private String path;
        private Map<String, String> headers;
        private Map<String, String> queryParams;
        private String requestBody;
        private int statusCode;
        private String responseBody;
    }

}

5.6 控制器

5.6.1 流量回放控制器

@RestController
@RequestMapping("/actuator/traffic-replay")
public class TrafficReplayController {

    @Autowired
    private TrafficReplayService replayService;

    @PostMapping("/replay")
    public ResponseEntity<String> replayTraffic() {
        replayService.replayTraffic();
        return ResponseEntity.ok("Traffic replay started");
    }

}

5.7 应用入口

@SpringBootApplication
public class GatewayGrayReleaseDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(GatewayGrayReleaseDemoApplication.class, args);
    }

}

六、最佳实践

6.1 流量录制

原则

  • 选择性录制:只录制关键路径和核心接口的流量
  • 脱敏处理:对敏感信息进行脱敏处理,保护用户隐私
  • 定期清理:定期清理过期的录制数据,避免存储压力
  • 质量保证:确保录制的流量数据质量,避免无效数据

建议

  • 只录制核心接口和关键路径的流量
  • 对敏感信息(如密码、身份证号)进行脱敏处理
  • 设置录制数据的保留期限,定期清理过期数据
  • 对录制的流量进行质量检查,确保数据的完整性和有效性

6.2 流量回放验证

原则

  • 环境隔离:在隔离的测试环境中进行回放验证
  • 结果对比:对比回放结果和原始响应,发现潜在问题
  • 性能评估:评估新版本的性能表现,确保不会出现性能退化
  • 异常处理:对回放过程中的异常进行处理和记录

建议

  • 在隔离的测试环境中进行回放验证,避免影响生产环境
  • 对比回放结果和原始响应,发现潜在问题
  • 评估新版本的性能表现,确保不会出现性能退化
  • 对回放过程中的异常进行处理和记录,便于问题分析

6.3 灰度发布

原则

  • 渐进式发布:从少量流量开始,逐步扩大发布范围
  • 监控告警:对灰度版本进行实时监控,设置告警机制
  • 快速回滚:在发现问题时能够快速回滚到旧版本
  • 用户反馈:收集用户反馈,及时调整发布策略

建议

  • 从 1% 或 5% 的流量开始,逐步扩大发布范围
  • 对灰度版本进行实时监控,设置告警机制
  • 建立快速回滚机制,在发现问题时能够及时回滚
  • 收集用户反馈,及时调整发布策略

6.4 集成测试

原则

  • 自动化测试:将流量回放验证集成到自动化测试流程中
  • 持续集成:在 CI/CD 流程中加入流量回放验证
  • 测试覆盖:确保测试覆盖所有关键路径和核心功能
  • 测试报告:生成详细的测试报告,便于问题分析

建议

  • 将流量回放验证集成到自动化测试流程中
  • 在 CI/CD 流程中加入流量回放验证
  • 确保测试覆盖所有关键路径和核心功能
  • 生成详细的测试报告,便于问题分析

七、总结

流量录制和回放验证是确保新版本稳定性的重要手段。通过录制生产环境的真实流量,然后在测试环境中回放这些流量来验证新版本的性能和稳定性,可以在正式上线前发现和解决潜在问题,降低发布风险。在实际项目中,我们应该根据业务需求和系统特性,合理配置流量录制和回放验证功能,建立完善的灰度发布流程,确保新版本的稳定性和性能。通过流量录制和回放验证,可以有效降低发布风险,提高系统的可靠性和用户体验。

互动话题

  1. 你的项目中是如何进行版本发布的?
  2. 你认为流量录制和回放验证最大的挑战是什么?
  3. 你有使用过类似的灰度发布方案吗?

欢迎在评论区留言讨论!更多技术文章,欢迎关注公众号:服务端技术精选


标题:Spring Cloud Gateway + 灰度发布流量录制 + 回放验证:新版本上线前用真实流量预演
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/04/08/1775463148135.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消