rxjava 被观察者_RxJava:从未来到可观察
rxjava 被觀察者
大約4年前,我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions ,但是直到我幾周前看到Matthew在Code Mesh上發表演講后,我才對它有所了解。
最近它似乎越來越流行,我注意到Netflix編寫了一個Java版本RxJava 。
我以為可以嘗試通過更改在探索cypher的MERGE函數時暴露的Observable而不是Future的代碼來嘗試一下。
回顧一下,我們有50個線程,我們進行了100次迭代,在這些迭代中我們創建了隨機(用戶,事件)對。 我們最多創建10個用戶和50個事件,目標是同時發送相同對的請求。
在另一篇文章的示例中,我丟棄了每個查詢的結果,而在這里我返回了結果,因此我有一些要訂閱的內容。
代碼的輪廓如下所示:
public class MergeTimeRx {public static void main( final String[] args ) throws InterruptedException, IOException{String pathToDb = "/tmp/foo";FileUtils.deleteRecursively( new File( pathToDb ) );GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );final ExecutionEngine engine = new ExecutionEngine( db );int numberOfThreads = 50;int numberOfUsers = 10;int numberOfEvents = 50;int iterations = 100;Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );events.subscribe( new Action1<ExecutionResult>(){@Overridepublic void call( ExecutionResult result ){for ( Map<String, Object> row : result ){}}} );....}}使用RxJava的好處是,沒有提到我們如何獲取ExecutionResult的集合,這并不重要。 我們只有它們的流,并且通過在Observable上調用訂閱函數,只要有另一個函數可用,我們就會得到通知。
我發現的大多數示例都顯示了如何從單個線程生成事件,但是我想使用線程池,以便可以同時觸發許多請求。 processEvents方法最終看起來像這樣:
private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations ){final Random random = new Random();final List<Integer> userIds = generateIds( numberOfUsers );final List<Integer> eventIds = generateIds( numberOfEvents );return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>(){@Overridepublic Subscription onSubscribe( final Observer<? super ExecutionResult> observer ){final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );List<Future<ExecutionResult>> jobs = new ArrayList<>();for ( int i = 0; i < iterations; i++ ){Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>(){@Overridepublic ExecutionResult call(){Integer userId = userIds.get( random.nextInt( numberOfUsers ) );Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );return engine.execute("MERGE (u:User {id: {userId}})\n" +"MERGE (e:Event {id: {eventId}})\n" +"MERGE (u)-[:HAS_EVENT]->(e)\n" +"RETURN u, e",MapUtil.map( "userId", userId, "eventId", eventId ) );}} );jobs.add( job );}for ( Future<ExecutionResult> future : jobs ){try{observer.onNext( future.get() );}catch ( InterruptedException | ExecutionException ignored ){}}observer.onCompleted();executor.shutdown();return Subscriptions.empty();}} );}我不確定這是否是使用Observable的正確方法,因此如果我記錯了,請在評論中讓我知道。
我不確定處理錯誤的正確方法是什么。 我最初在catch塊中調用了observer#onError ,但這意味著不會再產生不是我想要的事件。
如果您想使用它,該代碼可以作為要點 。 我添加了以下依賴關系以獲取RxJava庫:
<dependency><groupId>com.netflix.rxjava</groupId><artifactId>rxjava-core</artifactId><version>0.15.1</version></dependency> 參考: RxJava : 從未來到我們的JCG合作伙伴 Mark Needham在Mark Needham Blog博客上均可觀察到。翻譯自: https://www.javacodegeeks.com/2014/01/rxjava-from-future-to-observable.html
rxjava 被觀察者
總結
以上是生活随笔為你收集整理的rxjava 被观察者_RxJava:从未来到可观察的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 华硕推出 VG249QL3A 游戏显示器
- 下一篇: 淘宝问问 AI 助手测试版上线:无需申请