聊聊flink Table的ScalarFunction
生活随笔
收集整理的這篇文章主要介紹了
聊聊flink Table的ScalarFunction
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
序
本文主要研究一下flink Table的ScalarFunction
實例
public class HashCode extends ScalarFunction {private int factor = 0;@Overridepublic void open(FunctionContext context) throws Exception {// access "hashcode_factor" parameter// "12" would be the default value if parameter does not existfactor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); }public int eval(String s) {return s.hashCode() * factor;} }ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// set job parameter Configuration conf = new Configuration(); conf.setString("hashcode_factor", "31"); env.getConfig().setGlobalJobParameters(conf);// register the function tableEnv.registerFunction("hashCode", new HashCode());// use the function in Java Table API myTable.select("string, string.hashCode(), hashCode(string)");// use the function in SQL tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");- HashCode繼承了ScalarFunction,它定義了eval方法
ScalarFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/ScalarFunction.scala
abstract class ScalarFunction extends UserDefinedFunction {/*** Creates a call to a [[ScalarFunction]] in Scala Table API.** @param params actual parameters of function* @return [[Expression]] in form of a [[ScalarFunctionCall]]*/final def apply(params: Expression*): Expression = {ScalarFunctionCall(this, params)}// ----------------------------------------------------------------------------------------------/*** Returns the result type of the evaluation method with a given signature.** This method needs to be overridden in case Flink's type extraction facilities are not* sufficient to extract the [[TypeInformation]] based on the return type of the evaluation* method. Flink's type extraction facilities can handle basic types or* simple POJOs but might be wrong for more complex, custom, or composite types.** @param signature signature of the method the return type needs to be determined* @return [[TypeInformation]] of result type or null if Flink should determine the type*/def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null/*** Returns [[TypeInformation]] about the operands of the evaluation method with a given* signature.** In order to perform operand type inference in SQL (especially when NULL is used) it might be* necessary to determine the parameter [[TypeInformation]] of an evaluation method.* By default Flink's type extraction facilities are used for this but might be wrong for* more complex, custom, or composite types.** @param signature signature of the method the operand types need to be determined* @return [[TypeInformation]] of operand types*/def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {signature.map { c =>try {TypeExtractor.getForClass(c)} catch {case ite: InvalidTypesException =>throw new ValidationException(s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +s"automatically determined. Please provide type information manually.")}}} }- ScalarFunction繼承了UserDefinedFunction,它定義了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于將零個/一個/多個scalar值map成新的scalar值,其map行為由用戶自定義的public的eval方法來實現(xiàn);另外一般建議使用原始類型作為declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
CRowProcessRunner
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowProcessRunner.scala
class CRowProcessRunner(name: String,code: String,@transient var returnType: TypeInformation[CRow])extends ProcessFunction[CRow, CRow]with ResultTypeQueryable[CRow]with Compiler[ProcessFunction[Row, Row]]with Logging {private var function: ProcessFunction[Row, Row] = _private var cRowWrapper: CRowWrappingCollector = _override def open(parameters: Configuration): Unit = {LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)LOG.debug("Instantiating ProcessFunction.")function = clazz.newInstance()FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)FunctionUtils.openFunction(function, parameters)this.cRowWrapper = new CRowWrappingCollector()}override def processElement(in: CRow,ctx: ProcessFunction[CRow, CRow]#Context,out: Collector[CRow]): Unit = {cRowWrapper.out = outcRowWrapper.setChange(in.change)function.processElement(in.row,ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],cRowWrapper)}override def getProducedType: TypeInformation[CRow] = returnTypeoverride def close(): Unit = {FunctionUtils.closeFunction(function)} }- CRowProcessRunner的processElement方法調(diào)用了function.processElement,而function.processElement會去調(diào)用用戶定義的ScalarFunction的eval方法;這里的function繼承了ProcessFunction,它的code為CRowProcessRunner的構(gòu)造器參數(shù),由DataStreamCalc在translateToPlan方法中創(chuàng)建CRowProcessRunner的時候生成
ProcessFunction
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java
@PublicEvolving public abstract class ProcessFunction<I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Process one element from the input stream.** <p>This function can output zero or more elements using the {@link Collector} parameter* and also update internal state or set timers using the {@link Context} parameter.** @param value The input value.* @param ctx A {@link Context} that allows querying the timestamp of the element and getting* a {@link TimerService} for registering timers and querying the time. The* context is only valid during the invocation of this method, do not store it.* @param out The collector for returning result values.** @throws Exception This method may throw exceptions. Throwing an exception will cause the operation* to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;/*** Called when a timer set using {@link TimerService} fires.** @param timestamp The timestamp of the firing timer.* @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,* querying the {@link TimeDomain} of the firing timer and getting a* {@link TimerService} for registering timers and querying the time.* The context is only valid during the invocation of this method, do not store it.* @param out The collector for returning result values.** @throws Exception This method may throw exceptions. Throwing an exception will cause the operation* to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}/*** Information available in an invocation of {@link #processElement(Object, Context, Collector)}* or {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** <p>This might be {@code null}, for example if the time characteristic of your program* is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/*** A {@link TimerService} for querying time and registering timers.*/public abstract TimerService timerService();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}/*** Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/*** The {@link TimeDomain} of the firing timer.*/public abstract TimeDomain timeDomain();}}- ProcessFunction繼承了AbstractRichFunction,它定義了抽象方法processElement
DataStreamCalc
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
class DataStreamCalc(cluster: RelOptCluster,traitSet: RelTraitSet,input: RelNode,inputSchema: RowSchema,schema: RowSchema,calcProgram: RexProgram,ruleDescription: String)extends Calc(cluster, traitSet, input, calcProgram)with CommonCalcwith DataStreamRel {//......override def translateToPlan(tableEnv: StreamTableEnvironment,queryConfig: StreamQueryConfig): DataStream[CRow] = {val config = tableEnv.getConfigval inputDataStream =getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)// materialize time attributes in conditionval condition = if (calcProgram.getCondition != null) {val materializedCondition = RelTimeIndicatorConverter.convertExpression(calcProgram.expandLocalRef(calcProgram.getCondition),inputSchema.relDataType,cluster.getRexBuilder)Some(materializedCondition)} else {None}// filter out time attributesval projection = calcProgram.getProjectList.asScala.map(calcProgram.expandLocalRef)val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo)val genFunction = generateFunction(generator,ruleDescription,inputSchema,schema,projection,condition,config,classOf[ProcessFunction[CRow, CRow]])val inputParallelism = inputDataStream.getParallelismval processFunc = new CRowProcessRunner(genFunction.name,genFunction.code,CRowTypeInfo(schema.typeInfo))inputDataStream.process(processFunc).name(calcOpName(calcProgram, getExpressionString))// keep parallelism to ensure order of accumulate and retract messages.setParallelism(inputParallelism)} }- DataStreamCalc的translateToPlan調(diào)用了CommonCalc的generateFunction方法,生成了genFunction,之后通過genFunction.name,genFunction.code及CRowTypeInfo創(chuàng)建了CRowProcessRunner;生成的code繼承了ProcessFunction,實現(xiàn)了processElement方法,該方法會去調(diào)用用戶定義的ScalarFunction的eval方法
小結(jié)
- ScalarFunction繼承了UserDefinedFunction,它定義了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于將零個/一個/多個scalar值map成新的scalar值,其map行為由用戶自定義的public的eval方法來實現(xiàn);另外一般建議使用原始類型作為declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
- CRowProcessRunner的processElement方法調(diào)用了function.processElement,而function.processElement會去調(diào)用用戶定義的ScalarFunction的eval方法;這里的function繼承了ProcessFunction,它的code為CRowProcessRunner的構(gòu)造器參數(shù),由DataStreamCalc在translateToPlan方法中創(chuàng)建CRowProcessRunner的時候生成;ProcessFunction繼承了AbstractRichFunction,它定義了抽象方法processElement
- DataStreamCalc的translateToPlan調(diào)用了CommonCalc的generateFunction方法,生成了genFunction,之后通過genFunction.name,genFunction.code及CRowTypeInfo創(chuàng)建了CRowProcessRunner;生成的code繼承了ProcessFunction,實現(xiàn)了processElement方法,該方法會去調(diào)用用戶定義的ScalarFunction的eval方法
doc
- Integrating UDFs with the Runtime
總結(jié)
以上是生活随笔為你收集整理的聊聊flink Table的ScalarFunction的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: PAT A1084
- 下一篇: Luogu2295 MICE