admin 管理员组

文章数量: 888526

kotlin之冷流转热流

stateflow和sharedflow以及channel都属于热流,但是如何将一个cold flow转成hotflow? kotlin提供了一个shareIn和stateIn的方法,看一下方法的定义:

public fun <T> Flow<T>.stateIn(scope: CoroutineScope,started: SharingStarted,initialValue: T
): StateFlow<T>
public fun <T> Flow<T>.shareIn(scope: CoroutineScope,started: SharingStarted,replay: Int = 0
): SharedFlow<T> 

shareIn方法将流转换为SharedFlow

stateIn方法将流转换为StateFLow

参数说明:

scope:flow需要在给定的conrutinue里面进行

started:开始流的方式,这里有三种:

​ 1.SharingStarted.Eagerly: 无论当前有没有订阅者,流都会开始,订阅者只能接收到replay个缓冲区的值

​ 2.SharingStarted.Lazily:当有第一个订阅者时,流才会开始,后面的订阅者只能接收到replay个缓冲区的值,当没有订阅者时流还是活跃的

​ 3.SharingStarted.WhileSubscribed

定义如下:

/*
Sharing is started when the first subscriber appears, immediately stops when the last subscriber disappears (by default), keeping the replay cache forever (by default).
*/
public fun WhileSubscribed(stopTimeoutMillis: Long = 0,replayExpirationMillis: Long = Long.MAX_VALUE
)

这个参数的功能类似livedata的生命周期感知的功能。

源码分析:

我们主要分析shareIn方法:

public fun <T> Flow<T>.shareIn(scope: CoroutineScope,started: SharingStarted,replay: Int = 0
): SharedFlow<T> {val config = configureSharing(replay)val shared = MutableSharedFlow<T>(replay = replay,extraBufferCapacity = config.extraBufferCapacity,onBufferOverflow = config.onBufferOverflow)@Suppress("UNCHECKED_CAST")val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)return ReadonlySharedFlow(shared, job)
}

这个方法主要就是生成了一个SharedFlow,并且在指定的协程中去启动collect。

private fun <T> CoroutineScope.launchSharing(context: CoroutineContext,upstream: Flow<T>,shared: MutableSharedFlow<T>,started: SharingStarted,initialValue: T
): Job =launch(context) { // the single coroutine to rule the sharing// Optimize common built-in started strategieswhen {started === SharingStarted.Eagerly -> {// collect immediately & foreverupstream.collect(shared)}started === SharingStarted.Lazily -> {// start collecting on the first subscriber - wait for it firstshared.subscriptionCount.first { it > 0 }upstream.collect(shared)}else -> {// other & custom strategiesstartedmand(shared.subscriptionCount).distinctUntilChanged() // only changes in command have effect.collectLatest { // cancels block on new emissionwhen (it) {SharingCommand.START -> upstream.collect(shared) // can be cancelledSharingCommand.STOP -> { /* just cancel and do nothing else */ }SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {if (initialValue === NO_VALUE) {shared.resetReplayCache() // regular shared flow -> reset cache} else {shared.tryEmit(initialValue) // state flow -> reset to initial value}}}}}}}

这里面就是根据启动的策略去进行collect。Eagerly和Lazily都好很好理解,主要就是WhileSubscribed的方式的处理。

initialValue:转为stateflow的初始值

replay:转换为SharedFlow之后,当有新的订阅者的时候发送缓存中值的个数

@Test
fun cold2hotflow()= runBlocking {val flow = flowOf(1,2,3,4,5).shareIn(GlobalScope,SharingStarted.Lazily)val job = GlobalScope.launch {println("start collect")flow.collect {println("job---$it")}}job.join()
}

执行结果:

start collect
job---1
job---2
job---3
job---4
job---5

当使用SharingStarted.Eagerly时的结果:

start collect

当使用SharingStarted.WhileSubscribed时

val flow = flowOf(1,2,3,4,5).shareIn(GlobalScope,SharingStarted.WhileSubscribed())
start collect
job---1
job---2
job---3
job---4
job---5

本文标签: kotlin之冷流转热流