Knative Eventing 之 Sequence 介绍
在處理數據時,往往會涉及到一個數據需要進行多次加工,這時候我們一般是通過Pipeline的方式進行處理。那么在Knative Eventing中是否也能支持對一個事件進行分步驟多次處理? 這個還真有。從 0.7 版本開始,Knative Eventing中提供了一個 Sequence 資源模型,可用于事件Pipeline處理。
Sequence 定義
首先我們看一下Sequence Spec定義:
apiVersion: messaging.knative.dev/v1alpha1 kind: Sequence metadata:name: test spec:channelTemplate:apiVersion: messaging.knative.dev/v1alpha1kind: InMemoryChannelsteps:- ref:apiVersion: serving.knative.dev/v1alpha1kind: Servicename: testreply:kind: BrokerapiVersion: eventing.knative.dev/v1alpha1name: testSequence Spec包括3個部分:
在 Broker/Trigger 模型中使用 Sequence
我們將創建以下邏輯配置。創建一個 cronjobsource,向 Broker 提供事件,然后創建一個 filter,將這些事件連接到由3個 step 組成的 Sequence 中。然后,我們獲取最后的step返回結果事件發送給給Broker,并創建另一個 Trigger,該 Trigger 隨后將顯示事件結果。
對于這個例子,這里設置一個 Broker 程序、一個 InMemoryChannel 以及一個 Knative Service(用于顯示事件結果)。示例使用 default namespace。
如果要使用不同類型的Channel,則需要修改sequence.spec.channeltemplate以創建對應的 Channel 資源。
創建 Knative Service
首先創建3個Knative Service,用于 Sequence 中服務處理
apiVersion: serving.knative.dev/v1alpha1 kind: Service metadata:name: first spec:template:spec:containers:- image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700env:- name: STEPvalue: "0"--- apiVersion: serving.knative.dev/v1alpha1 kind: Service metadata:name: second spec:template:spec:containers:- image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700env:- name: STEPvalue: "1" --- apiVersion: serving.knative.dev/v1alpha1 kind: Service metadata:name: third spec:template:spec:containers:- image: us.gcr.io/probable-summer-223122/cmd-03315b715ae8f3e08e3a9378df706fbb@sha256:2656f39a7fcb6afd9fc79e7a4e215d14d651dc674f38020d1d18c6f04b220700env:- name: STEPvalue: "2"---執行創建命令:
kubectl -n default create -f ./steps.yaml創建 Sequence
創建Sequence,這里依次順序執行[first->second->third]這3個服務。將最終處理的結果發送到broker-test中。
apiVersion: messaging.knative.dev/v1alpha1 kind: Sequence metadata:name: sequence spec:channelTemplate:apiVersion: messaging.knative.dev/v1alpha1kind: InMemoryChannelsteps:- ref:apiVersion: serving.knative.dev/v1alpha1kind: Servicename: first- ref:apiVersion: serving.knative.dev/v1alpha1kind: Servicename: second- ref:apiVersion: serving.knative.dev/v1alpha1kind: Servicename: thirdreply:kind: BrokerapiVersion: eventing.knative.dev/v1alpha1name: broker-test執行如下命令:
kubectl -n default create -f ./sequence.yaml創建CronJobSource指向Broker
這里將創建一個 cronjobsource,它將每2分鐘發送一個{"message": "Hello world!"} 信息到 broker-test 中。
apiVersion: sources.eventing.knative.dev/v1alpha1 kind: CronJobSource metadata:name: cronjob-source spec:schedule: "*/2 * * * *"data: '{"message": "Hello world!"}'sink:apiVersion: eventing.knative.dev/v1alpha1kind: Brokername: broker-test執行命令如下:
kubectl -n default create -f ./cron-source.yaml為Sequence創建Trigger
創建訂閱事件類型為:dev.knative.cronjob.event 的 Trigger, 用于Sequence 進行消費處理。
apiVersion: eventing.knative.dev/v1alpha1 kind: Trigger metadata:name: sequence-trigger spec:filter:sourceAndType:type: dev.knative.cronjob.eventsubscriber:ref:apiVersion: messaging.knative.dev/v1alpha1kind: Sequencename: sequence執行如下命令:
kubectl -n default create -f ./trigger.yaml創建結果訂閱 Trigger
創建結果訂閱 Trigger,訂閱samples.http.mod3?的事件類型,對 sequence 執行的結果進行顯示
apiVersion: serving.knative.dev/v1alpha1 kind: Service metadata:name: sequence-display spec:template:spec:containers:- image: gcr.io/knative-releases/github.com/knative/eventing-sources/cmd/event_display --- apiVersion: eventing.knative.dev/v1alpha1 kind: Trigger metadata:name: sequence-trigger spec:filter:sourceAndType:type: samples.http.mod3subscriber:ref:apiVersion: serving.knative.dev/v1alpha1kind: Servicename: sequence-display ---結論
通過 Sequence 資源模型,我們很容易在 Knative Eventing 中實現事件處理的 Pipeline。對于需要多步驟處理的服務尤為適合。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的Knative Eventing 之 Sequence 介绍的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 从电商到软件市场,阿里双11战火蔓延
- 下一篇: 【从入门到放弃-ZooKeeper】Zo