[Coroutine] 채널(Channel)

업데이트:

채널

채널은 동시성 코드 간에 서로 안전한 통신을 할 수 있도록 해주는 도구이다. 채널은 동시성 코드가 메시지를 보내 통신할 수 있도록 해준다.

채널은 실행 중인 스레드에 상관없이 서로 다른 코루틴 간에 메시지를 안전하게 보내고 받기 위한 파이프라인으로 생각할 수 있다.

버퍼와 배압

채널의 send() 는 일시 중단 함수다. 그 이유는 실제로 데이터를 수신하는 누군가가 있을 때까지 전송하는 코드를 일시 중지하고 싶을 수도 있기 때문이다. 흔히 배압 이라고 하며 리시버가 실제로 처리할 수 있는 것보다 더 많은 요소들로 채널이 넘치지 않도록 도와준다.

배압을 구성하기 위해 채널에 대한 버퍼를 정의할 수 있다. 채널을 통ㅎ해 데이터를 보내는 코루틴은 채널 안의 요소가 버퍼 크기에 도달하면 일시 중단된다. 그리고 채널에서 요소가 제거되는 즉시, 송신자는 다시 재개된다.

언버퍼드 채널

버퍼가 없는 채널이다. 랑데부 채널이라고도 부른다. 채널에서 send() 를 호출하면 리시버가 receive()를 호출할 때까지 일시 중지된다.

랑데부 채널은 아래와 같은 방법으로 초기화 한다.

//1. 파라미터 없이 호출
val channel1 = Channel<Int>()

//2. 버퍼 용량을 0으로 전달
val channel2 = Channel<Int> (0)

//3. 랑데뷰 상수 값 전달. 상수 값은 0이므로 2번과 동일하다
val channel3 = Channel<Int>(Channel.RENDEZVOUS)

아래 예에서 sender는 채널을 통해 최대 10개까지 숫자를 보낼수 있다. 그리나 실행이 끝니가 전에 채널로부터 수신하는 요소가 두 개 뿐이어서 두 요소만 전달이 된다.

runBlocking {
    val time = measureTimeMillis {
        val channel = Channel<Int> ()
        GlobalScope.launch {
            repeat(10) {
                channel.send(it)
                println("Sent $it")
            }
        }

        channel.receive()
        channel.receive()
    }
    println("Took $time ms")

}

이 코루틴의 결과는 다음과 같다.

Sent 0
Sent 1
Took 27 ms

버퍼드 채널

버퍼를 가지는 채널이다. 채널 내 요소의 수가 버퍼의 크기와 같을 때 송신자의 실행을 중지한다.

Linked List Channel

버퍼가 무한대로 생각하면 되며 송신자는 절대 중단되지 않는다.

val ch = Channel<Int>(Channel.UNLIMITED)

Array Channel

버퍼 크기가 1 ~ int.MAX_VALUE 까지 가지며, 가지고 있는 요소의 양이 버퍼 크기에 이르면 송신자를 일시 중단한다.

val ch = Channel<Int>(50)

버퍼가 가득차면 송신자를 일시 중지하고, 하나 이상의 항목이 검색되면 다시 재개한다.

runBlocking {
    val time = measureTimeMillis {
        val channel = Channel<Int> (5)
        GlobalScope.launch {
            repeat(10) {
                channel.send(it)
                println("Sent $it")
            }
        }
        delay(1500)
        println("take 2")

        channel.receive()
        channel.receive()
        delay(1500)

    }
    println("Took $time ms")
}

예제에서는 송신자는 최대 10개를 보낼수 있지만 버퍼 사이즈 때문에 5개를 전송하고 중단이 되며 2개의 요소가 수신이 되면 2개를 다시 보낸다.

Sent 0
Sent 1
Sent 2
Sent 3
Sent 4
take 2
Sent 5
Sent 6
Took 3050 ms

Conflated Channel

채널에 1개의 버퍼만 가지고 있고, 새로운 요소의 버퍼만 갖고 있고, 새로운 요소가 보내질 때마다 이전 요소는 유실된다.

val channel = Channel<Int>(Channel.CONFLATED)

Channel.CONFLATED 파라미터로 Channel() 인스턴스를 생성한다

val time = measureTimeMillis {
    runBlocking {
        val channel = Channel<Int>(Channel.CONFLATED)
        GlobalScope.launch {
            repeat(5) {
                channel.send(it)
                println("Sent $it")
            }
        }
        println("Received ${channel.receive()}")
        delay(500)
    }
}
println("${time} ms")

위 코드를 실행하면 수신자는 가장 마지막 요소만 읽는다.

Sent 0
Sent 1
Sent 2
Sent 3
Sent 4
Received 4
563 ms

보내기 전 검증

Send Channel

채널에 요소를 전송하기 전에 채널이 닫히지 않았는지 확인을 해야한다. 이를 위해 isClosedForSend 를 사용할 수 있다.

val channel = Channel<Int>()
channel.isClosedForSend //false
channel.close() 
channel.isClosedForSend //true
//

만약 채널이 닫힌 채로 send()  호출하면 ClosedSendChannelException  발생한다.

~~~kotlin
val channel = Channel<Int>(1)
channel.close()
channel.send(1) //ClosedSendChannelException 발생

Receive Channel

수신자는 채널을 수신하기 전에 채널이 닫혔는지 체크를 해야한다. 이를 위해 isClosedForReceive 속성을 확인한다.

val channel = Channel<Int>(1)
channel.isClosedForReceive
channel.close()
channel.isClosedForReceive

닫힌 채널에서 receive가 호출되면, ClosedReceiveChannelException 이 발생한다.

val channel = Channel<Int>(1)
channel.close()
channel.receive() //ClosedReceiveChannelException 발생

다음은 isEmpty를 속성을 사용하여 채널에 수신할 것이 있는지를 확인 할수 있다.

val channel = Channel<Int>(1)
channel.isEmpty //true
channel.send(10)
channel.isEmpty //false

Reference

코틀린 동시성 프로그래밍(에이콘)

댓글남기기