Hadoop RPC框架
                                                            生活随笔
收集整理的這篇文章主要介紹了
                                Hadoop RPC框架
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.                        
                                原文:http://blog.csdn.net/thomas0yang/article/details/41211259
----------------------------------------------------------------------------------------------
1、RPC框架概述 1.1 RPC(Remote Procedure Call Protocol)——遠程過程調(diào)用協(xié)議,它是一種通過網(wǎng)絡從遠程計算機程序上請求服務,而不需要了解底層網(wǎng)絡技術(shù)的協(xié)議。RPC協(xié)議假定某些傳輸協(xié)議的存在,如TCP或UDP,為通信程序之間攜帶信息數(shù)據(jù)。在OSI網(wǎng)絡通信模型中,RPC跨越了傳輸層和應用層。RPC使得開發(fā)包括網(wǎng)絡分布式多程序在內(nèi)的應用程序更加容易。 1.2 RPC通常采用客戶端服務器模型,其框架主要有以下幾部分- 通信模塊:實現(xiàn)請求應該協(xié)議。主要分為同步方式和異步方式。
- stub程序:客戶端和服務器均包含stub程序,可以看做代理程序。使得遠程函數(shù)表現(xiàn)的跟本地調(diào)用一樣,對用戶程序完全透明。
- 調(diào)度程序:接受來自通信模塊的請求消息,根據(jù)標識選擇stub程序處理。并發(fā)量大一般采用線程池處理。
- 客戶程序/服務過程:請求發(fā)出者和請求的處理者。
2、Hadoop RPC基本框架 2.1Hadoop RPC的使用方法見代碼
| 服務 public interface MyBiz extends VersionedProtocol { ??? long PROTOCOL_VERSION = 12321443L; ??? String hello(String name); } public class MyBizImpl implements MyBiz { ??? @Override ??? public long getProtocolVersion(String arg0, long arg1) throws IOException { ??????? return PROTOCOL_VERSION; ??? } ??? @Override ??? public String hello(String name) { ??????? System. out.println( "invoked"); ??????? return "hello " + name; ??? } } 服務器 public class MyServer { ??? public static final String SERVER_ADDRESS = "localhost"; ??? public static final int SERVER_PORT = 12345; ??? public static void main(String[] args) throws IOException { ??????? Server server = RPC. getServer(new MyBizImpl(), SERVER_ADDRESS, SERVER_PORT , new Configuration()); ??????? server.start(); ??? } } 客戶端 public class MyClient { ??? public static void main(String[] args) throws IOException { ??????? MyBiz proxy = (MyBiz) RPC. getProxy(MyBiz.class, MyBiz.PROTOCOL_VERSION, ??????????????? new InetSocketAddress(MyServer. SERVER_ADDRESS,MyServer.SERVER_PORT), ??????????????? new Configuration()); ??????? String result = proxy.hello( "5"); ??????? System. out.println(result); ??????? RPC.stopProxy(proxy); ??? } } | 
2.2 org.apache.hadoop.ipc.RPC類解析 RPC類主要包含三部分:
- ClientCache(成員變量):根據(jù)用戶提供的SocketFactory來緩存Client對象,以便重用Client對象。
- Server(內(nèi)部類):繼承Server抽象類,利用反射實現(xiàn)了call方法,即客戶端請求的方法和對應參數(shù)完成方法調(diào)用。
- Invocation(內(nèi)部類):將要調(diào)用的方法名和參數(shù)打包成可序列化的對象,方便客戶端和服務器之間傳遞。
2.3 客戶端和服務器端的關(guān)系
- Client-NameNode之間,其中NameNode是服務器
- Client-DataNode之間,其中DataNode是服務器
- DataNode-NameNode之間,其中NameNode是服務器
- DataNode-DateNode之間,其中某一個DateNode是服務器,另一個是客戶端
- Call(內(nèi)部類):封裝了一個RPC請求,包含5個成員變量,唯一表示id、函數(shù)調(diào)用信息param、函數(shù)返回值value、函數(shù)異常信息error、函數(shù)完成標識done。Hadoop rpc?server采用異步方式處理客戶端請求,使得遠程過程調(diào)用的發(fā)生順序和返回順序無直接關(guān)系,而客戶端正是通過id識別不同的函數(shù)調(diào)用。當客戶端向服務器發(fā)送請求,只需填充id和param兩個變量,其余3個變量由服務器端根據(jù)函數(shù)執(zhí)行情況填充。
- Connection(內(nèi)部類,一個線程):是client和server之間的一個通信連接,封裝了連接先關(guān)的基本信息和操作。基本信息包括:通信連接唯一標識remoteId(ConnectionId)、與Server端通信的scoket、網(wǎng)絡輸入輸出流in/out、保存RPC請求的哈希表calls(Hashtable<Integer, Call>)。操作包括:addCall將一個Call對象添加到哈希表中;sendParam想服務器端發(fā)送RPC請求;receiveResponse從服務器端接收已經(jīng)處理完成的RPC請求;run調(diào)用receiveResponse方法,等待返回結(jié)果。
- ConnectionId(內(nèi)部類):連接的標記(包括server地址,協(xié)議,其他一些連接的配置項信息)
- ParallelCall(內(nèi)部類):實現(xiàn)并行調(diào)用的請求
- ParallelResults(內(nèi)部類):并行調(diào)用的執(zhí)行結(jié)果
2.4.3 調(diào)用流程分析,當調(diào)用call函數(shù)執(zhí)行某個遠程方法時,有以下幾個步驟: 1)創(chuàng)建一個Connection對象,并將遠程方法調(diào)用信息封裝成Call對象,放到Connection對象中的哈希表中;
2)調(diào)用Connection類中的sendRpcRequest()方法將當前Call對象發(fā)送給Server端;
3)Server端處理完RPC請求后,將結(jié)果通過網(wǎng)絡返回給Client端,Client端通過receiveRpcResponse()函數(shù)獲取結(jié)果;
4)Client檢查結(jié)果處理狀態(tài)(成功還是失敗),并將對應Call對象從哈希表中刪除。
2.4.4 一個Client包含多個連接,private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
2.5?org.apache.hadoop.ipc.Server類解析
2.5.1 背景 Hadoop采用了Master/Slave結(jié)構(gòu),其中Master是整個系統(tǒng)的單點,如NameNode或JobTracker,這是制約系統(tǒng)性能和可擴展性的最關(guān)鍵因素之一;而Master通過ipc.Server接收并處理所有Slave發(fā)送的請求,這就要求ipc.Server 將高并發(fā)和可擴展性作為設計目標。為此,ipc.Server采用了很多提高并發(fā)處理能力的技術(shù),主要包括線程池、事件驅(qū)動和Reactor設計模式等,這些技術(shù)均采用了JDK自帶的庫實現(xiàn),這里重點分析它是如何利用Reactor設計模式提高整體性能的。
2.5.2 reactor設計模式 Reactor是并發(fā)編程中的一種基于事件驅(qū)動的設計模式,它具有以下兩個特點:通過派發(fā)/分離I/O操作事件提高系統(tǒng)的并發(fā)性能;提供了粗粒度的并發(fā)控制,使用單線程實現(xiàn),避免了復雜的同步處理。典型的Reactor實現(xiàn)原理如圖所示。
典型的Reactor模式中主要包括以下幾個角色。
- Reactor:I/O事件的派發(fā)者。
- Acceptor:接受來自Client的連接,建立與Client對應的Handler,并向Reactor注冊此Handler。
- Handler:與一個Client通信的實體,并按一定的過程實現(xiàn)業(yè)務的處理。Handler內(nèi)部往往會有更進一步的層次劃分,用來抽象諸如read、decode、compute、encode和send等過程。在Reactor模式中,業(yè)務邏輯被分散的I/O事件所打破,所以Handler需要有適當?shù)臋C制在所需的信息還不全(讀到一半)的時候保存上下文,并在下一次I/O事件到來的時候(另一半可讀)能繼續(xù)上次中斷的處理。
- Reader/Sender:為了加速處理速度,Reactor模式往往構(gòu)建一個存放數(shù)據(jù)處理線程的線程池,這樣數(shù)據(jù)讀出后,立即扔到線程池中等待后續(xù)處理即可。為此,Reactor模式一般分離Handler中的讀和寫兩個過程,分別注冊成單獨的讀事件和寫事件,并由對應的Reader和Sender線程處理。
| package com.sohu.tv.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * NIO服務端 * @author 小路 */ public class NIOServer { ??? //通道管理器 ??? private Selector selector; ??? /** ???? * 獲得一個ServerSocket通道,并對該通道做一些初始化的工作 ???? * @param port? 綁定的端口號 ???? * @throws IOException ???? */ ??? public void initServer(int port) throws IOException { ??????? // 獲得一個ServerSocket通道 ??????? ServerSocketChannel serverChannel = ServerSocketChannel.open(); ??????? // 設置通道為非阻塞 ??????? serverChannel.configureBlocking(false); ??????? // 將該通道對應的ServerSocket綁定到port端口 ??????? serverChannel.socket().bind(new InetSocketAddress(port)); ??????? // 獲得一個通道管理器 ??????? this.selector = Selector.open(); ??????? //將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_ACCEPT事件,注冊該事件后, ??????? //當該事件到達時,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞。 ??????? serverChannel.register(selector, SelectionKey.OP_ACCEPT); ??? } ??? /** ???? * 采用輪詢的方式監(jiān)聽selector上是否有需要處理的事件,如果有,則進行處理 ???? * @throws IOException ???? */ ??? @SuppressWarnings("unchecked") ??? public void listen() throws IOException { ??????? System.out.println("服務端啟動成功!"); ??????? // 輪詢訪問selector ??????? while (true) { ??????????? //當注冊的事件到達時,方法返回;否則,該方法會一直阻塞 ????????????selector.select(); ??????????? // 獲得selector中選中的項的迭代器,選中的項為注冊的事件 ??????????? Iterator ite = this.selector.selectedKeys().iterator(); ??????????? while (ite.hasNext()) { ??????????????? SelectionKey key = (SelectionKey) ite.next(); ??????????????? // 刪除已選的key,以防重復處理 ??????????????? ite.remove(); ??????????????? // 客戶端請求連接事件 ??????????????? if (key.isAcceptable()) { ??????????????????? ServerSocketChannel server = (ServerSocketChannel) key ??????????????????????????? .channel(); ??????????????????? // 獲得和客戶端連接的通道 ??????????????????? SocketChannel channel = server.accept(); ??????????????????? // 設置成非阻塞 ??????????????????? channel.configureBlocking(false); ??????????????????? //在這里可以給客戶端發(fā)送信息哦 ??????????????????? channel.write(ByteBuffer.wrap(new String("向客戶端發(fā)送了一條信息").getBytes())); ??????????????????? //在和客戶端連接成功之后,為了可以接收到客戶端的信息,需要給通道設置讀的權(quán)限。 ????????????????????channel.register(this.selector, SelectionKey.OP_READ); ??????????????????? // 獲得了可讀的事件 ??????????????? } else if (key.isReadable()) { ??????????????????? read(key); ??????????????? } ??????????? } ??????? } ??? } ??? /** ???? * 處理讀取客戶端發(fā)來的信息 的事件 ???? * @param key ???? * @throws IOException ???? */ ??? public void read(SelectionKey key) throws IOException{ ??????? // 服務器可讀取消息:得到事件發(fā)生的Socket通道 ??????? SocketChannel channel = (SocketChannel) key.channel(); ??????? // 創(chuàng)建讀取的緩沖區(qū) ??????? ByteBuffer buffer = ByteBuffer.allocate(10); ??????? channel.read(buffer); ??????? byte[] data = buffer.array(); ??????? String msg = new String(data).trim(); ??????? System.out.println("服務端收到信息:"+msg); ??????? ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes()); ??????? channel.write(outBuffer);// 將消息回送給客戶端 ??? } ??? /** ???? * 啟動服務端測試 ???? * @throws IOException ???? */ ??? public static void main(String[] args) throws IOException { ??????? NIOServer server = new NIOServer(); ??????? server.initServer(8000); ??????? server.listen(); ??? } } package com.sohu.tv.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * NIO客戶端 * @author 小路 */ public class NIOClient { ??? //通道管理器 ??? private Selector selector; ??? /** ???? * 獲得一個Socket通道,并對該通道做一些初始化的工作 ???? * @param ip 連接的服務器的ip ???? * @param port? 連接的服務器的端口號 ???? * @throws IOException ???? */ ??? public void initClient(String ip,int port) throws IOException { ??????? // 獲得一個Socket通道 ??????? SocketChannel channel = SocketChannel.open(); ??????? // 設置通道為非阻塞 ??????? channel.configureBlocking(false); ??????? // 獲得一個通道管理器 ??????? this.selector = Selector.open(); ??????? // 客戶端連接服務器,其實方法執(zhí)行并沒有實現(xiàn)連接,需要在listen()方法中調(diào) ??????? //用channel.finishConnect();才能完成連接 ??????? channel.connect(new InetSocketAddress(ip,port)); ??????? //將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_CONNECT事件。 ??????? channel.register(selector, SelectionKey.OP_CONNECT); ??? } ??? /** ???? * 采用輪詢的方式監(jiān)聽selector上是否有需要處理的事件,如果有,則進行處理 ???? * @throws IOException ???? */ ??? @SuppressWarnings("unchecked") ??? public void listen() throws IOException { ??????? // 輪詢訪問selector ??????? while (true) { ??????????? selector.select(); ??????????? // 獲得selector中選中的項的迭代器 ??????????? Iterator ite = this.selector.selectedKeys().iterator(); ??????????? while (ite.hasNext()) { ??????????????? SelectionKey key = (SelectionKey) ite.next(); ??????????????? // 刪除已選的key,以防重復處理 ??????????????? ite.remove(); ??????????????? // 連接事件發(fā)生 ??????????????? if (key.isConnectable()) { ??????????????????? SocketChannel channel = (SocketChannel) key ??????????????????????????? .channel(); ??????????????????? // 如果正在連接,則完成連接 ??????????????????? if(channel.isConnectionPending()){ ??????????????????????? channel.finishConnect(); ??????????????????? } ??????????????????? // 設置成非阻塞 ??????????????????? channel.configureBlocking(false); ??????????????????? //在這里可以給服務端發(fā)送信息哦 ??????????????????? channel.write(ByteBuffer.wrap(new String("向服務端發(fā)送了一條信息").getBytes())); ??????????????????? //在和服務端連接成功之后,為了可以接收到服務端的信息,需要給通道設置讀的權(quán)限。 ??????????????????? channel.register(this.selector, SelectionKey.OP_READ); ??????????????????? // 獲得了可讀的事件 ??????????????? } else if (key.isReadable()) { ??????????????????? read(key); ??????????????? } ??????????? } ??????? } ??? } ??? /** ???? * 處理讀取服務端發(fā)來的信息 的事件 ???? * @param key ???? * @throws IOException ???? */ ??? public void read(SelectionKey key) throws IOException{ ??????? //和服務端的read方法一樣 ??? } ??? /** ???? * 啟動客戶端測試 ???? * @throws IOException ???? */ ??? public static void main(String[] args) throws IOException { ??????? NIOClient client = new NIOClient(); ??????? client.initClient("localhost",8000); ??????? client.listen(); ??? } } | 
2.5.4 server處理流程 ipc.Server的主要功能是接收來自客戶端的RPC請求,經(jīng)過調(diào)用相應的函數(shù)獲取結(jié)果后,返回給對應的客戶端。為此,ipc.Server被劃分成3個階段:接收請求、處理請求和返回結(jié)果。 (1)接收請求
? ? ?該階段主要任務是接收來自各個客戶端的RPC請求,并將它們封裝成固定的格式(Call類)放到一個共享隊列(callQueue)中,以便進行后續(xù)處理。該階段內(nèi)部又分為建立連接和接收請求兩個子階段,分別由Listener和Reader兩種線程完成。
? ? ?整個Server只有一個Listener線程,統(tǒng)一負責監(jiān)聽來自客戶端的連接請求,一旦有新的請求到達,它會采用輪詢的方式從線程池中選擇一個Reader線程進行處理,而Reader線程可同時存在多個,它們分別負責接收一部分客戶端連接的RPC請求,至于每個Reader線程負責哪些客戶端連接,完全由Listener決定,當前Listener只是采用了簡單的輪詢分配機制。
? ? ?Listener和Reader線程內(nèi)部各自包含一個Selector對象,分別用于監(jiān)聽SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。對于Listener線程,主循環(huán)的實現(xiàn)體是監(jiān)聽是否有新的連接請求到達,并采用輪詢策略選擇一個Reader線程處理新連接;對于Reader線程,主循環(huán)的實現(xiàn)體是監(jiān)聽(它負責的那部分)客戶端連接中是否有新的RPC請求到達,并將新的RPC請求封裝成Call對象,放到共享隊列callQueue中。
(2)處理請求
? ? ?該階段主要任務是從共享隊列callQueue中獲取Call對象,執(zhí)行對應的函數(shù)調(diào)用,并將結(jié)果返回給客戶端,這全部由Handler線程完成。
? ? ?Server端可同時存在多個Handler線程,它們并行從共享隊列中讀取Call對象,經(jīng)執(zhí)行對應的函數(shù)調(diào)用后,將嘗試著直接將結(jié)果返回給對應的客戶端。但考慮到某些函數(shù)調(diào)用返回結(jié)果很大或者網(wǎng)絡速度過慢,可能難以將結(jié)果一次性發(fā)送到客戶端,此時Handler將嘗試著將后續(xù)發(fā)送任務交給Responder線程。
(3)返回結(jié)果
? ? ?前面提到,每個Handler線程執(zhí)行完函數(shù)調(diào)用后,會嘗試著將執(zhí)行結(jié)果返回給客戶端,但對于特殊情況,比如函數(shù)調(diào)用返回結(jié)果過大或者網(wǎng)絡異常情況(網(wǎng)速過慢),會將發(fā)送任務交給Responder線程。
? ? ?Server端僅存在一個Responder線程,它的內(nèi)部包含一個Selector對象,用于監(jiān)聽SelectionKey.OP_WRITE事件。當Handler沒能將結(jié)果一次性發(fā)送到客戶端時,會向該Selector對象注冊SelectionKey.OP_WRITE事件,進而由Responder線程采用異步方式繼續(xù)發(fā)送未發(fā)送完成的結(jié)果。
總結(jié)
以上是生活随笔為你收集整理的Hadoop RPC框架的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: java continue goto_J
- 下一篇: oracle函数trunc的使用
