Skip to content

executeJob

Flowable 7.1.0 摘要:立即执行指定的异步作业。

方法签名与说明

void executeJob(String jobId)

立即执行一个异步作业。通常用于手动触发失败的作业重试,或者加速异步作业的执行。

Parameters:

  • jobId - 作业的唯一标识符,不能为null

Throws:

  • FlowableObjectNotFoundException - 当指定ID的作业不存在时
  • FlowableException - 当作业执行失败时

常见使用场景

1. 失败作业重试

当异步作业执行失败后,管理员可以手动触发重试,而不是等待自动重试机制。

2. 加速异步执行

某些异步任务需要立即执行,而不是等待作业执行器调度。

3. 作业监控与管理

在作业管理后台,提供手动执行作业的功能,方便运维人员操作。

4. 测试环境调试

在开发测试阶段,手动触发异步作业,观察执行效果。

Kotlin + Spring Boot 调用示例

示例1:重试失败的作业

kotlin
import org.flowable.engine.ManagementService
import org.springframework.stereotype.Service

@Service
class JobRetryService(
    private val managementService: ManagementService
) {
    
    /**
     * 重试失败的作业
     * 企业场景:定时任务执行失败后,管理员手动重试
     */
    fun retryFailedJob(jobId: String): Boolean {
        return try {
            println("开始重试作业: $jobId")
            
            // 执行作业
            managementService.executeJob(jobId)
            
            println("作业执行成功")
            true
        } catch (e: Exception) {
            println("作业执行失败: ${e.message}")
            false
        }
    }
}

示例2:批量重试失败作业

kotlin
import org.flowable.engine.ManagementService
import org.flowable.job.api.Job
import org.springframework.stereotype.Service

data class JobRetryResult(
    val jobId: String,
    val success: Boolean,
    val errorMessage: String?
)

@Service
class BatchJobRetryService(
    private val managementService: ManagementService
) {
    
    /**
     * 批量重试失败的作业
     * 企业场景:系统恢复后,批量重试之前失败的作业
     */
    fun retryAllFailedJobs(processInstanceId: String? = null): List<JobRetryResult> {
        // 查询死信作业(失败超过重试次数的作业)
        val queryBuilder = managementService.createDeadLetterJobQuery()
        
        processInstanceId?.let {
            queryBuilder.processInstanceId(it)
        }
        
        val deadLetterJobs = queryBuilder.list()
        
        if (deadLetterJobs.isEmpty()) {
            println("没有失败的作业需要重试")
            return emptyList()
        }
        
        println("找到 ${deadLetterJobs.size} 个失败作业,开始批量重试...")
        
        return deadLetterJobs.map { job ->
            try {
                // 先将死信作业移回可执行作业队列
                managementService.moveDeadLetterJobToExecutableJob(job.id, 3)
                
                // 执行作业
                managementService.executeJob(job.id)
                
                JobRetryResult(
                    jobId = job.id,
                    success = true,
                    errorMessage = null
                )
            } catch (e: Exception) {
                JobRetryResult(
                    jobId = job.id,
                    success = false,
                    errorMessage = e.message
                )
            }
        }
    }
}

示例3:作业执行监控

kotlin
import org.flowable.engine.ManagementService
import org.springframework.stereotype.Service

@Service
class JobExecutionMonitor(
    private val managementService: ManagementService
) {
    
    /**
     * 执行作业并监控结果
     * 企业场景:作业管理后台,实时监控作业执行状态
     */
    fun executeJobWithMonitoring(jobId: String): Map<String, Any> {
        // 获取作业信息
        val job = managementService.createJobQuery()
            .jobId(jobId)
            .singleResult()
            ?: throw IllegalArgumentException("作业不存在: $jobId")
        
        val startTime = System.currentTimeMillis()
        
        return try {
            println("=== 开始执行作业 ===")
            println("作业ID: ${job.id}")
            println("流程实例ID: ${job.processInstanceId}")
            println("执行配置: ${job.jobHandlerConfiguration}")
            
            // 执行作业
            managementService.executeJob(jobId)
            
            val duration = System.currentTimeMillis() - startTime
            
            mapOf(
                "jobId" to jobId,
                "status" to "成功",
                "duration" to "${duration}ms",
                "message" to "作业执行成功"
            )
        } catch (e: Exception) {
            val duration = System.currentTimeMillis() - startTime
            
            mapOf(
                "jobId" to jobId,
                "status" to "失败",
                "duration" to "${duration}ms",
                "error" to e.message,
                "stackTrace" to e.stackTraceToString()
            )
        }
    }
}

示例4:定时作业手动触发

kotlin
import org.flowable.engine.ManagementService
import org.springframework.stereotype.Service

@Service
class TimerJobManualTrigger(
    private val managementService: ManagementService
) {
    
    /**
     * 手动触发定时作业
     * 企业场景:不等待定时器到期,立即触发执行
     */
    fun triggerTimerJob(processInstanceId: String, activityId: String): String {
        // 查询定时作业
        val timerJobs = managementService.createTimerJobQuery()
            .processInstanceId(processInstanceId)
            .list()
        
        val targetJob = timerJobs.find { job ->
            job.jobHandlerConfiguration?.contains(activityId) == true
        } ?: throw IllegalArgumentException("找不到对应的定时作业")
        
        println("找到定时作业: ${targetJob.id}")
        println("原定执行时间: ${targetJob.duedate}")
        
        // 将定时作业移到可执行队列
        managementService.moveTimerToExecutableJob(targetJob.id)
        
        // 立即执行
        managementService.executeJob(targetJob.id)
        
        return "定时作业已手动触发执行"
    }
}

示例5:异步服务任务立即执行

kotlin
import org.flowable.engine.ManagementService
import org.flowable.engine.RuntimeService
import org.springframework.stereotype.Service

@Service
class AsyncTaskExecutor(
    private val managementService: ManagementService,
    private val runtimeService: RuntimeService
) {
    
    /**
     * 立即执行异步服务任务
     * 企业场景:发送邮件、调用外部接口等异步任务需要立即执行
     */
    fun executeAsyncTask(processInstanceId: String, taskName: String): Boolean {
        // 查询该流程实例的所有作业
        val jobs = managementService.createJobQuery()
            .processInstanceId(processInstanceId)
            .list()
        
        if (jobs.isEmpty()) {
            println("该流程实例没有待执行的异步作业")
            return false
        }
        
        println("找到 ${jobs.size} 个异步作业")
        
        // 执行所有作业
        jobs.forEach { job ->
            try {
                println("执行作业: ${job.id}")
                managementService.executeJob(job.id)
            } catch (e: Exception) {
                println("作业执行失败: ${e.message}")
                throw e
            }
        }
        
        return true
    }
}

示例6:作业管理REST API

kotlin
import org.flowable.engine.ManagementService
import org.springframework.web.bind.annotation.*

@RestController
@RequestMapping("/api/jobs")
class JobManagementController(
    private val managementService: ManagementService
) {
    
    /**
     * 执行指定作业
     */
    @PostMapping("/{jobId}/execute")
    fun executeJob(@PathVariable jobId: String): Map<String, Any> {
        return try {
            managementService.executeJob(jobId)
            
            mapOf(
                "success" to true,
                "message" to "作业执行成功",
                "jobId" to jobId
            )
        } catch (e: Exception) {
            mapOf(
                "success" to false,
                "message" to "作业执行失败: ${e.message}",
                "jobId" to jobId
            )
        }
    }
    
    /**
     * 批量执行失败作业
     */
    @PostMapping("/retry-failed")
    fun retryFailedJobs(): Map<String, Any> {
        val deadLetterJobs = managementService.createDeadLetterJobQuery()
            .list()
        
        val results = deadLetterJobs.map { job ->
            try {
                managementService.moveDeadLetterJobToExecutableJob(job.id, 3)
                managementService.executeJob(job.id)
                mapOf("jobId" to job.id, "success" to true)
            } catch (e: Exception) {
                mapOf("jobId" to job.id, "success" to false, "error" to e.message)
            }
        }
        
        val successCount = results.count { it["success"] == true }
        
        return mapOf(
            "total" to deadLetterJobs.size,
            "success" to successCount,
            "failed" to (deadLetterJobs.size - successCount),
            "details" to results
        )
    }
    
    /**
     * 查询待执行的作业
     */
    @GetMapping
    fun listJobs(
        @RequestParam(required = false) processInstanceId: String?,
        @RequestParam(defaultValue = "1") page: Int,
        @RequestParam(defaultValue = "10") size: Int
    ): Map<String, Any> {
        
        val query = managementService.createJobQuery()
        
        processInstanceId?.let {
            query.processInstanceId(it)
        }
        
        val total = query.count()
        val jobs = query.listPage((page - 1) * size, size)
        
        return mapOf(
            "data" to jobs.map { job ->
                mapOf(
                    "id" to job.id,
                    "processInstanceId" to job.processInstanceId,
                    "executionId" to job.executionId,
                    "retries" to job.retries,
                    "duedate" to job.duedate
                )
            },
            "total" to total,
            "page" to page,
            "size" to size
        )
    }
}

示例7:作业执行日志记录

kotlin
import org.flowable.engine.ManagementService
import org.springframework.stereotype.Service
import java.time.LocalDateTime

data class JobExecutionLog(
    val jobId: String,
    val processInstanceId: String?,
    val executionTime: LocalDateTime,
    val success: Boolean,
    val duration: Long,
    val errorMessage: String?
)

@Service
class JobExecutionLogger(
    private val managementService: ManagementService
) {
    
    private val executionLogs = mutableListOf<JobExecutionLog>()
    
    /**
     * 执行作业并记录日志
     * 企业场景:作业执行审计,记录所有手动执行的作业
     */
    fun executeJobWithLogging(jobId: String): JobExecutionLog {
        val job = managementService.createJobQuery()
            .jobId(jobId)
            .singleResult()
            ?: throw IllegalArgumentException("作业不存在")
        
        val startTime = System.currentTimeMillis()
        var success = false
        var errorMessage: String? = null
        
        try {
            managementService.executeJob(jobId)
            success = true
        } catch (e: Exception) {
            errorMessage = e.message
            throw e
        } finally {
            val duration = System.currentTimeMillis() - startTime
            
            val log = JobExecutionLog(
                jobId = jobId,
                processInstanceId = job.processInstanceId,
                executionTime = LocalDateTime.now(),
                success = success,
                duration = duration,
                errorMessage = errorMessage
            )
            
            executionLogs.add(log)
            
            // 这里可以将日志保存到数据库
            println("作业执行日志: $log")
        }
        
        return executionLogs.last()
    }
    
    /**
     * 获取执行日志
     */
    fun getExecutionLogs(): List<JobExecutionLog> {
        return executionLogs.toList()
    }
}

注意事项

1. 作业状态检查

  • 执行前应检查作业是否存在
  • 确认作业不在执行中(避免重复执行)
  • 对于死信作业,需先移回可执行队列

2. 异常处理

  • 作业执行可能失败,务必捕获异常
  • 记录失败原因,便于问题排查
  • 考虑实现重试机制

3. 权限控制

  • 手动执行作业是敏感操作,应限制权限
  • 建议只允许管理员或运维人员执行

4. 作业类型区分

  • 普通异步作业可直接执行
  • 定时作业需先移到可执行队列
  • 死信作业需先恢复重试次数

5. 性能影响

  • 批量执行作业会占用系统资源
  • 建议在业务低峰期进行批量操作

6. 事务一致性

  • 作业执行在独立事务中
  • 失败不影响主流程事务

相关 API

  • ManagementService.executeJob() - 执行作业
  • ManagementService.createJobQuery() - 查询作业
  • ManagementService.createDeadLetterJobQuery() - 查询失败作业
  • ManagementService.moveDeadLetterJobToExecutableJob() - 恢复失败作业
  • ManagementService.moveTimerToExecutableJob() - 触发定时作业
  • ManagementService.setJobRetries() - 设置作业重试次数
  • ManagementService.deleteJob() - 删除作业

最佳实践

1. 作业执行前验证

kotlin
fun safeExecuteJob(jobId: String): Boolean {
    val job = managementService.createJobQuery()
        .jobId(jobId)
        .singleResult()
        ?: return false
    
    if (job.retries == 0) {
        // 恢复重试次数
        managementService.setJobRetries(jobId, 3)
    }
    
    managementService.executeJob(jobId)
    return true
}

2. 异步执行作业

kotlin
@Async
fun executeJobAsync(jobId: String) {
    try {
        managementService.executeJob(jobId)
    } catch (e: Exception) {
        logger.error("异步执行作业失败", e)
    }
}

3. 作业执行超时控制

kotlin
fun executeJobWithTimeout(jobId: String, timeoutSeconds: Long): Boolean {
    return try {
        val future = CompletableFuture.supplyAsync {
            managementService.executeJob(jobId)
        }
        future.get(timeoutSeconds, TimeUnit.SECONDS)
        true
    } catch (e: TimeoutException) {
        println("作业执行超时")
        false
    }
}

本文档说明

  • 基于 Flowable 7.1.0 版本编写
  • 所有示例均可直接在 Spring Boot + Kotlin 项目中使用
  • 示例场景来自真实企业应用,具有实际参考价值