WEB BE Repository/JAVA & Kotlin

[코틀린 코루틴] 플로우

조금씩 차근차근 2026. 1. 8. 19:00

플로우는 코루틴을 기반으로 한 리액티브 스트림이다.

 

플로우를 사용하면 시간이 지남에 따라 나타나는 여러 값을 다루는 상황에서 코루틴의 동시성 매커니즘을 활용할 수 있다.

 

플로우의 여러 유형과 이를 생성/변환/소비하는 방법을 알아보자.

목차

  • 플로우란 무엇인가?
  • 콜드 플로우
  • 핫 플로우

플로우란 무엇인가?

일시 중단 함수는 스트림 형태의 데이터에는 취약하다.

다음 코드를 실행해보자.

private var zeroTime = System.currentTimeMillis()  
fun log(message: Any?) = println("[${System.currentTimeMillis() - zeroTime} ms] ${Thread.currentThread().name}: $message")  

suspend fun createValues(): List<Int> {  
    return buildList {  
        add(1)  
        delay(1000)  
        add(2)  
        delay(1000)  
        add(3)  
        delay(1000)  
    }  
}  

fun main() = runBlocking {  
    val list = createValues()  
    list.forEach{  
        log(it)  
    }  
}


모든 값이 3초 뒤에 출력이 되는 것을 확인할 수 있다.

 

하지만, 실제 값들은 1초마다 하나씩 사용가능했을 것이다.
이것을 활용하고 싶어질 때, 플로우(리액티브 스트림)이 유용하다.

플로우 처음 사용해보기

fun createValuesFlow(): Flow<Int> {  
    return flow {  
        emit(1)  
        delay(1000)  
        emit(2)  
        delay(1000)  
        emit(3)  
        delay(1000)  
    }  
}  

fun main() = runBlocking {  
    val flow = createValuesFlow()  
    flow.collect{  
        log(it)  
    }  
}

이 코드의 경우, 해당 값들이 사용 가능해지는 순간마다 바로바로 사용할 수 있음을 확인할 수 있다.

플로우의 유형

플로우는 크게 두 가지로 분류할 수 있다.

  • 콜드 플로우
    • 값이 실제로 소비되기 시작할 때만 값을 배출한다.
    • 수집자가 하나 있다.
    • 수집자는 모든 값들을 받는다.
    • 보통은 완료된다.
    • 하나의 코루틴에서 값이 emit된다.(channelFlow 사용 시 예외)
  • 핫 플로우
    • 기본적으로 활성화된다.
    • 여러 구독자가 있고, 브로드캐스트 방식으로 동작한다.
    • 구독자는 구독 시작 시점부터 값을 받는다.
    • 완료되지 않는다.
    • 여러 코루틴에서 배출할 수 있다.

이 설명이 아직은 추상적으로 느껴질 수 있다.

 

이제 실제 예시를 살펴보며 둘의 공통점과 차이점을 이해해보자.


콜드 플로우

앞서 기본적인 콜드 플로우의 사용법을 살펴봤다.
이제 해당 코드를 좀 더 자세히 살펴보고 다음 정보들을 이해해보자.

  • 플로우의 생성
  • 플로우의 작동 시점과 방식
  • 취소하는 방법
  • 플로우의 내부 구현
  • 동시성을 사용하는 방법

콜드 플로우의 생성 - flow 빌더 함수

컬렉션과 마찬가지로, flow 라고 하는 플로우를 생성할 수 있는 빌더 함수를 이용할 수 있다.

  • 기본적으로 플로우는 동기적으로 동작한다.
  • 빌더 함수의 블록 안에서는 emit 함수를 호출해 플로우의 수집자에게 값을 제공하고, 수집자가 해당 값을 처리할 때까지 빌더 함수의 실행을 중단한다.

이는 다음 코드를 실행해보면 확실하게 이해할 수 있다.

fun createValuesFlow(): Flow<Int> {  
    return flow {  
        emit(1)  
        delay(1000)  
        emit(2)  
        delay(1000)  
        emit(3)  
        delay(1000)  
    }  
}  

fun main() = runBlocking {  
    val flow = createValuesFlow()  
    log("start")  
    delay(2000)  
    flow.collect{  
        log(it)  
    }  
}


결과를 보면, 2초를 기다렸다고 이미 모든 값들이 emit되어 있는 것이 아닌, 2초 뒤부터 1초마다 하나씩 값이 emit 되는것을 확인할 수 있다.

 

플로우는 처음에 비활성 상태이며, 최종 연산자(terminal operator)가 호출돼야만 빌더에서 정의된 계산이 시작된다.

 

이것이 이 플로우가 콜드 플로우라고 불리는 이유이다.
기본적으로 수집되기 시작할 때까지 비활성 상태이기 때문이다.

플로우의 작동 시점과 방식 - suspend collect, emit

위에서 설명했듯, 콜드 플로우는 수집되기 전까지 작업을 수행하지 않는다.

 

cold flow를 수집하는collect함수와 플로우를 방출하는 emit 함수는 모두 suspend 함수인데, 이를 보여주기 위해 간단한 예제를 추가했다.

fun createValuesFlow(): Flow<Int> {  
    return flow {  
        emit(1)  
        delay(500)  
        emit(2)  
    }  
}  

fun main() = runBlocking {  
    val flow = createValuesFlow()  
    log("start")  
    flow.collect{  
        log(it)  
        delay(200)  
    }  
}

위 코드는 어떻게 동작할까?

지금 메모장을 켜서 이 코드가 어떻게 동작할지 시나리오를 그려보는 것을 추천한다.

  • 0.5초 후 완료?
  • 0.7초 후 완료?
  • 0.9초 후 완료?

결과는 다음과 같다.

collect는 플로우 내 원소들 하나하나에 대하여, 나오는 대로 주어진 람다를 수행한다.
이를 그림으로 표현하면 다음과 같다.

이 경우 프로그램은 다음과 같은 과정으로 동작한다.

  • 수집자가 플로우 빌더에 정의된 실행을 촉발해서 첫 번째 배출을 발생시킨다.
  • 수집자와 연결된 람다가 호출되면서 메시지를 기록하고 200밀리초 동안 지연된다.
  • 그 후 플로우 람다가 계속 실행되며 500밀리초 동안 추가 지연과 배출(그리고 수집)이 발생한다.

시퀀스가 최종 연산자를 사용할 때마다 다시 평가되는 것처럼 콜드 플로우에서 collect를 여러 번 호출하면 그 코드가 여러번 실행된다.

val letters = flow {  
        emit(1)  
        delay(500)  
        emit(2)  
}  

fun main() = runBlocking {  
    log("start")  
    letters.collect{  
        log(it)  
        delay(200)  
    }  
    letters.collect{  
        log(it)  
        delay(200)  
    }  
    log("end")  
}

 

만약 플로우에 네트워크 요청과 같은 side effect가 있다면 이 사실에 유의하자.

플로우 수집 취소

코루틴 흐름에 따라 플로우 또한 수집을 취소하고 싶을 수 있다.
이때 다음 사실을 기억해두면 좋다.

  • collect 함수는 suspend 함수여서, 취소 가능 지점이다.
  • emit 함수는 suspend 함수여서, 취소 가능 지점이다.

이 코드의 경우, 5/200 인 25번 출력을 진행하고 종료된다.

콜드 플로우의 내부 구현

이쯤되면 자연스레 콜드 플로우의 내부 구현 방식에 대해 궁금해질 것이다.

코틀린의 콜드 플로우는 suspend 함수와 lambda with receiver의 조합이다.


lambda with receiver

lambda with receiver(수신 객체 지정 람다)는 "람다 블록 안에서 특정 객체를 this로 취급하도록” 만든 람다를 의미한다.

  • 일반 람다
    val f: (StringBuilder) -> Unit = { sb ->
      sb.append("A")
      sb.append("B")
    }
  • lambda with receiver
    val g: StringBuilder.() -> Unit = {
      append("A")
      append("B")
    }

콜트 플로우의 내부 구현을 살펴보자.

위 문서를 보면 알듯이, FlowFlowCollector라는 2가지 인터페이스만으로 구현되어 있다.
이들은 한 함수만 정의하며 순서대로 collectemit을 정의한다.

즉, 함수형 인터페이스이다.
그러므로 지금부턴 Flow와 FlowCollector 대신 collect와 emit을 주로 사용하겠다.

여기서

letter ->  
    println(letter)  
    delay(200.milliseconds)

이 부분이 emit 함수(FlowCollector 객체)을 정의하고,

delay(300.milliseconds)  
emit("A")  
delay(300.milliseconds)  
emit("B")

이 코드블록은 collect 함수(Flow 객체)를 정의한다.

 

위 코드의 메인 함수의 흐름을 보며 내부 구조를 정확히 명명하고 이해해보자.

  1. collect 호출은 "플로우 빌더 함수와 연결된 람다"를 호출한다.
  2. 이 경우 람다(collect)는 0.3초동안 일시 중단된다.
  3. emit 호출은 collect 에 전달된 람다(emit)를 실행하고, 이 람다는 print문을 실행한 후 0.2초 지연된다.
  4. 그 후 제어 흐름은 플로우 빌더의 람다(collect)로 돌아가 두 번째 실행(0.3초 지연)을 반복한다.
  5. 이후 똑같이 다시 emit 함수로 돌아가 0.2초 지연을 수행하고 collect 함수의 수행을 완료한다.

채널 플로우를 사용한 동시성 플로우

지금까지는 동기적으로 동작하는 플로우만을 살펴보았다.
그렇다면, 동시성을 사용하는 플로우는 없을까?
물론 있다.

 

먼저 간단한 문제를 정의해보자.
한번 연산하는데 0.5초가 걸리는 연산을 다음과 같이 정의해서 플로우로 사용한다고 가정하자.

이 코드에서 수행하는 작업(난수 생성)은 서로 독립이므로, 동시성으로 수행하기 좋다고 생각이 든다.

val randomNumbers = flow {  
    coroutineScope {  
        repeat(10) {  
            launch { emit(getRandomNumber()) }  
        }
    }  
}

하지만, 다음과 같이 코드를 작성하면 예외가 발생한다.


플로우는 메모리 가시성 제어를 위해, 서로 다른 코루틴에서 하나의 플로우에 동시 접근을 제한한다.

 

이럴 때는 동시성 전용 플로우인 channelFlow 빌더 함수를 이용해야 한다.

channelFlow

채널 플로우는 순차적으로 배출하는 emit 함수를 제공하지 않는다.
대신 여러 코루틴에서 send를 사용해 값을 제공할 수 있다.

 

플로우의 수집자는 여전히 값을 순차적으로 수신하며, collect 내 람다가 그 역할을 수행한다.

채널 플로우는 기본적으로 새로운 코루틴을 생성할 수 있는 코루틴 스코프를 제공하기에, 다음과 같이 코드를 작성할 수 있다.

그 결과 다음과 같이 빠르게 결과를 받을 수 있다.

지금까지 동시 작업을 위한 채널 플로우를 알아봤다.
하지만 동시성은 관리하기 위한 비용이 들기 때문에, 되도록이면 기본적으로 일반 flow를 쓰고, 플로우 안에서 새로운 코루틴을 시작해야 하는 경우(성능 이점)만 채널 플로우를 사용하도록 하자.


핫 플로우

기본적으로 콜드 플로우는 수집자와 직접적으로 연관되어 있다.
각 수집자는 플로우에 지정된 코드를 독립적으로 실행한다.

 

핫 플로우는 코루틴 간에 값을 브로드캐스트 방식으로 통신하고, 동시성 시스템에서 상태를 관리하는 데 적용할 수 있는 유형의 플로우이다.

 

즉, 각 수집자가 플로우 로직을 독립적으로 촉발하는 대신, 여러 구독자(subscriber)라고 불리는 수집자들이 배출된 항목을 공유한다.

 

핫 플로우는 항상 활성 상태이기 때문에 구독자의 유무에 관계없이 배출이 발생할 수 있다.

 

코틀린 코루틴에는 2가지 핫 플로우 구현이 기본적으로 제공된다.

  • 공유 플로우: 값을 브로드캐스트하기 위해 사용된다.
  • 상태 플로우: 상태를 전달하는 특별한 경우에 사용된다.

실제로는 상태 플로우를 공유 플로우보다 더 자주 사용하게 될 것이다.

살짝 먼저 이야기하자면, StateFlow는 간편하게 구현할 수 있는 옵저버 패턴이다.
그래서, 안드로이드와 같은 UI 개발자는 더욱 그렇다.

 

하지만 활용을 위해 둘 모두 이해하는 것은 충분히 유용하다.

그러므로 공유 플로우부터 알아보도록 하자.


공유 플로우

공유 플로우는 값을 구독자에게 브로드캐스트한다.

이는 fan-out 방식과 유사하다.

간단한 공유 플로우 구현을 함께 살펴보며 이해해보자.

코드부터 만드는 방식이 핫 플로우와는 많이 다르다는 것을 알 수 있다.
플로우 빌더 대신 가변 플로우에 대한 참조를 얻는다.
배출이 구독자 유무와 관계없이 발생하므로 우리가 실제로 배출을 수행하는 코루틴을 시작할 책임이 있다.

 

구독자를 추가하는 방법은 콜드 플로우를 수집하는 것과 동일하다.
그냥 collect를 호출하면 된다.

단, 구독자는 구독 시작 이후에 배출된 값만 수신한다는 점에 유의해야 한다.


위 출력 결과를 보면, 구독자가 0.5초에 발행된 0은 수집하지 못한 것을 확인할 수 있다.

 

공유 플로우는 브로드캐스트 방식으로 작동하기 때문에, 구독자를 추가해서 이미 존재하는 messageFlow 의 발행을 수신할 수 있다.

 


위의 경우, A와 B 모두 성공적으로 수집한 것을 확인할 수 있다.

구독자를 위한 값 재생

기본적으로 공유 플로우는 구독 시점 이전의 데이터는 제공하지 않는다.
구독자가 구독 이전에 배출된 원소도 받기를 원한다면, MutableSharedFlow를 생성할 때 replay 파라미터 를 활용해 새 구독자를 위해 제공할 값의 캐시를 설정할 수 있다.

 

아래 코드는 마지막 5개의 값을 재생하도록 MessageFlow를 설정한다.

private val _messageFlow = MutableSharedFlow<Int>(replay = 5)

shareIn으로 콜드 플로우를 공유 플로우로 전환

코틀린에서는 시간이 지남에 따라 여러 값을 계산하는 작업을 단순한 콜드 플로우로 노출하고, 필요할 때 이 콜드 플로우를 핫 플로우로 변환하는 패턴이 자주 사용된다.


현재 이 코드의 경우, launch 로 플로우를 두번 수집하고 있다.

 

만약 플로우 내에서 DB 요청이 있을경우, 각각이 동일한 요청을 받아야 한다면, 하나의 플로우 값을 둘이 공유할 수 있어야 한다.

 

shareIn을 활용하면, 같은 플로우를 공유하도록 만들 수 있다.


두번째 매개변수 started에 SharingStarted.Lazily 를 줬는데, 여기서 여러가지 다른 동작을 지정할 수 있다.

  • Eagerly는 플로우 수집을 즉시 시작한다.
  • Lazily는 첫 번째 구독자가 나타나야만 수집을 시작한다.
  • whileSubscribed는 첫 번째 구독자가 나타나야 수집을 시작하고, 마지막 구독자가 사라지면 플로우 수집을 취소한다.

상태 플로우

실시간 시스템에서 자주 발생하는 특별한 사례는 시간이 지남에 따라 변할 수 있는 값, 즉 상태를 추적하는 것이다.

 

상태 플로우는 변수의 상태 변화를 쉽게 추적할 수 있는 공유 플로우의 특별한 버전이다.

 

상태 플로우에 대해 다음 네가지를 알아보자.

  • 상태 플로우를 생성하고 구독자에게 노출시키는 방법
  • 상태 플로우의 값을 (병렬로 접근해도) 안전하게 갱신하는 방법
  • 값이 실제로 변경될 때만 상태 플로우가 값을 배출하게 하는 equality-based conflation 방법
  • 콜드 플로우를 상태 플로우로 변환하기

MutableStateFlow - 상태 플로우를 생성하고 구독자에게 노출시키는 방법


StateFlow는 MutableStateFlow로 정의 가능하다.

UPDATE 함수 - 상태 플로우의 값을 (병렬로 접근해도) 안전하게 갱신하는 방법

위 코드에서 ++ 연산을 사용하지 않고 update 연산을 사용했는데, update 함수는 StateFlow의 값을 원자적으로 변경한다.

값이 실제로 변경될 때만 상태 플로우가 값을 배출하게 하는 equality-based conflation 방법


위 코드의 출력 결과는 어떻게 될까?

LEFT
RIGHT
RIGHT
LEFT
LEFT

위가 나올 것 같지만, stateFlow는 실제로 값이 변경될 때만 emit을 수행한다.

따라서 출력 결과는 다음과 같다.

stateIn으로 콜드 플로우를 상태 플로우로 변환하기

콜드 플로우는 stateIn 함수를 이용해 상태 플로우로 변환할 수 있다.


여기서 특이한 점으로는, 공유 플로우와 다르게 stateIn 함수에게 시작 전략을 전달하지 않는다.

 

이 함수는 항상 주어진 코루틴 스코프 안에서 플로우를 시작하고, 코루틴 스코프가 취소될 때까지 구독자에게 value 프로퍼티를 통해 최신 값을 제공한다.


일반적인 규칙으로, 네트워크 요청이나 DB 읽기와 같은 서비스를 제공하는 함수는 콜드 플로우를 사용해 선언된다.
이를 사용하는 다른 클래스나 함수의 경우,

  • 콜드 플로우를 직접 수집하거나
  • 필요한 경우 이 정보를 시스템에 다른 부분에 제공하기 위해, 상태 플로우나 공유 플로우로 변환할 수 있다.

참고자료: 코틀린 인 액션

 

Kotlin in Action 2/e | 세바스티안 아이그너 | 에이콘출판사 - 예스24

안드로이드 공식 언어인 코틀린은 실용성과 간결성, 자바와의 상호 운용성으로 인해 서버 프로그래밍 등 다양한 분야에 쓰이는 경우가 늘고 있다. 코틀린 언어의 가장 큰 특징이라면 실용성을

www.yes24.com