SpringBoot + 异步任务结果持久化 + 查询接口:用户可随时查看长时间任务进度与结果
前言
你是否遇到过这样的场景:
- 用户上传一个 1GB 的 Excel 文件,需要 5 分钟才能处理完成
- 导出 10 万条数据到 Excel,需要等待 2 分钟
- 批量处理 1000 个订单,每个订单需要调用 3 个第三方接口
这些长时间运行的任务,如果让用户一直等待页面响应,体验极差。更糟糕的是,如果系统崩溃或重启,任务进度全部丢失,用户需要重新提交。
今天要介绍的「异步任务结果持久化 + 查询接口」方案,将彻底解决这个问题——任务进度实时可查,系统重启不丢失。
一、传统异步任务的痛点
场景重现
产品经理说:「用户需要导出 10 万条订单数据,这个功能要尽快上线。」
你很快写出了代码:
@RestController
@RequestMapping("/export")
public class ExportController {
@GetMapping("/orders")
public void exportOrders(HttpServletResponse response) {
List<Order> orders = orderService.findAll();
ExcelUtils.export(response, orders);
}
}
测试时发现:10 万条数据导出需要 2 分钟,用户页面一直转圈,体验极差。
于是你改成了异步:
@Async
public void exportOrders(String taskId) {
List<Order> orders = orderService.findAll();
ExcelUtils.export(orders);
}
问题来了:
| 问题 | 描述 |
|---|---|
| 进度不可见 | 用户不知道任务执行到哪一步了 |
| 结果无法获取 | 任务完成后,用户怎么下载文件? |
| 重启即丢失 | 系统重启后,任务进度全部丢失 |
| 无法重试 | 任务失败后,用户需要重新提交 |
传统方案的三大痛点
| 痛点 | 影响 |
|---|---|
| 用户体验差 | 长时间等待,不知道任务进度 |
| 数据不安全 | 系统崩溃或重启,任务全部丢失 |
| 运维困难 | 无法追踪任务状态,排查问题困难 |
更糟糕的是:有些系统甚至没有任务管理,用户提交任务后就石沉大海。
二、异步任务持久化:让任务「有迹可循」
核心思路
异步任务持久化的核心思想是:将任务信息存储到数据库,通过任务 ID 追踪任务状态和进度。
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 用户提交 │────────▶│ 创建任务 │────────▶│ 异步执行 │
│ 任务请求 │ 返回 │ 记录到DB │ 更新 │ 更新进度 │
│ 获取任务ID │ 任务ID │ │ 状态 │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 轮询查询 │◀────────│ 查询任务 │◀────────│ 任务完成 │
│ 任务进度 │ 返回 │ 状态和进度 │ 通知 │ 保存结果 │
│ 下载结果 │ 状态 │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
技术选型
| 组件 | 作用 | 优势 |
|---|---|---|
| Spring Boot | 应用框架 | 生态完善、易于集成 |
| @Async | 异步执行 | 简单易用、性能优秀 |
| MySQL | 数据持久化 | 成熟稳定、支持事务 |
| Spring Data JPA | 数据访问 | 简化数据库操作 |
三、实现方案详解
1. 任务状态枚举
定义任务的生命周期状态:
public enum TaskStatus {
PENDING("待执行"),
RUNNING("执行中"),
SUCCESS("成功"),
FAILED("失败"),
CANCELLED("已取消");
private final String description;
TaskStatus(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
2. 任务实体类
定义任务表结构:
@Entity
@Table(name = "async_task")
@Data
public class AsyncTask {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String taskId;
@Column(nullable = false)
private String taskType;
@Column(nullable = false)
private TaskStatus status;
private Integer progress;
private String message;
@Lob
private String result;
@Column(nullable = false)
private LocalDateTime createTime;
private LocalDateTime updateTime;
private LocalDateTime completeTime;
@PrePersist
protected void onCreate() {
createTime = LocalDateTime.now();
updateTime = LocalDateTime.now();
}
@PreUpdate
protected void onUpdate() {
updateTime = LocalDateTime.now();
}
}
3. 任务仓储接口
@Repository
public interface AsyncTaskRepository extends JpaRepository<AsyncTask, Long> {
Optional<AsyncTask> findByTaskId(String taskId);
List<AsyncTask> findByStatus(TaskStatus status);
}
4. 异步任务服务
核心服务类,管理任务的生命周期:
@Service
@Slf4j
public class AsyncTaskService {
@Autowired
private AsyncTaskRepository taskRepository;
@Autowired
private TaskExecutor taskExecutor;
public AsyncTask createTask(String taskType, Map<String, Object> params) {
AsyncTask task = new AsyncTask();
task.setTaskId(UUID.randomUUID().toString());
task.setTaskType(taskType);
task.setStatus(TaskStatus.PENDING);
task.setProgress(0);
AsyncTask savedTask = taskRepository.save(task);
log.info("创建任务: taskId={}, type={}", savedTask.getTaskId(), taskType);
return savedTask;
}
public void updateTaskProgress(String taskId, Integer progress, String message) {
AsyncTask task = taskRepository.findByTaskId(taskId)
.orElseThrow(() -> new RuntimeException("任务不存在: " + taskId));
task.setProgress(progress);
task.setMessage(message);
task.setStatus(TaskStatus.RUNNING);
taskRepository.save(task);
log.info("更新任务进度: taskId={}, progress={}, message={}", taskId, progress, message);
}
public void completeTask(String taskId, String result) {
AsyncTask task = taskRepository.findByTaskId(taskId)
.orElseThrow(() -> new RuntimeException("任务不存在: " + taskId));
task.setStatus(TaskStatus.SUCCESS);
task.setProgress(100);
task.setResult(result);
task.setCompleteTime(LocalDateTime.now());
taskRepository.save(task);
log.info("任务完成: taskId={}", taskId);
}
public void failTask(String taskId, String errorMessage) {
AsyncTask task = taskRepository.findByTaskId(taskId)
.orElseThrow(() -> new RuntimeException("任务不存在: " + taskId));
task.setStatus(TaskStatus.FAILED);
task.setMessage(errorMessage);
task.setCompleteTime(LocalDateTime.now());
taskRepository.save(task);
log.error("任务失败: taskId={}, error={}", taskId, errorMessage);
}
public AsyncTask getTask(String taskId) {
return taskRepository.findByTaskId(taskId)
.orElseThrow(() -> new RuntimeException("任务不存在: " + taskId));
}
}
5. 异步任务执行器
@Component
@Slf4j
public class AsyncTaskExecutor {
@Autowired
private AsyncTaskService taskService;
@Async("taskExecutor")
public void executeExportTask(String taskId, Map<String, Object> params) {
try {
taskService.updateTaskProgress(taskId, 0, "开始执行导出任务");
List<Order> orders = orderService.findAll();
int total = orders.size();
for (int i = 0; i < total; i++) {
Order order = orders.get(i);
processOrder(order);
int progress = (int) ((i + 1) * 100.0 / total);
taskService.updateTaskProgress(taskId, progress,
String.format("正在处理第 %d/%d 条数据", i + 1, total));
Thread.sleep(10);
}
String result = "导出完成,共处理 " + total + " 条数据";
taskService.completeTask(taskId, result);
} catch (Exception e) {
taskService.failTask(taskId, "任务执行失败: " + e.getMessage());
}
}
}
6. 控制器:任务提交和查询
@RestController
@RequestMapping("/api/tasks")
public class TaskController {
@Autowired
private AsyncTaskService taskService;
@Autowired
private AsyncTaskExecutor taskExecutor;
@PostMapping("/export")
public Map<String, Object> submitExportTask(@RequestBody Map<String, Object> params) {
AsyncTask task = taskService.createTask("EXPORT_ORDERS", params);
taskExecutor.executeExportTask(task.getTaskId(), params);
return Map.of(
"taskId", task.getTaskId(),
"status", task.getStatus(),
"message", "任务已提交,请使用 taskId 查询进度"
);
}
@GetMapping("/{taskId}")
public AsyncTask getTask(@PathVariable String taskId) {
return taskService.getTask(taskId);
}
@GetMapping("/{taskId}/result")
public Map<String, Object> getTaskResult(@PathVariable String taskId) {
AsyncTask task = taskService.getTask(taskId);
if (task.getStatus() != TaskStatus.SUCCESS) {
throw new RuntimeException("任务尚未完成");
}
return Map.of(
"taskId", taskId,
"result", task.getResult(),
"completeTime", task.getCompleteTime()
);
}
}
四、实战演示
场景一:导出订单数据
步骤 1:提交任务
POST /api/tasks/export
Content-Type: application/json
{
"startDate": "2024-01-01",
"endDate": "2024-12-31"
}
响应:
{
"taskId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"status": "PENDING",
"message": "任务已提交,请使用 taskId 查询进度"
}
步骤 2:查询进度
GET /api/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890
响应:
{
"taskId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"taskType": "EXPORT_ORDERS",
"status": "RUNNING",
"progress": 45,
"message": "正在处理第 4500/10000 条数据",
"createTime": "2024-01-15T10:00:00",
"updateTime": "2024-01-15T10:00:30"
}
步骤 3:获取结果
GET /api/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/result
响应:
{
"taskId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"result": "导出完成,共处理 10000 条数据",
"completeTime": "2024-01-15T10:01:00"
}
场景二:批量处理订单
@Async("taskExecutor")
public void executeBatchProcessTask(String taskId, List<Long> orderIds) {
try {
taskService.updateTaskProgress(taskId, 0, "开始批量处理订单");
int total = orderIds.size();
for (int i = 0; i < total; i++) {
Long orderId = orderIds.get(i);
processOrder(orderId);
int progress = (int) ((i + 1) * 100.0 / total);
taskService.updateTaskProgress(taskId, progress,
String.format("正在处理订单 %d/%d", i + 1, total));
}
taskService.completeTask(taskId, "批量处理完成,共处理 " + total + " 个订单");
} catch (Exception e) {
taskService.failTask(taskId, "批量处理失败: " + e.getMessage());
}
}
五、进阶功能
1. 任务取消
支持用户主动取消任务:
public void cancelTask(String taskId) {
AsyncTask task = taskRepository.findByTaskId(taskId)
.orElseThrow(() -> new RuntimeException("任务不存在: " + taskId));
if (task.getStatus() == TaskStatus.RUNNING) {
task.setStatus(TaskStatus.CANCELLED);
task.setCompleteTime(LocalDateTime.now());
taskRepository.save(task);
log.info("任务已取消: taskId={}", taskId);
}
}
2. 任务重试
支持失败任务自动重试:
@Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000))
@Async("taskExecutor")
public void executeTaskWithRetry(String taskId, Map<String, Object> params) {
try {
executeTask(taskId, params);
} catch (Exception e) {
taskService.failTask(taskId, "任务执行失败: " + e.getMessage());
}
}
3. 任务超时控制
设置任务超时时间:
@Async("taskExecutor")
@Timeout(value = 5, unit = TimeUnit.MINUTES)
public void executeTaskWithTimeout(String taskId, Map<String, Object> params) {
try {
executeTask(taskId, params);
} catch (TimeoutException e) {
taskService.failTask(taskId, "任务执行超时");
}
}
4. 实时进度推送
使用 WebSocket 或 SSE 实时推送进度:
@Async("taskExecutor")
public void executeTaskWithProgress(String taskId, Map<String, Object> params) {
try {
for (int i = 0; i <= 100; i++) {
taskService.updateTaskProgress(taskId, i, "处理中...");
webSocketService.sendProgress(taskId, i);
Thread.sleep(100);
}
taskService.completeTask(taskId, "任务完成");
} catch (Exception e) {
taskService.failTask(taskId, e.getMessage());
}
}
六、最佳实践
1. 数据库设计
| 字段 | 类型 | 说明 |
|---|---|---|
| id | BIGINT | 主键 |
| task_id | VARCHAR(64) | 任务 ID,唯一索引 |
| task_type | VARCHAR(50) | 任务类型 |
| status | VARCHAR(20) | 任务状态 |
| progress | INT | 进度(0-100) |
| message | VARCHAR(500) | 任务消息 |
| result | TEXT | 任务结果 |
| create_time | DATETIME | 创建时间 |
| update_time | DATETIME | 更新时间 |
| complete_time | DATETIME | 完成时间 |
2. 进度更新策略
| 策略 | 适用场景 | 优缺点 |
|---|---|---|
| 实时更新 | 需要精确进度 | 优点:进度准确;缺点:数据库压力大 |
| 批量更新 | 大批量数据 | 优点:减少数据库操作;缺点:进度不够精确 |
| 分阶段更新 | 多阶段任务 | 优点:平衡性能和准确性;缺点:实现复杂 |
3. 异常处理
@Async("taskExecutor")
public void executeTask(String taskId, Map<String, Object> params) {
try {
doExecute(taskId, params);
} catch (BusinessException e) {
taskService.failTask(taskId, "业务异常: " + e.getMessage());
} catch (Exception e) {
log.error("任务执行异常: taskId={}", taskId, e);
taskService.failTask(taskId, "系统异常,请联系管理员");
}
}
4. 性能优化
| 优化点 | 方案 | 效果 |
|---|---|---|
| 批量更新 | 每 100 条更新一次进度 | 减少数据库操作 |
| 异步更新 | 使用消息队列异步更新进度 | 提升响应速度 |
| 缓存优化 | 使用 Redis 缓存任务状态 | 减少数据库查询 |
| 索引优化 | 为 task_id 创建唯一索引 | 提升查询性能 |
七、监控告警
1. 任务监控
@Component
@Slf4j
public class TaskMonitor {
@Autowired
private AsyncTaskRepository taskRepository;
@Scheduled(fixedRate = 60000)
public void monitorRunningTasks() {
List<AsyncTask> runningTasks = taskRepository.findByStatus(TaskStatus.RUNNING);
for (AsyncTask task : runningTasks) {
Duration duration = Duration.between(task.getUpdateTime(), LocalDateTime.now());
if (duration.toMinutes() > 30) {
log.warn("任务执行时间过长: taskId={}, duration={}分钟",
task.getTaskId(), duration.toMinutes());
}
}
}
}
2. 失败任务告警
@Scheduled(fixedRate = 300000)
public void monitorFailedTasks() {
List<AsyncTask> failedTasks = taskRepository.findByStatus(TaskStatus.FAILED);
if (!failedTasks.isEmpty()) {
log.error("发现 {} 个失败任务", failedTasks.size());
alertService.sendAlert("任务失败告警", failedTasks.size());
}
}
八、总结
异步任务结果持久化 + 查询接口的方案,彻底解决了传统异步任务的痛点:
✅ 进度可查:用户随时查看任务执行进度
✅ 结果可取:任务完成后可获取结果
✅ 重启不丢:任务信息持久化到数据库
✅ 状态可追:完整的任务生命周期记录
✅ 体验提升:用户无需长时间等待
让长时间任务变得透明、可控、可靠!
互动话题
你的项目中是如何处理长时间任务的?有没有遇到过任务丢失的问题?欢迎在评论区分享你的经验。
更多技术文章,欢迎关注公众号服务端技术精选,及时获取最新动态。
标题:SpringBoot + 异步任务结果持久化 + 查询接口:用户可随时查看长时间任务进度与结果
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/03/16/1773467095227.html
公众号:服务端技术精选
评论
0 评论