Skip to content

코틀린 코루틴 채널

Published: at 오후 04:22

Table of contents

Open Table of contents

채널

코루틴끼리의 통신을 위한 기본적인 방법으로 채널 API가 추가되었습니다.

채널은 송신자와 수신자의 수에 제한이 없이며, 채널을 통해 전송된 모든 값은 단 한 번만 받을 수 있습니다. (at most once)

Channel 은 두 개의 서로 다른 인터페이스를 구현한 하나의 인터페이스 입니다.

두 인터페이스는 구분되어 있으며, 채널의 진입점을 제한하기 위해 ReceiveChannel 이나 SendChannel 중 하나만 노출시키는 것도 가능합니다.

sendreceive는 모두 중단 함수라는 것을 확인할 수 있습니다. 원소른 보내고 받는 함수가 중단 함수인 것은 필수적인 특징입니다.

Java API에서 제공하는 BlockingQueue와 유사한데, BlockingQueue는 블로킹 큐이지만, 코루틴은 블로킹 큐가 아닙니다. 코루틴은 블로킹 큐와 달리 코루틴이 중단되는 것이지 스레드가 중단되는 것이 아닙니다.

fun CoroutineScope.produceNumbers(max: Int) = produce<Int> {
    var x = 0
    while (x <= max) send(x++)
}

produce 함수는 빌더로 시작된 코루틴이 어떻게 종료되든 상관없이(끝나거나, 중단되거나, 취소되거나) 채널을 닫습니다. 따라서 close를 반드시 호출합니다. produce 빌더는 채널을 만드는 가장 인기있는 방법이며, 안전하고 편리하다는 등의 많은 장점이 있습니다.

val channel = Channel<Int>()

launch {
    channel.consumeEach { println(it) }
}

수신자는 얼마나 많은 원소를 보내는지 알아야 합니다. 수신자가 이런 정보를 아는 경우는 별로 없기 때문에 송신자가 보내는 만큼 수신자가 기다리는 방식을 선호합니다.

채널이 닫힐 때까지 원소를 받기 위해 for 루프나 consumeEach 함수를 사용할 수 있습니다.

채널 타입

설정한 용량 크기에 따라 채널을 네 가지로 구분할 수 있습니다.

버퍼 오버플로일 때

채널을 커스텀화하기 위해 버퍼가 꽉 찼을 때(onBufferOverflow 파라미터)의 행동을 정의할 수 있습니다.

채널 용량 중 Channel.CONFLATED는 용량을 1로 설정하고 onBufferOverflowDROP_OLDEST로 설정한 것임을 알 수 있습니다.

전달되지 않은 원소 핸들러

Channel 함수에서 반드시 알아야 할 또 다른 파라미터는 onUndeliveredElement 입니다. 원소가 어떠한 이유로 처리되지 않을 때 호출됩니다. 대부분 채널이 닫히거나 취소되었음을 의미하지만, send, receive, receiveOrNull 또는 hasNext가 에러를 던질 때 발생할 수도 있습니다.

주로 채널에서 보낸 자원을 닫을 때 사용합니다.

val channel = Channel<Resource>(capacity) { resource ->
    resource.close()
}

or

val channel = Channel<Resource>(
    capacity,
    onUndeliveredElement = { resource -> resource.close() }
)

팬아웃(Fan-out)

여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있습니다. 하지만 원소를 적절하게 처리하려면 반드시 for 루프를 사용해야 합니다 (consumeEach는 여러 개의 코루틴이 사용하기에는 안전하지 않습니다.)

팬인(Fan-in)

여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있습니다.

다수의 채널을 하나의 채널로 합쳐야 할 경우가 있습니다. 이런경우 produce 함수로 여러 개의 채널을 합치는 fanIn 함수를 사용할 수 있습니다.

fun <T> CoroutineScope.fanIn(channels: List<ReceiveChannel<T>>): ReceiveChannel<T> = produce {
    for (channel in channels) {
        launch {
            for (element in channel) send(element)
        }
    }
}

파이프라인

한 채널로부터 받을 원소를 다른 채널로 전송하는 경우가 있습니다. 이를 파이프라인이라고 부릅니다.

fun CoroutineScope.numbers(): ReceiveChannel<Int> = produce {
    repeat(10) { send(it) }
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (number in numbers) send(number * number)
}

통신의 기본 형태로서의 채널

채널은 서로 다른 코루틴이 통신할 때 유용합니다. 충돌이 발생하지 않으며(공유 상태로 인한 문제가 일어나지 않습니다.) 공평함을 보장합니다.

순수한 형태의 채널은 좀더 복잡한 처리를 요하는 경우에 유용합니다.

고전적으로 처리하는 방식은 최적화된 방식이 아닙니다.

모든 것을 하나의 프로세스로 오랫동안 처리하는 건 좋은 생각이 아닙니다.

첫 번째로, 내부적인 예외가 발생하거나 서버가 재개되면 어디서 멈췄는지에 대한 단서를 남지기 않습니다.

두 번째로, 대규모의 상품을 제공하는 판매자가 오랫동안 서버를 붙들고 있으면, 다른 작은 규모의 판매자는 변경하기 위해 한참을 기다려야 합니다.

게다가 변경을 처리하기 위해 필요한 서비스에 너무 많은 네트워크 요청을 같은 시간에 보내면 안됩니다.

파이프라인을 설정하는 것이 해결책이 될 수 있습니다.