FunDA(7)- Reactive Streams to fs2 Pull Streams
? ? Reactive-Stream不只是簡單的push-model-stream, 它還帶有“拖式”(pull-model)性質。這是因為在Iteratee模式里雖然理論上由Enumerator負責主動推送數據,實現了push-model功能。但實際上Iteratee也會根據自身情況,通過提供callback函數通知Enumerator可以開始推送數據,這從某種程度上也算是一種pull-model。換句話講Reactive-Streams是通過push-pull-model來實現上下游Enumerator和Iteratee之間互動的。我們先看個簡單的Iteratee例子:
def showElements: Iteratee[Int,Unit] = Cont {case Input.El(e) =>println(s"EL($e)")showElementscase Input.Empty => showElementscase Input.EOF =>println("EOF")Done((),Input.EOF) } //> showElements: => play.api.libs.iteratee.Iteratee[Int,Unit] val enumNumbers = Enumerator(1,2,3,4,5) //> enumNumbers : play.api.libs.iteratee.Enumerator[Int] = play.api.libs.iteratee.Enumerator$$anon$19@47f6473 enumNumbers |>> showElements //> EL(1)//| EL(2)//| EL(3)//| EL(4)//| EL(5)//| res0: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Unit]] = Success(Cont(<function1>))我們看到:enumNumbers |>> showElements立刻啟動了運算。但并沒有實際完成數據發送,因為showElements并沒有收到Input.EOF。首先,我們必須用Iteratee.run來完成運算:
val it = Iteratee.flatten(enum |>> consumeAll).run//> El(1)//| El(2)//| El(3)//| El(4)//| El(5)//| El(6)//| El(7)//| El(8)//| EOF//| it : scala.concurrent.Future[Int] = Success(99)這個run函數是這樣定義的:
/*** Extracts the computed result of the Iteratee pushing an Input.EOF if necessary* Extracts the computed result of the Iteratee, pushing an Input.EOF first* if the Iteratee is in the [[play.api.libs.iteratee.Cont]] state.* In case of error, an exception may be thrown synchronously or may* be used to complete the returned Promise; this indeterminate behavior* is inherited from fold().** @return a [[scala.concurrent.Future]] of the eventually computed result*/def run: Future[A] = fold({case Step.Done(a, _) => Future.successful(a)case Step.Cont(k) => k(Input.EOF).fold({case Step.Done(a1, _) => Future.successful(a1)case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF")case Step.Error(msg, e) => sys.error(msg)})(dec)case Step.Error(msg, e) => sys.error(msg)})(dec)再一個問題是:enumNumbers |>> showElements是個封閉的運算,我們無法逐部分截取數據流,只能取得整個運算結果。也就是說如果我們希望把一個Enumerator產生的數據引導到fs2 Stream的話,只能在所有數據都讀入內存后才能實現了。這樣就違背了使用Reactive-Streams的意愿。那我們應該怎么辦?一個可行的方法是使用一個存儲數據結構,用兩個線程,一個線程里Iteratee把當前數據存入數據結構,另一個線程里fs2把數據取出來。fs2.async.mutable包提供了個Queue類型,我們可以用這個Queue結構來作為Iteratee與fs2之間的管道:Iteratee從一頭把數據壓進去(enqueue),fs2從另一頭把數據取出來(dequeue)。
我們先設計enqueue部分,這部分是在Iteratee里進行的:
def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {case Input.EOF =>q.enqueue1(None).unsafeRunDone((),Input.EOF)case Input.Empty => enqueueTofs2(q)case Input.El(e) =>q.enqueue1(Some(e)).unsafeRunenqueueTofs2(q) } //> enqueueTofs2: (q: fs2.async.mutable.Queue[fs2.Task,Option[Int]])play.api.libs.iteratee.Iteratee[Int,Unit]先分析一下這個Iteratee:我們直接把enqueueTofs2放入Cont狀態,也就是等待接受數據狀態。當收到數據時運行q.enqueue1把數據塞入q,然后不斷循環運行至收到Input.EOF。注意:q.enqueue1(Some(e)).unsafeRun是個同步運算,在未成功完成數據enqueue1的情況下會一直占用線程。所以,q另一端的dequeue部分必須是在另一個線程里運行,否則會造成整個程序的死鎖。fs2的Queue類型款式是:Queue[F,A],所以我們必須用Stream.eval來對這個Queue進行函數式的操作:
val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>//run Enumerator-Iteratee and enqueue data in thread 1//dequeue data and en-stream in thread 2(current thread)}因為Stream.eval運算結果是Stream[Task,Int],所以我們可以得出這個flatMap內的函數款式 Queue[Task,Option[Int]] => Stream[Task,Int]。下面我們先考慮如何實現數據enqueue部分:這部分是通過Iteratee的運算過程產生的。我們提到過這部分必須在另一個線程里運行,所以可以用Task來選定另一線程如下:
Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()現在這個Task就在后面另一個線程里自己去運算了。但它的運行進展則會依賴于另一個線程中dequeue數據的進展。我們先看看fs2提供的兩個函數款式:
/** Repeatedly calls `dequeue1` forever. */def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat/*** Halts the input stream at the first `None`.** @example {{{* scala> Stream[Pure, Option[Int]](Some(1), Some(2), None, Some(3), None).unNoneTerminate.toList* res0: List[Int] = List(1, 2)* }}}*/def unNoneTerminate[F[_],I]: Pipe[F,Option[I],I] =_ repeatPull { _.receive {case (hd, tl) =>val out = Chunk.indexedSeq(hd.toVector.takeWhile { _.isDefined }.collect { case Some(i) => i })if (out.size == hd.size) Pull.output(out) as tlelse if (out.isEmpty) Pull.doneelse Pull.output(out) >> Pull.done}}剛好,dequeue產生Stream[F,A]。而unNoneTerminate可以根據Stream(None)來終止運算。現在我們可以把這個Reactive-Streams到fs2-pull-streams轉換過程這樣來定義:
implicit val strat = Strategy.fromFixedDaemonPool(4)//> strat : fs2.Strategy = Strategy val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuturepipe.unNoneTerminate(q.dequeue) } //> fs2Stream : fs2.Stream[fs2.Task,Int] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>)現在這個stream應該已經變成fs2.Stream[Task,Int]了。我們可以用前面的log函數來試運行一下:
def log[A](prompt: String): Pipe[Task,A,A] =_.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}//> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A] fs2Stream.through(log("")).run.unsafeRun //> > 1//| > 2//| > 3//| > 4//| > 5我們成功的把Iteratee的Reactive-Stream轉化成fs2的Pull-Model-Stream。
下面是這次討論的源代碼:
import play.api.libs.iteratee._ import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global import scala.collection.mutable._ import fs2._ object iteratees { def showElements: Iteratee[Int,Unit] = Cont {case Input.El(e) =>println(s"EL($e)")showElementscase Input.Empty => showElementscase Input.EOF =>println("EOF")Done((),Input.EOF) } val enumNumbers = Enumerator(1,2,3,4,5)enumNumbers |>> showElementsIteratee.flatten(enumNumbers |>> showElements).rundef enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {case Input.EOF =>q.enqueue1(None).unsafeRunDone((),Input.EOF)case Input.Empty => enqueueTofs2(q)case Input.El(e) =>q.enqueue1(Some(e)).unsafeRunenqueueTofs2(q) } implicit val strat = Strategy.fromFixedDaemonPool(4) val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuturepipe.unNoneTerminate(q.dequeue) }def log[A](prompt: String): Pipe[Task,A,A] =_.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}fs2Stream.through(log("")).run.unsafeRun}?
?
?
?
?
?
?
?
?
?
?
?
轉載于:https://www.cnblogs.com/tiger-xc/p/6370363.html
總結
以上是生活随笔為你收集整理的FunDA(7)- Reactive Streams to fs2 Pull Streams的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 图片压缩之 PNG
- 下一篇: Python中的eval()、exec(