响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono
響應(yīng)式 Web 第三節(jié)
- 服務(wù)調(diào)用中的三種耦合
- 響應(yīng)式流規(guī)范與接口
- 響應(yīng)式流中的流量控制
- Web中的響應(yīng)式與請求/響應(yīng)式的區(qū)別
- 流式處理中的Source/Sink模型
- RXJava2 觀察者模式同步與異步實現(xiàn)
- Project Reactor 中的 Flux、Mono
- Flux、Mono 同步靜態(tài)創(chuàng)建與異步動態(tài)創(chuàng)建
- WebFlux
服務(wù)當中的耦合
在調(diào)用服務(wù)的時候,總會有耦合,基于rmi的
1、技術(shù)耦合:dubbo,典型的基于rpc的遠程服務(wù)調(diào)用,兩邊都是java才能調(diào)用。
2、空間耦合:兩臺機器的依賴
3、時間耦合:服務(wù)的可用、不可用
微服務(wù) 就巧妙地解決了這三個維度上的耦合,但是所有調(diào)用幾乎都是同步調(diào)用。
異步調(diào)用能提升整體的性能嗎?不能,但是它能夠提高整體的吞吐量,防止雪崩
對于傳統(tǒng)編程模型的web服務(wù):
- 訪問量過大,web服務(wù)可能會oom,瀏覽器/app一次接這么多數(shù)據(jù)可能也會扛不住
- 而且前端的展示要等待傳輸?shù)倪^程
解決方法:分頁。
分頁缺點:只能追加,不能在中間插入,否則會在分頁取數(shù)據(jù)的時候發(fā)生混亂。也可以通過編碼解決,但是會增加整體業(yè)務(wù)的復(fù)雜度。如果使用私有數(shù)據(jù)的話,你會和別人看到的數(shù)據(jù)不一樣。
分頁缺點解決方法:響應(yīng)式編程,基于發(fā)布/訂閱模型
發(fā)布/訂閱模型
- mq:做數(shù)據(jù)緩沖、通知,不做持久化,數(shù)據(jù)可以推過去,或者主動去拉也可以
- zk
- sse:server sent push
List底層是數(shù)組,是固定長度的;Flux底層是流,是可變長度的,流的大小取決于緩沖區(qū)的大小。
響應(yīng)式數(shù)據(jù)庫:例如,某個用戶發(fā)送短信超過100條之后,會反過來去回調(diào)服務(wù)的接口。
設(shè)置邊界:到達邊界之后,就流向server/service,要考慮一次流多少:如果流多了,會造成流量過大,解決方法:加緩沖區(qū),可以在服務(wù)端加緩沖區(qū),也可以在客戶端加緩沖區(qū)。
推送數(shù)據(jù):超過客戶端的臨界值怎么辦?丟棄策略
拉取數(shù)據(jù):rocketmq是拉數(shù)據(jù)
推數(shù)據(jù)和拉數(shù)據(jù),都是流式計算的概念
流式計算
Flume,Flink都是處理流的。
大數(shù)據(jù)技術(shù)棧中,引入了很多先進的概念,web架構(gòu)中沒有的。
Flume用來做大數(shù)據(jù)中對于日志的拉取。
Flink
source,channel,sink
source:數(shù)據(jù)源
channel:緩沖區(qū)
sink:目的地
處理數(shù)據(jù):同步/異步
Flux<T>:可以裝0~n個數(shù)據(jù)
Mono:只能裝一個數(shù)據(jù)
背壓處理,慢消費,同一線程,好控制
響應(yīng)式流的規(guī)范:Reactive規(guī)范
- Reactive是響應(yīng)式,jdk9引入了響應(yīng)式的接口。
- Project Reactor,RXJava是響應(yīng)式的框架。RXJava在安卓領(lǐng)域用的比較多
- webflux也是響應(yīng)式框架,將servlet換成了netty或servlet3
代碼示例
Project Reactor
官網(wǎng)
https://projectreactor.io/
Reactor 是Spring5中構(gòu)建各個響應(yīng)式組件的基礎(chǔ)框架,內(nèi)部提供了Flux和Mono兩個代表異步數(shù)據(jù)序列的核心組件。
Flux
靜態(tài)方法生成
// 靜態(tài)方法生成FluxString[] s = new String[] {"xx","oo"};// just 已知元素數(shù)量和內(nèi)容 使用// Flux<String> flux1 = Flux.just(s); // flux1.subscribe(System.out::println);Flux<String> flux2 = Flux.just("xx","xxx"); // flux2.subscribe(System.out::println);//fromArray方法List<String> list = Arrays.asList("hello", "world");Flux<String> flux3 = Flux.fromIterable(list);// flux3.subscribe(System.out::println);//fromStream方法Stream<String> stream = Stream.of("hi", "hello");Flux<String> flux4 = Flux.fromStream(stream);// flux4.subscribe(System.out::println);//range方法Flux<Integer> range = Flux.range(0, 5);// range.subscribe(System.out::println);//interval方法, take方法限制個數(shù)為5個Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).take(5);longFlux.subscribe(System.out::println);//鏈式Flux.range(1, 5).subscribe(System.out::println); } //鏈式Flux.range(1, 5).subscribe(System.out::println);// 合并Flux<String> mergeWith = flux3.mergeWith(flux4);mergeWith.subscribe(System.out::println);System.out.println("---");// 結(jié)合為元祖Flux<String> source1 = Flux.just("111", "world","333");Flux<String> source2 = Flux.just("2111", "xxx");Flux<Tuple2<String, String>> zip = source1.zipWith(source2);zip.subscribe(tuple -> {System.out.println(tuple.getT1() + " -> " + tuple.getT2());}); // 跳過兩個Flux<String> flux = Flux.just("1111", "222", "333");Flux<String> skip = flux.skip(2);skip.subscribe(System.out::println);// 拿前幾個Flux<String> flux2 = Flux.just("1111", "222", "333");Flux<String> skip2 = flux2.take(2);skip2.subscribe(System.out::println);// 過濾Flux<String> flux = Flux.just("xx", "oo", "x1x");Flux<String> filter = flux.filter(s -> s.startsWith("x"));filter.subscribe(System.out::println);// 去重Flux<String> flux = Flux.just("xx", "oo", "x1x","x2x");Flux<String> filter = flux.filter(s -> s.startsWith("x")).distinct();filter.subscribe(System.out::println);// 轉(zhuǎn) MonoFlux<String> flux = Flux.just("xx", "oo", "x1x","x2x");Mono<List<String>> mono = flux.collectList();mono.subscribe(System.out::println);// 邏輯運算 all 與 anyFlux<String> flux = Flux.just("xx", "oox", "x1x","x2x");Mono<Boolean> mono = flux.all(s -> s.contains("x"));mono.subscribe(System.out::println);Mono 連接
Flux<String> concatWith = Mono.just("100").concatWith(Mono.just("100"));concatWith.subscribe(System.out::println);異常處理
Mono.just("100").concatWith(Mono.error(new Exception("xx"))).onErrorReturn("xxx").subscribe(System.out::println)動態(tài)創(chuàng)建
// 同步動態(tài)創(chuàng)建,next 只能被調(diào)用一次Flux.generate(sink -> {sink.next("xx");sink.complete();}).subscribe(System.out::print);} Flux.create(sink -> {for (int i = 0; i < 10; i++) {sink.next("xxoo:" + i);}sink.complete();}).subscribe(System.out::println);}WebFlux
RXJava2
http://reactivex.io/#
Reactive Extensions
同步
哪個線程產(chǎn)生就在哪個線程消費
maven依賴
<!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava --> <dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId> </dependency>main
public static void main(String[] args) {Observable<String> girl = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext("1");emitter.onNext("2");emitter.onNext("3");emitter.onNext("4");emitter.onNext("5");emitter.onComplete();}});// 觀察者Observer<String> man = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// TODO Auto-generated method stubSystem.out.println("onSubscribe" + d);}@Overridepublic void onNext(String t) {// TODO Auto-generated method stubSystem.out.println("onNext " + t);}@Overridepublic void onError(Throwable e) {// TODO Auto-generated method stubSystem.out.println("onError " + e.getMessage());}@Overridepublic void onComplete() {// TODO Auto-generated method stubSystem.out.println("onComplete");}};girl.subscribe(man);}異步
| Schedulers.computation() | 適用于計算密集型任務(wù) |
| Schedulers.io() | 適用于 IO 密集型任務(wù) |
| Schedulers.trampoline() | 在某個調(diào)用 schedule 的線程執(zhí)行 |
| Schedulers.newThread() | 每個 Worker 對應(yīng)一個新線程 |
| Schedulers.single() | 所有 Worker 使用同一個線程執(zhí)行任務(wù) |
| Schedulers.from(Executor) | 使用 Executor 作為任務(wù)執(zhí)行的線程 |
下節(jié)課,我們講WebFlux的應(yīng)用~
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術(shù)人生總結(jié)
以上是生活随笔為你收集整理的响应式web(三):服务当中的三种耦合,流式计算,RXJava2,Flux,Mono的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: P8实战(二):分布式锁前置技能 etc
- 下一篇: leetcode 101. 对称二叉树