李晶(大晶)
一、背景
业务系统在发展的过程中,业务的逻辑越来越复杂,简单的逻辑步骤拆分已经不能快速适应业务的变化,大部分时候都是在主流程上打补丁的方式进行业务支持。而且大量的业务逻辑被隐藏在实现细节中,无法从全局看到整体的业务流程。所以在此背景下,需要一个业务编排框架帮助我们将代码逻辑以组件化的方式组织起来,达到快速配置和快速了解业务全貌的作用。
二、实践简述
2.1 代码结构
在流程编排后的代码结构如下,目前编排后的代码架构,已经达到快速配置和快速了解业务全貌的作用
编排前:复杂的业务逻辑通常使用AO封装,简单的业务逻辑使用Service封装,随着业务需求越来越复杂,AO和service的依赖关系逐渐变成网状,业务逻辑嵌套很深,无法快速了解整个业务处理的核心节点。
编排后:复杂的业务逻辑使用组件封装,组件只会使用各种service进行处理,每个业务流程配置所需要的组件,组件可以通过出入参适配进行组件复用。
2.2 业务编排
一个业务编排应有的元素如下图,也通常是我们进行业务编排核心需要考虑到的点。
首先:需要一个贯穿整个流程编排的上下文数据流,每个组件都处理整个上下文数据流
其次:使用编排框架,根据上下文数据流路由到对应的业务编排中,由框架中的执行引擎根据编排执行组件
三、业务编排框架
3.1 技术选型
目前使用的框架是公司内部封装产品,该产品框架在技术选型上,主要是基于Spring 的schema 扩展实现自定义标签,因为我们的项目都是基于Spring 框架的,所以结合起来会比较方便。
3.2 功能介绍
目前框架已实现的功能如下:
流程的配置化: 将分散的业务流程抽象成核心节点,这些节点称之为组件,编排配置这些组件,就可以顺序执行。
条件控制:除了顺序执行组件,也具备不同条件下的分支路由
组件之间的适配:相似业务逻辑的组件,由于不同的出入参可以由适配组件进行适配转换,增加组件的复用
业务组件对流程的成功和失败监听:组件执行中的异常捕捉处理,异常组件的日志打印,或者某一组件失败其他组件需要进行回滚操作,都可以支持
组件的并发执行:为了提高流程的整体性能, 可以将没有依赖关系的组件设置为并发执行
3.3 框架实现
3.3.1 整体架构
3.3.2 框架说明
框架整体分为3部分;
流程解析:是用来解析自定义的Spring 标签,解析后的标签会生成不同的流程组件实例
组件定义:封装业务逻辑的组件和控制流程流转的组件
引擎执行部分:为不同的业务定义不同的流程,同时提供根据不同业务进行流程路由的能力
3.3.3 组件说明
从上面的图中可以看出,组件被分为了两类组件,结构组件和业务组件
业务组件:是定义的一组业务接口由使用方去实现
结构组件:是框架层面实现的,它将不同的业务组件以不同的规则组织起来。
比如pipeline组件是包含了一组的业务组件;ChooseValve实现了条件选择控制,但是具体的业务逻辑选择是由业务方实现Condition或者ChooseCondition接口来决定的;RefValve是对普通Spring Bean 的包装。
3.4 调用流程
下面就是一个业务流程执行时的过程,可以清楚的看到,结构组件在其中的作用,它其实就是具有某些特殊功能的组件。
四、上下文对象
公司的业务编排框架并未对上下文对象做定义,而是支持使用方自主定义,因此这里也简单介绍下目前使用的上下文对象的定义,如下图:
4.1 类图说明
Context
MediatorExt
MediatorList
MediatorDTO
类设计采用中继者模式,利用泛型,达到快速存储扩展信息的功能,在上下文数据传递中使用非常高效。结合现有的具体业务,上下文对象共有3中应用场景;出参为空、出参为单个对象、出参为LIst对象
4.2 示例说明
出参为空示例
public class CartInfoQueryComponent extends AbstractCartPutItemComponent {
private static final Logger LOG = LoggerFactory.getLogger(CartInfoQueryComponent.class);
@Autowired
private CartStrategyContext cartStrategyContext;
@Override
public void handler(CartPutItemDTO cartPutItemDTO, Context<Void> result) {
//存储扩展对象
result.setExtDomain(MediatorExt.CART_INFO_RESULT, cartItemDTOs);
}
}
出参为单对象示例
public class GeneralAddressComponent extends AbstractPlaceOrderComponent {
private static final Logger LOGGER = LoggerFactory.getLogger(GeneralAddressComponent.class);
@Autowired
private AddressService addressService;
@Autowired
private ActivityShopAddressAdapter activityShopAddressAdapter;
@Override
public void handler(PlaceOrderDTO placeOrder, Context<PlaceOrderResult> context) {
//获取扩展信息
Buyer buyer = context.getExtDomain(MediatorExt.BUYER);
}
出参为List对象示例
public class ItemBaseLimitComponent extends AbstractOrderComponent {
@Autowired
private ItemLimitService limitService;
@Override
public void handler(PlaceOrderConfirmDTO param, Context<MediatorList<ItemConfirmDTO>> result) {
List<MediatorDTO<ItemConfirmDTO>> mediatorList = result.getDomain().getMediatorList();
for (MediatorDTO<ItemConfirmDTO> mediatorDTO : mediatorList) {
ItemConfirmDTO itemConfirmDTO = mediatorDTO.getDomain();
itemConfirmDTO.setId(1L)
mediatorDTO.setExtDomain(MediatorExt.LIMIT_INFO, itemLimitDTO);
}
}
五、应用实践
5.1 普通场景
5.1.1 示例
下面以一个实践应用场景来看下是如何配置流程的,使用执行引擎的实例,根据业务key路由到对应的流程编排中。普通的示例中使用到了 chooseValve选择组件、adapter适配组件,每个业务组件ref 引用的都是一个Spring Bean的实例。两个不同的场景共用了大部分的业务组件,只针对各自的场景做了少量个性化业务组件,通过这种配置化,实现了业务逻辑的快速组装,而且也能很方便的插入新的组件。
Context<MediatorList> result = new Context<>();
try {
// 根据query路由到对应的engineExecutor实例,再调用execute方法
tradeListEngineExecutor.getEngineExecutor(query).execute(tradeFormForQuery, result);
} catch (Exception buyException) {
// 流程的中断是通过抛出异常为中断的,所以需要做好异常捕获,可以通过异常的类型来区分是系统异常还是业务异常
LOGGER.error("query TradeList exception ,appOrderQuery={},e={}", query, buyException);
RestResult<List<BaseTradeListTO>> restResult = RestResult.fail(buyException.getMessage());
return restResult;
}
<!--///////////////////////////////////////////流程编排定义/////////////////////////////////////////////////////// -->
<bean id="tradeListEngineExecutor" class="com.yt.buy.biz.engine.EngineExecutorManager">
<property name="engineExecutorMap">
<map>
<entry value-ref="appQueryTradeListEngine" key="APP_QUERY_PROGRAM"/>
<entry value-ref="omoQueryTradeListEngine" key="OMO_WX_QUERY_PROGRAM"/>
</map>
</property>
<property name="selectorName" value="trade-list"/>
</bean>
<!--定义具体的业务编排和异常处理器-->
<bean id="appQueryTradeListEngine" class="com.yangt.engine.choreography.executor.EngineExecutorImpl">
<!--指定业务编排-->
<property name="pipeline" ref="appQueryTradeListPipeline"/>
<!--指定异常处理器-->
<property name="exceptionHandler" ref="tradeListEngineExceptionHandler"/>
</bean>
<!--具体的业务编排-->
<hipac:pipeline label="appQueryPipeline" id="appQueryTradeListPipeline">
<hipac:choosevalve refcondition="queryTradeListChooseCondition">
<hipac:when resultcompare="30">
<hipac:refvalve ref="tradeListWaitRateInfoComponent"/>
</hipac:when>
<hipac:otherwise>
<hipac:adapter refAdapter="tradeListQueryParamAdapter">
<hipac:refvalve ref="shopTradeListInitComponent"/>
</hipac:adapter>
</hipac:otherwise>
</hipac:choosevalve>
<hipac:refvalve ref="tradeListItemSnapshotComponent"/>
<hipac:adapter refAdapter="tradeListOrderComponentAdapter">
<hipac:refvalve ref="tradeListMultiRefundInfoComponent"/>
</hipac:adapter>
<hipac:refvalve ref="tradeListRefundComponent"/>
<hipac:refvalve ref="tradeListItemInfoComponent" label="查询商品基本信息" />
<hipac:refvalve ref="tradeListItemMaterialComponent"/>
<hipac:refvalve ref="tradeListRateStatusInfoComponent"/>
<hipac:refvalve ref="tradeListProfitComponent"/>
<hipac:refvalve ref="tradeListBatchSpecInfoComponent"/>
<hipac:refvalve ref="tradeListStoreInfoComponent"/>
<hipac:refvalve ref="tradeListImInfoComponent"/>
<hipac:refvalve ref="tradeListQualificationComponent" label="查询资质文件" />
</hipac:pipeline>
<!--不同场景下组件复用和个性化定义-->
<bean id="omoQueryTradeListEngine" class="com.yangt.engine.choreography.executor.EngineExecutorImpl">
<!--指定业务编排-->
<property name="pipeline" ref="omoQueryTradeListPipeline"/>
<!--指定异常处理器-->
<property name="exceptionHandler" ref="tradeListEngineExceptionHandler"/>
</bean>
<!--具体的业务编排-->
<hipac:pipeline label="omoQueryPipeline" id="omoQueryTradeListPipeline">
<hipac:refvalve ref="shopTradeListInitComponent" label="查询交易列表"/>
<hipac:refvalve ref="tradeListItemSnapshotComponent" label="查询商品快照信息"/>
<hipac:refvalve ref="omoTradeIdTransitionComponent" label="omo商城交易订单号转换处理器"/>
<hipac:refvalve ref="tradeLogisticsInfoComponent" label="查询订单物流信息组件"/>
<hipac:refvalve ref="tradeListRefundComponent" label="查询退款信息"/>
<hipac:refvalve ref="tradeListBatchSpecInfoComponent" label="查询批次信息"/>
<hipac:refvalve ref="tradeListStoreInfoComponent" label="填充供货商信息"/>
</hipac:pipeline>
5.2 并发组件
5.2.1 示例
并发组件配置使用了concurrent并发组件,需要指定超时时间和线程池。 这里值得一说的是并发组件的设计。
<bean id="omoQueryTradeListEngine" class="com.yangt.engine.choreography.executor.EngineExecutorImpl">
<!--指定业务编排-->
<property name="pipeline" ref="omoQueryTradeListPipeline"/>
<!--指定异常处理器-->
<property name="exceptionHandler" ref="tradeListEngineExceptionHandler"/>
</bean>
<!--具体的业务编排-->
<hipac:pipeline label="omoQueryPipeline" id="omoQueryTradeListPipeline">
<hipac:refvalve ref="shopTradeListInitComponent" label="查询交易列表"/>
<hipac:refvalve ref="tradeListItemSnapshotComponent" label="查询商品快照信息"/>
<hipac:refvalve ref="omoTradeIdTransitionComponent" label="omo商城交易订单号转换处理器"/>
<!--并发组件设置-->
<hipac:concurrent timeout="4000" executor="tradeListQueryThreadPool">
<hipac:refvalve ref="tradeLogisticsInfoComponent" label="查询订单物流信息组件"/>
<hipac:refvalve ref="tradeListRefundComponent" label="查询退款信息"/>
<hipac:refvalve ref="tradeListBatchSpecInfoComponent" label="查询批次信息"/>
<hipac:refvalve ref="tradeListStoreInfoComponent" label="填充供货商信息"/>
</hipac:concurrent>
</hipac:pipeline>
5.2.2 设计说明
背景:在实际的业务流程中,我们会把代码按照功能或者领域划分为一个个独立的方法,再按照业务需要的顺序把它们组织起来,使用流程编排时,也还是一样的,但是我们经常会发现有些组件之间其实并没有依赖关系,
但是它们可能都是比较耗时的操作,这个在对RT比较敏感的业务中会希望通过并发执行提升整体的效率。基于此进行了并发组件的设计。
concurrent并发组件它持有3个主要的变量,线程池对象、一个组件的集合、超时时间,在具体执行时,会通过CompletableFuture 将pipelines 执行的结果包含起来。
部分代码示例:
@Override
public void invoke(PipelineContext pipelineContext) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(" concurrent valve start {}", getBeanName());
}
List<CompletableFuture<Void>> futureList = new ArrayList<>(pipelines.length);
for (Pipeline pipeline : pipelines) {
CompletableFuture<Void> future;
if (executor == null) {
future = CompletableFuture.runAsync(RunnableWrapper.of(new AsyncRunnable(pipelineContext, pipeline)));
} else {
future = CompletableFuture.runAsync(RunnableWrapper.of(new AsyncRunnable(pipelineContext, pipeline)), executor);
}
futureList.add(future);
}
// 将上面所有的异步执行组件整合起来
CompletableFuture<Void> future = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
if (timeout > 0) {
future.get(timeout, TimeUnit.MILLISECONDS);
} else {
future.get();
}
pipelineContext.invokeNext();
}
从实现上看还是挺简单的。但目前的设计是有一定限制,从上面的示例也可以看出,------------需要业务去调整自己的流程配置以适应并发组件的能力,并发组件只能使用在相邻几个组件没有依赖且需要异步的场景下。
其实在设计并发组件时,还有另一种方案,是在配置组件时,直接指定当前组件是否为异步组件,同时每个组件需要配置好自己的上游依赖组件,这样就不用为了并发执行而调整组件顺序, 但这样一来,最终流程的执行顺序可能就和配置的顺序不一致,流程不再是看起来的串行,而是一个树状结构。如下图:
基于这种思路【流程如下图】,在组件设计上, 需要实现一个异步组件和依赖组件,在业务组件执行前先获取它的依赖组件是否存在,如果存在,再看依赖组件是否执行完毕。这样组件的执行顺序将完全由用户配置依赖顺序决定,但是也带来了更多的复杂性,因为从流程配置上来看,没那么直观了。这两种异步的方式都有团队在使用,这个取决于当前的业务实际情况,最终我们使用更易理解的配置方式设计。
5.3 回滚场景
5.3.1 示例
在实际业务中,存在整个流程失败对应的写操作需要回滚,现有的框架也支持回滚,需要回滚的组价需要继承RollBack<OrderForm, ResultData>类,并写入回滚的具体的动作,当整个编排发生异常是,就会一次执行组件的回滚方法
public class ReduceStockComponentV2 extends ProcessorSupport<OrderForm, ResultData> implements RollBack<OrderForm, ResultData> {
@Override
public void handler(OrderForm orderForm, ResultData resultData, ComponentInnerContext context, Map<String, Object> attribute) {
//组件正常执行方法
}
@Override
public void rollBack(PipelineContext pipelineContext) {
//todo 回滚动作
}
5.4 个人思考
从个人的实践来看,在实际应用流程编排前,首先需要的是梳理业务流程,将当前的业务流程抽象成一个个相对独立的业务单元,再将这些业务单元组装成相应的业务组件,这样才能更好的达到组件共用的目标,
同时也能提高组件的高内聚能力。如果只是简单的把之前过程式的代码迁移到流程编排中,可能很难达到理想的目标,而且还会因为一个逻辑被分散到了多个组件中,造成理解困难。
六、未来发展
流程的可视化
对于复杂业务的流程编排来说,流程的可视化还是非常重要的,对于想要快速了解整体业务逻辑来说,有个图形化的流程, 不用再去翻看流程代码配置。
事务组件支持
对于多个组件之间的事务需求,目前是没有支持的,实际应用时是通过将需要进行事务包裹的业务逻辑统一到一个组件中去实现的,但是如果框架层面能支持是更好的,这样每个组件可以保持职责单一原则。
参考资料:
https://github.com/hejiehui/xUnit
https://github.com/apache/camel