kafka streams_Kafka REST Proxy MapR Streams入门
kafka streams
MapR生態系統軟件包2.0(MEP)隨附了一些與MapR流有關的新功能:
- 用于MapR Streams的Kafka REST代理為MapR Streams和Kafka集群提供了RESTful接口,使其易于使用和產生消息以及執行管理操作。
- Kafka Connect for MapR Streams是一個實用程序,用于在MapR Streams與Apache Kafka和其他存儲系統之間流式傳輸數據。
MapR生態系統軟件包(MEP)是一種與核心升級脫鉤的生態系統升級工具,可讓您獨立于MapR融合數據平臺升級工具。 您可以在本文中了解有關MEP 2.0的更多信息 。
在此博客中,我們描述了如何使用Kafka REST代理向MapR流發布消息和從MapR流中使用消息。 REST代理是MapR融合數據平臺的重要補充,它允許任何編程語言使用MapR流。
MapR Streams工具隨附的Kafka REST Proxy可以與MapR Streams(默認)以及Apache Kafka(在混合模式下)一起使用。 在本文中,我們將重點介紹MapR流。
先決條件
- 具有MEP 2.0的MapR融合數據平臺5.2
- 使用MapR Streams工具
- curl,wget或任何HTTP / REST客戶端工具
創建MapR流和主題
流是主題的集合,您可以通過以下方式將其分組管理:
您可以在文檔中找到有關MapR Streams概念的更多信息。
在您的MapR群集或沙盒上,運行以下命令:
$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p $ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3 $ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3啟動Kafka控制臺的生產者和消費者
打開兩個終端窗口,并使用以下命令運行使用者的Kafka實用程序:
消費者
- 主題傳感器-json
- 主題傳感器二進制
這兩個終端窗口將允許您查看有關不同主題的消息。
使用Kafka REST代理
端點/ topics / [topic_name]允許您獲取有關該主題的一些信息。 在MapR Streams中,主題是路徑標識的流的一部分; 要通過REST API訪問主題,您必須輸入完整路徑并在URL中進行編碼; 例如:
- / apps / iot-stream:sensor-json將使用%2Fapps%2Fiot-stream%3Asensor-json進行編碼
運行以下命令,以獲取有關sensor-json主題的信息:
$ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json注意:為簡單起見,我從運行Kafka REST代理的節點上運行命令,因此可以使用localhost 。
您可以通過添加Python命令,以漂亮的方式打印JSON,例如:
$ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool默認流
如上所述,流路徑是您必須在命令中使用的主題名稱的一部分。 但是,可以將MapR Kafka REST代理配置為使用默認流。 對于此配置,您應該在/opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties文件中添加以下屬性:
- stream.default.stream = / apps / iot-stream
更改Kafka REST代理配置時,必須使用maprcli或MCS重新啟動服務。
使用streams.default.stream屬性的主要原因是簡化應用程序使用的URL。 例如:
- 通過streams.default.stream ,可以使用curl -X GET http:// localhost:8082 / topics /
- 如果沒有此配置,或者要使用特定的流,則必須在URL中指定它: http:// localhost:8082 / topics /%2Fapps%2Fiot-stream%3Asensor-json
在本文中,所有URL都包含編碼的流名稱,因此您可以在不更改配置的情況下開始使用Kafka REST代理,也可以將其用于其他流。
用于MapR流的Kafka REST代理允許應用程序將消息發布到MapR流。 消息可以作為JSON或二進制內容(base64編碼)發送。
要發送JSON消息:
- 查詢應該是HTTP POST
- 內容類型應為:application / vnd.kafka.json.v1 + json
- 身體:
完整的請求是:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"} }]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json您應該在/ apps / iot-stream:sensor-json使用者正在運行的終端窗口中看到打印的消息。
發送二進制消息:
- 查詢應該是HTTP POST
- 內容類型應為:application / vnd.kafka.binary.v1 + json
- 身體:
請注意, SGVsbG8gV29ybGQ =是在Base64中編碼的字符串“ Hello World”。
完整的請求是:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary您應該在/ apps / iot-stream:sensor-binary使用者正在運行的終端窗口中看到打印的消息。
發送多個消息:
HTTP正文的記錄字段允許您發送多個消息; 例如,您可以發送:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"} }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"} } ]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json該命令將發送2條消息,并將偏移量增加2。您可以通過在JSON數組中添加新元素來對二進制內容執行相同的操作; 例如:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary您可能知道,可以將密鑰綁定到消息,以確保所有具有相同密鑰的消息都將到達同一分區。 為此,將key屬性添加到消息中,如下所示:
{"records":[{"key": "K001","value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"} }] }現在,您知道如何使用REST代理將消息發布到MapR Streams主題,讓我們看看如何使用消息。
消費信息
REST代理還可以用于消費主題消息。 為此,您需要:
創建使用者實例
以下請求創建使用者實例:
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json服務器的響應如下所示:
{"instance_id":"iot_json_consumer","base_uri":"http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer" }請注意,我們已經使用/ consumers / [topic_name]創建使用者。 后續請求將使用base_uri從主題獲取消息。 與任何MapR Streams / Kafka使用者一樣, auto.offset.reset定義其行為。 在此示例中,該值設置為最早 ,這意味著使用者將從頭開始閱讀消息。 您可以在MapR Streams文檔中找到有關使用者配置的更多信息。
消費信息
要使用消息,只需將MapR Streams主題添加到使用者實例的URL。
以下請求使用了該主題的消息:
curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json此調用返回JSON文檔中的消息:
[{"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1},{"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2},{"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3} ]每次對API的調用都會根據上一次調用的偏移量返回發布的新消息。
請注意,消費者將被銷毀:
- 在Consumer.instance.timeout.ms設置了一些空閑時間(默認值設置為300000ms / 5分鐘)后,使用REST API調用銷毀了該空閑時間(請參見下文)。
消費二進制格式的消息
如果需要使用二進制消息,則方法是相同的:您需要更改格式和Accept標頭。
調用此URL為二進制主題創建使用者實例:
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary然后使用消息,accept標頭設置為application / vnd.kafka.binary.v1 + json :
curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary該調用返回JSON文檔中的消息,并且該值在Base64中編碼:
[{"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1},{"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2} ]刪除使用者實例
如前所述,將根據REST Proxy的consumer.instance.timeout.ms配置自動銷毀使用者 。 也可以使用使用者實例URI和HTTP DELETE調用銷毀實例,如下所示:
curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer結論
在本文中,您學習了如何將Kafka REST代理用于MapR流,該代理允許任何應用程序使用在MapR融合數據平臺中發布的消息。
您可以在MapR文檔和以下資源中找到有關Kafka REST代理的更多信息:
- MapR Streams入門
- Ted Dunning和Ellen Friedman撰寫的“流傳輸體系結構:使用Apache Kafka和MapR流的新設計”電子書
翻譯自: https://www.javacodegeeks.com/2017/01/getting-started-kafka-rest-proxy-mapr-streams-2.html
kafka streams
總結
以上是生活随笔為你收集整理的kafka streams_Kafka REST Proxy MapR Streams入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 战地5用什么电脑系统(战地5用什么电脑系
- 下一篇: 电脑声音不能调(电脑声音不能调试)