带有Kafka和ZeroMQ的分布式类星体演员
因此,您已經有了使用actor的精美設計,選擇了JVM和Quasar在該主題上的強大而忠實的觀點。 所有明智的決定,但是在集群上進行分配時您有什么選擇呢?
星系
Galaxy是一個非常酷的選擇:快速的內存中數據網格,針對數據局部性進行了優化,具有復制,可選的持久性,分布式參與者注冊表,甚至參與者之間的參與者遷移! 只有一個警告:要發布正式版的生產質量的銀河版,還需要幾個月的時間。 不建議將當前版本的Galaxy用于生產。
如果我們需要在那之前上線怎么辦?
幸運的是,Quasar Actors的阻塞編程模型非常簡單,以至于將其與大多數消息傳遞解決方案集成起來都是輕而易舉的,并且為了證明讓我們用兩種快速,流行且截然不同的模型來做到這一點: Apache Kafka和?MQ 。
代碼和計劃
可以在GitHub上找到以下所有示例,只需快速閱讀簡短的README ,即可立即運行它們。
Kafka和?MQ分別有兩個示例:
- 快速而骯臟的人直接進行發布/投票或發送/接收演員的呼叫。
- 詳細介紹了代理角色,這些代理角色將您的代碼與消息傳遞API隔離開。 為了證明我沒有在說謊,該程序對兩種技術使用了相同的生產者和消費者參與者類 ,并且幾乎使用了相同的引導程序。
卡夫卡
Apache Kafka的采用率急劇上升,這是由于其基于持久性提交日志和用于并行消息使用的使用者組的獨特設計:這種結合形成了快速,可靠,靈活和可擴展的代理。
該API包括兩種類型的生產者:sync和async;一種消費者(僅sync); Comsat包括社區貢獻的,對光纖友好的Kafka生產商集成 。
Kafka生產者句柄是線程安全的,在共享時表現最佳,并且可以像這樣在actor主體(或其他任何地方)中輕松獲得和使用:
final Properties producerConfig = new Properties(); producerConfig.put("bootstrap.servers", "localhost:9092"); producerConfig.put("client.id", "DemoProducer"); producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");try (final FiberKafkaProducer<Integer, byte[]> producer = new FiberKafkaProducer<>(new KafkaProducer<>(producerConfig))) {final byte[] myBytes = getMyBytes(); // ...final Future<RecordMetaData> res = producer.send(new ProducerRecord<>("MyTopic", i, myBytes));res.get(); // Optional, blocks the fiber until the record is persisted; thre's also `producer.flush()` }我們用Comsat的FiberKafkaProducer包裝了KafkaProducer對象,以便找回光纖阻塞的未來。
但是,使用者句柄不是線程安全的1,并且僅是線程阻塞的:
final Properties producerConfig = new Properties(); consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {consumer.subscribe(Collections.singletonList(TOPIC));final ConsumerRecords<Integer, byte[]> records = consumer.poll(1000L);for (final ConsumerRecord<Integer, byte[]> record : records) {final byte[] v = record.value();useMyBytes(v); // ...} }由于我們不想阻塞光纖的基礎線程池(除了卡夫卡在doRun的線程池,我們無法對其做太多的事情),因此在我們的actor的doRun我們將使用FiberAsync.runBlocking代替FiberAsync.runBlocking來喂入固定的FiberAsync.runBlocking具有異步任務的size執行程序,該任務將阻塞光纖直到poll (將在給定的池中執行)返回之前:
final ExecutorService e = Executors.newFixedThreadPool(2);try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {consumer.subscribe(Collections.singletonList(TOPIC));final ConsumerRecords<Integer, byte[]> records = call(e, () -> consumer.poll(1000L));for (final ConsumerRecord<Integer, byte[]> record : records) {final byte[] v = record.value();useMyBytes(v); // ...} }其中call是一個定義如下的實用程序方法(如果不是此Java編譯器bug,則沒有必要):
@Suspendable public static <V> V call(ExecutorService es, Callable<V> c) throws InterruptedException, SuspendExecution {try {return runBlocking(es, (CheckedCallable<V, Exception>) c::call);} catch (final InterruptedException | SuspendExecution e) {throw e;} catch (final Exception e) {throw new RuntimeException(e);} }在第一個完整的示例中,我們將從生產者角色向消費者發送一千個序列化消息。
?MQ
?MQ(或ZeroMQ)不是集中的代理解決方案,而更多地是各種通信模式(請求/答復,發布/訂閱等)的套接字的一般化。 在我們的示例中,我們將使用最簡單的請求-答復模式。 這是我們的新生產者代碼:
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {trgt.connect("tcp://localhost:8000");final byte[] myBytes = getMyBytes(); // ...trgt.send(baos.toByteArray(), 0 /* flags */)trgt.recv(); // Reply, e.g. ACK }如您所見,上下文充當套接字工廠,并傳遞了要使用的I / O線程數:這是因為?MQ套接字不是連接綁定的OS句柄,而是用于處理的機器的簡單前端重試連接,多個連接,高效的并發I / O甚至為您排隊。 這就是為什么send調用幾乎永遠不會阻塞,而recv調用不是連接上的I / O調用,而是線程與專門的I / O任務之間的同步的原因,該任務將從一個或多個連接中傳入字節。
不過,我們將在角色中阻塞光纖,而不是線程,因此讓我們在read調用上使用FiberAsync.runBlocking ,以防萬一它阻塞甚至在send時阻塞:
final ExecutorService ep = Executors.newFixedThreadPool(2);try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {exec(e, () -> trgt.connect("tcp://localhost:8000"));final byte[] myBytes = getMyBytes(); // ...call(e, trgt.send(myBytes, 0 /* flags */));call(e, trgt::recv); // Reply, e.g. ACK }這是消費者:
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);final ZMQ.Socket src = zmq.socket(ZMQ.REP)) {exec(e, () -> src.bind("tcp://*:8000"));final byte[] v = call(e, src::recv);exec(e, () -> src.send("ACK"));useMyBytes(v); // ... }exec是另一個實用程序函數,類似于call :
@Suspendable public static void exec(ExecutorService es, Runnable r) throws InterruptedException, SuspendExecution {try {runBlocking(es, (CheckedCallable<Void, Exception>) () -> { r.run(); return null; });} catch (final InterruptedException | SuspendExecution e) {throw e;} catch (final Exception e) {throw new RuntimeException(e);} }這是完整的第一個示例 。
在不改變邏輯的情況下進行分配:與救援人員的松散耦合
很簡單,不是嗎? 但是,有些令人討厭的事情:我們與網絡另一端的參與者打交道的方式與本地參與者不同。 無論他們位于何處或如何連接,這些都是我們愿意寫的演員:
public final class ProducerActor extends BasicActor<Void, Void> {private final ActorRef<Msg> target;public ProducerActor(ActorRef<Msg> target) {this.target = target;}@Overrideprotected final Void doRun() throws InterruptedException, SuspendExecution {for (int i = 0; i < MSGS; i++) {final Msg m = new Msg(i);System.err.println("USER PRODUCER: " + m);target.send(m);}System.err.println("USER PRODUCER: " + EXIT);target.send(EXIT);return null;} }public final class ConsumerActor extends BasicActor<Msg, Void> {@Overrideprotected final Void doRun() throws InterruptedException, SuspendExecution {for (;;) {final Msg m = receive();System.err.println("USER CONSUMER: " + m);if (EXIT.equals(m))return null;}} }幸運的是,每個演員,無論做什么,都具有相同的非常基本的接口:傳入消息隊列,稱為信箱 。 這意味著我們可以在兩個通信參與者之間插入任意數量的中間參與者或代理 ,尤其是我們希望有一個發送代理,它將通過中間件將消息獲取到目標主機,并在其中接收接收代理,以捕獲傳入的消息。并將它們放入目標目的地的郵箱中。
因此,在我們的主程序中,我們將為我們的ProducerActor提供合適的發送代理,然后讓ConsumerActor從合適的接收代理接收:
final ProducerActor pa = Actor.newActor(ProducerActor.class, getSendingProxy()); // ... final ConsumerActor ca = Actor.newActor(ConsumerActor.class); pa.spawn(); System.err.println("USER PRODUCER started"); subscribeToReceivingProxy(ca.spawn()); // ... System.err.println("USER CONSUMER started"); pa.join(); System.err.println("USER PRODUCER finished"); ca.join(); System.err.println("USER CONSUMER finished");讓我們看看如何首先使用Kafka然后使用?MQ來實現這些代理。
卡夫卡男演員代理
代理參與者的工廠將與特定的Kafka主題相關聯:這是因為可以對主題進行分區 ,以使多個使用者可以同時讀取該主題。 我們希望能夠最佳地利用每個主題的最大級別或并發性:
/* ... */ KafkaProxies implements AutoCloseable {/* ... */ KafkaProxies(String bootstrap, String topic) { /* ... */ }// ... }當然,我們希望對多個參與者使用一個主題,因此發送代理將指定接收者參與者ID,接收代理將僅將消息轉發給綁定到該ID的用戶參與者:
/* ... */ <M> ActorRef<M> create(String actorID) { /* ... */ } /* ... */ void drop(ActorRef ref) throws ExecutionException, InterruptedException { /* ... */ } /* ... */ <M> void subscribe(ActorRef<? super M> consumer, String actorID) { /* ... */ } /* ... */ void unsubscribe(ActorRef<?> consumer, String actorID) { /* ... */ }關閉AutoClosable工廠將告訴所有代理終止,并清理簿記參考:
/* ... */ void close() throws Exception { /* ... */ }生產者實現是非常簡單和無趣的,同時給消費者帶來了更多的樂趣,因為它將使用Quasar Actors的選擇性接收將傳入消息保留在其郵箱中,直到至少有一個訂閱的用戶actor可以使用它們為止:
@Override protected Void doRun() throws InterruptedException, SuspendExecution {//noinspection InfiniteLoopStatementfor (;;) {// Try extracting from queuefinal Object msg = tryReceive((Object m) -> {if (EXIT.equals(m))return EXIT;if (m != null) {//noinspection uncheckedfinal ProxiedMsg rmsg = (ProxiedMsg) m;final List<ActorRef> l = subscribers.get(rmsg.actorID);if (l != null) {boolean sent = false;for (final ActorRef r : l) {//noinspection uncheckedr.send(rmsg.payload);sent = true;}if (sent) // Someone was listening, remove from queuereturn m;}}return null; // No subscribers (leave in queue) or no messages});// Something from queueif (msg != null) {if (EXIT.equals(msg)) {return null;}continue; // Go to next cycle -> precedence to queue}// Try receiving//noinspection Convert2Lambdafinal ConsumerRecords<Void, byte[]> records = call(e, () -> consumer.get().poll(100L));for (final ConsumerRecord<Void, byte[]> record : records) {final byte[] v = record.value();try (final ByteArrayInputStream bis = new ByteArrayInputStream(v);final ObjectInputStream ois = new ObjectInputStream(bis)) {//noinspection uncheckedfinal ProxiedMsg rmsg = (ProxiedMsg) ois.readObject();final List<ActorRef> l = subscribers.get(rmsg.actorID);if (l != null && l.size() > 0) {for (final ActorRef r : l) {//noinspection uncheckedr.send(rmsg.payload);}} else {ref().send(rmsg); // Enqueue}} catch (final IOException | ClassNotFoundException e) {e.printStackTrace();throw new RuntimeException(e);}} }由于我們還需要處理郵箱,因此我們以足夠小的超時來輪詢Kafka。 還要注意,許多參與者可以訂閱相同的ID,傳入的消息將廣播給所有參與者。 每個主題創建的接收actor代理(即光纖)的數量,以及池線程和Kafka使用者句柄( consumer是本地線程,因為Kafka使用者不是線程安全的)的數量將等于每個主題的分區數:這使接收吞吐量達到最大。
目前,此實現使用Java序列化在字節之間來回轉換消息,但是當然可以使用其他框架,例如Kryo 。
?MQ演員代理
?MQ模型是完全去中心化的:既沒有經紀人,也沒有話題,因此我們可以簡單地將?MQ地址/端點與一組參與者等同,而無需使用額外的參與者ID:
/* ... */ ZeroMQProxies implements AutoCloseable {/* ... */ ZeroMQProxies(int ioThreads) { /* ... */ }/* ... */ <M> ActorRef<M> to(String trgtZMQAddress) { /* ... */ }/* ... */ void drop(String trgtZMQAddress)/* ... */ void subscribe(ActorRef<? super M> consumer, String srcZMQEndpoint) { /* ... */ }/* ... */ void unsubscribe(ActorRef<?> consumer, String srcZMQEndpoint) { /* ... */ }/* ... */ void close() throws Exception { /* ... */ } }同樣,在這種情況下,并且由于與以前相同的原因,使用者有點有趣,但幸運的是,線程安全性方面的任何問題都因為?MQ套接字在多個線程中可以正常工作:
@Override protected Void doRun() throws InterruptedException, SuspendExecution {try(final ZMQ.Socket src = zmq.socket(ZMQ.REP)) {System.err.printf("PROXY CONSUMER: binding %s\n", srcZMQEndpoint);Util.exec(e, () -> src.bind(srcZMQEndpoint));src.setReceiveTimeOut(100);//noinspection InfiniteLoopStatementfor (;;) {// Try extracting from queuefinal Object m = tryReceive((Object o) -> {if (EXIT.equals(o))return EXIT;if (o != null) {//noinspection uncheckedfinal List<ActorRef> l = subscribers.get(srcZMQEndpoint);if (l != null) {boolean sent = false;for (final ActorRef r : l) {//noinspection uncheckedr.send(o);sent = true;}if (sent) // Someone was listening, remove from queuereturn o;}}return null; // No subscribers (leave in queue) or no messages});// Something processable is thereif (m != null) {if (EXIT.equals(m)) {return null;}continue; // Go to next cycle -> precedence to queue}System.err.println("PROXY CONSUMER: receiving");final byte[] msg = Util.call(e, src::recv);if (msg != null) {System.err.println("PROXY CONSUMER: ACKing");Util.exec(e, () -> src.send(ACK));final Object o;try (final ByteArrayInputStream bis = new ByteArrayInputStream(msg);final ObjectInputStream ois = new ObjectInputStream(bis)) {o = ois.readObject();} catch (final IOException | ClassNotFoundException e) {e.printStackTrace();throw new RuntimeException(e);}System.err.printf("PROXY CONSUMER: distributing '%s' to %d subscribers\n", o, subscribers.size());//noinspection uncheckedfor (final ActorRef s : subscribers.getOrDefault(srcZMQEndpoint, (List<ActorRef>) Collections.EMPTY_LIST))//noinspection uncheckeds.send(o);} else {System.err.println("PROXY CONSUMER: receive timeout");}}} }更多功能
這篇簡短的文章有望使人們一眼就可以看出,由于Quasar的Actor具有順暢的順序流程的特性,因此可以無縫地將Quasar的Actor與消息傳遞解決方案進行接口連接。 當然,可以更進一步,例如:
- 演員查找和發現 :我們如何提供全球演員命名/發現服務? 例如,Kafka使用ZooKeeper,因此可能值得利用,但?MQ大量下注于去中心化,故意不提供預先打包的基礎。
- Actor故障管理 :我們如何支持在不同節點中運行的actor之間的故障管理鏈接和監視?
- 消息路由 :如何在不更改參與者內部邏輯的情況下動態調整節點與參與者之間的消息流?
- 角色移動性 :我們如何將角色移動到其他節點,例如,使其更靠近消息源,以提高性能或移動到具有不同安全性的位置?
- 可伸縮性和容錯性 :如何管理參與者節點的添加,刪除,死亡和分區? 像Galaxy這樣的分布式IMDG和像Kafka這樣的基于代理的解決方案通常已經做到了,但是像?MQ這樣的結構級解決方案通常不這樣做。
- 安全性 :我們如何支持相關的信息安全性屬性?
- 測試,記錄,監視 :我們如何方便地整體測試,跟蹤和監視分布式參與者集合?
這些主題尤其是分布式系統設計的“硬核”,尤其是分布式參與者,因此,有效地解決它們可能需要大量的精力。 Galaxy解決了所有這些問題,但Quasar參與者提供了一個SPI ,涵蓋了上述一些主題,并允許與發行技術更緊密地集成。 您可能還對Akka與Quasar + Galaxy之間的比較感興趣,該比較涵蓋了許多此類方面。
就是這樣,請與您分布的Quasar演員一起玩樂,并在Quasar-Pulsar用戶組中留下有關您的旅程的注釋!
翻譯自: https://www.javacodegeeks.com/2016/05/distributed-quasar-actors-kafka-zeromq.html
總結
以上是生活随笔為你收集整理的带有Kafka和ZeroMQ的分布式类星体演员的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑writer怎么搞中文版(write
- 下一篇: 最常见的Java异常及其对Java开发人