kotlin的Flow可以连续异步发出多个数据。
1. 普通flow,冷流类似于一个函数,当开始收集时才开始运行
val coldStream = flow {
for (i in 1..5) {
delay(100L)
emit(i)
}
}
val collect1 = buildString {
coldStream.collect { append(it).append(", ") }
}.removeSuffix(", ")
- 在原来的的 CoroutineContext 中运行, 如果flow{}中运行完毕那流就结束了。
- flow中的emit需在当前的上下文中调用,不可以切换到其它线程中发送;逻辑可以在其它Context执行,emit需要在当前上下文。
- 监听者在collect()时是堵塞当前协程的,可以通过独立的上下文job保存,或者withTimeoutOrNull()的方式中途退出。
2. 把Callback转换为Flow流
val flow = callbackFlow<Long> {
val dis = Observable.interval(200, TimeUnit.MILLISECONDS).subscribe {
this.offer(it)
}
awaitClose {
println("flow Closed.")
dis.dispose()
}
}
withTimeoutOrNull(5000) {
flow.collect {
println("Recv $it")
}
}
- awaitClose:当收集代码退出时也及时通知数据发送方停止。
- 上面的offer()提示Deprecated, 但是改为trySend会报错,很奇怪。那就继续使用offer()吧。
3. SharedFlow热流,类似于 rxJava.PublishSubject 或者 LiveData。
val sharedFlow = MutableSharedFlow<Int>()
scope.launch(Dispatchers.Default) {
for (i in 1..10) {
delay(200L)
sharedFlow.emit(i)
println("Emit: $i - ${threadName()}")
}
}
scope.launch(Dispatchers.Default) {
sharedFlow.collect {
println("Col1: $it - ${threadName()}")
}
}
在UI中跟生命周期结合,由UI监听ViewModel的数据变动及时更新界面:
lifecycleScope.launch {
viewModel.dataListFlow.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
.collect {
adapter.datas = it
adapter.notifyDataSetChanged()
}
}
- Channel也是有点类似,可以使用这个代替Channel的使用,还不用学习多一套接口。
4. StateFlow也是热流,能保留最后一个状态值,有数据防抖功能,相同数据不会真正发出。
val stateFlow = MutableSharedFlow<Int>()
// 启动一个 Job 来发射数据
val job = launch {
repeat(100) { count ->
delay(200) // 模拟一些工作
val e = count/2*2
println("Emit $e")
stateFlow.emit(e) // 更新 StateFlow 的值
}
}
// 订阅 StateFlow
val subscription = launch {
stateFlow.collect { value ->
println("StateFlow collect: $value")
if (value >= 8) {
println("Cancelling subscription...")
cancel() // 取消收集
}
}
}
// 等待一段时间,确保 Job 运行
delay(1200)
job.cancelAndJoin() // 取消数据源的 Job
subscription.cancelAndJoin() // 取消收集的 Job
- Compose中的UI状态很多都是使用 mutableStateOf() 就是这个 StateFlow()。