Kotlin实战指南二十:flow
轉載請標明出處:http://blog.csdn.net/zhaoyanjun6/article/details/117370700
本文出自【趙彥軍的博客】
文章目錄
- 往期精彩文章
- flow 是啥
- flow咋用?
- 創建flow
- map 操作符
- 例子1
- 例子2
- catch 捕獲上游出現的異常
- 例子1:不捕獲異常
- 例子2:捕獲異常
- 取消flow
- collectIndexed
- distinctUntilChanged 過濾重復的值
- transform
- filter 過濾
- flowOn 數據發射的線程
- launchIn
- conflate()
- withIndex
- onEach
- onStart 在數據發射之前觸發
- onCompletion
- drop(n) 忽略最開始釋放的值
- dropWhile
- sample
- take 取指定數量的數據
- takeWhile
- 實現一個定時器功能
往期精彩文章
Kotlin實戰指南十九:use 函數魔法
Kotlin實戰指南十八:open、internal 關鍵字使用
Kotlin實戰指南十七:JvmField、JvmStatic使用
flow 是啥
按順序發出值并正常完成或異常完成的冷流異步數據流
flow咋用?
flow 是kotlin coroutines 庫里面的類,所以使用 flow 之前,要確保添加了協程依賴
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.0"引入依賴后,我們寫一段代碼:
flow {emit(1) //發射數字 1emit(2) //發射數字 2}.collect {//接收結果Log.d("flow-", "value $it")}如果你這樣寫就會報錯
意思是:collect 方法是 suspend 修飾的掛起函數,只能在協程里,或者其他掛起函數中使用。我們來修改一下:
輸出結果:
D/flow-: value: 1 D/flow-: value: 2創建flow
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {val flow = flow {emit(1)emit(2)}val flow2 = flowOf(1, 2.3)val flow3 = mutableListOf(1, 2, 3, 4, 5).asFlow()val flow4 = (1..4).asFlow()}} }map 操作符
例子1
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {flow {emit(1) //發射數字 1emit(2) //發射數字 2}.map {"map $it"}.collect {//接收結果Log.d("flow-", "value: $it")}}} }輸出結果:
D/flow-: value: map 1 D/flow-: value: map 2例子2
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow: Flow<String> = flow {emit(1) //發射數字 1emit(2) //發射數字 2}.map {"map $it"}GlobalScope.launch {flow.collect {//接收結果Log.d("flow-", "value: $it")}}} } 輸出結果: ```java D/flow-: value: map 1 D/flow-: value: map 2在接收數據的時候,我們用了 collect 方法,collect 英文含義就是收集的意思
catch 捕獲上游出現的異常
當 flow 流操作中發生異常的情況時,程序會發生崩潰:
例子1:不捕獲異常
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow: Flow<String> = flow {emit(1) //發射數字 1emit(2) //發射數字 2}.map {0 / 0 //人為制造異常"map $it"}GlobalScope.launch {flow.collect {//接收結果Log.d("flow-", "value: $it")}}} }輸出結果:程序發生崩潰
例子2:捕獲異常
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow: Flow<String> = flow {emit(1) //發射數字 1emit(2) //發射數字 2}.map {0 / 0 //人為制造異常"map $it"}GlobalScope.launch {flow.catch {//捕獲異常Log.d("flow-", "value: ${it.message}")}.collect {//接收結果Log.d("flow-", "value: $it")}}} }輸出結果:
D/flow-: exception: divide by zero可以看到程序可以正常運行,我們也正常捕獲了異常
取消flow
Flow創建后并不返回可以cancel的句柄,但是一個flow的collect是suspend的,所以可以像取消一個suspend方法一樣取消flow的collection。
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val job = GlobalScope.launch {flow {emit(1)kotlinx.coroutines.delay(3000)emit(2)kotlinx.coroutines.delay(3000)emit(3)}.collect {//接收結果Log.d("flow-", "value: $it")}}findViewById<Button>(R.id.cancel).setOnClickListener {job.cancel()}} }collectIndexed
輸出帶有索引的結果
distinctUntilChanged 過濾重復的值
如果生產的值和上個發送的值相同,值就會被過濾掉
flow {emit(1)emit(1)emit(2)emit(2)emit(3)emit(4) }.distinctUntilChanged()// 結果:1 2 3 4 // 解釋: // 第一個1被釋放 // 第二個1由于和第一個1相同,被過濾掉 // 第一個2被釋放 // 第二個2由于和第一個2相同,被過濾掉 // 第一個3被釋放 // 第一個4被釋放可以傳參(old: T, new: T) -> Boolean,進行自定義的比較
private class Person(val age: Int, val name: String)flow {emit(Person(20, "張三"))emit(Person(21, "李四"))emit(Person(21, "王五"))emit(Person(22, "趙六")) }.distinctUntilChanged{old, new -> old.age == new.age } .collect{ value -> println(value.name) }// 結果:張三 李四 趙六 // 解釋:本例子定義如果年齡相同就認為是相同的值,所以王五被過濾掉了可以用 distinctUntilChangedBy轉換成年齡進行對比
flow {emit(Person(20, "張三"))emit(Person(21, "李四"))emit(Person(21, "王五"))emit(Person(22, "趙六")) }.distinctUntilChangedBy { person -> person.age }// 結果:張三 李四 趙六transform
對每個值進行轉換,用一個新的 FlowCollector 發射數據
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.transform {if (it % 2 == 0) {emit("value-$it")}}.collect {Log.d("flow-", "collect $it")}}} }//輸出結果 D/flow-: collect value-2 D/flow-: collect value-4filter 過濾
val list = mutableListOf(1, 2, 3, 4, 5)val job = GlobalScope.launch {list.asFlow().filter {it > 3}.collect {Log.d("flow-", "value: $it")} }輸出結果:
D/flow-: value: 4D/flow-: value: 5類似的還有 filterNot 反向過濾,這里就不舉例子了
flowOn 數據發射的線程
可以切換CoroutineContext
說明:flowOn只影響該運算符之前的CoroutineContext,對它之后的CoroutineContext沒有任何影響
我們先看一下默認情況下的線程問題
輸出結果
D/flow-: init->thread:DefaultDispatcher-worker-1 D/flow-: collect->thread:DefaultDispatcher-worker-1 value: 1 D/flow-: collect->thread:DefaultDispatcher-worker-1 value: 2我們可以看到默認情況下,flow 數據發射的線程就是當前協程所在的線程。
我們可以用 flowOn 方法指定數據發射的線程
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow = flow {Log.d("flow-", "init->thread:${Thread.currentThread().name}")emit(1)emit(2)}GlobalScope.launch {flow.flowOn(Dispatchers.Main) //指定數數據發射的線程為主線程.collect {Log.d("flow-", "collect->thread:${Thread.currentThread().name} value: $it")}}} }輸出結果
D/flow-: init->thread:main D/flow-: collect->thread:DefaultDispatcher-worker-2 value: 1 D/flow-: collect->thread:DefaultDispatcher-worker-2 value: 2可以看到數據發射的線程已經被切換到了主線程
多個操作符的情況下,如下
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {flowDemo()}}suspend fun flowDemo() {flow {Log.d("flow-", "init ${Thread.currentThread().name}")emit(1)}.map {Log.d("flow-", "map ${Thread.currentThread().name}")it}.flowOn(Dispatchers.Main).collect {Log.d("flow-", "collect ${Thread.currentThread().name} value:$it")}} } D/flow-: init main D/flow-: map main D/flow-: collect DefaultDispatcher-worker-1 value:1launchIn
scope.launch { flow.collect() }的縮寫, 代表在某個協程上下文環境中去接收釋放的值
例子:
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {val flow = flow {Log.d("flow-", "init")emit(1)emit(2)}.onEach {Log.d("flow-", "onEach $it")}flow.launchIn(this)}} }輸出結果:
D/flow-: init D/flow-: onEach 1 D/flow-: onEach 2conflate()
如果值的生產速度大于值的消耗速度,就忽略掉中間未來得及處理的值,只處理最新的值。
val flow1 = flow {delay(2000)emit(1)delay(2000)emit(2)delay(2000)emit(3)delay(2000)emit(4) }.conflate()flow1.collect { value ->println(value)delay(5000) }// 結果: 1 3 4 // 解釋: // 2000毫秒后生產了1這個值,交由collect執行,花費了5000毫秒,當1這個值執行collect完成后已經經過了7000毫秒。 // 這7000毫秒中,生產了2,但是collect還沒執行完成又生產了3,所以7000毫秒以后會直接執行3的collect方法,忽略了2這個值 // collect執行完3后,還有一個4,繼續執行。withIndex
將值封裝成IndexedValue對象
onEach
每個值釋放的時候可以執行的一段代碼
onEach 函數,執行一段代碼后,再釋放值
onStart 在數據發射之前觸發
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow = flow {Log.d("flow-", "init->thread:${Thread.currentThread().name}")emit(1)emit(2)emit(3)}GlobalScope.launch {flow.onStart {Log.d("flow-", "onStart->thread:${Thread.currentThread().name}")}.flowOn(Dispatchers.Main) //指定數數據發射的線程為主線程.collect {Log.d("flow-", "collect->thread:${Thread.currentThread().name} value: $it")}}} }輸出結果:
D/flow-: onStart->thread:main D/flow-: init->thread:main D/flow-: collect->thread:DefaultDispatcher-worker-1 value: 1 D/flow-: collect->thread:DefaultDispatcher-worker-1 value: 2 D/flow-: collect->thread:DefaultDispatcher-worker-1 value: 3結論:
onStart 方法在數據發射之前調用,onStart 所在的線程是數據產生的線程。
onCompletion
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {val flow = flow {Log.d("flow-", "init ${Thread.currentThread().name}")emit(1)emit(2)}flow.onStart {Log.d("flow-", "onStart ${Thread.currentThread().name}")}.onCompletion {Log.d("flow-", "onCompletion ${Thread.currentThread().name}")}.collect {Log.d("flow-", "collect ${Thread.currentThread().name} value:$it")}}} }輸出結果:
D/flow-: onStart DefaultDispatcher-worker-1 D/flow-: init DefaultDispatcher-worker-1 D/flow-: collect DefaultDispatcher-worker-1 value:1 D/flow-: collect DefaultDispatcher-worker-1 value:2 D/flow-: onCompletion DefaultDispatcher-worker-1drop(n) 忽略最開始釋放的值
flow {emit(1)emit(2)emit(3)emit(4) }.drop(2)// 結果:3 4 // 解釋: // 最開始釋放的兩個值(1,2)被忽略了dropWhile
判斷第一個值如果滿足(T) -> Boolean這個條件就忽略
flow {emit(1)emit(2)emit(3)emit(4) }.dropWhile {it % 2 == 0 }// 結果:1 2 3 4 // 解釋: // 第一個值不是偶數,所以1被釋放flow {emit(1)emit(2)emit(3)emit(4) }.dropWhile {it % 2 != 0 }// 結果:2 3 4 // 解釋: // 第一個值是偶數,所以1被忽略sample
如果有一個資料來源大概每200ms 就會丟出來數據,但是在更新UI 的時候,我們不需要那么頻繁的更新UI , 就需要用到采樣。比如每 2 秒更新一次。
take 取指定數量的數據
class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow = flow {emit(1)emit(2)emit(3)}GlobalScope.launch {flow.take(2).flowOn(Dispatchers.Main) //指定數數據發射的線程為主線程.collect {Log.d("flow-", "collect->thread:${Thread.currentThread().name} value: $it")}}} }輸出結果:
D/flow-: collect->thread:DefaultDispatcher-worker-2 value: 1 D/flow-: collect->thread:DefaultDispatcher-worker-2 value: 2takeWhile
只會釋放第一個值,但也不是必須釋放,要看條件
判斷第一個值如果滿足(T) -> Boolean這個條件就釋放
flow {emit(1)emit(2)emit(3)emit(4) }.takeWhile { it%2 != 0 }// 結果:1 // 解釋: // 第一個值滿足是奇數條件flow {emit(1)emit(2)emit(3)emit(4) }.takeWhile { it%2 == 0 }// 結果:無 // 解釋: // 第一個值不滿足是奇數條件實現一個定時器功能
GlobalScope.launch {val flow = flow {while (true) {emit(0)//每隔一秒產生一個數據delay(1000)}}flow.collect {UtilLog.d(TAG, "定時任務..")} }總結
以上是生活随笔為你收集整理的Kotlin实战指南二十:flow的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kotlin实战指南十九:use 函数魔
- 下一篇: Android Coroutines C