基于消息中间件RabbitMQ实现简单的RPC服务
轉(zhuǎn)載自??基于消息中間件RabbitMQ實現(xiàn)簡單的RPC服務(wù)
RPC(Remote Procedure Call,遠(yuǎn)程過程調(diào)用),是一種計算機(jī)通信協(xié)議。對于兩臺機(jī)器而言,就是A服務(wù)器上的應(yīng)用程序調(diào)用B服務(wù)器上的函數(shù)或者方法,由于不在同一個內(nèi)存空間或機(jī)器上運(yùn)行,因此需要借助于網(wǎng)絡(luò)通信。
1. RPC框架
我們首先通過一張圖理解RPC的工作流程:
因此,實現(xiàn)一個最簡單的RPC服務(wù),只需要Client、Server和Network,本文就是利用消息中間件RabbitMQ作為Network載體傳輸信息,實現(xiàn)簡單的RPC服務(wù)。簡單原理可如下圖所示:
即:當(dāng)Client發(fā)送RPC請求時,Client端是消息生產(chǎn)者,Server端是消息消費(fèi)者;當(dāng)Server返回結(jié)果時,Server端是消息生產(chǎn)者,Client是消息消費(fèi)者;發(fā)送和返回使用不同的隊列。
接下來我們通過代碼,詳細(xì)展示一個計算斐波那契數(shù)列的RPC服務(wù)。
2. RPCServer實現(xiàn)
2.1 Server初始化
/*** 隊列名、交換機(jī)名、路由鍵*/ private static final String EXCHANGE_NAME = "rpc_exchange"; private static final String QUEUE_NAME = "request_rpc_queue"; private static final String ROUTING_KEY = "rpc_routing_key";private Connection connection = null; private Channel channel = null; private QueueingConsumer consumer = null;/*** Server的構(gòu)造函數(shù)*/ private RPCServer() {try {//創(chuàng)建鏈接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Config.HOST);factory.setPort(Config.PORT);factory.setUsername(Config.USER);factory.setPassword(Config.PASSWORD);connection = factory.newConnection();//創(chuàng)建信道channel = connection.createChannel();//設(shè)置AMQP的通信結(jié)構(gòu)channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);//設(shè)置消費(fèi)者consumer = new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, false, QUEUE_NAME, consumer);} catch (Exception e) {LOG.error("build connection failed!", e);} }初始化就是聲明RabbitMQ的鏈接工廠、鏈接、信道、隊列、交換機(jī)等等,并做了綁定,由此構(gòu)成了AMQP的通信結(jié)構(gòu)。
2.2 監(jiān)聽隊列并反饋
/*** 開啟server*/ private void startServer() {try {LOG.info("Waiting for RPC calls.....");while (true) {//獲得文本消息QueueingConsumer.Delivery delivery = consumer.nextDelivery();BasicProperties props = delivery.getProperties();//返回消息的屬性BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();long receiveTime = System.currentTimeMillis();JSONObject json = new JSONObject();try {String message = new String(delivery.getBody(), "UTF-8");int n = Integer.parseInt(message);LOG.info("Got a request: fib(" + message + ")");json.put("status", "success");json.put("result", fib(n));} catch (Exception e) {json.put("status", "fail");json.put("reason", "Not a Number!");LOG.error("receive message failed!", e);} finally {long responseTime = System.currentTimeMillis();json.put("calculateTime", (responseTime - receiveTime));channel.basicPublish("", props.getReplyTo(), replyProps, json.toString().getBytes("UTF-8"));channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}} catch (Exception e) {LOG.error("server failed!", e);} finally {if (connection != null) {try {connection.close();} catch (Exception e) {LOG.error("close failed!", e);}}} }在該方法中使用了一個無限循環(huán),每次處理一條消息。通過調(diào)用消費(fèi)者對象的nextDelivery方法來獲得RabbitMQ隊列的最新一條消息。同時通過getProperties獲取到消息中的反饋信息屬性,用于標(biāo)記客戶端Client的屬性。然后計算斐波那契數(shù)列的結(jié)果。最后通過basicAck使用消息信封向RabbitMQ確認(rèn)了該消息。
到這里就實現(xiàn)了計算斐波那契數(shù)列RPC服務(wù)的Server端。
3. RPCClient實現(xiàn)
3.1 初始化CLient
/*** 消息請求的隊列名、交換機(jī)名、路由鍵*/ private static final String EXCHANGE_NAME = "rpc_exchange"; private static final String QUEUE_NAME = "request_rpc_queue"; private static final String ROUTING_KEY = "rpc_routing_key";/*** 消息返回的隊列名、交換機(jī)名、路由鍵*/ private static final String RESPONSE_QUEUE = "response_rpc_queue"; private static final String RESPONSE_ROUTING_KEY = "response_rpc_routing_key";/*** RabbitMQ的實體*/ private Connection connection = null; private Channel channel = null; private QueueingConsumer consumer = null;/*** 構(gòu)造客戶端* @throws Exception*/ private RPCClient() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Config.HOST);factory.setPort(Config.PORT);factory.setUsername(Config.USER);factory.setPassword(Config.PASSWORD);connection = factory.newConnection();channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);channel.queueBind(RESPONSE_QUEUE, EXCHANGE_NAME, RESPONSE_ROUTING_KEY);consumer = new QueueingConsumer(channel);channel.basicConsume(RESPONSE_QUEUE, true, consumer); }這里聲明AMQP結(jié)構(gòu)體的方式和Server端類似,只不過Client端需要多聲明一個隊列,用于RPC的response。
3.2 發(fā)送/接收消息
/*** 請求server* @param message* @return* @throws Exception*/ private String requestMessage(String message) throws Exception {String response = null;String corrId = UUID.randomUUID().toString();BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(RESPONSE_QUEUE).build();channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();if (delivery.getProperties().getCorrelationId().equals(corrId)) {response = new String(delivery.getBody(),"UTF-8");break;}}return response; }BasicProperties用于存儲你請求消息的屬性,這里我設(shè)置了correlationId和replyTo屬性,用于Server端的返回識別。
4. 運(yùn)行測試
Client端發(fā)送:
Server端接收并處理:
Client收到計算結(jié)果:
由于我運(yùn)行RabbitMQ的服務(wù)器是租用的阿里云的,差不多傳輸時延在60ms左右,如果把RPC服務(wù)和消息中間件同機(jī)房部署的話延時基本上就在ms級別。
5. FAQ
5.1 說明
需要體驗完整的過程,你需要如下環(huán)境:
JDK1.6以上 + Maven + RabbitMQ5.2 源代碼
完整代碼代碼請戳:github
其中Server的代碼在:
rpc.RPCServerClient端的代碼位置:
rpc.RPCClient總結(jié)
以上是生活随笔為你收集整理的基于消息中间件RabbitMQ实现简单的RPC服务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎么找dedecms 5.7 后台路径找
- 下一篇: 安卓n是什么(安卓n系统)