-
[Kotlin] 코루틴을 이해해보자프로그래밍 언어/kotlin 2021. 7. 4. 00:44728x90반응형
새차원님의 코틀린 코루틴 강좌와 공식문서의 코루틴 부분을 참고하여 정리한 내용입니다.
1. Why Coroutines?
코루틴 : 비동기적으로 실행되는 코드를 간소화하기 위해 Android에서 사용할 수 있는 동시 실행 설계 패턴.
- 메인스레드가 블로킹될 수 있는 부분에 대해서 도움을 준다.
- 비동기 처리 코드를 순차적인 코드로 만들 수 있다. ( by replacing callbacks )
- 코루틴은 이전에 자신의 실행이 마지막으로 중단되었던 지점 다음 장소에서 실행을 재개한다.
api를 호출한 후, data로 ui 업데이트 하는 코드
main 스레드에서 api 호출하면 ui 스레드가 블로킹 되어 화면을 그릴 수 없게 되고, 앱은 죽게 된다.
잘 동작하지만 dream code 는 아님
콜백도 없고, 별도의 스레드로 갔다 오는 코드가 없지만, 콜백이랑 똑같이 처리된다.
2. Basics
fun main() { GlobalScope.launch { // launch a new coroutine in background and continue delay(1000L) // non-blocking delay for 1 second println("World!") // print after delay } println("Hello") // main thread continues while coroutine is delayed Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive }
launch 하려면 coroutine scope가 있어야 한다. GlobalScope는 coroutine scope인데, 미리 준비된 객체로, lifetime이 프로그램 전체인 전역 scope이다.
launch은 coroutine builder로서, 내부적으로 stand alone coroutine을 반환한다.
delay 함수는 suspend function(논블로킹). sleep 은 메인 스레드를 블로킹하는 함수이다.
delay 와 sleep 이 혼용되고 있으니까 delay로 통일시켜보자.
fun main() { GlobalScope.launch { delay(1000L) // non-blocking delay for 1 second println("World!") // print after delay } println("Hello") // main thread continues while coroutine is delayed runBlocking { delay(2000L) // block main thread for 2 seconds to keep JVM alive } }
delay는 suspend function이기 때문에, 코루틴이나 다른 suspend function 안에서 호출이 가능하다. 따라서, 명시적으로 블로킹하는 코루틴을 하나 만들어 주어야 한다. runBlocking 도 코루틴 빌더이고, blocking coroutine 하나를 만들어서 반환한다. launch 는 자신을 호출한 스레드를 블로킹하지 않지만, runBlocking은 자신을 호출한 스레드를 블로킹한다.
좀 더 관용적인 형태의 코드는 다음과 같다.
fun main() = runBlocking { // this: CoroutineScope launch { // launch a new coroutine and continue delay(1000L) // non-blocking delay for 1 second (default time unit is ms) println("World!") // print after delay } println("Hello") // main coroutine continues while a previous one is delayed }
runBlocking 으로 전체를 한번 감쌌기 때문에, 안에 있는 코드가 전부 실행되기 전까지는 메인 스레드가 종료되지 않는다.
launch 라는 코루틴 빌더는 job 객체를 반환한다. job 을 join 하게 되면, delay 를 사용하지 않고, 코루틴이 완료될때까지 기다렸다가 메인함수를 종료하도록 할 수 있다.
fun main() = runBlocking { // this: CoroutineScope val job = GlobalScope.launch { delay(1000L) println("World!") } println("Hello") job.join() }
structured concurrency
코루틴을 여러개 실행하면.. 위와 같이 job들을 관리해주기 힘들다. 이런 문제는, GlobalScope 이기 때문이다.
top-level 코루틴(GlobalScope)와 runBlocking 코루틴은 구조적으로 관계가 없다. 따라서 GlobalScope 에서 코루틴이 끝나든 말든 상관없이 join이 없으면 메인함수가 끝나버린다.
그러면.. 서로 구조적으로 관계를 만들어 주면, 서로 끝날때까지 기다려 줄 수 있지 않을까? -> structured concurrency
structured concurrency: top level coroutine을 만들지 말고, runBlocking에 의해 생성된 coroutine의 child로 코루틴을 만들면, 부모 코루틴이 child 코루틴 완료될때까지 기다려준다.
방법은, GlobalScope 에서 launch하지 말고, runBlocking에서 들어온 coroutine scope 에서 launch를 하자.
fun main() = runBlocking { launch { delay(1000L) println("World!") } launch { delay(1000L) println("World!") } println("Hello") }
이렇게 하면, join 없이, 여러 자식 코루틴들이 끝날 때 까지 다 기다려준다.
코루틴 안에서는 suspend function 만 호출가능하다.
fun main() = runBlocking { // this: CoroutineScope launch { doWorld() } println("Hello") } // this is your first suspending function suspend fun doWorld() { delay(1000L) println("World!") }
코루틴은 가볍다.
fun main() = runBlocking { repeat(100_000) { // launch a lot of coroutines launch { delay(5000L) print(".") } } }
프로세스가 끝나면 코루틴도 끝난다.
3. Cacellation and Timeouts
코루틴을 잘 취소해주는 것은 중요하다. 메모리 리소스를 잡아먹는 것이기 때문이다.
우선 runBlocking 으로 메인 스레드는 블로킹 시킨다. 그리고 launch를 이용해서 코루틴을 하나 만든다. 코루틴은 1000번 반복하면서 숫자를 0.5초 간격으로 출력한다. 1.3초정도 지난다음에, 앱이 끝났으면 좋겠다면 cancel()을 호출하면 코루틴을 취소 가능하다.
fun main() = runBlocking { val job = launch { repeat(1000) { i -> println("job: I'm sleeping ${i++} ...") delay(500L) } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
코루틴 취소해도 코루틴이 계속 동작된다. => 코루틴이 취소될려면, 코루틴 코드 자체에서 cancel을 체크해야한다. ( 코루틴 스스로가 취소에 협조적이어야 한다. ) 그러면 cancle 가능한 코드는 어떻게 만들까?
방법 1) 주기적으로 suspend function을 호출. 호출되었다가 다시 재개될 때 cancel되었는지 확인해서 exception을 던져주는 방식
방법 2) 명시적으로 상태를 체크해서 상태가 isActive 아니면 이 코루틴을 종료시키면 되는 방식
코루틴 내에서 suspend 가 되었다가, 다시 재개될 때, 재개될 시점에 suspend function이 exception을 던진다. exception 체크 해볼려면, 코루틴 내에서 try-catch 문으로 확인해볼 수도 있다.
fun main() = runBlocking { val startTime = currentTimeMillis() val job = launch(Dispatchers.Default) { var nextPrintTime = startTime var i = 0 while (i < 5) { // computation loop, just wastes CPU // print a message twice a second if (currentTimeMillis() >= nextPrintTime) { // delay(1L) yield() println("job: I'm sleeping ${i++} ...") nextPrintTime += 500L } } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
isActive 를 사용해서 실제로 코루틴 job이 종료되었는지 체크한다.
fun main() = runBlocking { val startTime = currentTimeMillis() val job = launch(Dispatchers.Default) { var nextPrintTime = startTime var i = 0 while (isActive) { // cancellable computation loop // print a message twice a second if (currentTimeMillis() >= nextPrintTime) { println("job: I'm sleeping ${i++} ...") nextPrintTime += 500L } } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
코루틴을 종료할 때 리소스를 어떻게 해제할 수 있는지 알아보자. 코루틴을 종료할 때, 코루틴에서 네트워크를 쓰거나 DB를 쓰다가 코루틴이 cancel되면, 리소스를 close해주어야 한다. 리소스를 해제할 위치는 suspend 함수가 exception 발생시키는 위치에서 finally 블록에서 해제해주면 된다.
fun main() = runBlocking { val job = launch { try { repeat(1000) { i -> println("job: I'm sleeping $i ...") delay(500L) } } finally { println("job: I'm running finally") } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
cancel된 코루틴 안에서 다시 또 코루틴을 실행해야 하는 rare한 케이스 ( but 이렇게 할 수도 있다 )
fun main() = runBlocking { val job = launch { try { repeat(1000) { i -> println("job: I'm sleeping $i ...") delay(500L) } } finally { withContext(NonCancellable) { println("job: I'm running finally") delay(1000L) println("job: And I've just delayed for 1 sec because I'm non-cancellable") } } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.") }
timeout은 코루틴의 job을 가지고 cancel 하는 것이 아니고, 코루틴을 실행할 때, 해당 시간이 지나면 이 코루틴은 취소된다고 미리 timeout을 지정하는 방식. 아래의 예시는 runBlocking 안에서 실행하고 있어서 TimeoutCancellationException이 발생한다.
fun main() = runBlocking { withTimeout(1300L) { repeat(1000) { i -> println("I'm sleeping $i ...") delay(500L) } } }
위의 exception 던지는 것을 해결하기 위해서는 아래와 같이 withTimeoutOrNull을 활용할 수 있다.
fun main() = runBlocking { val result = withTimeoutOrNull(1300L) { repeat(1000) { i -> println("I'm sleeping $i ...") delay(500L) } "Done" // will get cancelled before it produces this result } println("Result is $result") }
4. Composing Suspending Functions
suspend function 을 어떻게 조합해서 코루틴을 유용하게 사용할 수 있을까
retrofit 호출 같은 것(=heavy 한 비동기 job)을 순차적으로 실행하고 싶으면 어떻게 해야되는가?
코루틴에서는 일반 코드처럼 작성하면, 비동기일지라도 순차적으로 실행되는 것이 기본이다
-> 비동기 실행을 순차적으로 맞춰줄 수 있고, 콜백을 순차 실행한다.
class MainActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?){ super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) initUi() // case1 button_coroutines.setOnClickListener { CoroutineScope(Dispatchers.Main).launch { // 메인 스레드를 블로킹하지 않음 runDreamCode() // Ui 업데이트 됨 } } // case2 button_blocking.setOnClickListener { runBlocking { // 메인 스레드를 블로킹한다. runDreamCode() // Ui 업데이트 안됨 } } } } suspend fun runDreamCode() { val time = measureTimeMillis { val one = doSomethingUsefuleOne() // CPU를 많이 먹는 동작1 val two = doSomethingUsefuleTwo() // CPU를 많이 먹는 동작2 println("The answer is ${one + two}") } println("Completed in $time ms") } suspend fun doSomethingUsefuleOne(): Int { println("doSomethingUsefuleOne") delay(1000L) return 13 } suspend fun doSomethingUsefulTwo(): Int { delay(1000L) return 29 }
case1 ) 코루틴 안에서 일반 코드처럼 작성하였고, 심지어 메인 스레드에서 실행되었지만, UI를 블로킹하지 않는다.
case2 ) 코루틴이 아닌 형태로, runBlocking을 활용하여 명시적으로 메인 스레드를 블로킹하도록 한다면 어떻게 될까? -> 모든 UI가 다 멈춘다.
fun main() = runBlocking<Unit> { val time = measureTimeMillis { val one = async { doSomethingUsefulOne() } val two = async { doSomethingUsefulTwo() } println("The answer is ${one.await() + two.await()}") } println("Completed in $time ms") } suspend fun doSomethingUsefulOne(): Int { delay(1000L) // pretend we are doing something useful here return 13 } suspend fun doSomethingUsefulTwo(): Int { delay(1000L) // pretend we are doing something useful here, too return 29 }
만약 첫번째와 두번째 연산이 dependency가 없다면, 굳이 순차적으로 실행하지 않고, 비동기적으로 실행하고 싶을 것이다. 그럴때는, 다음과 같이 async {} 사용하면 된다.
fun main() = runBlocking<Unit> { val time = measureTimeMillis { val one = async { doSomethingUsefulOne() } val two = async { doSomethingUsefulTwo() } println("The answer is ${one.await() + two.await()}") } println("Completed in $time ms") } suspend fun doSomethingUsefulOne(): Int { delay(1000L) // pretend we are doing something useful here return 13 } suspend fun doSomethingUsefulTwo(): Int { delay(1000L) // pretend we are doing something useful here, too return 29 }
async 는 Job을 상속한 객체를 반환한다. one.await() 하면 job이 끝날때까지 기다린다.
async 자체는 코루틴 빌더인데, 바로 시작하지 않고 start()를 호출해서 시작 시점을 명시하고 싶다면, 여기에 start옵션으로 LAZY를 걸 수도 있다.
fun main() = runBlocking<Unit> { val time = measureTimeMillis { val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() } val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() } // some computation one.start() // start the first one two.start() // start the second one println("The answer is ${one.await() + two.await()}") } println("Completed in $time ms") } suspend fun doSomethingUsefulOne(): Int { delay(1000L) // pretend we are doing something useful here return 13 } suspend fun doSomethingUsefulTwo(): Int { delay(1000L) // pretend we are doing something useful here, too return 29 }
다음과 같이 GlobalScope에서 독립적으로 코루틴을 실행시키는 코드는 짜지는 말자.GlobalScope 에 launch 를 사용하면 exception이 났을 때, 돌이킬 수 없는 상황이 발생할 수 있다.
doSomethingUsefuleOne 함수는 일반 함수라서 어디에서든지 사용이 가능하다. coroutine scope 안에 안들어있다. GlobalScope에서 실행되었기 때문에, exception과 관계가 전혀 없어서, 지역에서 exception 난 것이 적용이 안된다. -> exception handling 떨어짐.
// note that we don't have `runBlocking` to the right of `main` in this example fun main() { val time = measureTimeMillis { // we can initiate async actions outside of a coroutine val one = somethingUsefulOneAsync() val two = somethingUsefulTwoAsync() // but waiting for a result must involve either suspending or blocking. // here we use `runBlocking { ... }` to block the main thread while waiting for the result runBlocking { println("The answer is ${one.await() + two.await()}") } } println("Completed in $time ms") } @OptIn(DelicateCoroutinesApi::class) fun somethingUsefulOneAsync() = GlobalScope.async { doSomethingUsefulOne() } @OptIn(DelicateCoroutinesApi::class) fun somethingUsefulTwoAsync() = GlobalScope.async { doSomethingUsefulTwo() } suspend fun doSomethingUsefulOne(): Int { delay(1000L) // pretend we are doing something useful here return 13 } suspend fun doSomethingUsefulTwo(): Int { delay(1000L) // pretend we are doing something useful here, too return 29 }
이런 상황은 structured concurrency 를 사용해서 해결할 수 있다.
coroutine scope 로 한번 감싸서, 어디서 쓸 수 있는 형태가 아닌, coroutine scope 안에서만 사용할 수 있도록 바꾼다. 코루틴들 사이에서 exception 이 발생되면, exception이 전파되면서 코루틴이 전부 취소된다.
fun main() = runBlocking<Unit> { val time = measureTimeMillis { println("The answer is ${concurrentSum()}") } println("Completed in $time ms") } suspend fun concurrentSum(): Int = coroutineScope { val one = async { doSomethingUsefulOne() } val two = async { doSomethingUsefulTwo() } one.await() + two.await() } suspend fun doSomethingUsefulOne(): Int { delay(1000L) // pretend we are doing something useful here return 13 }
어떤 코루틴에서 exception이 발생했을 때, 코루틴이 cancel되면, hierarchy로 전파가 된다. 전파가 되면 다 종료가 된다.
fun main() = runBlocking<Unit> { try { failedConcurrentSum() } catch(e: ArithmeticException) { println("Computation failed with ArithmeticException") } } suspend fun failedConcurrentSum(): Int = coroutineScope { val one = async<Int> { try { delay(Long.MAX_VALUE) // Emulates very long computation 42 } finally { println("First child was cancelled") } } val two = async<Int> { println("Second child throws an exception") throw ArithmeticException() } one.await() + two.await() }
5. Coroutines under the hood
어떻게 내가 순차적으로 작성한 코드가 비동기도 되고 콜백도 되고 하는거지?
어떻게 코루틴에서 중단되었다가 재개될 수 있는거지?
코틀린이 내부적으로 어떻게 동작하는 걸까? 마법은 없다..
Continuation-Passing Style = CPS
내부적으로, 콜백같은걸 계속 넘겨주면서 콜백 콜백 되는거다..
kotlin suspending function
CPS transformation
JVM에 들어갈 때는, 우리가 호출한 함수에 Continuation cont 가 인수에 생긴다.
1. Labels 작업
함수가 재개될 때 suspend point 가 필요하다.
suspend fun postItem(item: Item) { // LABEL 0 val token = requestToken() // LABEL 1 val post = createPost(token, item) // LABEL 2 processPost(post) }
2. 내부적으로 switch-case 문으로 디컴파일
fun postItem(item: Item) { switch(label) { case 0: val token = requestToken() case 1: val post = createPost(token, item) case 2: processPost(post) } }
3. Continuation-Passing Style로 디컴파일
fun postItem(item: Item, cont: Continuation) { val sm = object : CoroutineImpl { ... } switch(sm.label) { case 0: requestToken(sm) case 1: createPost(token, item, sm) case 2: processPost(post) } }
continuation 객체가 있어서, 매번 함수 호출할때마다 continuation 넘겨준다. 이는 콜백/인터페이스 같은거라, resume 등의 인터페이스 가진 객체다. sm은 state machine 을 의미하며 각각의 함수가 호출될 때, 상태를 같이 넘겨주는 용도이다. 이 sm 의 정체는 결국 continuation 이다. resume은 자기 자신을 다시 불러준다. 그 때, label 값을 하나 올려서 다른 case를 실행한다.
코루틴 스코프에서 모든 코루틴들을 실행하도록 하고, 화면을 나가면, 코루틴 스코프에 cancel을 해주면 모든 job들이 취소가 된다.
안드로이드에서, 라이프사이클이 있는 클래스인 경우, 라이프사이클에 맞게 코루틴 스코프를 연결해놓고 실행시키면 된다.
6. Coroutine Context and Dispatchers
dispatcher와 thread의 관계에 대해서 알아보자.
코루틴 컨텍스트 요소에는 dispatcher가 있다. dispatcher는 코루틴이 어떤 스레드나 스레드풀에서 실행될지 결정하는 element이다. 코루틴 빌더는 모두 옵션으로 coroutine context를 받는다.
어떤 스레드에서 실행되고 있는지 출력하는 예제코드를 보자.
- runBlocking에서 옵셔널 파라미터 없이 그냥 실행하면, 자신이 실행된 컨텍스트 상속받아서 작업하기 때문에, 메인 스레드(=runBlocking 이랑 같은 컨텍스트)
- unconfined는 메인 스레드에서 실행.
- default는 DefaultDispatcher-worker-1에서 실행. 글로벌 스코프에서 실행했던 그런 코루틴들이 실행되는 스레드(= 기본 스레드)와 같은 스레드.
- newSingleThreadContext는 코루틴 실행할 때마다 스레드 하나 만든다.
fun main() = runBlocking<Unit> { launch { // context of the parent, main runBlocking coroutine println("main runBlocking : I'm working in thread ${Thread.currentThread().name}") } launch(Dispatchers.Unconfined) { // not confined -- will work with main thread println("Unconfined : I'm working in thread ${Thread.currentThread().name}") } launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher println("Default : I'm working in thread ${Thread.currentThread().name}") } newSingleThreadContext("MyOwnThread").use { // will get its own new thread launch(it) { println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}") } } }
코루틴 디버깅 -> JVM 옵션에 코루틴 디버그 옵션 Dkotlinx.coroutines.debug을 설정한다.
스레드를 스위칭 하는 예제를 보자. 아주 중요한 예제라고 한다.withContext를 사용하면 스레드를 점프해서 스위칭할 수 있다.
스레드 context를 만들었을 때, use 를 이용해서 스레드를 close를 해주었다.
fun main() { newSingleThreadContext("Ctx1").use { ctx1 -> newSingleThreadContext("Ctx2").use { ctx2 -> runBlocking(ctx1) { log("Started in ctx1") withContext(ctx2) { log("Working in ctx2") } log("Back to ctx1") } } } }
withContext에 dispatcher를 ctx2로 주게 되면, ctx2스레드에서 코루틴이 실행된다. ( 하나의 코루틴에서 스레드 전환. )
실행 결과:
코루틴 컨텍스트에 job 이라는 중요한 요소가 있다. 어떤 element를 탐색할 수 있다.
fun main() = runBlocking<Unit> { println("My job is ${coroutineContext[Job]}") launch { println("My job is ${coroutineContext[Job]}") } async { println("My job is ${coroutineContext[Job]}") } }
실행 결과 :
어떤 새로운 코루틴이 실행되면, 부모 코루틴의 자식이 된다. GlobalScope는 독립적으로 job이 생성되고 부모-자식 관계가 생성되지 않는다. 부모 코루틴이 cancel 되면 자식 코루틴도 cancel 되는가? -> 당연히 자식 코루틴만 종료가 된다.
fun main() = runBlocking<Unit> { // launch a coroutine to process some kind of incoming request val request = launch { // it spawns two other jobs launch(Job()) { println("job1: I run in my own Job and execute independently!") delay(1000) println("job1: I am not affected by cancellation of the request") } // and the other inherits the parent context launch { delay(100) println("job2: I am a child of the request coroutine") delay(1000) println("job2: I will not execute this line if my parent request is cancelled") } } delay(500) request.cancel() // cancel processing of the request delay(1000) // delay a second to see what happens println("main: Who has survived request cancellation?") }
실행 결과 :
부모 코루틴은 자식 코루틴들이 실행이 모두 완료될때까지 기본적으로 기다려준다.
fun main() = runBlocking<Unit> { // launch a coroutine to process some kind of incoming request val request = launch { repeat(3) { i -> // launch a few children jobs launch { delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms println("Coroutine $i is done") } } println("request: I'm done and I don't explicitly join my children that are still active") } request.join() // wait for completion of the request, including all its children println("Now processing of the request is complete") }
코루틴 스코프에서 모든 코루틴들을 실행하도록 하고, 화면을 나가면(destroy), 코루틴 스코프에 cancel을 해주면 mainScope의 모든 job들이 취소가 된다. 혹은 안드로이드에서, 라이프사이클이 있는 클래스인 경우, 라이프사이클에 맞게 코루틴 스코프를 연결해놓고 실행시키면 된다.
class Activity { private val mainScope = CoroutineScope(Dispatchers.Default) // use Default for test purposes fun destroy() { mainScope.cancel() } fun doSomething() { // launch ten coroutines for a demo, each working for a different time repeat(10) { i -> mainScope.launch { delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc println("Coroutine $i is done") } } } } // class Activity ends fun main() = runBlocking<Unit> { val activity = Activity() activity.doSomething() // run test function println("Launched coroutines") delay(500L) // delay for half a second println("Destroying activity!") activity.destroy() // cancels all coroutines delay(1000) // visually confirm that they don't work }
728x90'프로그래밍 언어 > kotlin' 카테고리의 다른 글
[Kotlin In Action] 코틀린 타입 시스템 (0) 2021.08.06 [Kotlin in Action] 람다로 프로그래밍 (0) 2021.07.26 [Kotlin in action] 클래스, 객체, 인터페이스 (0) 2021.07.25 [Kotlin in Action] 함수의 정의와 호출 (0) 2021.07.13 [코틀린 인 액션] 코틀린 기초 (0) 2021.07.06