RSocket协议初识
文章目錄
- 前言
- RSocket是什么?
- RSocket設計目標是什么?
- RSocket與其他協議有什么區別?
- 對比Http1.x
- 對比Http2.x
- 對比grpc
- 對比TCP
- 對比WebSocket
- 結論
- RSocket適用于哪些場景?
- 1、移動設備與服務器的連接。
- 2、微服務場景。
- 3、由于微服務和移動設備的普及,RSocket火起來應該就是這幾年的事兒。
前言
前幾天無聊翻SpringBoot官方文檔,無意中發現文檔中增加了一個章節叫RSocket協議的鬼東西,遂研究了一下。
RSocket是什么?
RSocket是一種二進制字節流傳輸協議,位于OSI模型中的5~6層,底層可以依賴TCP、WebSocket、Aeron協議。
RSocket設計目標是什么?
1、支持對象傳輸,包括request\response、request\stream、fire and forget、channel
2、支持應用層流量控制
3、支持單連接雙向、多次復用
4、支持連接修復
5、更好的使用WebSocket和Aeron協議
RSocket與其他協議有什么區別?
對比Http1.x
Http1.x只支持request\response,但是現實應用中并不是所有請求都需要有回應(Fire And Forget)、有的需求需要一個請求返回一個數據流(request\stream)、有的還需要雙向數據傳輸(channel)。
對比Http2.x
http2.x不支持應用層流量控制、偽雙向傳輸,即服務端push數據本質上還是對客戶端請求的響應,而不是直接推送。RSocket做到了真正的雙向傳輸,使得服務端可以調用客戶端服務,使得服務端和客戶端在角色上完全對等,即兩邊同時是Requester和Responder。
對比grpc
grpc需要依賴protobuf,本質上還是http2.x。RSocket不限制編解碼,可以是json、protobuf等等。
性能上grpc要差一些:詳見壓測對比,https://dzone.com/articles/rsocket-vs-grpc-benchmark
對比TCP
其實兩者不在一個層面,為啥要作比較呢,因為netty讓tcp層的編程也很容易,但是需要自定義傳輸協議,比如定義header、body長度等等,用起來還是很麻煩的。
對比WebSocket
websocket不支持應用層流量控制,本質上也是一端請求另一端響應,不支持連接修復。
RSocket協議的形式是什么?
連接上傳輸的數據是流(Stream)
流(Stream)由幀(Frame)組成
幀(Frame)包含了元數據(MetaData)與業務數據(Data)
結論
基于RSocket協議,我們的業務數據會被打包成幀,并以幀流的形式在客戶端與服務端互相傳輸。所以RSocket的所有特性都是基于這個幀流實現的。后續有時間會針對每個幀類型做解析。
RSocket適用于哪些場景?
1、移動設備與服務器的連接。
數據雙向傳輸,且支持流量控制。支持背壓,背壓的意思:如果客戶端請求服務端過快,那么服務端會堆積請求,最終耗光資源。有了背壓服務端可以根據自己的資源來控制客戶端的請求速度,即調用客戶端告訴他別發那么快。
支持連接修復,比如手機進地鐵之后,網絡斷開一段時間,其他協議需要重新建立連接,RSocket則可以修復連接繼續傳輸幀數據。
2、微服務場景。
spring cloud目前支持的http協議,不能fire and forget、不能請求流數據、不能單連接雙向調用;替換成RSocket之后可以滿足以上需求的同時提高性能。且針對服務治理、負載均衡等RSocket都在慢慢完善。
3、由于微服務和移動設備的普及,RSocket火起來應該就是這幾年的事兒。
BB了這么多你給我上個代碼
SpringBoot中的使用
step1、構建SpringBoot項目,引入依賴
step2、編寫需要傳輸的消息類和服務器類
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.time.Instant;@Data @AllArgsConstructor @NoArgsConstructor public class Message {private String from;private String to;private long index;private long created = Instant.now().getEpochSecond();public Message(String from, String to) {this.from = from;this.to = to;this.index = 0;}public Message(String from, String to, long index) {this.from = from;this.to = to;this.index = index;}} import java.time.Duration; import java.util.ArrayList; import java.util.List; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.annotation.ConnectMapping; import org.springframework.stereotype.Controller; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux;@Slf4j @Controller public class RSocketController {private final List<RSocketRequester> CLIENTS = new ArrayList<>();@MessageMapping("request-response")public Message requestResponse(Message request) {log.info("收到請求: {}", request);return new Message("服務端", "客戶端");}@MessageMapping("fire-and-forget")public void fireAndForget(Message request) {log.info("收到fire-and-forget請求: {}", request);}@MessageMapping("stream")Flux<Message> stream(Message request) {log.info("收到流式請求: {}", request);return Flux.interval(Duration.ofSeconds(1)).map(index -> new Message(”服務端“, "客戶端", index)).log();}@MessageMapping("channel")Flux<Message> channel(final Flux<Duration> settings) {return settings.doOnNext(setting -> log.info("發射間隔為 {} 秒.", setting.getSeconds())).switchMap(setting -> Flux.interval(setting).map(index -> new Message("服務端", "客戶端", index))).log();}}step3、配置文件里增加配置項
spring.main.lazy-initialization=true spring.rsocket.server.port=7000step4、編寫客戶端代碼
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.time.Instant;@Data @AllArgsConstructor @NoArgsConstructor public class Message {private String from;private String to;private long index;private long created = Instant.now().getEpochSecond();public Message(String from, String to) {this.from = from;this.to = to;this.index = 0;}public Message(String from, String to, long index) {this.from = from;this.to = to;this.index = index;}} import java.time.Duration; import javax.annotation.PreDestroy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import lombok.extern.slf4j.Slf4j; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;@Slf4j @RestController public class RSocketClient {private final RSocketRequester rsocketRequester;private static Disposable disposable;@Autowiredpublic RSocketClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {this.rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies).connectTcp("localhost", 7000).block();this.rsocketRequester.rsocket().onClose().doOnError(error -> log.warn("發生錯誤,鏈接關閉")).doFinally(consumer -> log.info("鏈接關閉")).subscribe();}@PreDestroyvoid shutdown() {rsocketRequester.rsocket().dispose();}@GetMapping("request-response")public Message requestResponse() {Message message = this.rsocketRequester.route("request-response").data(new Message("客戶端", "服務器")).retrieveMono(Message.class).block();log.info("客戶端request-response收到響應 {}", message);return message;}@GetMapping("fire-and-forget")public String fireAndForget() {this.rsocketRequester.route("fire-and-forget").data(new Message("客戶端", "服務器")).send().block();return "fire and forget";}@GetMapping("stream")public String stream() {disposable = this.rsocketRequester.route("stream").data(new Message("客戶端", "服務器")).retrieveFlux(Message.class).subscribe(message -> log.info("客戶端stream收到響應 {}", message));return "stream";}@GetMapping("channel")public String channel() {Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));Flux<Duration> settings = Flux.concat(setting1, setting2, setting3).doOnNext(d -> log.info("客戶端channel發送消息 {}", d.getSeconds()));disposable = this.rsocketRequester.route("channel").data(settings).retrieveFlux(Message.class).subscribe(message -> log.info("客戶端channel收到響應 {}", message));return "channel";}}step5、啟動服務端、啟動客戶端,打開瀏覽器訪問localhost:8080/fire-and-forget等測試效果
代碼解析
- @MessageMapping:Spring提供的注解,用于路由,與@GetMapping等功能類似
- Mono:響應式編程里用于返回0-1個結果
- Flux:響應式編程里用于返回0-N個結果
- Disposable:斷流器,為true的時候兩邊不能傳輸數據
What Next?
- 協議原理解析
- 由于RSocket社區還不夠活躍,Git上的代碼也是剛剛起步,還在不斷更新中,相關功能也在不斷完善中,后續隨著官方新內容的更新我也會跟著更新。
- RSocket中很多概念如Mono、Flux、Disposable、背壓、流式處理等都是響應式編程中的概念,想了解響應式編程可以查看:http://reactivex.io/ 中的文檔,其中包括了RXJava等RX系列的各種語言的Demo。
總結
以上是生活随笔為你收集整理的RSocket协议初识的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OpenCV中cornerSubPixe
- 下一篇: spark 2.2 读取 Hadoop3