Please move ‘proctime(r_proctime)‘ to the end of the schema.以及rowtime和proctime
代碼:
List<Tuple3<String,Long,Timestamp>>ratesHistoryData = new ArrayList<>();ratesHistoryData.add(Tuple3.of("US Dollar", 102L,new Timestamp(1L)));ratesHistoryData.add(Tuple3.of("Euro", 114L,new Timestamp(1L)));ratesHistoryData.add(Tuple3.of("Yen", 1L,new Timestamp(1L)));ratesHistoryData.add(Tuple3.of("Euro", 116L,new Timestamp(1L)));ratesHistoryData.add(Tuple3.of("Euro", 119L,new Timestamp(1L)));// List轉DataStreamDataStream<Tuple3<String, Long,Timestamp>> ratesHistoryStream = env.fromCollection(ratesHistoryData);// Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate,r_proctime.proctime"); // DataStream轉TableTable ratesHistory = tEnv.fromDataStream(ratesHistoryStream, $("r_currency"), $("r_rate"), $("r_proctime").proctime());?
完整報錯:
Exception:Exception in thread "main" org.apache.flink.table.api.ValidationException: The proctime attribute can only be appended to the table schema and not replace an existing field. Please move 'proctime(r_proctime)' to the end of the schema.at org.apache.flink.table.typeutils.FieldInfoUtils$IndexedExprToFieldInfo.validateProcTimeAttributeAppended(FieldInfoUtils.java:512)at org.apache.flink.table.typeutils.FieldInfoUtils$IndexedExprToFieldInfo.visit(FieldInfoUtils.java:483)at org.apache.flink.table.typeutils.FieldInfoUtils$IndexedExprToFieldInfo.visit(FieldInfoUtils.java:459)at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)at org.apache.flink.table.typeutils.FieldInfoUtils.lambda$extractFieldInfosFromTupleType$6(FieldInfoUtils.java:421)at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfosFromTupleType(FieldInfoUtils.java:422)at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:264)at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:233)at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lambda$asQueryOperation$0(StreamTableEnvironmentImpl.java:384)at java.util.Optional.map(Optional.java:215)at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.java:383)at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:230)at JoinwithTemporalTable.main(JoinwithTemporalTable.java:51)先講下proctime和rowtime,根據文檔[1]
| 函數 | 代表的含義 | 完整案例示范 |
| proctime() | Processing Time | 完整案例 |
| rowtime() | Event Time | 完整案例 |
?
?
注意哈,這個問題的解決方案你不能只看一句代碼,需要看下前面你的數據。
解決方案(rowtime方式):
如果你想使用rowtime,也就是Event Time
那么修改方案是:
? ? ? ? List<Tuple3<String,Long,Timestamp>>ratesHistoryData = new ArrayList<>();
? ? ? ? ratesHistoryData.add(Tuple3.of("US Dollar", 102L,new Timestamp(1L)));
? ? ? ? ratesHistoryData.add(Tuple3.of("Euro", 114L,new Timestamp(1L)));
? ? ? ? ratesHistoryData.add(Tuple3.of("Yen", 1L,new Timestamp(1L)));
? ? ? ? ratesHistoryData.add(Tuple3.of("Euro", 116L,new Timestamp(1L)));
? ? ? ? ratesHistoryData.add(Tuple3.of("Euro", 119L,new Timestamp(1L)));
Table ratesHistory = tEnv.createTemporaryView("Orders", orderA,$("o_currency"),$("o_ratet"),$("o_proctime").rowtime());
?
解決方案(proctime方式)
如果你想使用proctime,也就是Processing Time
那么修改方案是:
? ? ? ? DataStream<Tuple2<String, Long>> orderA = env.fromCollection(Arrays.asList(
? ? ? ? ? ? ? ? Tuple2.of("US Dollar", 102L),
? ? ? ? ? ? ? ? Tuple2.of("US Dollar", 102L),
? ? ? ? ? ? ? ? Tuple2.of("US Dollar", 102L)));
? ? ? ? tableEnv.createTemporaryView("Orders", orderA, $("o_currency"), $("o_rate"), $("o_proctime").proctime());
?
注意兩點:
①這里的解決方案中我特意都提到了原始數據。
因為這個報錯是和原始數據息息相關的,你如果只盯著報錯的那一句是不可能解決的。
②
proctime的總列數=原始數據列數+1
因為ProcessingTime是我們新加入的一列數據,你顯然不能把數據里包含的原來的時間戳用proctime標記為ProcessingTime對吧?
rowtime的總列數=原始數據列數
這是因為,EventTime就來自數據本身,處理時自然不用新增一列。
?
?
Reference:
[1]Time Attributes
[2]Apache Flink 進階入門(二):Time 深度解析
總結
以上是生活随笔為你收集整理的Please move ‘proctime(r_proctime)‘ to the end of the schema.以及rowtime和proctime的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网站SEO关键词怎么可以快速占据首页位置
- 下一篇: 小鹏汽车 Q4 登陆以色列市场,与当地经