javascript
RxJS笔记
RxJS
《深入淺出RxJS》讀書筆記遺留問題
chapter1
html部分
測試你對時間的感覺按住我一秒鐘然后松手你的時間:毫秒jquery實現
var time = new Date().getTime(); $("#hold-me").mousedown(function(event) {time = new Date().getTime();}).mouseup(function(event) {if (time) {var elapse = new Date().getTime() - time;$("#hold-time").text(elapse);// 重置time 避免直接觸發mouseup事件,例如在A處點擊然后在B處uptime = null;}});RxJS實現
const holdMeButton = document.getElementById("hold-me");const mouseDown$ = Rx.Observable.fromEvent(holdMeButton,"mousedown");const mouseUp$ = Rx.Observable.fromEvent(holdMeButton,"mouseup");// 獲取間隔時間const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(),(mouseUpEvent,mouseDownEvent)=>{return mouseUpEvent.timestamp - mouseDownEvent.timestamp});holdTime$.subscribe(ms=>{document.getElementById("hold-time").innerText = ms;})holdTime$.flatMap(ms=>{return Rx.Observable.ajax('https://timing-sense-score-board.herokuapp.com/score/'+ms)}).map(e=>e.response).subscribe(res=>{document.getElementById("rank").innerText = `你超過了${res.rank}% 的用戶`})chapter2
Koa2的使用
主要用來加載靜態資源,所以使用到了 koa,koa-static const path = require("path"); const koa = require("koa"); const serve = require("koa-static"); const app = new koa();app.use(async function (ctx,next) {console.log("收到請求...")await next()console.log(`"${ctx.path}"請求 已處理...`) })app.use(serve(path.resolve(__dirname, "../src"))).listen(3001,function(err){if(err) throw err;console.log("程序啟動成功") });Observable 和 Observer
Observable 可被觀察的對象,Observer觀察者,Observer通過subscribe來觀察Observable對象RxJS的數據流就是Observable對象:
舉個栗子
// 使用 deep-link方式引入函數 const Observable = require("rxjs").Observable;/** 定義Observable對象的行為,會產生數據,調用訂閱者的next方法* 1. 此處的Observer與訂閱者行為 theObserver并不是同一個對象,而是對theObserver的包裝* 2. 如果observer.error被調用,之后的complete或者next就不會被調用啦,同理,complete被調用之后,也不會* 再調用next或者error* 3. 如果error或者complete一直未調用,則observer就一直在內存中等待被調用 */ const onSubscribe = observer =>{observer.next(1);observer.error(2);observer.complete(3); } // 產生一個Observable對象 const source$ = new Observable(onSubscribe); // 定義觀察者的行為 消費Observable對象產生的數據 const theObserver = {next:item => console.log(item),error:item => console.error(item),complete:item => console.log("已完成"), } // 建立Observable與Observer的關系 source$.subscribe(theObserver)退訂subscribe
在訂閱一段事件之后observer不再響應吐出的信息了,這時可以退訂,但是Observeable還會一直產生數據 const Observable = require("rxjs").Observable;const onSubscribe = observer =>{let n = 1;const handle = setInterval(()=>{console.log(`in onSubscribe ${n}`)// if(n>3){// observer.complete()// }observer.next(n++);},1000)return {unsubscribe(){// clearInterval(handle)}} }const source$ = new Observable(onSubscribe);const theObserver = {next:item => console.log(item) }let subscription = source$.subscribe(theObserver)setTimeout(()=>{// 此處的unsubscribe也是封裝過的subscription.unsubscribe() },3500)在node中執行,會一直打印 in onSubscribe *,但是source$不會再響應
Chapter3 操作符基礎
const Observable = require("rxjs/Observable").Observable; const of = require("rxjs/observable/of").of; const map = require("rxjs/operator/map").map; // 新建一個操作符 // 此處this是外部變量,導致此operator不再是純函數 Observable.prototype.double = function(){// return this::map(x=>x*2)return map.call(this,x=>x*2) }const source$ = of(1,3,4); const result$ = source$.double();result$.subscribe(value=>console.log(value))lettable/pipeable操作符
解決需要使用call或者bind改變this的操作,這樣是依賴外部環境的,不屬于純函數,也會喪失TS的類型檢查優勢-
lettable將Observable對象傳遞給下文,避免使用this
const Observable = require("rxjs/Observable").Observable; require("rxjs/add/observable/of").of; require("rxjs/add/operator/map").map; require("rxjs/add/operator/let").let;const source$ = Observable.of(1,2,3); const double$ = obs$ => obs$.map(v=>v*2); // 接受上文,傳遞到下文 const result$ = source$.let(double$);result$.subscribe(console.log)
// ES5實現
function map(project){
}
// 添加操作符
var result$ = source$.let(map(x => x * 3));
// ES6實現
const map6 = fn => obj$ =>
// 添加操作符
var result$ = source$.let(map6(x => x * 4));
of列舉數據
import {Observable} from "rxjs/Observable"; import "rxjs/add/observable/of" // 依次吐出數據,一次性emit const source$ = Observable.of(1,2,3); // 訂閱 // 第一個參數是next,第二個參數是error回調,第三個參數是complete回調 source$.subscribe(console.log,null,()=>{console.log("Complete")})range產生指定范圍的數據
const sourc$ = Observable.range(/*初始值*/1,/*個數*/100); // 每次只能步進 1generate循環創建
相當于for循環 const source$ = Observable.generate(// 初始值2,// 判斷條件value=> value < 10,// 步進value=> value+0.5,// 函數體,產生的結果value=> value*value )使用generate代替range
const range = function(min,count){const max = min + count;return Observable.generate(min,v=>vv+1,v=>v*v) }repeat重復數據的數據流
實例操作符,通過import 'rxjs/add/operator/repeat'引入
-
如果沒有observer.complete()repeat不會被調用
repeat以complete為契機會再次執行數據源,如果上游一直沒有complete下游就不會執行 - 因為repeat的存在,第一次數據源執行完(以complete為契機)后并不會執行observer的complete回調
創建異步數據的Observable對象
interval和timer
interval類似于setInterval
require('rxjs/add/observable/interval') // 每隔1000ms產生一個數據,初始值為0,步進為1 Observable.interval(1000)'timer 是setTimeout的超集
// 1000ms后開始產生數據,之后每隔1000ms產生一個數據,功能相當于interval Observable.timer(1000,1000) // 指定日期 Observable.time(new Date(new Date().getTime() + 12000))from 把一切轉化為Observable
fromPromise異步處理的對接
const Observable = require("rxjs").Observable; require("rxjs/add/observable/fromPromise");const promise = Promise.resolve(123); Observable.fromPromise(promise).subscribe(console.log, null, () =>console.log("Complete") ); //123 //Complete const promise1 = Promise.reject("error"); Observable.from(console.log,err => console.log("catch", err),() => console.log("Complete!") ); // 未捕獲的Promise錯誤 // (node:765) UnhandledPromiseRejectionWarning: error // (node:765) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing // inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). ( // rejection id: 1) // (node:765) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.fromEvent連接DOM與RxJS的橋梁
const event$ = Observable.fromEvent(document.getElementById("btn"),"click"); event$.subscribe(event=>{// Do something })在NodeJs中可以與EventEmitter交互
const Observable = require("rxjs").Observable; const EventEmitter = require("events"); require("rxjs/add/observable/fromEvent")const emitter = new EventEmitter();const source$ = Observable.fromEvent(emitter,"msg"); source$.subscribe(console.log,null,()=>console.log("Complete"))emitter.emit("msg",1) // 1 emitter.emit("msg","haha") // haha emitter.emit("a-msg","haha") // emitter.emit("msg",'nihao') // nihaofromEvent是Hot Observable,也就是數據的產生和訂閱無關,對于fromEvent來說,數據源是外部產生的,不受RxJS控制,這是Hot Observable對象的特點
fromEventPattern針對不規范的事件源
規范的事件源:DOM事件,EventEmitter事件repeatWhen
例如 在上游事件結束之后的一段時間再重新訂閱 const Observable = require("rxjs").Observable; require("rxjs/add/operator/repeatWhen")const notifier = ()=>{return Observable.interval(1000); }const source$ = Observable.of(1,2,3); // const source$ = Observable.create(observer=>{ // observer.next(111); // return { // unsubscribe(){ // console.log("on Unsubscribe") // } // } // }); const repeat$ = source$.repeatWhen(notifier);repeat$.subscribe(console.log,null,()=>console.log("Complete")) // 每隔一秒產生一次 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // ^Cdefer延遲創建Observable
針對Observable占用內存比較大的情況,懶加載 const Observable = require("rxjs").Observable; require("rxjs/add/observable/defer"); require("rxjs/add/observable/of");const observableFactory = ()=>Observable.of(1,2,3); const source$ = Observable.defer(observableFactory)合并數據流
| 把多個數據流以首尾相連的方式合并 | concat,concatAll |
| 把多個數據流以先到先得的方式合并 | merge,mergeAll |
| 把多個數據流中的數據以一一對應的方式合并 | zip和zipAll |
| 持續合并多個數據流中最新產生的數據 | combineLatest,combineAll,withLatestFrom |
| 從多個數據流中選取第一個產生內容的數據流 | race |
| 在數據流前面添加一個指定數據 | startWith |
| 只獲取多個數據流最后產生的數據 | forkJoin |
| 從高階數據流中切換數據源 | switch,exhaust |
concat
-
實例方法
const Observable = require("rxjs").Observable; require("rxjs/add/operator/of") require("rxjs/add/operator/concat")const source$1 = Observable.of(1,2,3); const source$2 = Observable.of(4,5,6);source$1.concat(source$2).subscribe(console.log,null,()=>console.log("Complete")) -
靜態方法
const Observable = require("rxjs").Observable; require("rxjs/add/operator/of") require("rxjs/add/observable/concat")const source$1 = Observable.of(1,2,3); const source$2 = Observable.of(4,5,6);Observable.concat(source$1,source$2).subscribe(console.log,null,()=>console.log("Complete"))
merge先到先得
merge用在同步數據的情況下和concat表現只,不建議使用 const Observable = require("rxjs").Observable; require("rxjs/add/operator/merge"); require("rxjs/add/operator/map"); require("rxjs/add/observable/timer");const source$1 = Observable.timer(0, 1000).map(x => x + "A"); const source$2 = Observable.timer(500, 1000).map(x => x + "B"); const source$3 = Observable.timer(1000, 1000).map(x => x + "C");// 此時 source$1與source$2永遠不會停止,所以 source$1.merge(source$2, source$3, /*此參數限制了合并的Observable的個數*/ 2).subscribe(console.log, null, () => console.log("Complete"));// 0A // 0B // 1A // 1B // 2A // 2B // 3A // 3B // 4A // 4B // ^C總結
- 上一篇: 梦到捉了好多知了猴什么意思
- 下一篇: 做梦梦到剪脚趾甲什么意思