kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ
前言
之前有文章 《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 Kafka 寫過 Flink 將處理后的數(shù)據(jù)后發(fā)到 Kafka 消息隊(duì)列中去,當(dāng)然我們常用的消息隊(duì)列可不止這一種,還有 RocketMQ、RabbitMQ 等,剛好 Flink 也支持將數(shù)據(jù)寫入到 RabbitMQ,所以今天我們就來寫篇文章講講如何將 Flink 處理后的數(shù)據(jù)寫入到 RabbitMQ。
前提準(zhǔn)備
安裝 RabbitMQ
這里我直接用 docker 命令安裝吧,先把 docker 在 mac 上啟動起來。
在命令行中執(zhí)行下面的命令:
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management對這個(gè)命令不懂的童鞋可以看看我以前的文章:http://www.54tianzhisheng.cn/2018/01/26/SpringBoot-RabbitMQ/
登錄用戶名和密碼分別是:admin / admin ,登錄進(jìn)去是這個(gè)樣子就代表安裝成功了:
依賴
pom.xml 中添加 Flink connector rabbitmq 的依賴如下:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId><version>${flink.version}</version> </dependency>生產(chǎn)者
這里我們依舊自己寫一個(gè)工具類一直的往 RabbitMQ 中的某個(gè) queue 中發(fā)數(shù)據(jù),然后由 Flink 去消費(fèi)這些數(shù)據(jù)。
注意按照我的步驟來一步步操作,否則可能會出現(xiàn)一些錯(cuò)誤!
RabbitMQProducerUtil.java
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class RabbitMQProducerUtil {public final static String QUEUE_NAME = "zhisheng";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory();//設(shè)置RabbitMQ相關(guān)信息factory.setHost("localhost");factory.setUsername("admin");factory.setPassword("admin");factory.setPort(5672);//創(chuàng)建一個(gè)新的連接Connection connection = factory.newConnection();//創(chuàng)建一個(gè)通道Channel channel = connection.createChannel();// 聲明一個(gè)隊(duì)列 // channel.queueDeclare(QUEUE_NAME, false, false, false, null);//發(fā)送消息到隊(duì)列中String message = "Hello zhisheng";//我們這里演示發(fā)送一千條數(shù)據(jù)for (int i = 0; i < 1000; i++) {channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8"));System.out.println("Producer Send +'" + message + i);}//關(guān)閉通道和連接channel.close();connection.close();} }Flink 主程序
import com.zhisheng.common.utils.ExecutionEnvUtil; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** 從 rabbitmq 讀取數(shù)據(jù)*/ public class Main {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;//這些配置建議可以放在配置文件中,然后通過 parameterTool 來獲取對應(yīng)的參數(shù)值final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setVirtualHost("/").setPort(5672).setUserName("admin").setPassword("admin").build();DataStreamSource<String> zhisheng = env.addSource(new RMQSource<>(connectionConfig,"zhisheng",true,new SimpleStringSchema())).setParallelism(1);zhisheng.print();//如果想保證 exactly-once 或 at-least-once 需要把 checkpoint 開啟 // env.enableCheckpointing(10000);env.execute("flink learning connectors rabbitmq");} }運(yùn)行 RabbitMQProducerUtil 類,再運(yùn)行 Main 類!
注意??:
1、RMQConnectionConfig 中設(shè)置的用戶名和密碼要設(shè)置成 admin/admin,如果你換成是 guest/guest,其實(shí)是在 RabbitMQ 里面是沒有這個(gè)用戶名和密碼的,所以就會報(bào)這個(gè)錯(cuò)誤:
nested exception is com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.不出意外的話應(yīng)該你運(yùn)行 RabbitMQProducerUtil 類后,立馬兩個(gè)運(yùn)行的結(jié)果都會出來,速度還是很快的。
2、如果你在 RabbitMQProducerUtil 工具類中把注釋的那行代碼打開的話:
// 聲明一個(gè)隊(duì)列 // channel.queueDeclare(QUEUE_NAME, false, false, false, null);就會出現(xiàn)這種錯(cuò)誤:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'zhisheng' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)這是因?yàn)槟愦蜷_那個(gè)注釋的話,一旦你運(yùn)行了該類就會創(chuàng)建一個(gè)叫做 zhisheng 的 Queue,當(dāng)你再運(yùn)行 Main 類中的時(shí)候,它又會創(chuàng)建這樣一個(gè)叫 zhisheng 的 Queue,然后因?yàn)橐呀?jīng)有同名的 Queue 了,所以就有了沖突,解決方法就是把那行代碼注釋就好了。
3、該 connector(連接器)中提供了 RMQSource 類去消費(fèi) RabbitMQ queue 中的消息和確認(rèn) checkpoints 上的消息,它提供了三種不一樣的保證:
- Exactly-once(只消費(fèi)一次): 前提條件有,1 是要開啟 checkpoint,因?yàn)橹挥性?checkpoint 完成后,才會返回確認(rèn)消息給 RabbitMQ(這時(shí),消息才會在 RabbitMQ 隊(duì)列中刪除);2 是要使用 Correlation ID,在將消息發(fā)往 RabbitMQ 時(shí),必須在消息屬性中設(shè)置 Correlation ID。數(shù)據(jù)源根據(jù) Correlation ID 把從 checkpoint 恢復(fù)的數(shù)據(jù)進(jìn)行去重;3 是數(shù)據(jù)源不能并行,這種限制主要是由于 RabbitMQ 將消息從單個(gè)隊(duì)列分派給多個(gè)消費(fèi)者。
- At-least-once(至少消費(fèi)一次): 開啟了 checkpoint,但未使用相 Correlation ID 或 數(shù)據(jù)源是并行的時(shí)候,那么就只能保證數(shù)據(jù)至少消費(fèi)一次了
- No guarantees(無法保證): Flink 接收到數(shù)據(jù)就返回確認(rèn)消息給 RabbitMQ
Sink 數(shù)據(jù)到 RabbitMQ
RabbitMQ 除了可以作為數(shù)據(jù)源,也可以當(dāng)作下游,Flink 消費(fèi)數(shù)據(jù)做了一些處理之后也能把數(shù)據(jù)發(fā)往 RabbitMQ,下面演示下 Flink 消費(fèi) Kafka 數(shù)據(jù)后寫入到 RabbitMQ。
public class Main1 {public static void main(String[] args) throws Exception {final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env);final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setVirtualHost("/").setPort(5672).setUserName("admin").setPassword("admin").build();//注意,換一個(gè)新的 queue,否則也會報(bào)錯(cuò)data.addSink(new RMQSink<>(connectionConfig, "zhisheng001", new MetricSchema()));env.execute("flink learning connectors rabbitmq");} }是不是很簡單?但是需要注意的是,要換一個(gè)之前不存在的 queue,否則是會報(bào)錯(cuò)的。
不出意外的話,你可以看到 RabbitMQ 的監(jiān)控頁面會出現(xiàn)新的一個(gè) queue 出來,如下圖:
總結(jié)
本文先把 RabbitMQ 作為數(shù)據(jù)源,寫了個(gè) Flink 消費(fèi) RabbitMQ 隊(duì)列里面的數(shù)據(jù)進(jìn)行打印出來,然后又寫了個(gè) Flink 消費(fèi) Kafka 數(shù)據(jù)后寫入到 RabbitMQ 的例子!
本文原創(chuàng)地址是: http://www.54tianzhisheng.cn/2019/01/20/Flink-RabbitMQ-sink/ , 未經(jīng)允許禁止轉(zhuǎn)載。
關(guān)注我
微信公眾號:zhisheng
另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink 即可無條件獲取到。
更多私密資料請加入知識星球!
Github 代碼倉庫
https://github.com/zhisheng17/flink-learning/
以后這個(gè)項(xiàng)目的所有代碼都將放在這個(gè)倉庫里,包含了自己學(xué)習(xí) flink 的一些 demo 和博客。
本文的項(xiàng)目代碼在 https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-rabbitmq
相關(guān)文章
1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹
2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡單程序入門
3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解
4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹
5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?
6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹
7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?
8、《從0到1學(xué)習(xí)Flink》—— Flink Data transformation(轉(zhuǎn)換)
9、《從0到1學(xué)習(xí)Flink》—— 介紹Flink中的Stream Windows
10、《從0到1學(xué)習(xí)Flink》—— Flink 中的幾種 Time 詳解
11、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 ElasticSearch
12、《從0到1學(xué)習(xí)Flink》—— Flink 項(xiàng)目如何運(yùn)行?
13、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 Kafka
14、《從0到1學(xué)習(xí)Flink》—— Flink JobManager 高可用性配置
15、《從0到1學(xué)習(xí)Flink》—— Flink parallelism 和 Slot 介紹
16、《從0到1學(xué)習(xí)Flink》—— Flink 讀取 Kafka 數(shù)據(jù)批量寫入到 MySQL
17、《從0到1學(xué)習(xí)Flink》—— Flink 讀取 Kafka 數(shù)據(jù)寫入到 RabbitMQ
18、《從0到1學(xué)習(xí)Flink》—— 你上傳的 jar 包藏到哪里去了?
總結(jié)
以上是生活随笔為你收集整理的kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python中字符串运算符及用法_pyt
- 下一篇: easyexcel 导出 代码翻译con