Appearance
executeJob
Flowable 7.1.0 摘要:立即执行指定的异步作业。
方法签名与说明
void executeJob(String jobId)
立即执行一个异步作业。通常用于手动触发失败的作业重试,或者加速异步作业的执行。
Parameters:
- jobId - 作业的唯一标识符,不能为null
Throws:
- FlowableObjectNotFoundException - 当指定ID的作业不存在时
- FlowableException - 当作业执行失败时
常见使用场景
1. 失败作业重试
当异步作业执行失败后,管理员可以手动触发重试,而不是等待自动重试机制。
2. 加速异步执行
某些异步任务需要立即执行,而不是等待作业执行器调度。
3. 作业监控与管理
在作业管理后台,提供手动执行作业的功能,方便运维人员操作。
4. 测试环境调试
在开发测试阶段,手动触发异步作业,观察执行效果。
Kotlin + Spring Boot 调用示例
示例1:重试失败的作业
kotlin
import org.flowable.engine.ManagementService
import org.springframework.stereotype.Service
@Service
class JobRetryService(
private val managementService: ManagementService
) {
/**
* 重试失败的作业
* 企业场景:定时任务执行失败后,管理员手动重试
*/
fun retryFailedJob(jobId: String): Boolean {
return try {
println("开始重试作业: $jobId")
// 执行作业
managementService.executeJob(jobId)
println("作业执行成功")
true
} catch (e: Exception) {
println("作业执行失败: ${e.message}")
false
}
}
}示例2:批量重试失败作业
kotlin
import org.flowable.engine.ManagementService
import org.flowable.job.api.Job
import org.springframework.stereotype.Service
data class JobRetryResult(
val jobId: String,
val success: Boolean,
val errorMessage: String?
)
@Service
class BatchJobRetryService(
private val managementService: ManagementService
) {
/**
* 批量重试失败的作业
* 企业场景:系统恢复后,批量重试之前失败的作业
*/
fun retryAllFailedJobs(processInstanceId: String? = null): List<JobRetryResult> {
// 查询死信作业(失败超过重试次数的作业)
val queryBuilder = managementService.createDeadLetterJobQuery()
processInstanceId?.let {
queryBuilder.processInstanceId(it)
}
val deadLetterJobs = queryBuilder.list()
if (deadLetterJobs.isEmpty()) {
println("没有失败的作业需要重试")
return emptyList()
}
println("找到 ${deadLetterJobs.size} 个失败作业,开始批量重试...")
return deadLetterJobs.map { job ->
try {
// 先将死信作业移回可执行作业队列
managementService.moveDeadLetterJobToExecutableJob(job.id, 3)
// 执行作业
managementService.executeJob(job.id)
JobRetryResult(
jobId = job.id,
success = true,
errorMessage = null
)
} catch (e: Exception) {
JobRetryResult(
jobId = job.id,
success = false,
errorMessage = e.message
)
}
}
}
}示例3:作业执行监控
kotlin
import org.flowable.engine.ManagementService
import org.springframework.stereotype.Service
@Service
class JobExecutionMonitor(
private val managementService: ManagementService
) {
/**
* 执行作业并监控结果
* 企业场景:作业管理后台,实时监控作业执行状态
*/
fun executeJobWithMonitoring(jobId: String): Map<String, Any> {
// 获取作业信息
val job = managementService.createJobQuery()
.jobId(jobId)
.singleResult()
?: throw IllegalArgumentException("作业不存在: $jobId")
val startTime = System.currentTimeMillis()
return try {
println("=== 开始执行作业 ===")
println("作业ID: ${job.id}")
println("流程实例ID: ${job.processInstanceId}")
println("执行配置: ${job.jobHandlerConfiguration}")
// 执行作业
managementService.executeJob(jobId)
val duration = System.currentTimeMillis() - startTime
mapOf(
"jobId" to jobId,
"status" to "成功",
"duration" to "${duration}ms",
"message" to "作业执行成功"
)
} catch (e: Exception) {
val duration = System.currentTimeMillis() - startTime
mapOf(
"jobId" to jobId,
"status" to "失败",
"duration" to "${duration}ms",
"error" to e.message,
"stackTrace" to e.stackTraceToString()
)
}
}
}示例4:定时作业手动触发
kotlin
import org.flowable.engine.ManagementService
import org.springframework.stereotype.Service
@Service
class TimerJobManualTrigger(
private val managementService: ManagementService
) {
/**
* 手动触发定时作业
* 企业场景:不等待定时器到期,立即触发执行
*/
fun triggerTimerJob(processInstanceId: String, activityId: String): String {
// 查询定时作业
val timerJobs = managementService.createTimerJobQuery()
.processInstanceId(processInstanceId)
.list()
val targetJob = timerJobs.find { job ->
job.jobHandlerConfiguration?.contains(activityId) == true
} ?: throw IllegalArgumentException("找不到对应的定时作业")
println("找到定时作业: ${targetJob.id}")
println("原定执行时间: ${targetJob.duedate}")
// 将定时作业移到可执行队列
managementService.moveTimerToExecutableJob(targetJob.id)
// 立即执行
managementService.executeJob(targetJob.id)
return "定时作业已手动触发执行"
}
}示例5:异步服务任务立即执行
kotlin
import org.flowable.engine.ManagementService
import org.flowable.engine.RuntimeService
import org.springframework.stereotype.Service
@Service
class AsyncTaskExecutor(
private val managementService: ManagementService,
private val runtimeService: RuntimeService
) {
/**
* 立即执行异步服务任务
* 企业场景:发送邮件、调用外部接口等异步任务需要立即执行
*/
fun executeAsyncTask(processInstanceId: String, taskName: String): Boolean {
// 查询该流程实例的所有作业
val jobs = managementService.createJobQuery()
.processInstanceId(processInstanceId)
.list()
if (jobs.isEmpty()) {
println("该流程实例没有待执行的异步作业")
return false
}
println("找到 ${jobs.size} 个异步作业")
// 执行所有作业
jobs.forEach { job ->
try {
println("执行作业: ${job.id}")
managementService.executeJob(job.id)
} catch (e: Exception) {
println("作业执行失败: ${e.message}")
throw e
}
}
return true
}
}示例6:作业管理REST API
kotlin
import org.flowable.engine.ManagementService
import org.springframework.web.bind.annotation.*
@RestController
@RequestMapping("/api/jobs")
class JobManagementController(
private val managementService: ManagementService
) {
/**
* 执行指定作业
*/
@PostMapping("/{jobId}/execute")
fun executeJob(@PathVariable jobId: String): Map<String, Any> {
return try {
managementService.executeJob(jobId)
mapOf(
"success" to true,
"message" to "作业执行成功",
"jobId" to jobId
)
} catch (e: Exception) {
mapOf(
"success" to false,
"message" to "作业执行失败: ${e.message}",
"jobId" to jobId
)
}
}
/**
* 批量执行失败作业
*/
@PostMapping("/retry-failed")
fun retryFailedJobs(): Map<String, Any> {
val deadLetterJobs = managementService.createDeadLetterJobQuery()
.list()
val results = deadLetterJobs.map { job ->
try {
managementService.moveDeadLetterJobToExecutableJob(job.id, 3)
managementService.executeJob(job.id)
mapOf("jobId" to job.id, "success" to true)
} catch (e: Exception) {
mapOf("jobId" to job.id, "success" to false, "error" to e.message)
}
}
val successCount = results.count { it["success"] == true }
return mapOf(
"total" to deadLetterJobs.size,
"success" to successCount,
"failed" to (deadLetterJobs.size - successCount),
"details" to results
)
}
/**
* 查询待执行的作业
*/
@GetMapping
fun listJobs(
@RequestParam(required = false) processInstanceId: String?,
@RequestParam(defaultValue = "1") page: Int,
@RequestParam(defaultValue = "10") size: Int
): Map<String, Any> {
val query = managementService.createJobQuery()
processInstanceId?.let {
query.processInstanceId(it)
}
val total = query.count()
val jobs = query.listPage((page - 1) * size, size)
return mapOf(
"data" to jobs.map { job ->
mapOf(
"id" to job.id,
"processInstanceId" to job.processInstanceId,
"executionId" to job.executionId,
"retries" to job.retries,
"duedate" to job.duedate
)
},
"total" to total,
"page" to page,
"size" to size
)
}
}示例7:作业执行日志记录
kotlin
import org.flowable.engine.ManagementService
import org.springframework.stereotype.Service
import java.time.LocalDateTime
data class JobExecutionLog(
val jobId: String,
val processInstanceId: String?,
val executionTime: LocalDateTime,
val success: Boolean,
val duration: Long,
val errorMessage: String?
)
@Service
class JobExecutionLogger(
private val managementService: ManagementService
) {
private val executionLogs = mutableListOf<JobExecutionLog>()
/**
* 执行作业并记录日志
* 企业场景:作业执行审计,记录所有手动执行的作业
*/
fun executeJobWithLogging(jobId: String): JobExecutionLog {
val job = managementService.createJobQuery()
.jobId(jobId)
.singleResult()
?: throw IllegalArgumentException("作业不存在")
val startTime = System.currentTimeMillis()
var success = false
var errorMessage: String? = null
try {
managementService.executeJob(jobId)
success = true
} catch (e: Exception) {
errorMessage = e.message
throw e
} finally {
val duration = System.currentTimeMillis() - startTime
val log = JobExecutionLog(
jobId = jobId,
processInstanceId = job.processInstanceId,
executionTime = LocalDateTime.now(),
success = success,
duration = duration,
errorMessage = errorMessage
)
executionLogs.add(log)
// 这里可以将日志保存到数据库
println("作业执行日志: $log")
}
return executionLogs.last()
}
/**
* 获取执行日志
*/
fun getExecutionLogs(): List<JobExecutionLog> {
return executionLogs.toList()
}
}注意事项
1. 作业状态检查
- 执行前应检查作业是否存在
- 确认作业不在执行中(避免重复执行)
- 对于死信作业,需先移回可执行队列
2. 异常处理
- 作业执行可能失败,务必捕获异常
- 记录失败原因,便于问题排查
- 考虑实现重试机制
3. 权限控制
- 手动执行作业是敏感操作,应限制权限
- 建议只允许管理员或运维人员执行
4. 作业类型区分
- 普通异步作业可直接执行
- 定时作业需先移到可执行队列
- 死信作业需先恢复重试次数
5. 性能影响
- 批量执行作业会占用系统资源
- 建议在业务低峰期进行批量操作
6. 事务一致性
- 作业执行在独立事务中
- 失败不影响主流程事务
相关 API
ManagementService.executeJob()- 执行作业ManagementService.createJobQuery()- 查询作业ManagementService.createDeadLetterJobQuery()- 查询失败作业ManagementService.moveDeadLetterJobToExecutableJob()- 恢复失败作业ManagementService.moveTimerToExecutableJob()- 触发定时作业ManagementService.setJobRetries()- 设置作业重试次数ManagementService.deleteJob()- 删除作业
最佳实践
1. 作业执行前验证
kotlin
fun safeExecuteJob(jobId: String): Boolean {
val job = managementService.createJobQuery()
.jobId(jobId)
.singleResult()
?: return false
if (job.retries == 0) {
// 恢复重试次数
managementService.setJobRetries(jobId, 3)
}
managementService.executeJob(jobId)
return true
}2. 异步执行作业
kotlin
@Async
fun executeJobAsync(jobId: String) {
try {
managementService.executeJob(jobId)
} catch (e: Exception) {
logger.error("异步执行作业失败", e)
}
}3. 作业执行超时控制
kotlin
fun executeJobWithTimeout(jobId: String, timeoutSeconds: Long): Boolean {
return try {
val future = CompletableFuture.supplyAsync {
managementService.executeJob(jobId)
}
future.get(timeoutSeconds, TimeUnit.SECONDS)
true
} catch (e: TimeoutException) {
println("作业执行超时")
false
}
}本文档说明
- 基于 Flowable 7.1.0 版本编写
- 所有示例均可直接在 Spring Boot + Kotlin 项目中使用
- 示例场景来自真实企业应用,具有实际参考价值