Table of contents
Open Table of contents
공유 상태로 인한 문제
아래에 있는 UserDownloader
클래스를 살펴봅시다. 이 클래스에서 아이디로 사용자를 받아오거나, 이전에 전송 받은 모든 사용자를 얻을 수 있습니다.
class UserDownloader(
private val api: NetworkService
) {
private val users = mutableListOf<User>()
fun download(): List<User> = users.toList()
suspend fun fetchUser(id: Int) {
val user = api.fetchUser(id)
users.add(user)
}
}
앞의 구현 방식은 동시 사용에 대한 대비가 되어있지 않습니다.
fetchUser
호출은 users
를 변경합니다. 이 경우 같은 시간에 해당 함수가 한 개의 스레드에서 시작할 경우에만 정상적으로 작동합니다.
같은 시간에 두 개 이상의 스레드에서 함수가 호출되 수 있으므 users
는 공유 상태에 해당하며 보호될 필요가 있습니다.
동시에 리스트를 변경하면 충돌이 일어날 수 있기 때문입니다.
동기화 블로킹
위와 같은 문제는 자바에서 사용되는 전통적인 도구인 syncronized
블록이나 동기화된 컬렉션을 통해 해결할 수 있습니다.
이 방법은 작동하긴 하지만, 몇 가지 문제점이 있습니다. 가장 큰 문제점은 syncronized
블록 내부에서 중단 함수를 사용할 수 없다는 것입니다.
두 번째는 syncronized
블록에서 코루틴이 자기 차례를 기다릴 때, 스레드를 블로킹한다는 것입니다.
블로킹 없이 중단하거나 충돌을 회피하는 방법을 사용해야 합니다.
원자성
자바에는 간단한 경우에 사용할 수 있는 다른 방법이 있습니다. 자바는 다양한 원자값을 가지고 있습니다.
원자값을 사용한 연산은 빠르며 ‘스레드 안전’을 보장합니다. 이러한 연산을 원자성 연상이라 합니다.
원자성 연산은 락 없이 로우 레벨로 구현되어 효율적이고 사용하기가 쉽습니다.
원자성은 하나의 프리미티느 변수 또는 하나의 레퍼런스의 안전을 보장하기 위해 사용되지만, 좀더 복잡한 경우에는 다른 방법을 사용해야 합니다.
싱글스레드로 제한된 디스패처
싱글 스레드 디스패처를 사용하는 것이 공유 상태와 관련된 대부분의 문제를 해결하는 가장 쉬운 방법입니다.
val dispatcher = Dispatchers.IO.limitedParallelism(1)
var counter = 0
fun main() = runBlocking {
massiveRun {
withContext(dispatcher) {
counter++
}
}
println("Counter = $counter") // Counter = 100000
}
두 가지 방법으로 디스패처를 사용할 수 있습니다. 첫 번째 방법은 코스 그레인드 스레드 한정(coarse-grained thread confinement) 으로 알려져 있습니다.
이 방법은 디스패처를 싱글스레드로 제한한 withContext
로 전체 함수를 래핑하는 방법입니다. 사용하기 쉬우며 충돌을 방지할 수 있지만, 함수 전체에서 멀티스레딩의 이점을 누리지 못하는 문제가 있습니다.
만약 블로킹되는 함수 또는 CPU 집약적인 함수를 호출하면 함수 실행이 느려지게 됩니다.
class UserDownloader(
private val api: NetworkService
) {
private val users = mutableListOf<User>()
private val dispatcher = Dispatchers.IO.limitedParallelism(1)
suspend fun downloaded(): List<User> =
withContext(dispatcher) {
users.toList()
}
suspend fun fetchUser(id: Int) = withContext(dispatcher) {
val user = api.fetchUser(id)
users += newUser
}
}
두 번째 방법은 파인 그레인드 스레드 한정(find-grained thread confinement) 으로 알려져 있습니다.
이 방법은 상태를 변경하는 구문들만 래핑합니다. 예제에서는 users
를 사용하는 모든 줄에 해당합니다. 파인 그레인드 스레드 한정은 좀 더 번거롭지만
크리티컬 섹션(critical section)이 아닌 부분이 블로킹되거나 CPU 집약적인 경우에 더 나은 성능을 제공합니다.
일반적인 중단 함수에 적용하는 경우에는 성능에 큰 차이가 없습니다.
대부분의 경우, 표준 디스패처가 같은 스레드 풀을 사용하기 때문에 싱글스레드를 가진 디스패처를 사용하는 건 쉬울 뿐 아니라 효율적입니다.
뮤텍스
마지막으로 가장 인기있는 방식은 Mutex
를 사용하는 것입니다.
뮤텍스를 단 하나의 열쇠가 있는 방이라고 생각할 수 있습니다.
뮤텍스에 가장 중요한 기능은 lock
입니다.
첫 번째 코루틴이 lock
을 호출하면 열쇠를 가지고 중단 없이 작업을 수행합니다.
또 다른 코루틴이 lock
을 호춣하면 unlock
을 호출할 때까지 중단됩니다.
또 다른 코루틴이 lock
함수를 호출하면, 마찬가지로 작업을 중단한 뒤에 두 번째 코루틴 다음 순서로 큐에 들어가게 됩니다.
첫 번째 코루틴이 unlock
함수를 호출하면 열쇠를 반납하고 두 번째 코루틴(큐의 첫 번째 코루틴)이 재개한 뒤 lock
함수를 통과하게 됩니다.
따라서 단 하나의 코루틴만이 lock
과 unlock
사이에 있을 수 있습니다.
suspend fun main() = coroutineScope {
repeat(5) {
launch {
mutex.withLock {
println("Locked by $it")
delay(1000)
println("Unlocked by $it")
}
}
}
}
// Locked by 0
// Unlocked by 0
// Locked by 1
// Unlocked by 1
// ...
syncronized
블록과 달리 Mutex
는 스레드를 블로킹하는 대신 코루틴을 중단시킨다는 것입니다.
좀더 안전하고 가벼운 방식입니다.
병렬 실행이 싱글스레드로 제한된 디스패처를 사용하는 것과 비교하면 뮤텍스가 가벼우며 좀 더 나은 성능을 가질 수 있습니다.
하지만 적절히 사용하는 것 또한 어렵습니다. 뮤택스를 사용할 때 맞닥뜨리는 위험한 경우는 코루틴이 락을 두 번 통과할 수 없다는 것입니다.
다음 코드를 실행하면 프로그램은 데드락에 빠지게 됩니다.
suspend fun main() = coroutineScope {
val mutex = Mutex()
val job = launch {
mutex.withLock {
mutex.withLock {
println("Unreachable")
}
}
}
job.join()
}
뮤텍스가 가진 두 번째 문제점은 코루틴이 중단되었을 때 뮤텍스를 풀 수 없다는 점입니다.
다음 코드를 보면 delay
중에 뮤텍스가 잠겨있어 5초가 걸리는 걸 확인할 수 있습니다.
class MessageRepository {
private val messages = mutableListOf<String>()
private val mutex = Mutex()
suspend fun addMessage(message: String) {
mutex.withLock {
delay(5000) // 네트워크 호출이라 가정합니다.
messages.add(message)
}
}
}
suspend fun main() = coroutineScope {
val repository = MessageRepository()
val timeMillis = measureTimeMillis {
coroutineScope {
repeat(5) {
launch {
repository.addMessage("Message $it")
}
}
}
}
println("Time: $timeMillis") // Time: 5000
}
싱글스레드로 제한된 디스패처를 사용하면 이런 문제는 발생하지 않습니다.
delay
나 네트워크 호출이 코루틴을 중단시키면 스레드를 다른 코루틴이 사용합니다.
따라서 전체 함수를 뮤텍스로 래핑하는 건 지양해야 합니다.
뮤텍스를 사용하기로 했다면 락을 두 번 걸지 않고 중단함수를 호출하지 않도록 신경써야 합니다.
세마포어
세마포어는 둘 이상이 접근할 수 있고 사용법이 다릅니다.
Semaphore
는 여러 개의 접근을 허용하므로, acquire
, release
, withPermit
함수를 가지고 있습니다.
suspend fun main() = coroutineScope {
val semaphore = Semaphore(2)
coroutineScope {
repeat(5) {
launch {
semaphore.withPermit {
println("Permit by $it")
delay(1000)
}
}
}
}
}
세마포어는 공유 상태로 인해 생기는 문제는 해결할 수는 없지만, 동시 요청을 처리하는 수를 제한할 때 사용할 수 있어 처리율 제한 장치(rate limiter) 를 구현할 때 도움이 됩니다.