RxJava:从未来到可观察
大約4年前,我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions ,但是直到我?guī)字芮翱吹組atthew在Code Mesh上發(fā)表演講之后,我才對(duì)它有所了解。
它似乎最近變得越來越流行,我注意到,現(xiàn)在有一個(gè)由Netflix編寫的Java版本RxJava 。
我以為可以嘗試通過更改在探索cypher的MERGE函數(shù)時(shí)暴露的Observable而不是Future的代碼來嘗試一下。
回顧一下,我們有50個(gè)線程,我們進(jìn)行了100次迭代,在這些迭代中我們創(chuàng)建了隨機(jī)(用戶,事件)對(duì)。 我們最多創(chuàng)建10個(gè)用戶和50個(gè)事件,并且目標(biāo)是同時(shí)發(fā)送相同對(duì)的請(qǐng)求。
在另一篇文章的示例中,我丟棄了每個(gè)查詢的結(jié)果,而在這里我返回了結(jié)果,因此我有一些要訂閱的內(nèi)容。
代碼的輪廓如下所示:
public class MergeTimeRx {public static void main( final String[] args ) throws InterruptedException, IOException{String pathToDb = "/tmp/foo";FileUtils.deleteRecursively( new File( pathToDb ) );GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );final ExecutionEngine engine = new ExecutionEngine( db );int numberOfThreads = 50;int numberOfUsers = 10;int numberOfEvents = 50;int iterations = 100;Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );events.subscribe( new Action1<ExecutionResult>(){@Overridepublic void call( ExecutionResult result ){for ( Map<String, Object> row : result ){}}} );....}}使用RxJava的好處是,沒有提到我們?nèi)绾潍@取ExecutionResult的集合,這并不重要。 我們只有它們的流,并且通過在Observable上調(diào)用訂閱函數(shù),只要有另一個(gè)函數(shù)可用,我們就會(huì)得到通知。
我發(fā)現(xiàn)的大多數(shù)示例都顯示了如何從單個(gè)線程生成事件,但是我想使用線程池,以便可以同時(shí)觸發(fā)許多請(qǐng)求。 processEvents方法最終看起來像這樣:
private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations ){final Random random = new Random();final List<Integer> userIds = generateIds( numberOfUsers );final List<Integer> eventIds = generateIds( numberOfEvents );return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>(){@Overridepublic Subscription onSubscribe( final Observer<? super ExecutionResult> observer ){final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );List<Future<ExecutionResult>> jobs = new ArrayList<>();for ( int i = 0; i < iterations; i++ ){Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>(){@Overridepublic ExecutionResult call(){Integer userId = userIds.get( random.nextInt( numberOfUsers ) );Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );return engine.execute("MERGE (u:User {id: {userId}})\n" +"MERGE (e:Event {id: {eventId}})\n" +"MERGE (u)-[:HAS_EVENT]->(e)\n" +"RETURN u, e",MapUtil.map( "userId", userId, "eventId", eventId ) );}} );jobs.add( job );}for ( Future<ExecutionResult> future : jobs ){try{observer.onNext( future.get() );}catch ( InterruptedException | ExecutionException ignored ){}}observer.onCompleted();executor.shutdown();return Subscriptions.empty();}} );}我不確定這是否是使用Observable的正確方法,因此如果我記錯(cuò)了,請(qǐng)?jiān)谠u(píng)論中讓我知道。
我不確定處理錯(cuò)誤的正確方法是什么。 我最初在catch塊中調(diào)用了observer#onError ,但這意味著不會(huì)再產(chǎn)生不是我想要的事件。
如果您想使用它,該代碼可以作為要點(diǎn) 。 我添加了以下依賴關(guān)系以獲得RxJava庫:
<dependency><groupId>com.netflix.rxjava</groupId><artifactId>rxjava-core</artifactId><version>0.15.1</version></dependency> 參考: RxJava : 從未來到我們的JCG合作伙伴 Mark Needham在Mark Needham Blog博客上均可觀察到。翻譯自: https://www.javacodegeeks.com/2014/01/rxjava-from-future-to-observable.html
總結(jié)
以上是生活随笔為你收集整理的RxJava:从未来到可观察的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手机全景app(全景 安卓)
- 下一篇: 阿里云备案要几天没 填写会取消吗(阿里云