0%

Flowable 执行流程

Flowable 执行流程

一切的起始:基于命令模式的流程引擎

在 Flowable 中,每一个操作都对应着一个 Command,在一个执行周期中(也可以理解成事务周期),会顺序的执行任务直至到暂停节点或终点。

Main -> RuntimeService: trigger
RuntimeService -> CommandExecutor: execute(Cmd) \n 将trigger调用包装成命令 
CommandExecutor -> CommandInterceptor: execute(Cmd) \n 责任链执行拦截器 
CommandInterceptor -> CommandInterceptor: LogInterceptor \n 打印命令执行信息
CommandInterceptor -> CommandContextInterceptor: 装填上下文
CommandContextInterceptor -> CommandContext: create()
CommandContext -> CommandContext: 装填session、command信息
CommandContextInterceptor -> CommandContextInterceptor: 命令入栈管理
CommandInterceptor -> CommandInterceptor: TransactionContextInterceptor \n 创建DB事务上下文
CommandInterceptor -> CommandInterceptor: CommandInvoker \n 1.入队封装的Command \n 2.执行封装的Command
CommandInterceptor -> CommandExecutor: 返回结果
CommandInterceptor -> CommandInterceptor: TransactionContextInterceptor \n 移除DB事务上下文
CommandInterceptor -> CommandContextInterceptor: 关闭上下文
CommandContextInterceptor -> CommandContext: close()
CommandContext -> CommandContext: 移除上下文
CommandContext -> CommandContext: 调用Listener的close()
CommandContext -> CommandContext: 清空缓存 flushSession()
CommandContext -> CommandContext: 调用Listener的AfterSessionsFlush\n(包含TransactionContextInterceptor事务提交)
CommandContext -> CommandContext: 调用Listener的closed()
CommandContext -> CommandContext: 关闭Sessions closeSession()
CommandContextInterceptor -> CommandContext: 移除上下文

流程部署

代码示例

// 手动配置数据库
ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
            .setJdbcUrl("jdbc:mysql://127.0.0.1:3306/flowable?characterEncoding=UTF-8")
            .setJdbcUsername("root")
            .setJdbcPassword("123456")
            .setJdbcDriver("com.mysql.cj.jdbc.Driver")
            .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);

// 根据配置内容创建 ProcessEngine
ProcessEngine processEngine = cfg.buildProcessEngine();
// 获取 RepositoryService
RepositoryService repositoryService = processEngine.getRepositoryService();
// 通过 RepositoryService 发布流程单元
Deployment deployment = repositoryService.createDeployment()
  .addClasspathResource("holiday-request.bpmn20.xml") 
  // 需要注意的是,文件名必须以 bpmn 或 bpmn20.xml 结尾
  // 详细原因见,org.flowable.engine.impl.bpmn.deployer.ParsedDeploymentBuilder#build
  // 在 isBpmnResource(resource.getName()) 行做了判断跳过
  .name("请假流程")
  .category("请假")
  .tenantId("system")
  .deploy();

主要流程

  1. 封装为命令模式进行处理

    class DeployCmd<T> implements Command<Deployment>
  2. Deployment 部署前准备

    reference:org.flowable.engine.impl.cmd.DeployCmd#executeDeploy

    🌟 包含持久化 ACT_RE_DEPLOYMENT

  3. 委托给 EngineDeployer 的实现类进行处理

    🌟 BpmnDeployer 包含持久化 ACT_RE_PROCDEF

  4. 广播事件 ENTITY_INITIALIZED

TIP

org.flowable.engine.impl.persistence.entity.TableDataManagerImpl#entityToTableNameMap 记录了表和 PO 的映射

启动流程实例

代码示例

// 获取流程引擎对象
ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
// 启动流程实例通过 RuntimeService 对象
RuntimeService runtimeService = processEngine.getRuntimeService();
// 构建流程变量
Map<String,Object> variables = new HashMap<>();
variables.put("employee", "wang");// 谁申请请假
variables.put("nrOfHolidays", 1); // 请几天假
variables.put("description", "抽烟喝酒烫头"); // 请假的原因
// 启动流程实例,第一个参数是流程定义的id
ProcessInstance processInstance = runtimeService
  .startProcessInstanceById("holiday:1:4", variables);// 启动流程实例
// 输出相关的流程实例信息
System.out.println("流程定义的ID:" + processInstance.getProcessDefinitionId());
System.out.println("流程实例的ID:" + processInstance.getId());
System.out.println("当前活动的ID:" + processInstance.getActivityId());

主要流程

  1. 封装为命令链模式进行处理

    class StartProcessInstanceCmd<T> implements Command<ProcessInstance>
  2. 启动初始流程

    org.flowable.engine.impl.util.ProcessInstanceHelper#createAndStartProcessInstanceWithInitialFlowElement
  3. 准备 Execution 实例

    🌟 包含持久化 ACT_RU_EXECUTION

    🌟 包含广播事件 ENTITY_CREATED(ProcessInstanceExecution)

  4. 记录 History 数据

    🌟 包含持久化 ACT_HI_PROCINST

    🌟 包含广播事件 HISTORIC_PROCESS_INSTANCE_CREATED

  5. 广播事件 PROCESS_CREATED

  6. 装填 process_variables 和 transient_variables 变量

  7. 广播事件 ENTITY_INITIALIZED(ProcessInstance)

  8. 创建 ChildExecution

    🌟 包含持久化 ACT_RU_EXECUTION

    🌟 包含广播事件 ENTITY_CREATED(ChildExecution)ENTITY_INITIALIZED(ChildExecution)

  9. 开始 Process 处理

    protected void executeProcessStartExecutionListeners() {
           org.flowable.bpmn.model.Process process = ProcessDefinitionUtil.getProcess(execution.getProcessDefinitionId());
           executeExecutionListeners(process, execution.getParent(), ExecutionListener.EVENTNAME_START);
       }
  10. FlowNode Behavior 前处理

    🌟 包含持久化 ACT_HI_ACTINST(新增数据库条目,录入活动开始时间)

    🌟 包含广播事件 HISTORIC_ACTIVITY_INSTANCE_CREATED(HistoricActivityInstanceEntity)

    🌟 包含广播事件 ACTIVITY_STARTED

  11. 执行 FlowNode Behavior

    Reference: org.flowable.engine.impl.agenda.ContinueProcessOperation#executeActivityBehavior

    Behavior 由于有多实现,因此持久化 / 广播各不相同

  12. FlowNode Behavior 后处理

    🌟 包含持久化 ACT_HI_ACTINST(录入活动结束时间)

    🌟 包含广播事件 HISTORIC_ACTIVITY_INSTANCE_ENDED(HistoricActivityInstanceEntity)

    🌟 包含广播事件 ACTIVITY_COMPLETED

  13. 带动后续 SequenceFlow 执行

    Reference:

    org.flowable.engine.impl.agenda.TakeOutgoingSequenceFlowsOperation#handleFlowNode { leaveFlowNode }

    🌟 包含广播事件 SEQUENCEFLOW_TAKEN

  14. 带动后续 FlowNode 执行,返回步骤 11 递归

  15. 执行到需要暂停或是终止的节点,退出递归,提交事务资源

任务提交流程

代码示例

Scanner scanner = new Scanner(System.in);

TaskService taskService = processEngine.getTaskService();
List<Task> tasks = taskService.createTaskQuery().taskCandidateGroup("managers").list();
System.out.println("You have " + tasks.size() + " tasks:");
for (int i = 0; i < tasks.size(); i++) {
  System.out.println("----------------");
  System.out.println((i + 1) + ") " + tasks.get(i).getName());
  System.out.println("----------------");
}

System.out.println("Which task would you like to complete?");
int taskIndex = Integer.parseInt(scanner.nextLine());
Task task = tasks.get(taskIndex - 1);
Map<String, Object> processVariables = taskService.getVariables(task.getId());
System.out.println(processVariables.get("employee") + " wants " +
                   processVariables.get("nrOfHolidays") + " of holidays. Do you approve this?");

boolean approved = scanner.nextLine().toLowerCase().equals("y");
Map<String, Object> variables = new HashMap<>();
variables.put("approved", approved);
taskService.complete(task.getId(), variables);

主要流程

  1. 封装为命令链模式进行处理

    class CompleteTaskCmd extends NeedsActiveTaskCmd<Void>
  2. 广播事件 TASK_COMPLETED(TaskEntity)

  3. Task 关联项后处理

    1. 广播事件 TaskListener.EVENTNAME_DELETE

    2. 删除 SubTask

    3. 删除 IdentityLink

      🌟 包含广播事件 ENTITY_DELETED(IdentityLinkEntity)

      🌟 包含根据 TaskId 删除持久化 ACT_RU_IDENTITYLINK

    4. 删除 ByteArray

      🌟 包含广播事件 ENTITY_DELETED(ByteArrayEntity)

      🌟 包含根据 TaskId 删除持久化 ACT_GE_BYTEARRAY

    5. 删除 Variables

      🌟 包含广播事件 ENTITY_DELETED(VariableInstanceEntity)

      🌟 包含根据 TaskId 删除持久化 ACT_RU_VARIABLE

    6. 记录历史任务

      🌟 包含持久化 ACT_HI_TASKINST(录入任务结束时间)

  4. Task 后处理

    1. 删除 Task

      🌟 包含广播事件 ENTITY_DELETED

      🌟 包含删除持久化 ACT_RU_TASK

    2. 调起后续流程

      Reference:

      org.flowable.engine.FlowableEngineAgenda#planTriggerExecutionOperation