override def matches(call: RelOptRuleCall): Boolean = {val agg: FlinkLogicalWindowAggregate = call.rel(0)// check if we have grouping setsval groupSets = agg.getGroupType != Group.SIMPLEif (groupSets || agg.indicator) {throw new TableException("GROUPING SETS are currently not supported.")}true}
object WindowEmitStrategy {/**是否為EventTime表標識、* 是否為SessionWindow、* early fire和late fire配置、* 延遲毫秒數(窗口結束時間加上這個毫秒數即數據清理時間)* 是否允許延遲* @param tableConfig* @param window* @return*/def apply(tableConfig: TableConfig, window: LogicalWindow): WindowEmitStrategy = {val isEventTime = isRowtimeAttribute(window.timeAttribute)val isSessionWindow = window.isInstanceOf[SessionGroupWindow]val allowLateness = if (isSessionWindow) {// ignore allow lateness in session window because retraction is not supported0L} else if (tableConfig.getMinIdleStateRetentionTime < 0) {// min idle state retention time is not set, use 0L as default which means not allow lateness0L} else {// use min idle state retention time as allow latenesstableConfig.getMinIdleStateRetentionTime}val enableEarlyFireDelay = tableConfig.getConfiguration.getBoolean(TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED)val earlyFireDelay = getMillisecondFromConfigDuration(tableConfig, TABLE_EXEC_EMIT_EARLY_FIRE_DELAY)val enableLateFireDelay = tableConfig.getConfiguration.getBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED)val lateFireDelay = getMillisecondFromConfigDuration(tableConfig, TABLE_EXEC_EMIT_LATE_FIRE_DELAY)new WindowEmitStrategy(isEventTime,isSessionWindow,earlyFireDelay,enableEarlyFireDelay,lateFireDelay,enableLateFireDelay,allowLateness)}
@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {//對每個元素分配窗口final Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);//if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();//if (windowAssigner instanceof MergingWindowAssigner) {MergingWindowSet<W> mergingWindows = getMergingWindowSet();for (W window: elementWindows) {//接下來是遍歷涉及的窗口進行聚合,包括從windowState獲取聚合前值、使用句柄進行聚合、// 更新狀態至windowState,將當前轉態// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we don't merge then// actualWindow == windowW actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {@Overridepublic void merge(W mergeResult,Collection<W> mergedWindows, W stateWindowResult,Collection<W> mergedStateWindows) throws Exception {//如果最大時間戳加上可以允許的延遲還是小于這個水印線,那么說明這個數據要被丟棄了if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {throw new UnsupportedOperationException("The end timestamp of an " +"event-time window cannot become earlier than the current watermark " +"by merging. Current watermark: " + internalTimerService.currentWatermark() +" window: " + mergeResult);} else if (!windowAssigner.isEventTime()) {long currentProcessingTime = internalTimerService.currentProcessingTime();if (mergeResult.maxTimestamp() <= currentProcessingTime) {throw new UnsupportedOperationException("The end timestamp of a " +"processing-time window cannot become earlier than the current processing time " +"by merging. Current processing time: " + currentProcessingTime +" window: " + mergeResult);}}triggerContext.key = key;triggerContext.window = mergeResult;triggerContext.onMerge(mergedWindows);for (W m: mergedWindows) {triggerContext.window = m;triggerContext.clear();deleteCleanupTimer(m);}//更新狀態// merge the merged state windows into the newly resulting state windowwindowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);}});// drop if the window is already lateif (isWindowLate(actualWindow)) {mergingWindows.retireWindow(actualWindow);continue;}isSkippedElement = false;W stateWindow = mergingWindows.getStateWindow(actualWindow);if (stateWindow == null) {throw new IllegalStateException("Window " + window + " is not in in-flight window set.");}windowState.setCurrentNamespace(stateWindow);//把元素分配到相應的聚合后的windowwindowState.add(element.getValue());//使用TriggerContext(其實就是不同類型窗口Trigger觸發器的代理),// 綜合early fire、late fire、水印時間與窗口結束時間,綜合判斷是否觸發窗口寫出triggerContext.key = key;triggerContext.window = actualWindow;//判斷是否觸發TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}//觸發窗口計算emitWindowContents(actualWindow, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(actualWindow);}// need to make sure to update the merging state in statemergingWindows.persist();} else {for (W window: elementWindows) {// drop if the window is already lateif (isWindowLate(window)) {continue;}isSkippedElement = false;windowState.setCurrentNamespace(window);windowState.add(element.getValue());triggerContext.key = key;triggerContext.window = window;TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(window);}}// side output input event if// element not handled by any window// late arriving tag has been set// windowAssigner is event time and current timestamp + allowed lateness no less than element timestampif (isSkippedElement && isElementLate(element)) {if (lateDataOutputTag != null){sideOutput(element);} else {this.numLateRecordsDropped.inc();}}}