grpc简单使用 java_gRPC学习记录(四)-官方Demo - Java 技术驿站-Java 技术驿站
了解proto3后,接下來看官方Demo作為訓練,這里建議看一遍之后自己動手搭建出來,一方面鞏固之前的知識,一方面是對整個流程更加熟悉.
官方Demo地址: https://github.com/grpc/grpc-java
例子是一個簡單的路由映射的應用,它允許客戶端獲取路由特性的信息,生成路由的總結,以及交互路由信息,如服務器和其他客戶端的流量更新.
1.1定義服務
也就是寫proto文件
//指定proto3格式
syntax = "proto3";
//一些生成代碼的設置
option java_multiple_files = true;//以外部類模式生成
option java_package = "cn.mrdear.route";//所在包名
option java_outer_classname = "RouteProto";//最外層類名稱
//定義服務
service RouteGuide{
//得到指定點的feature
//一個 簡單 RPC , 客戶端使用存根發送請求到服務器并等待響應返回,就像平常的函數調用一樣。
rpc GetFeature(Point) returns (Feature) {}
//獲取一個矩形內的點
//一個 服務器端流式 RPC , 客戶端發送請求到服務器,拿到一個流去讀取返回的消息序列。 客戶端讀取返回的流,
//直到里面沒有任何消息。從例子中可以看出,通過在 響應 類型前插入 stream 關鍵字,可以指定一個服務器端的流方法。
rpc ListFeatures(Rectangle) returns (stream Feature){}
//記錄該點
//一個 客戶端流式 RPC , 客戶端寫入一個消息序列并將其發送到服務器,同樣也是使用流。一旦客戶端完成寫入消息,
//它等待服務器完成讀取返回它的響應。通過在 請求 類型前指定 stream 關鍵字來指定一個客戶端的流方法。
rpc RecordRoute(stream Point) returns (RouteSummary){}
//路由交流
//一個 雙向流式 RPC 是雙方使用讀寫流去發送一個消息序列。兩個流獨立操作,因此客戶端和服務器
//可以以任意喜歡的順序讀寫:比如, 服務器可以在寫入響應前等待接收所有的客戶端消息,或者可以交替 的讀取和寫入消息,
//或者其他讀寫的組合。每個流中的消息順序被預留。你可以通過在請求和響應前加 stream 關鍵字去制定方法的類型。
rpc RouteChat(stream RouteNote) returns (stream RouteNote){}
}
//代表經緯度
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
//由兩個點確定的一個方塊
message Rectangle{
Point lo = 1;
Point hi = 2;
}
//某一位置的名稱
message Feature {
string name = 1;
Point location = 2;
}
// Not used in the RPC. Instead, this is here for the form serialized to disk.
message FeatureDatabase {
repeated Feature feature = 1;
}
//給某一點發送消息
message RouteNote{
Point location = 1;
string message = 2;
}
//記錄收到的信息
message RouteSummary{
int32 point_count = 1;
int32 feture_count = 2;
int32 distance = 3;
int32 elapsed_time = 4;
}
執行mvn compile生成如下代碼:
1.2編寫RouteGuideService
該類就是這個項目所提供給外部的功能.該類需要繼承RouteGuideGrpc.RouteGuideImplBase,這個類提供了我們所定義分服務接口,繼承后覆蓋需要實現的自定義方法.
簡單 RPC
簡單RPC和普通方法調用形式差不多,客戶端傳來一個實體,服務端返回一個實體.
@Override
public void getFeature(Point request, StreamObserver responseObserver) {
System.out.println("getFeature得到的請求參數: " + request.toString());
// responseObserver.onError(); 代表請求出錯
responseObserver.onNext(checkFeature(request));//包裝返回信息
responseObserver.onCompleted();//結束一次請求
}
//找到復核的feature
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
其中StreamObserver是一個應答觀察者,用于封裝返回的信息,服務器把該信息傳給客戶端.請求結束要調用onCompleted()方法.
服務器端流式 RPC
在proto文件中聲明了stream,但是從接口上看不出來和簡單RPC的區別,代碼中最主要的區別是多次調用responseObserver.onNext()的方法,最后完成時寫回數據.
@Override
public void listFeatures(Rectangle request, StreamObserver responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
//如果不存在則繼續
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
//找到符合的就寫入
responseObserver.onNext(feature);
}
}
//最后標識完成
responseObserver.onCompleted();
}
客戶端流式 RPC
服務端就需要一直監控客戶端寫入情況,因此需要一個StreamObserver接口,其中onNext方法會在客戶端每次寫入時調用,當寫入完畢時調用onCompleted()方法.具體還要到后面客戶端調用分析.
@Override
public StreamObserver recordRoute(StreamObserver responseObserver) {
return new StreamObserver() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
//客戶端每寫入一個Point,服務端就會調用該方法
@Override
public void onNext(Point point) {
System.out.println("recordRoute得到的請求參數: " + point.toString());
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
System.err.println("Encountered error in recordRoute");
}
//客戶端寫入結束時調用
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFetureCount(featureCount)
.setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
雙向流式 RPC
和客戶端流式RPC差不多.
@Override
public StreamObserver routeChat(StreamObserver responseObserver) {
return new StreamObserver() {
@Override
public void onNext(RouteNote note) {
List notes = getOrCreateNotes(note.getLocation());
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
notes.add(note);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
System.err.println("Encountered error in routeChat");
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
1.3創建服務端
和Helloworld一樣的形式,最主要的是addService(new RouteGuideService(features)),這里把需要注冊的服務給注冊上.
public class RouteGuideServer {
private final int port;//服務端端口
private final Server server;//服務器
public RouteGuideServer(int port) throws IOException {
this.port = port;
//獲取初始化數據
List features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
//初始化Server參數
server = ServerBuilder.forPort(port)
//添加指定服務
.addService(new RouteGuideService(features))
.build();
}
/**
* 啟動服務
*/
public void start() throws IOException {
server.start();
System.out.println("Server started, listening on " + port);
//程序退出時關閉資源
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
RouteGuideServer.this.stop();
System.err.println("*** server shut down");
}));
}
/**
* 關閉服務
*/
public void stop() {
if (server != null) {
server.shutdown();
}
}
/**
* 使得server一直處于運行狀態
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
RouteGuideServer server = new RouteGuideServer(50051);
server.start();
server.blockUntilShutdown();
}
}
1.4編寫客戶端
客戶端需要一個channel和一個存根blockingStub或者asyncStub根據業務需要選擇同步或者異步.
private final ManagedChannel channel;//grpc信道,需要指定端口和地址
private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;//阻塞/同步存根
private final RouteGuideGrpc.RouteGuideStub asyncStub;//非阻塞,異步存根
public RouteGuideClient(String host,int port) {
//創建信道
channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext(true)
.build();
//創建存根
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
/**
* 關閉方法
*/
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
簡單grpc
和調用普通方法形式差不多.
public void getFeature(int lat,int lon){
System.out.println("start getFeature");
Point request = Point.newBuilder()
.setLatitude(lat)
.setLongitude(lon)
.build();
Feature feature;
try {
//同步阻塞調用
feature = blockingStub.getFeature(request);
System.out.println("getFeature服務端返回 :" + feature);
} catch (StatusRuntimeException e) {
System.out.println("RPC failed " +e.getStatus());
}
}
調用代碼:
public static void main(String[] args) throws InterruptedException {
RouteGuideClient client = new RouteGuideClient("localhost", 50051);
try {
client.getFeature(409146138, -746188906);//成功案例
client.getFeature(0, 0);//失敗案例
} finally {
client.shutdown();
}
}
客戶端日志
服務端日志(參數都為0的時候,這邊并沒拿到參數)
服務器端流式 RPC
和簡單RPC差不多,只不過返回的是一個集合類.
//2.服務端流式RPC
public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon){
System.out.println("start listFeatures");
Rectangle request =
Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator features;
try {
features = blockingStub.listFeatures(request);
for (int i = 1; features.hasNext(); i++) {
Feature feature = features.next();
System.out.println("getFeature服務端返回 :" + feature);
}
} catch (Exception e) {
System.out.println("RPC failed " +e.getMessage());
}
}
客戶端日志:
服務端日志:
客戶端流式 RPC
該種方式兩遍都是異步操作,所以需要互相監聽,也因此需要使用阻塞存根.服務端監聽Point的寫入,客戶端監聽RouteSummary的寫回.
public void recordRoute(List features, int numPoints) throws InterruptedException {
System.out.println("start recordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
//建一個應答者接受返回數據
StreamObserver responseObserver = new StreamObserver() {
@Override
public void onNext(RouteSummary summary) {
System.out.println("recordRoute服務端返回 :" + summary);
}
@Override
public void onError(Throwable t) {
System.out.println("RecordRoute Failed");
finishLatch.countDown();
}
@Override
public void onCompleted() {
System.out.println("RecordRoute finish");
finishLatch.countDown();
}
};
//客戶端寫入操作
StreamObserver requestObserver = asyncStub.recordRoute(responseObserver);
Random random = new Random();
try {
for (int i = 0; i < numPoints; ++i) {
int index = random.nextInt(features.size());
Point point = features.get(index).getLocation();
System.out.println("客戶端寫入point:" + point);
requestObserver.onNext(point);
Thread.sleep(random.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
return;
}
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
//標識已經寫完
requestObserver.onCompleted();
// Receiving happens asynchronously
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
System.out.println("recordRoute can not finish within 1 minutes");
}
}
客戶端日志:
服務端日志:
雙向流式 RPC
和客戶端流式RPC比較接近,同樣都需要雙方監控.
public CountDownLatch routeChat() {
System.out.println("start routeChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
//寫入監聽
StreamObserver requestObserver =
//寫回監聽
asyncStub.routeChat(new StreamObserver() {
//服務端每寫回一個操作就調用
@Override
public void onNext(RouteNote note) {
System.out.println("服務端寫回: " + note);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
System.out.println("RouteChat Failed:");
finishLatch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
for (RouteNote request : requests) {
System.out.println("客戶端寫入:" + request);
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
//標識寫完
requestObserver.onCompleted();
return finishLatch;
}
這里調用需要特殊處理下;
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
System.out.println("routeChat can not finish within 1 minutes");
}
客戶端日志:
服務端日志:
官方Demo之后,入門算結束,接下來就要看詳細的官方文檔,然后在項目中使用,這個過程會遇到不少問題,解決這些問題就是對這個技術的熟練.
附錄:
總結
以上是生活随笔為你收集整理的grpc简单使用 java_gRPC学习记录(四)-官方Demo - Java 技术驿站-Java 技术驿站的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 计算机能力挑战赛_蓝桥杯、PAT、CCF
- 下一篇: Js拼接嵌套php代码,分享一个js文件