SpringBoot + 视频转码状态回调 + 失败重试:FFmpeg 崩溃后自动恢复,保障处理成功率

背景:视频转码的挑战

在视频类应用中,视频转码是一个核心功能,但也是一个充满挑战的功能:

  • 处理时间长:视频转码通常需要几分钟甚至更长时间
  • 资源消耗大:CPU、内存占用率高
  • FFmpeg 不稳定:可能因为各种原因崩溃
  • 状态跟踪难:转码过程中状态变化频繁
  • 失败率高:网络、存储、FFmpeg本身都可能导致失败

这些问题导致视频转码的成功率难以保证,用户体验大打折扣。本文将介绍如何使用 SpringBoot 实现视频转码状态回调 + 失败重试机制,确保 FFmpeg 崩溃后自动恢复,保障处理成功率。

核心概念

1. 视频转码状态

视频转码过程中,状态会不断变化:

状态说明处理动作
PENDING等待转码加入转码队列
PROCESSING正在转码监控转码进度
COMPLETED转码完成通知用户、清理资源
FAILED转码失败记录日志、触发重试
CANCELLED已取消清理资源

2. 状态回调机制

状态回调是指转码过程中,系统主动将状态变化通知给业务系统:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  业务系统    │     │  转码系统    │     │  FFmpeg     │
└─────────────┘     └─────────────┘     └─────────────┘
        │                     │                     │
        │  1. 提交转码任务     │                     │
        │ ─────────────────>  │                     │
        │                     │  2. 调用 FFmpeg      │
        │                     │ ─────────────────>  │
        │                     │  3. 转码进行中         │
        │  4. 状态回调(PROCESSING)│               │
        │ <─────────────────  │                     │
        │                     │  4. 转码完成/失败      │
        │  5. 状态回调(COMPLETED/FAILED)│           │
        │ <─────────────────  │                     │

3. 失败重试机制

失败重试是指在转码失败后,自动重新发起转码请求,直到成功或达到最大重试次数:

  • 指数退避:重试间隔呈指数增长
  • 固定间隔:固定时间间隔重试
  • 随机退避:随机时间间隔重试,避免雪崩

技术实现

1. 核心实体类

// 转码任务实体
@Data
@Entity
@Table(name = "transcode_task")
public class TranscodeTask {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "task_id", unique = true, nullable = false, length = 64)
    private String taskId; // 任务ID
    
    @Column(name = "video_id", nullable = false, length = 64)
    private String videoId; // 视频ID
    
    @Column(name = "input_path", nullable = false, length = 512)
    private String inputPath; // 输入文件路径
    
    @Column(name = "output_path", nullable = false, length = 512)
    private String outputPath; // 输出文件路径
    
    @Column(name = "status", nullable = false, length = 32)
    private String status; // 状态:PENDING/PROCESSING/COMPLETED/FAILED/CANCELLED
    
    @Column(name = "progress", nullable = false)
    private Integer progress = 0; // 转码进度 0-100
    
    @Column(name = "retry_count", nullable = false)
    private Integer retryCount = 0; // 重试次数
    
    @Column(name = "max_retry", nullable = false)
    private Integer maxRetry = 3; // 最大重试次数
    
    @Column(name = "error_message", length = 1024)
    private String errorMessage; // 错误信息
    
    @Column(name = "callback_url", length = 512)
    private String callbackUrl; // 回调URL
    
    @CreationTimestamp
    @Column(name = "create_time", nullable = false)
    private Date createTime;
    
    @UpdateTimestamp
    @Column(name = "update_time", nullable = false)
    private Date updateTime;
    
    @Column(name = "start_time")
    private Date startTime; // 开始时间
    
    @Column(name = "end_time")
    private Date endTime; // 结束时间
    
    @Column(name = "duration")
    private Long duration; // 耗时(毫秒)
}

// 转码配置实体
@Data
@Entity
@Table(name = "transcode_config")
public class TranscodeConfig {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "name", unique = true, nullable = false, length = 128)
    private String name; // 配置名称
    
    @Column(name = "video_codec", length = 32)
    private String videoCodec; // 视频编码:h264/h265
    
    @Column(name = "audio_codec", length = 32)
    private String audioCodec; // 音频编码:aac
    
    @Column(name = "resolution", length = 32)
    private String resolution; // 分辨率:720p/1080p
    
    @Column(name = "bitrate")
    private Integer bitrate; // 码率
    
    @Column(name = "frame_rate")
    private Integer frameRate; // 帧率
    
    @Column(name = "ffmpeg_params", columnDefinition = "TEXT")
    private String ffmpegParams; // FFmpeg参数
    
    @CreationTimestamp
    @Column(name = "create_time", nullable = false)
    private Date createTime;
}

2. 视频转码服务

@Service
@Slf4j
public class VideoTranscodeService {
    
    @Autowired
    private TranscodeTaskRepository taskRepository;
    
    @Autowired
    private TranscodeConfigRepository configRepository;
    
    @Autowired
    private TranscodeCallbackService callbackService;
    
    @Autowired
    private TranscodeRetryService retryService;
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Value("${transcode.ffmpeg.path:ffmpeg}")
    private String ffmpegPath;
    
    @Value("${transcode.output.dir:/tmp/transcode}")
    private String outputDir;
    
    /**
     * 提交转码任务
     */
    public TranscodeTask submitTask(TranscodeRequest request) {
        // 1. 创建转码任务
        TranscodeTask task = new TranscodeTask();
        task.setTaskId(UUID.randomUUID().toString());
        task.setVideoId(request.getVideoId());
        task.setInputPath(request.getInputPath());
        task.setOutputPath(generateOutputPath(request.getVideoId()));
        task.setStatus("PENDING");
        task.setRetryCount(0);
        task.setMaxRetry(request.getMaxRetry() != null ? request.getMaxRetry() : 3);
        task.setCallbackUrl(request.getCallbackUrl());
        task.setProgress(0);
        
        taskRepository.save(task);
        
        log.info("Transcode task created: {}", task.getTaskId());
        
        // 2. 发送到转码队列
        sendToQueue(task);
        
        return task;
    }
    
    /**
     * 处理转码任务
     */
    @Async
    public void processTask(TranscodeTask task) {
        try {
            // 1. 更新任务状态为处理中
            updateTaskStatus(task.getTaskId(), "PROCESSING", null);
            
            // 2. 获取转码配置
            TranscodeConfig config = configRepository.findByName(task.getConfigName()).orElse(null);
            if (config == null) {
                throw new TranscodeException("转码配置不存在");
            }
            
            // 3. 构建FFmpeg命令
            List<String> command = buildFFmpegCommand(task, config);
            
            // 4. 执行转码
            ProcessBuilder processBuilder = new ProcessBuilder(command);
            processBuilder.redirectErrorStream(true);
            
            Process process = processBuilder.start();
            
            // 5. 监控转码进度
            monitorProgress(process, task);
            
            // 6. 等待转码完成
            int exitCode = process.waitFor();
            
            if (exitCode == 0) {
                // 转码成功
                updateTaskStatus(task.getTaskId(), "COMPLETED", null);
                
                // 回调通知
                callbackService.notifySuccess(task);
            } else {
                // 转码失败
                String error = readProcessError(process);
                handleTranscodeFailure(task, error);
            }
            
        } catch (Exception e) {
            log.error("Transcode task failed: {}", task.getTaskId(), e);
            handleTranscodeFailure(task, e.getMessage());
        }
    }
    
    /**
     * 构建FFmpeg命令
     */
    private List<String> buildFFmpegCommand(TranscodeTask task, TranscodeConfig config) {
        List<String> command = new ArrayList<>();
        command.add(ffmpegPath);
        command.add("-i");
        command.add(task.getInputPath());
        
        // 视频编码
        if (StringUtils.hasText(config.getVideoCodec())) {
            command.add("-c:v");
            command.add(config.getVideoCodec());
        }
        
        // 音频编码
        if (StringUtils.hasText(config.getAudioCodec())) {
            command.add("-c:a");
            command.add(config.getAudioCodec());
        }
        
        // 分辨率
        if (StringUtils.hasText(config.getResolution())) {
            command.add("-s");
            command.add(config.getResolution());
        }
        
        // 码率
        if (config.getBitrate() != null) {
            command.add("-b:v");
            command.add(config.getBitrate() + "k");
        }
        
        // 帧率
        if (config.getFrameRate() != null) {
            command.add("-r");
            command.add(config.getFrameRate().toString());
        }
        
        // 自定义参数
        if (StringUtils.hasText(config.getFfmpegParams())) {
            String[] params = config.getFfmpegParams().split("\\s+");
            Collections.addAll(command, params);
        }
        
        // 输出路径
        command.add("-y"); // 覆盖输出文件
        command.add(task.getOutputPath());
        
        return command;
    }
    
    /**
     * 监控转码进度
     */
    private void monitorProgress(Process process, TranscodeTask task) {
        new Thread(() -> {
            try (BufferedReader reader = new BufferedReader(
                    new InputStreamReader(process.getInputStream()))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    // 解析FFmpeg输出,提取进度信息
                    Integer progress = parseProgress(line);
                    if (progress != null && progress > task.getProgress()) {
                        task.setProgress(progress);
                        taskRepository.save(task);
                        
                        // 回调进度更新
                        callbackService.notifyProgress(task, progress);
                        
                        // 更新Redis缓存
                        updateProgressCache(task.getTaskId(), progress);
                    }
                }
            } catch (IOException e) {
                log.error("Error reading FFmpeg output", e);
            }
        }).start();
    }
    
    /**
     * 解析转码进度
     */
    private Integer parseProgress(String line) {
        // FFmpeg输出格式示例:frame= 123 fps= 25 q=28.0 size= 123456 time=00:00:05.00 bitrate=1234.5kbits/s speed=1.23x
        // 可以通过解析time字段计算进度
        
        Pattern pattern = Pattern.compile("time=(\\d{2}):(\\d{2}):(\\d{2})\\.(\\d{2})");
        Matcher matcher = pattern.matcher(line);
        
        if (matcher.find()) {
            int hours = Integer.parseInt(matcher.group(1));
            int minutes = Integer.parseInt(matcher.group(2));
            int seconds = Integer.parseInt(matcher.group(3));
            int totalSeconds = hours * 3600 + minutes * 60 + seconds;
            
            // 假设视频总时长为60秒(实际应该从视频元数据获取)
            int progress = (int) (totalSeconds * 100.0 / 60.0);
            return Math.min(progress, 100);
        }
        
        return null;
    }
    
    /**
     * 处理转码失败
     */
    private void handleTranscodeFailure(TranscodeTask task, String errorMessage) {
        // 1. 更新任务状态为失败
        updateTaskStatus(task.getTaskId(), "FAILED", errorMessage);
        
        // 2. 回调失败通知
        callbackService.notifyFailure(task, errorMessage);
        
        // 3. 判断是否需要重试
        if (task.getRetryCount() < task.getMaxRetry()) {
            // 加入重试队列
            retryService.addRetryTask(task);
        }
    }
    
    /**
     * 更新任务状态
     */
    private void updateTaskStatus(String taskId, String status, String errorMessage) {
        TranscodeTask task = taskRepository.findByTaskId(taskId);
        if (task != null) {
            task.setStatus(status);
            task.setErrorMessage(errorMessage);
            
            if ("PROCESSING".equals(status)) {
                task.setStartTime(new Date());
            } else if ("COMPLETED".equals(status) || "FAILED".equals(status)) {
                task.setEndTime(new Date());
                if (task.getStartTime() != null) {
                    task.setDuration(task.getEndTime().getTime() - task.getStartTime().getTime());
                }
            }
            
            taskRepository.save(task);
        }
    }
    
    /**
     * 生成输出文件路径
     */
    private String generateOutputPath(String videoId) {
        return outputDir + "/" + videoId + "_transcoded.mp4";
    }
    
    /**
     * 发送到转码队列
     */
    private void sendToQueue(TranscodeTask task) {
        String queueKey = "transcode:queue";
        redisTemplate.opsForList().rightPush(queueKey, task.getTaskId());
    }
    
    /**
     * 更新进度缓存
     */
    private void updateProgressCache(String taskId, Integer progress) {
        String cacheKey = "transcode:progress:" + taskId;
        redisTemplate.opsForValue().set(cacheKey, progress.toString(), 1, TimeUnit.HOURS);
    }
    
    /**
     * 读取进程错误
     */
    private String readProcessError(Process process) {
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(process.getErrorStream()))) {
            StringBuilder error = new StringBuilder();
            String line;
            while ((line = reader.readLine()) != null) {
                error.append(line).append("\n");
            }
            return error.toString();
        } catch (IOException e) {
            return "读取错误信息失败: " + e.getMessage();
        }
    }
}

3. 状态回调服务

@Service
@Slf4j
public class TranscodeCallbackService {
    
    @Autowired
    private RestTemplate restTemplate;
    
    /**
     * 回调通知:转码成功
     */
    public void notifySuccess(TranscodeTask task) {
        if (!StringUtils.hasText(task.getCallbackUrl())) {
            return;
        }
        
        CallbackRequest request = CallbackRequest.builder()
                .taskId(task.getTaskId())
                .videoId(task.getVideoId())
                .status("COMPLETED")
                .progress(100)
                .outputPath(task.getOutputPath())
                .duration(task.getDuration())
                .build();
        
        sendCallback(task.getCallbackUrl(), request);
    }
    
    /**
     * 回调通知:转码失败
     */
    public void notifyFailure(TranscodeTask task, String errorMessage) {
        if (!StringUtils.hasText(task.getCallbackUrl())) {
            return;
        }
        
        CallbackRequest request = CallbackRequest.builder()
                .taskId(task.getTaskId())
                .videoId(task.getVideoId())
                .status("FAILED")
                .progress(task.getProgress())
                .errorMessage(errorMessage)
                .retryCount(task.getRetryCount())
                .build();
        
        sendCallback(task.getCallbackUrl(), request);
    }
    
    /**
     * 回调通知:进度更新
     */
    public void notifyProgress(TranscodeTask task, Integer progress) {
        if (!StringUtils.hasText(task.getCallbackUrl())) {
            return;
        }
        
        CallbackRequest request = CallbackRequest.builder()
                .taskId(task.getTaskId())
                .videoId(task.getVideoId())
                .status("PROCESSING")
                .progress(progress)
                .build();
        
        sendCallback(task.getCallbackUrl(), request);
    }
    
    /**
     * 发送回调请求
     */
    private void sendCallback(String callbackUrl, CallbackRequest request) {
        try {
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);
            
            HttpEntity<CallbackRequest> entity = new HttpEntity<>(request, headers);
            
            ResponseEntity<String> response = restTemplate.postForEntity(
                    callbackUrl, 
                    entity, 
                    String.class
            );
            
            if (response.getStatusCode().is2xxSuccessful()) {
                log.info("Callback sent successfully: {}", request.getTaskId());
            } else {
                log.warn("Callback failed with status: {}", response.getStatusCode());
            }
            
        } catch (Exception e) {
            log.error("Failed to send callback: {}", request.getTaskId(), e);
        }
    }
}

4. 失败重试服务

@Service
@Slf4j
public class TranscodeRetryService {
    
    @Autowired
    private TranscodeTaskRepository taskRepository;
    
    @Autowired
    private VideoTranscodeService transcodeService;
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Value("${transcode.retry.strategy:EXPONENTIAL}")
    private String retryStrategy;
    
    @Value("${transcode.retry.initial-delay:60000}")
    private Long initialDelay; // 初始延迟(毫秒)
    
    /**
     * 添加重试任务
     */
    public void addRetryTask(TranscodeTask task) {
        // 1. 计算重试延迟
        long delay = calculateRetryDelay(task.getRetryCount());
        
        // 2. 设置定时任务
        String retryKey = "transcode:retry:" + task.getTaskId();
        redisTemplate.opsForValue().set(retryKey, task.getTaskId(), delay, TimeUnit.MILLISECONDS);
        
        log.info("Retry task scheduled: {}, delay: {}ms", task.getTaskId(), delay);
    }
    
    /**
     * 计算重试延迟
     */
    private long calculateRetryDelay(int retryCount) {
        switch (retryStrategy) {
            case "EXPONENTIAL":
                return (long) (initialDelay * Math.pow(2, retryCount));
            case "FIXED":
                return initialDelay;
            case "RANDOM":
                long minDelay = initialDelay;
                long maxDelay = initialDelay * 2;
                return minDelay + (long) (Math.random() * (maxDelay - minDelay));
            default:
                return initialDelay;
        }
    }
    
    /**
     * 处理重试任务
     */
    @Scheduled(fixedRate = 10000) // 每10秒检查一次
    public void processRetryTasks() {
        Set<String> keys = redisTemplate.keys("transcode:retry:*");
        if (CollectionUtils.isEmpty(keys)) {
            return;
        }
        
        for (String key : keys) {
            String taskId = redisTemplate.opsForValue().get(key);
            if (taskId != null) {
                try {
                    // 1. 获取任务
                    TranscodeTask task = taskRepository.findByTaskId(taskId);
                    if (task == null || !"FAILED".equals(task.getStatus())) {
                        redisTemplate.delete(key);
                        continue;
                    }
                    
                    // 2. 增加重试次数
                    task.setRetryCount(task.getRetryCount() + 1);
                    task.setStatus("PENDING");
                    task.setErrorMessage(null);
                    taskRepository.save(task);
                    
                    // 3. 重新提交转码任务
                    transcodeService.processTask(task);
                    
                    // 4. 删除重试标记
                    redisTemplate.delete(key);
                    
                    log.info("Retry task processed: {}, retry count: {}", taskId, task.getRetryCount());
                    
                } catch (Exception e) {
                    log.error("Failed to process retry task: {}", taskId, e);
                }
            }
        }
    }
}

5. 转码控制器

@RestController
@RequestMapping("/api/transcode")
@Slf4j
public class TranscodeController {
    
    @Autowired
    private VideoTranscodeService transcodeService;
    
    @Autowired
    private TranscodeTaskRepository taskRepository;
    
    /**
     * 提交转码任务
     */
    @PostMapping("/submit")
    public Result<TranscodeTask> submitTask(@RequestBody @Validated TranscodeRequest request) {
        TranscodeTask task = transcodeService.submitTask(request);
        return Result.success(task);
    }
    
    /**
     * 查询任务状态
     */
    @GetMapping("/task/{taskId}")
    public Result<TranscodeTask> getTask(@PathVariable String taskId) {
        TranscodeTask task = taskRepository.findByTaskId(taskId);
        if (task == null) {
            return Result.error("任务不存在");
        }
        return Result.success(task);
    }
    
    /**
     * 取消转码任务
     */
    @PostMapping("/task/{taskId}/cancel")
    public Result<String> cancelTask(@PathVariable String taskId) {
        TranscodeTask task = taskRepository.findByTaskId(taskId);
        if (task == null) {
            return Result.error("任务不存在");
        }
        
        if ("COMPLETED".equals(task.getStatus()) || "FAILED".equals(task.getStatus())) {
            return Result.error("任务已完成或已失败,无法取消");
        }
        
        task.setStatus("CANCELLED");
        taskRepository.save(task);
        
        return Result.success("任务已取消");
    }
    
    /**
     * 手动重试任务
     */
    @PostMapping("/task/{taskId}/retry")
    public Result<String> retryTask(@PathVariable String taskId) {
        TranscodeTask task = taskRepository.findByTaskId(taskId);
        if (task == null) {
            return Result.error("任务不存在");
        }
        
        if (!"FAILED".equals(task.getStatus())) {
            return Result.error("只能重试失败的任务");
        }
        
        if (task.getRetryCount() >= task.getMaxRetry()) {
            return Result.error("已达到最大重试次数");
        }
        
        task.setRetryCount(task.getRetryCount() + 1);
        task.setStatus("PENDING");
        task.setErrorMessage(null);
        taskRepository.save(task);
        
        transcodeService.processTask(task);
        
        return Result.success("重试任务已提交");
    }
}

核心流程

1. 转码提交流程

  1. 接收转码请求:客户端提交转码任务
  2. 创建转码任务:保存任务信息到数据库
  3. 发送到队列:将任务加入转码队列
  4. 处理转码任务:从队列中取出任务进行处理
  5. 调用 FFmpeg:执行视频转码命令
  6. 监控转码进度:实时监控转码进度
  7. 更新任务状态:根据转码结果更新任务状态
  8. 回调通知:将转码结果通知给业务系统

2. 失败重试流程

  1. 检测转码失败:FFmpeg 退出码非 0
  2. 更新任务状态:将任务状态更新为 FAILED
  3. 判断重试条件:检查是否达到最大重试次数
  4. 计算重试延迟:根据重试策略计算延迟时间
  5. 设置定时任务:将任务加入重试队列
  6. 执行重试:定时任务触发后重新执行转码

技术要点

1. FFmpeg 进程管理

  • 进程监控:监控 FFmpeg 进程的运行状态
  • 资源限制:限制 FFmpeg 的 CPU 和内存使用
  • 超时控制:设置转码超时时间,防止进程挂起
  • 异常处理:捕获 FFmpeg 崩溃异常,及时处理

2. 进度监控

  • 实时解析:实时解析 FFmpeg 输出,提取进度信息
  • 缓存更新:将进度信息更新到 Redis 缓存
  • 回调通知:将进度变化通知给业务系统
  • 性能优化:避免频繁的数据库更新

3. 失败重试策略

  • 指数退避:重试间隔呈指数增长,避免雪崩
  • 最大重试次数:限制最大重试次数,避免无限重试
  • 错误分类:根据错误类型决定是否重试
  • 人工介入:超过最大重试次数后,通知人工处理

4. 状态回调

  • 异步回调:使用异步方式发送回调请求
  • 重试机制:回调失败时自动重试
  • 签名验证:验证回调请求的合法性
  • 超时控制:设置回调超时时间

最佳实践

1. 转码配置管理

  • 预设配置:提供常用的转码配置模板
  • 自定义配置:支持用户自定义转码参数
  • 配置验证:验证 FFmpeg 参数的合法性
  • 性能优化:根据视频特点选择最优配置

2. 资源管理

  • 并发控制:限制同时转码的任务数量
  • 资源隔离:使用独立的转码服务器
  • 资源监控:监控 CPU、内存、磁盘使用情况
  • 自动扩容:根据负载自动扩容转码服务器

3. 监控告警

  • 任务监控:监控转码任务的成功率、平均耗时
  • 资源监控:监控转码服务器的资源使用情况
  • 异常告警:转码失败率超过阈值时告警
  • 性能分析:分析转码性能瓶颈,优化转码参数

4. 日志记录

  • 详细日志:记录转码过程的详细信息
  • 错误日志:记录转码失败的详细原因
  • 性能日志:记录转码耗时、资源使用情况
  • 日志分析:定期分析日志,发现问题

常见问题

1. FFmpeg 崩溃

问题:FFmpeg 进程意外崩溃,转码任务失败

解决方案

  • 捕获 FFmpeg 崩溃异常
  • 记录崩溃原因和堆栈信息
  • 自动触发重试机制
  • 分析崩溃原因,优化 FFmpeg 参数

2. 转码进度不准确

问题:解析的转码进度与实际进度不符

解决方案

  • 从视频元数据获取视频总时长
  • 使用更精确的进度解析算法
  • 定期校准进度信息
  • 提供进度刷新机制

3. 回调失败

问题:回调请求发送失败,业务系统无法收到通知

解决方案

  • 实现回调重试机制
  • 记录回调失败日志
  • 提供回调重试接口
  • 支持轮询查询任务状态

4. 资源耗尽

问题:转码任务过多,服务器资源耗尽

解决方案

  • 实现任务队列,限制并发数
  • 使用资源隔离,避免影响其他服务
  • 实现自动扩容,根据负载增加资源
  • 优化转码参数,降低资源消耗

代码优化建议

1. 进度解析优化

/**
 * 使用更精确的进度解析算法
 */
private Integer parseProgress(String line) {
    // 解析frame、fps、time等字段
    Pattern pattern = Pattern.compile(
        "frame=\\s*(\\d+)\\s+fps=\\s*(\\d+\\.\\d+)\\s+q=\\s*(\\d+\\.\\d+)\\s+size=\\s*(\\d+)\\s+time=\\s*(\\d{2}):(\\d{2}):(\\d{2})\\.(\\d{2})"
    );
    
    Matcher matcher = pattern.matcher(line);
    if (matcher.find()) {
        int frame = Integer.parseInt(matcher.group(1));
        double fps = Double.parseDouble(matcher.group(2));
        
        // 从视频元数据获取总帧数
        int totalFrames = getTotalFrames(task.getVideoId());
        
        if (totalFrames > 0) {
            int progress = (int) (frame * 100.0 / totalFrames);
            return Math.min(progress, 100);
        }
    }
    
    return null;
}

2. 回调重试机制

/**
 * 实现回调重试机制
 */
private void sendCallbackWithRetry(String callbackUrl, CallbackRequest request) {
    int maxRetries = 3;
    int retryCount = 0;
    
    while (retryCount < maxRetries) {
        try {
            sendCallback(callbackUrl, request);
            return;
        } catch (Exception e) {
            retryCount++;
            log.warn("Callback failed, retry {}/{}: {}", retryCount, maxRetries, e.getMessage());
            
            if (retryCount < maxRetries) {
                try {
                    Thread.sleep(1000 * retryCount); // 指数退避
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }
    
    log.error("Callback failed after {} retries: {}", maxRetries, request.getTaskId());
}

3. 资源监控

/**
 * 监控转码服务器资源使用情况
 */
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void monitorResources() {
    // 获取CPU使用率
    double cpuUsage = getCpuUsage();
    
    // 获取内存使用率
    double memoryUsage = getMemoryUsage();
    
    // 获取磁盘使用率
    double diskUsage = getDiskUsage();
    
    // 检查是否超过阈值
    if (cpuUsage > 80 || memoryUsage > 80 || diskUsage > 90) {
        log.warn("Resource usage high: CPU={}, Memory={}, Disk={}", cpuUsage, memoryUsage, diskUsage);
        
        // 发送告警通知
        sendAlert("资源使用率过高");
        
        // 限制新任务提交
        limitNewTasks();
    }
}

总结

本文介绍了如何使用 SpringBoot 实现视频转码状态回调 + 失败重试机制,确保 FFmpeg 崩溃后自动恢复,保障处理成功率。核心要点:

  1. 状态管理:完整的转码状态管理,实时跟踪转码进度
  2. 回调机制:异步回调通知,及时告知业务系统转码结果
  3. 失败重试:智能重试策略,自动恢复失败的转码任务
  4. 异常处理:完善的异常处理机制,确保系统稳定性
  5. 监控告警:全面的监控和告警,及时发现和处理问题

通过这些措施,可以大大提高视频转码的成功率和系统稳定性,为用户提供更好的体验。

互动话题

  1. 你在实际项目中遇到过哪些视频转码的问题?是如何解决的?
  2. 对于 FFmpeg 崩溃,你有什么更好的处理方案?
  3. 在高并发场景下,你是如何优化视频转码性能的?

欢迎在评论区交流讨论!


欢迎关注公众号:服务端技术精选,获取更多技术分享和经验。


标题:SpringBoot + 视频转码状态回调 + 失败重试:FFmpeg 崩溃后自动恢复,保障处理成功率
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/03/19/1773639462693.html
公众号:服务端技术精选
    评论
    0 评论
avatar

取消