Drools:fireAllRules,fireUntilHalt和Timers内部代码清理的详细说明
在六月,我們在博客上發(fā)布了一個新的內(nèi)部狀態(tài)機,用于管理用戶,計時器和引擎線程之間的交互。 現(xiàn)在,我們對該代碼進行了另一次大的內(nèi)部清理,以使其更易于閱讀和理解。
如前所述,所有操作(插入,更新,刪除等)現(xiàn)在都放入線程安全傳播隊列中。 執(zhí)行這些操作時,用戶線程再也不會接觸引擎,甚至Alpha網(wǎng)絡也不會。 這樣可以提高線程安全性。 相反,當引擎啟動時,它首先排空并評估此隊列,這可能導致在進行規(guī)則評估和觸發(fā)之前進行alpha網(wǎng)絡評估。
除了用戶線程和引擎線程的分離之外,狀態(tài)機的另一個目標是協(xié)調(diào)Timer線程。 當計時器啟動時,發(fā)動機可能不起作用或正在運行。 如果引擎處于活動狀態(tài),則計時器應只向傳播隊列提交一個條目,并讓當前執(zhí)行的線程處理該作業(yè)。 如果引擎未處于活動狀態(tài)并且計時器規(guī)則是異步的,則計時器線程應通過executeTask方法來進行評估和觸發(fā)。 狀態(tài)機旨在最小化同步和鎖定,以使爭用最小。
引擎現(xiàn)在可以處于5種可能的狀態(tài)。INACTIVE是啟動狀態(tài)。
引擎評估和規(guī)則觸發(fā)具有三個潛在的入口點fireAllRules,fireUntilHalt和異步計時器規(guī)則-后者是通過executeTask部分完成的。 我們將fireAllRules和fireUntilHalt統(tǒng)一到單個fireLoop方法中,該方法使用作為參數(shù)傳遞的策略類來處理循環(huán)的潛在休息狀態(tài)。 如果沒有規(guī)則觸發(fā),沒有更多要評估的議程組以及隊列為空,則認為引擎處于靜止狀態(tài)。
然后,FireAllRules所有規(guī)則會將引擎設置為INACTIVE,然后退出循環(huán)。 FireUntilHalt將使當前線程等待,直到更多工作進入隊列進行處理。 這里已經(jīng)進行了工作,以確保在這些狀態(tài)轉換期間沒有間隙和執(zhí)行損失。
當線程想要轉換為FIRE_ALL_RULES或FIRE_UNTIL_HALT或EXECUTE_TASK時,它必須通過waitAndEnterExecutionState。 如果引擎為非活動狀態(tài),則可以立即轉換,否則,將進入等待狀態(tài),直到當前執(zhí)行線程完成并將引擎返回至非活動狀態(tài):
private void waitAndEnterExecutionState( ExecutionState newState ) {if (currentState != ExecutionState.INACTIVE) {try {stateMachineLock.wait();} catch (InterruptedException e) {throw new RuntimeException( e );}}setCurrentState( newState ); }讓我們看看fireAllRules()如何使用它。 首先請注意,如果引擎已經(jīng)在運行,因為先前已調(diào)用fireAllRules或fireUntilHalt并仍在運行,則它將退出。 第二個注意事項是,它僅將同步點保持足夠長的時間,以退出或進行所需的過渡。 一旦引擎進入FIRE_ALL_RULES狀態(tài),它就可以釋放同步塊,并且狀態(tài)機將停止任何干擾。
public int fireAllRules(AgendaFilter agendaFilter,int fireLimit) {synchronized (stateMachineLock) {if (currentState.isFiring()) {return 0;}waitAndEnterExecutionState( ExecutionState.FIRING_ALL_RULES );}int fireCount = fireLoop(agendaFilter, fireLimit, RestHandler.FIRE_ALL_RULES);return fireCount; }fireLoop現(xiàn)在是通用的,并且由fireAllRules和fireUntilHalt共同使用,并使用RestHandler策略來處理引擎到達靜止點時的邏輯。
private int fireLoop(AgendaFilter agendaFilter,int fireLimit,RestHandler restHandler) {// The engine comes to potential rest (inside the loop) when there are no propagations and no rule firings. // It's potentially at rest, because we cannot guarantee it is at rest. // This is because external async actions (timer rules) can populate the queue that must be executed immediately. // A final takeAll within the sync point determines if it can safely come to rest. // if takeAll returns null, the engine is now safely at rest. If it returns something // the engine is not at rest and the loop continues. // // When FireUntilHalt comes to a safe rest, the thread is put into a wait state, // when the queue is populated the thread is notified and the loop begins again. // // When FireAllRules comes to a safe rest it will put the engine into an INACTIVE state // and the loop can exit. // // When a halt() command is added to the propagation queue and that queue is flushed // the engine is put into a HALTING state. At this point isFiring returns false and // no more rules can fire and the loop exits.int fireCount = 0;try {PropagationEntry head = workingMemory.takeAllPropagations();int returnedFireCount = 0;boolean limitReached = fireLimit == 0; // -1 or > 0 will return false. No reason for user to give 0, just handled for completeness.boolean loop = true;while ( isFiring() ) {if ( head != null ) {// it is possible that there are no action propagations, but there are rules to fire. this.workingMemory.flushPropagations(head);head = null;}// a halt may have occurred during the flushPropagations, // which changes the isFiring state. So a second isFiring guard is needed if (!isFiring()) {break;}evaluateEagerList();InternalAgendaGroup group = getNextFocus();if ( group != null && !limitReached ) {// only fire rules while the limit has not reached.returnedFireCount = fireNextItem( agendaFilter, fireCount, fireLimit, group );fireCount += returnedFireCount;limitReached = ( fireLimit > 0 && fireCount >= fireLimit );head = workingMemory.takeAllPropagations();} else {returnedFireCount = 0; // no rules fired this iteration, so we know this is 0 group = null; // set the group to null in case the fire limit has been reached }if ( returnedFireCount == 0 && head == null && ( group == null || !group.isAutoDeactivate() ) ) {// if true, the engine is now considered potentially at rest head = restHandler.handleRest( workingMemory, this );}}if ( this.focusStack.size() == 1 && getMainAgendaGroup().isEmpty() ) {// the root MAIN agenda group is empty, reset active to false, so it can receive more activations. getMainAgendaGroup().setActive( false );}} finally {// makes sure the engine is inactive, if an exception is thrown. // if it safely returns, then the engine should already be inactive// it also notifies the state machine, so that another thread can take over immediateHalt();}return fireCount; }fire循環(huán)在執(zhí)行takeAll()時會經(jīng)過單個同步點,這是返回當前head實例的簡單操作,同時還使成員head字段為空,從而使隊列為空。 在此takeAll()期間,這意味著任何用戶或計時器操作都必須等待同步釋放后才能添加到隊列中。 在執(zhí)行完其余方法之后,可以執(zhí)行評估返回的項目列表以及評估網(wǎng)絡和觸發(fā)規(guī)則的過程,而無需進行另一個同步或鎖定。
其余處理程序都是兩個非常簡單的代碼段:
interface RestHandler {RestHandler FIRE_ALL_RULES = new FireAllRulesRestHandler();RestHandler FIRE_UNTIL_HALT = new FireUntilHaltRestHandler();PropagationEntry handleRest(InternalWorkingMemory wm, DefaultAgenda agenda);class FireAllRulesRestHandler implements RestHandler {@Override public PropagationEntry handleRest(InternalWorkingMemory wm, DefaultAgenda agenda) {synchronized (agenda.stateMachineLock) {PropagationEntry head = wm.takeAllPropagations();if (head == null) {agenda.halt();}return head;}}}class FireUntilHaltRestHandler implements RestHandler {@Override public PropagationEntry handleRest(InternalWorkingMemory wm, DefaultAgenda agenda) {return wm.handleRestOnFireUntilHalt( agenda.currentState );}} }@Overridepublic PropagationEntry handleRestOnFireUntilHalt(DefaultAgenda.ExecutionState currentState) {// this must use the same sync target as takeAllPropagations, to ensure this entire block is atomic, up to the point of wait synchronized (propagationList) {PropagationEntry head = takeAllPropagations();// if halt() has called, the thread should not be put into a wait state // instead this is just a safe way to make sure the queue is flushed before exiting the loop if (head == null && currentState == DefaultAgenda.ExecutionState.FIRING_UNTIL_HALT) {propagationList.waitOnRest();head = takeAllPropagations();}return head;} }請注意,FireAllRulesRestHandler必須在執(zhí)行最后的takeAll時獲取stateMachineLock,然后才能知道它真正安全地返回。 這是由于可能放置在隊列中的計時器需要立即觸發(fā)。 如果要返回引擎,計時器將不會立即觸發(fā)-這就是我們所說的行為“差距”,現(xiàn)在可以避免。
FireUntilHalt鎖定了傳播隊列,因為除了執(zhí)行takeAll之外,它還必須原子地執(zhí)行空值檢查和等待操作。 再一次,如果空檢查不在同步點之內(nèi),我們最終會在行為上出現(xiàn)另一個潛在的差距,現(xiàn)在可以避免這種情況。
難題的最后一部分是executeTask。 這允許以最佳方式發(fā)生異步操作(通常是計時器任務)。 如果引擎由于FireAllRules或FireUntilHalt而已經(jīng)在運行,則只需將任務提交到隊列中,然后讓當前運行的線程來處理它。 如果不是,則進入EXECUTING_TASK狀態(tài)并在當前線程中執(zhí)行它。
@Overridepublic void executeTask( ExecutableEntry executable ) {synchronized (stateMachineLock) {// state is never changed outside of a sync block, so this is safe. if (isFiring()) {executable.enqueue();return;} else if (currentState != ExecutionState.EXECUTING_TASK) {waitAndEnterExecutionState( ExecutionState.EXECUTING_TASK );}}try {executable.execute();} finally {immediateHalt();} }我應該補充說,halt()現(xiàn)在作為命令提交,并作為標準隊列消耗的一部分進行評估。 執(zhí)行時,它將在同步塊內(nèi)將引擎更改為暫停狀態(tài)。 這將允許外部循環(huán)退出:
public void halt() {synchronized (stateMachineLock) {if (currentState.isFiring()) {setCurrentState( ExecutionState.HALTING );}} }因此,我們現(xiàn)在有了真正健壯的代碼,可以以可理解的方式處理用戶,計時器和引擎線程的交互。 我們在清理中付出了很多努力,以便希望每個人都能理解代碼和行為。
發(fā)動機的最后一部分仍然被認為是不安全的。 在引擎運行時,用戶可以在此在一個線程上調(diào)用插入事實的設置方法。 這顯然可以流下眼淚。 我們計劃允許用戶將任務提交到此隊列,以便可以使用與正在運行的引擎相同的線程來執(zhí)行任務。 這將允許用戶從引擎外部的另一個線程提交pojo更新,作為任務來安全執(zhí)行。
翻譯自: https://www.javacodegeeks.com/2015/12/drools-detailed-description-internal-code-cleanups-fireallrules-fireuntilhalt-timers.html
總結
以上是生活随笔為你收集整理的Drools:fireAllRules,fireUntilHalt和Timers内部代码清理的详细说明的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: adf时间作用域_ADF:在任务流终结器
- 下一篇: 手机内存卡使用有哪些注意事项