Skip to content

流程引擎

钟勋 edited this page Dec 6, 2020 · 21 revisions

简介

流程引擎可以对复杂的业务流程进行编排,让繁琐的业务逻辑变得清晰明了易维护。同时流程引擎可作为分布式事务解决方案saga模式的一种实现。

在我开发流程引擎时并不了解saga模式,只是觉得需要有一个框架来简化繁杂的业务流程代码,保证分布式环境下事务的最终一致性。后来了解到saga模式,才发现它的思想和我的设计一拍即合无甚差别。如果你在实践saga模式,不妨使用流程引擎试试。同时流程引擎并不局限于使用在saga模式场景下,你可以把它作为普通的流程编排框架用来简化你的业务代码。

1. 引入流程引擎依赖

<dependency>
    <groupId>org.bekit</groupId>
    <artifactId>flow</artifactId>
    <version>1.3.0.RELEASE</version>
</dependency>

2. 配置

  • 如果是spring-boot项目则不需要进行任何配置。
  • 如果是非spring-boot项目则需要手动引入流程引擎配置类FlowEngineConfiguration。比如:
@Configuration
@Import(org.bekit.flow.boot.FlowEngineConfiguration.class)
public class MyImport {
}

3. 使用流程引擎

使用流程引擎步骤:1、定义流程编排,2、定义处理器,3、定义特定流程映射器(可选),4、定义特定流程加锁器(可选),5、定义特定流程监听器(可选)

以下通过一个转账交易流程进行演示,流程图如下:

3.1 定义流程编排

流程编排的职责就是定义一个流程内所有的节点,并把这些节点之间的流转表示出来。

// 转账交易流程
@Flow
public class TransferFlow {

    // 付款人下账节点  @StartNode是开始节点,一个流程中必须存在一个开始节点
    @StartNode(processor = "downPayerProcessor")    // processor属性表示本节点对应的处理器,不填表示本节点没有处理器
    public String downPayer(ResultStatus resultStatus) {    // 处理器的返回结果会以入参形式传进来
        // 本方法的作用是:根据处理器的返回结果,选择下一个流程节点名称
        switch (resultStatus) {
            case SUCCESS:
                return "upPayee";
            case FAIL:
                return "fail";
            case PROCESSING:
                return null;    // 当返回null时,流程引擎会中断流程的执行
            default:
                throw new RuntimeException("处理器返回结果不合法");
        }
    }

    // 收款人上账节点  @PhaseNode是阶段节点,是最常用的节点类型,流程是一个阶段一个阶段的执行
    @PhaseNode(processor = "upPayeeProcessor")
    public String upPayee(ResultStatus resultStatus) {
        switch (resultStatus) {
            case SUCCESS:
                return "sendSuccessMessage";
            case FAIL:
                return "restorePayer";
            case PROCESSING:
                return null;    // 和上面情况一样需要返回null,因为处理器返回结果是处理中,需要暂停执行,直到得到明确结果后才能跳转到下个节点
            default:
                throw new RuntimeException("处理器返回结果不合法");
        }
    }

    // 恢复付款人资金节点,付款人下账成功,但是收款人上账失败了,也就是说这笔转账交易失败,
    // 接下来需要把付款人的钱恢复回去,并且必须得保证恢复成功
    @PhaseNode(processor = "restorePayerProcessor")
    public String restorePayer(ResultStatus resultStatus) {
        switch (resultStatus) {
            case SUCCESS:
                return "fail";
            case FAIL:
                return "waitingRestorePayer";
            case PROCESSING:
                return null;
            default:
                throw new RuntimeException("处理器返回结果不合法");
        }
    }

    // 等待恢复付款人资金节点   @PauseNode是暂停节点,流程执行到这个节点会暂停执行,只有这个节点是第一个被执行的节点时,流程才会继续执行
    @PauseNode
    public String waitingRestorePayer() {
        return "restorePayer";
    }

    // 发送转账成功通知   @TransientNode是瞬态节点,它是没有独立状态的,在它执行前不会对事务做任何处理(不会提交事务,也不会新建事务)
    @TransientNode(processor = "sendSuccessMessageProcessor")
    public String sendSuccessMessage() {
        return "success";
    }

    // 成功节点   @EndNode是结束节点,流程执行到这个节点会结束执行
    @EndNode
    public void success() {
    }

    // 失败节点
    @EndNode
    public void fail() {
    }
}

流程通过@Flow注解标识,流程中的节点类型分为开始节点(@StartNode)、阶段节点(@PhaseNode)、瞬态节点(@TransientNode)、暂停节点(@PauseNode)、结束节点(@EndNode)这几种类型,其中瞬态节点是非状态节点,其他类型都是状态节点。

  1. 开始节点(@StartNode):每个流程都必须有一个唯一的开始节点,开始节点是一个流程最开始执行的节点。
  2. 阶段节点(@PhaseNode):阶段节点是最常用的节点类型,可以把流程想象成是通过一个个阶段来执行的。
  3. 瞬态节点(@TransientNode):瞬态节点是一种单纯的处理节点,是依附在其他节点上的节点。它是没有独立状态的,在它执行前不会对事务做任何处理(不会提交事务,也不会新建事务)。
  4. 暂停节点(@PauseNode):流程执行到暂停节点会暂停执行,只有这个节点是第一个被执行的节点时,流程才会继续执行。
  5. 结束节点(@EndNode):结束节点是流程结束的标志,当流程跳转到结束节点时,流程会自动结束(结束节点的方法体不会被执行)。

3.2 定义处理器

处理器是每个节点执行的具体业务代码,同一处理器可以被多个节点共同使用。

// 付款方下账处理器
@Processor
public class DownPayerProcessor {
    // 处理器执行方法,本方法的返回值会作为整个处理器的返回值
    @ProcessorExecute
    public ResultStatus execute(FlowContext<Transfer> context) throws TimeoutException {
        Transfer transfer = context.getTarget();
        // 具体业务逻辑
        return ResultStatus.SUCCESS;
    }
}

处理器通过@Processor进行标注,处理器执行方法必须只能有一个入参且类型必须是FlowContext。FlowContext是目标上下文,可以通过它获取你传给流程引擎的目标对象(context.getTarget())。

3.3 定义特定流程映射器(可选)

// 转账交易流程映射器
@TheFlowMapper(flow = "transferFlow")
public class TransferFlowMapper {
    // 映射出节点,根据目标对象的状态等信息映射到一个流程节点,流程引擎将从这个节点进行执行
    @MappingNode
    public String mappingNode(FlowContext<Transfer> context) {    // 入参就是流程上下文
        TransferStatus status = context.getTarget().getStatus();
        // bekit专门提供了EnumUtils工具类,用于在节点名称与状态枚举之间进行转换
        return EnumUtils.getCamelCaseName(status);
        // 上一行代码等同于下面代码
        // switch (status){
        //    case DOWN_PAYER:
        //        return "downPayer";
        //    case UP_PAYEE:
        //        return "upPayee";
        //    case RESTORE_PAYER:
        //        return "restorePayer";
        //    case WAITING_RESTORE_PAYER:
        //        return "waitingRestorePayer";
        //    case SUCCESS:
        //        return "success";
        //    case FAIL:
        //        return "fail";
    }
}

3.4 定义特定流程加锁器(可选)

// 转账交易流程加锁器
// 在并发情况下需要用锁来控制并发,你需要在这里实现具体锁住目标对象的代码
@TheFlowLocker(flow = "transferFlow")
public class TransferFlowLocker {
    @Autowired
    private TransferDao transferDao;

    // 加流程锁,流程执行前会调用本方法(可选)
    @FlowLock
    public Transfer flowLock(FlowContext<Transfer> context) {
        log.info("对流程[transfer]加流程锁(仅用于演示。实际场景可用分布式锁实现)");
        return context.getTarget();
    }

    // 加状态锁,每个状态节点执行前会调用本方法(可选)
    @StateLock
    public Transfer stateLock(FlowContext<Transfer> context) {
        log.info("对流程[transfer]加状态锁(利用数据库的行级悲观锁实现)");
        // 这里采用数据库行级悲观锁的形式锁住目标对象
        Transfer transfer = context.getTarget();
        transfer = transferDao.findLockByBizNo(transfer.getBizNo());
        return transfer;
    }

    // 解状态锁,下一个状态节点执行前会调用本方法(可选)
    @StateUnlock
    public void stateUnlock(FlowContext<Transfer> context) {
        log.info("对流程[transfer]解状态锁(仅用于演示。由于事务提交后,数据库的行级悲观锁会自动释放,所以无需手动解锁)");
    }

    // 解流程锁,流程执行后会调用本方法(可选)
    @FlowUnlock
    public void flowUnlock(FlowContext<Transfer> context) {
        log.info("对流程[transfer]解流程锁(仅用于演示)");
    }
}

3.5 定义特定流程监听器(可选)

// 转账交易流程监听器
@TheFlowListener(flow = "transferFlow")
public class TransferFlowListener {
    @Autowired
    private TransferDao transferDao;

    @ListenFlowStart
    public void listenFlowStart(FlowContext<Transfer> context) {
        // 仅用于演示,可根据实际场景选择是否监听本事件
        log.info("流程[transfer]开始执行");
    }

    @ListenExecutingNode
    public void listenExecutingNode(String node, FlowContext<Transfer> context) {
        // 仅用于演示,可根据实际场景选择是否监听本事件
        log.info("流程[transfer]即将执行节点[{}]", node);
    }

    @ListenDecidedNode
    public void listenDecidedNode(String node, FlowContext<Transfer> context) {
        // 仅用于演示,可根据实际场景选择是否监听本事件
        log.info("流程[transfer]选择节点[{}]作为下一个要执行的节点", node);
    }

    // 监听状态节点选择事件(主要作用就是用来修改目标对象的状态)
    // 入参node表示被选择的状态节点,context是流程上下文
    @ListenDecidedStateNode
    public void listenDecidedStateNode(String node, FlowContext<Transfer> context) {
        // bekit专门提供了EnumUtils工具类,用于在节点名称与状态枚举之间进行转换
        TransferStatus status = EnumUtils.getEnum(TransferStatus.class, node);
        log.info("节点[{}]是状态节点,更新交易记录状态为[{}]", node, status);
        // 根据被选择的节点修改目标对象到对应的状态,
        Transfer transfer = context.getTarget();
        transfer.setStatus(status);
        transferDao.save(transfer);
    }

    // 监听流程异常事件,当流程发生任何异常时都会发送这个事件
    @ListenFlowException
    public void listenFlowException(Throwable throwable, FlowContext<Transfer> context) {
        // 仅用于演示,可根据实际场景选择是否监听本事件
        log.info("流程[transfer]执行过程中发生异常:{}", throwable.getMessage());
    }

    @ListenFlowEnd
    public void listenFlowEnd(FlowContext<Transfer> context) {
        // 仅用于演示,可根据实际场景选择是否监听本事件
        log.info("流程[transfer]执行结束");
    }
}

3.6 执行流程

在需要执行流程的地方注入FlowEngine

@Autowired
private FlowEngine flowEngine;

执行流程

// 创建交易
Transfer transfer = new Transfer();
transfer.setPayerAccountId("123");
transfer.setPayeeAccountId("456");
transfer.setAmount(100);
transfer.setStatus(TransferStatus.DOWN_PAYER);
transferDao.save(transfer);   // 保存到数据库
// 执行流程
transfer = flowEngine.execute("transferFlow", transfer);
log.info("转账交易执行结果:{}", transfer.getStatus());