javascript
文件表单带数据一起提交spring_基于 Spring 实现管道模式的最佳实践
管道模式(Pipeline Pattern) 是 責任鏈模式(Chain of Responsibility Pattern) 的常用變體之一。在管道模式中,管道扮演著流水線的角色,將數據傳遞到一個加工處理序列中,數據在每個步驟中被加工處理后,傳遞到下一個步驟進行加工處理,直到全部步驟處理完畢。
PS:純的責任鏈模式在鏈上只會有一個處理器用于處理數據,而管道模式上多個處理器都會處理數據。
何時使用管道模式
任務代碼較為復雜,需要拆分為多個子步驟時,尤其是后續可能在任意位置添加新的子步驟、刪除舊的子步驟、交換子步驟順序,可以考慮使用管道模式。
愉快地使用管道模式
? 背景回放
最開始做模型平臺的時候,創建模型實例的功能,包括: “輸入數據校驗 -> 根據輸入創建模型實例 -> 保存模型實 例到相關 DB 表” 總共三個步驟,也不算復雜,所以當時的代碼大概是這樣的:
public class ModelServiceImpl implements ModelService {/*** 提交模型(構建模型實例)*/public CommonReponse<Long> buildModelInstance(InstanceBuildRequest request) {// 輸入數據校驗validateInput(request);// 根據輸入創建模型實例ModelInstance instance = createModelInstance(request);// 保存實例到相關 DB 表saveInstance(instance);} }然而沒有過多久,我們發現表單輸入數據的格式并不完全符合模型的輸入要求,于是我們要加入 “表單數據的預處理”。這功能還沒動手呢,又有業務方提出自己也存在需要對數據進行處理的情況(比如根據商家的表單輸入,生成一些其他業務數據作為模型輸入)。
所以在 “輸入數據校驗” 之后,還需要加入 “表單輸入輸出預處理” 和 “業務方自定義數據處理(可選)”。這個時候我就面臨一個選擇:是否繼續通過在 buildModelInstance 中加入新的方法來實現這些新的處理步驟?好處就是可以當下偷懶,但是壞處呢:
所以,為了不給以后的自己挖坑,我覺得要思考一個萬全的方案。這個時候,我小腦袋花開始飛轉,突然閃過了 Netty 中的 ChannelPipeline —— 對哦, 管道模式 ,不就正是我需要的嘛!
管道模式的實現方式也是多種多樣,接下來基于前面的背景,我分享一下我目前基于 Spring 實現管道模式的 “最佳套路”(如果你有更好的套路,歡迎賜教,一起討論哦)。
? 定義管道處理的上下文
/*** 傳遞到管道的上下文*/ @Getter @Setter public class PipelineContext {/*** 處理開始時間*/private LocalDateTime startTime;/*** 處理結束時間*/private LocalDateTime endTime;/*** 獲取數據名稱*/public String getName() {return this.getClass().getSimpleName();} }? 定義上下文處理器
/*** 管道中的上下文處理器*/ public interface ContextHandler<T extends PipelineContext> {/*** 處理輸入的上下文數據** @param context 處理時的上下文數據* @return 返回 true 則表示由下一個 ContextHandler 繼續處理,返回 false 則表示處理結束*/boolean handle(T context); }為了方便說明,我們現在先定義出最早版 【 提交模型邏輯】 的上下文和相關處理器:
/*** 模型實例構建的上下文*/ @Getter @Setter public class InstanceBuildContext extends PipelineContext {/*** 模型 id*/private Long modelId;/*** 用戶 id*/private long userId;/*** 表單輸入*/private Map<String, Object> formInput;/*** 保存模型實例完成后,記錄下 id*/private Long instanceId;/*** 模型創建出錯時的錯誤信息*/private String errorMsg;// 其他參數@Overridepublic String getName() {return "模型實例構建上下文";} }處理器 - 輸入數據校驗:
@Component public class InputDataPreChecker implements ContextHandler<InstanceBuildContext> {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic boolean handle(InstanceBuildContext context) {logger.info("--輸入數據校驗--");Map<String, Object> formInput = context.getFormInput();if (MapUtils.isEmpty(formInput)) {context.setErrorMsg("表單輸入數據不能為空");return false;}String instanceName = (String) formInput.get("instanceName");if (StringUtils.isBlank(instanceName)) {context.setErrorMsg("表單輸入數據必須包含實例名稱");return false;}return true;} }處理器 - 根據輸入創建模型實例:
@Component public class ModelInstanceCreator implements ContextHandler<InstanceBuildContext> {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic boolean handle(InstanceBuildContext context) {logger.info("--根據輸入數據創建模型實例--");// 假裝創建模型實例return true;} }處理器 - 保存模型實例到相關DB表:
@Component public class ModelInstanceSaver implements ContextHandler<InstanceBuildContext> {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic boolean handle(InstanceBuildContext context) {logger.info("--保存模型實例到相關DB表--");// 假裝保存模型實例return true;} }到這里,有個問題就出現了:應該使用什么樣的方式,將同一種 Context 的 ContextHandler 串聯為管道呢?思考一下:
? 構建管道路由表
基于 Spring 的 Java Bean 配置,我們可以很方便的構建管道的路由表:
/*** 管道路由的配置*/ @Configuration public class PipelineRouteConfig implements ApplicationContextAware {/*** 數據類型->管道中處理器類型列表 的路由*/private static finalMap<Class<? extends PipelineContext>,List<Class<? extends ContextHandler<? extends PipelineContext>>>> PIPELINE_ROUTE_MAP = new HashMap<>(4);/** 在這里配置各種上下文類型對應的處理管道:鍵為上下文類型,值為處理器類型的列表*/static {PIPELINE_ROUTE_MAP.put(InstanceBuildContext.class,Arrays.asList(InputDataPreChecker.class,ModelInstanceCreator.class,ModelInstanceSaver.class));// 將來其他 Context 的管道配置}/*** 在 Spring 啟動時,根據路由表生成對應的管道映射關系*/@Bean("pipelineRouteMap")public Map<Class<? extends PipelineContext>, List<? extends ContextHandler<? extends PipelineContext>>> getHandlerPipelineMap() {return PIPELINE_ROUTE_MAP.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, this::toPipeline));}/*** 根據給定的管道中 ContextHandler 的類型的列表,構建管道*/private List<? extends ContextHandler<? extends PipelineContext>> toPipeline(Map.Entry<Class<? extends PipelineContext>, List<Class<? extends ContextHandler<? extends PipelineContext>>>> entry) {return entry.getValue().stream().map(appContext::getBean).collect(Collectors.toList());}private ApplicationContext appContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {appContext = applicationContext;} }? 定義管道執行器
最后一步,定義管道執行器。管道執行器 根據傳入的上下文數據的類型,找到其對應的管道,然后將上下文數據放入管道中去進行處理。
/*** 管道執行器*/ @Component public class PipelineExecutor {private final Logger logger = LoggerFactory.getLogger(this.getClass());/*** 引用 PipelineRouteConfig 中的 pipelineRouteMap*/@Resourceprivate Map<Class<? extends PipelineContext>,List<? extends ContextHandler<? super PipelineContext>>> pipelineRouteMap;/*** 同步處理輸入的上下文數據<br/>* 如果處理時上下文數據流通到最后一個處理器且最后一個處理器返回 true,則返回 true,否則返回 false** @param context 輸入的上下文數據* @return 處理過程中管道是否暢通,暢通返回 true,不暢通返回 false*/public boolean acceptSync(PipelineContext context) {Objects.requireNonNull(context, "上下文數據不能為 null");// 拿到數據類型Class<? extends PipelineContext> dataType = context.getClass();// 獲取數據處理管道List<? extends ContextHandler<? super PipelineContext>> pipeline = pipelineRouteMap.get(dataType);if (CollectionUtils.isEmpty(pipeline)) {logger.error("{} 的管道為空", dataType.getSimpleName());return false;}// 管道是否暢通boolean lastSuccess = true;for (ContextHandler<? super PipelineContext> handler : pipeline) {try {// 當前處理器處理數據,并返回是否繼續向下處理lastSuccess = handler.handle(context);} catch (Throwable ex) {lastSuccess = false;logger.error("[{}] 處理異常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);}// 不再向下處理if (!lastSuccess) { break; }}return lastSuccess;} }? 使用管道模式
此時,我們可以將最開始的 buildModelInstance 修改為:
public CommonResponse<Long> buildModelInstance(InstanceBuildRequest request) {InstanceBuildContext data = createPipelineData(request);boolean success = pipelineExecutor.acceptSync(data);// 創建模型實例成功if (success) {return CommonResponse.success(data.getInstanceId());}logger.error("創建模式實例失敗:{}", data.getErrorMsg());return CommonResponse.failed(data.getErrorMsg()); }我們模擬一下模型實例的創建過程:
參數正常時:
參數出錯時:
這個時候我們再為 InstanceBuildContext 加入新的兩個 ContextHandler:FormInputPreprocessor(表單輸入數據預處理) 和 BizSideCustomProcessor(業務方自定義數據處理)。
@Component public class FormInputPreprocessor implements ContextHandler<InstanceBuildContext> {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic boolean handle(InstanceBuildContext context) {logger.info("--表單輸入數據預處理--");// 假裝進行表單輸入數據預處理return true;} } @Component public class BizSideCustomProcessor implements ContextHandler<InstanceBuildContext> {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic boolean handle(InstanceBuildContext context) {logger.info("--業務方自定義數據處理--");// 先判斷是否存在自定義數據處理,如果沒有,直接返回 true// 調用業務方的自定義的表單數據處理return true;} }此時 buildModelInstance 不需要做任何修改,我們只需要在 “路由表” 里面,將這兩個 ContextHandler 加入到 InstanceBuildContext 關聯的管道中, Spring 啟動的時候,會自動幫我們構建好每種 Context 對應的管道:
再模擬一下模型實例的創建過程:
? 異步處理
管道執行器 PipelineExecutor 中,acceptSync 是個同步的方法。
小蜜:看名字你就知道你悄悄埋伏筆了。
對于步驟繁多的任務,很多時候我們更需要的是異步處理,比如某些耗時長的定時任務。管道處理異步化非常的簡單,我們先定義一個線程池,比如:
<!-- 專門用于執行管道任務的線程池 --> <bean id="pipelineThreadPool"class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="4" /> <!-- 核心線程數 --><property name="maxPoolSize" value="8" /> <!-- 最大線程數 --><property name="keepAliveSeconds" value="960" /> <!-- 線程最大空閑時間/秒(根據管道使用情況指定)--><property name="queueCapacity" value="256" /> <!-- 任務隊列大小(根據管道使用情況指定)--><property name="threadNamePrefix" value="pipelineThreadPool" /><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy" /></property> </bean>然后在 PipelineExecutor 中加入異步處理的方法:
/*** 管道線程池*/ @Resource private ThreadPoolTaskExecutor pipelineThreadPool;/*** 異步處理輸入的上下文數據** @param context 上下文數據* @param callback 處理完成的回調*/ public void acceptAsync(PipelineContext context, BiConsumer<PipelineContext, Boolean> callback) {pipelineThreadPool.execute(() -> {boolean success = acceptSync(context);if (callback != null) {callback.accept(context, success);}}); }? 通用處理
比如我們想記錄下每次管道處理的時間,以及在處理前和處理后都打印相關的日志。那么我們可以提供兩個通用的 ContextHandler,分別放在每個管道的頭和尾:
@Component public class CommonHeadHandler implements ContextHandler<PipelineContext> {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic boolean handle(PipelineContext context) {logger.info("管道開始執行:context={}", JSON.toJSONString(context));// 設置開始時間context.setStartTime(LocalDateTime.now());return true;} } @Component public class CommonTailHandler implements ContextHandler<PipelineContext> {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic boolean handle(PipelineContext context) {// 設置處理結束時間context.setEndTime(LocalDateTime.now());logger.info("管道執行完畢:context={}", JSON.toJSONString(context));return true;} }通用頭、尾處理器可以在路由表里面放置,但是每次新加一種 PipelineContext 都要加一次,好像沒有必要 —— 我們直接修改下 管道執行器 PipelineExecutor 中的 acceptSync 方法:
@Component public class PipelineExecutor {......@Autowiredprivate CommonHeadHandler commonHeadHandler;@Autowiredprivate CommonTailHandler commonTailHandler;public boolean acceptSync(PipelineContext context) {......// 【通用頭處理器】處理commonHeadHandler.handle(context);// 管道是否暢通boolean lastSuccess = true;for (ContextHandler<? super PipelineContext> handler : pipeline) {try {// 當前處理器處理數據,并返回是否繼續向下處理lastSuccess = handler.handle(context);} catch (Throwable ex) {lastSuccess = false;logger.error("[{}] 處理異常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);}// 不再向下處理if (!lastSuccess) { break; }}// 【通用尾處理器】處理commonTailHandler.handle(context);return lastSuccess;} }總結
通過管道模式,我們大幅降低了系統的耦合度和提升了內聚程度與擴展性:
- ModelService 只負責處理 HSF 請求,不用關心具體的業務邏輯
- PipelineExecutor 只做執行工作,不用關心具體的管道細節
- 每個 ContextHandler 只負責自己那部分的業務邏輯,不需要知道管道的結構,與其他ContextHandler 的業務邏輯解耦
- 新增、刪除 或者 交換子步驟時,都只需要操作路由表的配置,而不要修改原來的調用代碼
如果覺得本文對你有幫助,也可以點進我主頁關注我公眾號,上面有更多技術干貨文章以及相關資料共享,大家一起學習進步!
總結
以上是生活随笔為你收集整理的文件表单带数据一起提交spring_基于 Spring 实现管道模式的最佳实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python怎么运行_程序员大牛讲解,P
- 下一篇: python制作软件封面_用python