Table of contents
Open Table of contents
채널
코루틴끼리의 통신을 위한 기본적인 방법으로 채널 API가 추가되었습니다.
채널은 송신자와 수신자의 수에 제한이 없이며, 채널을 통해 전송된 모든 값은 단 한 번만 받을 수 있습니다. (at most once)
Channel
은 두 개의 서로 다른 인터페이스를 구현한 하나의 인터페이스 입니다.
SendChannel
은 원소를 보내거나(또는 더하거나) 채널을 닫는 용도로 사용됩니다.ReceiveChannel
은 원소를 받거나(또는 꺼낼 때) 사용됩니다.
두 인터페이스는 구분되어 있으며, 채널의 진입점을 제한하기 위해 ReceiveChannel
이나 SendChannel
중 하나만 노출시키는 것도 가능합니다.
send
와 receive
는 모두 중단 함수라는 것을 확인할 수 있습니다. 원소른 보내고 받는 함수가 중단 함수인 것은 필수적인 특징입니다.
receive
를 호출했는데 채널에 원소가 없다면 코루틴은 원소가 들어올 때까지 중단됩니다. 공유 책장으로 비유하면, 누군가 책을 찾으러 갔는데 책장이 비어있 경우, 다른 사람이 책을 넣을 때까지 기다려야 하는 상황입니다.- 반면
send
는 채널의 용량이 다 찼을 때 중단됩니다. 대부분읜 채널은 용량이 제한되어 있다는 걸 나중에 확인할 수 있습니다. 공유 책장으로 비유하면, 누군가 책을 넣으러 갔는데 책장이 가득찬 경우, 다른사람이 책을 가져가 공간이 생길 때까지 기다려야 하는 상황입니다.
Java API에서 제공하는 BlockingQueue와 유사한데, BlockingQueue는 블로킹 큐이지만, 코루틴은 블로킹 큐가 아닙니다. 코루틴은 블로킹 큐와 달리 코루틴이 중단되는 것이지 스레드가 중단되는 것이 아닙니다.
fun CoroutineScope.produceNumbers(max: Int) = produce<Int> {
var x = 0
while (x <= max) send(x++)
}
produce
함수는 빌더로 시작된 코루틴이 어떻게 종료되든 상관없이(끝나거나, 중단되거나, 취소되거나) 채널을 닫습니다.
따라서 close
를 반드시 호출합니다. produce
빌더는 채널을 만드는 가장 인기있는 방법이며, 안전하고 편리하다는 등의 많은 장점이 있습니다.
val channel = Channel<Int>()
launch {
channel.consumeEach { println(it) }
}
수신자는 얼마나 많은 원소를 보내는지 알아야 합니다. 수신자가 이런 정보를 아는 경우는 별로 없기 때문에 송신자가 보내는 만큼 수신자가 기다리는 방식을 선호합니다.
채널이 닫힐 때까지 원소를 받기 위해 for 루프나 consumeEach
함수를 사용할 수 있습니다.
채널 타입
설정한 용량 크기에 따라 채널을 네 가지로 구분할 수 있습니다.
- 무제한(Unlimited): 제한이 없는 용량 버퍼를 가진
Channel
입니다. 이 채널은 보내는 쪽이 빠르면 받는 쪽이 느려도 문제가 없습니다. 이 채널은Channel.UNLIMITED
로 설정된 채널로,send
가 중단되지 않습니다. - 버퍼(Buffered): 특정 용량 크기를 가진 또는
Channel.BUFFERED
(기본값은 64이 JVM의 channels.defaultBuffer를 설정하면 오버라이드 할 수 있습니다)로 설정된 채널 - 랑데뷰(Rendezvous): 용량이 0이거나
Channel.RENDEZVOUS
로 설정된 채널로, 송신자와 수신자가 만날 때만 원소를 교환합니다(책장이 아닌 책을 교환하는 장소와 비슷합니다). - 융합(Conflated): 버퍼 크기가 1인
Channel.CONFLATED
로 설정된 채널로, 최신 원소만 유지합니다. 즉, 새로운 원소가 들어오면 이전 원소는 삭제됩니다.
버퍼 오버플로일 때
채널을 커스텀화하기 위해 버퍼가 꽉 찼을 때(onBufferOverflow 파라미터)의 행동을 정의할 수 있습니다.
SUSPEND
(기본옵션): 버퍼가 가득 찾을 때,send
메서드가 중단됩니다.DROP_OLDEST
: 버퍼가 가득 찾을 때, 가장 오래된 원소를 삭제하고 새로운 원소를 추가합니다.DROP_LATEST
: 버퍼가 가득 찾을 때, 가장 최근 원소를 삭제하고 새로운 원소를 추가합니다.
채널 용량 중 Channel.CONFLATED
는 용량을 1로 설정하고 onBufferOverflow
를 DROP_OLDEST
로 설정한 것임을 알 수 있습니다.
전달되지 않은 원소 핸들러
Channel
함수에서 반드시 알아야 할 또 다른 파라미터는 onUndeliveredElement
입니다. 원소가 어떠한 이유로 처리되지 않을 때 호출됩니다.
대부분 채널이 닫히거나 취소되었음을 의미하지만, send
, receive
, receiveOrNull
또는 hasNext
가 에러를 던질 때 발생할 수도 있습니다.
주로 채널에서 보낸 자원을 닫을 때 사용합니다.
val channel = Channel<Resource>(capacity) { resource ->
resource.close()
}
or
val channel = Channel<Resource>(
capacity,
onUndeliveredElement = { resource -> resource.close() }
)
팬아웃(Fan-out)
여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있습니다. 하지만 원소를 적절하게 처리하려면 반드시 for 루프를 사용해야 합니다
(consumeEach
는 여러 개의 코루틴이 사용하기에는 안전하지 않습니다.)
팬인(Fan-in)
여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있습니다.
다수의 채널을 하나의 채널로 합쳐야 할 경우가 있습니다. 이런경우 produce
함수로 여러 개의 채널을 합치는 fanIn
함수를 사용할 수 있습니다.
fun <T> CoroutineScope.fanIn(channels: List<ReceiveChannel<T>>): ReceiveChannel<T> = produce {
for (channel in channels) {
launch {
for (element in channel) send(element)
}
}
}
파이프라인
한 채널로부터 받을 원소를 다른 채널로 전송하는 경우가 있습니다. 이를 파이프라인이라고 부릅니다.
fun CoroutineScope.numbers(): ReceiveChannel<Int> = produce {
repeat(10) { send(it) }
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (number in numbers) send(number * number)
}
통신의 기본 형태로서의 채널
채널은 서로 다른 코루틴이 통신할 때 유용합니다. 충돌이 발생하지 않으며(공유 상태로 인한 문제가 일어나지 않습니다.) 공평함을 보장합니다.
순수한 형태의 채널은 좀더 복잡한 처리를 요하는 경우에 유용합니다.
고전적으로 처리하는 방식은 최적화된 방식이 아닙니다.
모든 것을 하나의 프로세스로 오랫동안 처리하는 건 좋은 생각이 아닙니다.
첫 번째로, 내부적인 예외가 발생하거나 서버가 재개되면 어디서 멈췄는지에 대한 단서를 남지기 않습니다.
두 번째로, 대규모의 상품을 제공하는 판매자가 오랫동안 서버를 붙들고 있으면, 다른 작은 규모의 판매자는 변경하기 위해 한참을 기다려야 합니다.
게다가 변경을 처리하기 위해 필요한 서비스에 너무 많은 네트워크 요청을 같은 시간에 보내면 안됩니다.
파이프라인을 설정하는 것이 해결책이 될 수 있습니다.