Swift/Concurrency

AsyncStream / AsyncThrowingStream (feat. RxSwift + Concurrency)

Zedd0202 2022. 2. 7. 08:08
반응형

 

안녕하세요 :) Zedd입니다.

오늘은 AsyncStream에 대해서 공부해보려고 합니다 :D

오늘 공부할 AsyncStream은 반드시 AsyncSequence를 알아야 이해가 가능합니다. 

 

# AsyncStream

✔️ 정의 : 순서가 있고, 비동기적으로 생성된 요소들의 sequence ✔️

정의도 한번에 와닿지 않고, AsyncStream에 대한 이런 저런 이야기가 많지만 딱 하나만 기억하면 됩니다.

📝 AsyncSequence를 생성하는 인터페이스!!!!!! 📝 

그래서 AsyncSequence를 알아야 이해가 가능하다고 말한거였어요

 

정말 간단한 예를 들어봅시다.

비동기랑은 상관없지만;; 1부터 10까지의 요소가 있는 AsyncSequence를 생성하고 싶다고 칩시다.

물론 Swift Concurrency ) AsyncSequence 에서 본 것 처럼 직접 타입을 만들어서 해도 되지만...

AsyncStream을 이용하면 훨씬 간단해집니다.

다음의 step을 따라 AsyncStream을 만들어봅시다!

1. AsyncStream을 만든다.

2. 타입을 지정한다.

3. 클로져안에서 하고싶은 일들을 한다. 

 

✔️ 1. AsyncStream을 만든다. 

let digits = AsyncStream

✔️ 2. 타입을 지정한다.

저는 1부터 10까지의 요소가 있는 Sequence를 만들고 싶기 때문에 Int로 지정해주겠습니다. 

let digits = AsyncStream<Int>

✔️ 3. 클로져안에서 하고싶은 일들을 한다.

let digits = AsyncStream<Int> { continuation in
    for digit in 1...10 {
      continuation.yield(digit)
    }
    continuation.finish()
}

이렇게 하면 얼렁뚱땅 AsyncSequence가 만들어지고,

Sequence이기 때문에 for-in loop에서 사용할 수 있게 됩니다. 

for await digit in digits {
  print(digit)
}
/*
1
2
...
10
*/

3. 클로져안에서 하고싶은 일들을 한다. 에서 

let digits = AsyncStream<Int> { continuation in
    for digit in 1...10 {
      continuation.yield(digit)
    }
    continuation.finish()
}

continuation, yield, finish같은 뭔가 알수없는 것들이 나왔는데요! 하나씩 보겠습니다.

 

# Continuation

continuation은 그냥 parameters죠!

하지만 궁금한건 왜 하필!! continuation이라는 이름이냐..인 것입니다.

continuation 파라미터의 타입이 AsyncStream.Continuation이기 때문입니다. 

 

# yield / finish

let digits = AsyncStream<Int> { continuation in
    for digit in 1...10 {
      continuation.yield(digit)
    }
    continuation.finish()
}

그럼 yield와 finish는 AsyncStream.Continuation의 메소드겠네요. 

맞습니다!

아주 간단하게 설명하면

✔️ yield - 스트림에 Element를 제공

✔️ finish - 정상적으로 스트림을 종료(sequence iterator가 sequence를 종료하는 nil을 생성하도록 함) 

입니다. 

 

다시 코드를 보도록 합시다. 

let digits = AsyncStream<Int> { continuation in
    for digit in 1...10 {
      continuation.yield(digit)
    }
    continuation.finish()
}

내가 원하는건 1부터 10까지의 요소가 있는 AsyncSequence였습니다. 

for문을 돌면서 1부터 10을 yield해주고, 스트림을 종료(finish)시켜 줍니다.

 

# AsyncThrowingStream

Swift Concurrency ) AsyncSequence에서 Throwable한 AsyncSequence도 만들어 봤었는데요.

AsyncStream은 에러를 throw하지 못합니다.

대신!! AsyncThrowingStream이라는 것이 따로 있습니다.

AsyncStream이랑 다 똑같습니다! 다른점은 에러를 던질 수 있는 AsyncSequence를 만드는 친구인거죠.

위에서 했던거랑 똑같이 만들어보겠습니다.

1. AsyncThrowingStream을 만든다.

2. 타입을 지정한다.

3. 클로져안에서 하고싶은 일들을 한다. 

 

✔️ 1. AsyncThrowingStream을 만든다.

let digits = AsyncThrowingStream

✔️ 2.  타입을 지정한다.

let digits = AsyncThrowingStream<Int, Error>

AsyncThrowingStream이기 때문에 Error타입도 같이 지정해줘야합니다.

✔️ 3. 클로져안에서 하고싶은 일들을 한다. 

let digits = AsyncThrowingStream<Int> { continuation in
    for digit in 1...10 {
      continuation.yield(digit)
    }
    continuation.finish()
}

💡 여기서 continuation의 타입은 AsyncThrowingStream.Continuation입니다. 

 

AsyncThrowingStream이니 에러 한번 던져봐야겠죠 

let digits = AsyncThrowingStream<Int, Error> { continuation in
    for digit in 1...10 {
      continuation.yield(digit)
    }
    continuation.finish(throwing: ZeddError.someError)
}

finish()대신 finish(throwing: )을 사용하면 됩니다.

digits를 사용하는 곳에서는

do {
    for try await digit in digits {
        print(digit)
    }
} catch {

}

for-try-await-in loop를 사용해야겠죠?

 

# onTermination

continuation.onTermination = { termination in
    print(termination)
}

이런식으로 onTermination 콜백을 설정할 수도 있습니다.

termination은 AsyncStream.Continuation.Termination enum 타입인데요. 

(물론 AsyncThrowingStream은 AsyncThrowingStream.Continuation.Termination이겠죠?)

finished와 cancelled가 있습니다. 

✔️ finish - 스트림이 finish메소드를 통해 종료되었을 때

✔️  cancelled - 스트림이 취소되었을 때

그래서 다음 코드와 같이 

let digits = AsyncStream(Int.self) { continuation in
    continuation.onTermination = { termination in
        switch termination {
        case .finished:
            print("finished")
        case .cancelled:
            print("cancelled")
        }
    }

    for digit in 1...100 {
        print(digit) ✅
        continuation.yield(digit)
    }
    print("finish before") ✅
    continuation.finish()
    print("finish after") ✅
}

switch case문을 사용하여 print를 찍게 해줬습니다. 

위 코드로 어떤 것들이 찍힐까요? 

1
2
3
4
5
6
7
8
9
10
finish before
finished // onTermination callback
finish after

요렇게 찍히게 됩니다.

 

cancelled를 직접 줄 순 없을까요? 

줄 수 있습니다!!

continuation.onTermination?(.cancelled)

이렇게 해줄 수 있어요. 물론 

continuation.onTermination?(.finished)

도 가능합니다 :D

let digits = AsyncStream(Int.self) { continuation in
    continuation.onTermination = { termination in
        switch termination {
        case .finished:
            print("finished")
        case .cancelled:
            print("cancelled")
        }
    }

    for digit in 1...10 {
        if digit == 5 {
            continuation.onTermination?(.cancelled) ✅
        } else {
            continuation.yield(digit)
        }
        print(digit)
        continuation.yield(digit)
    }
    print("finish before")
    continuation.finish()
    print("finish after")
}

코드가 길지만 봐야할 곳은 하나입니다. 

if digit == 5 {
    continuation.onTermination?(.cancelled)
}

digit이 5일때 cancelled로 onTermination을 호출했습니다.

🤔 : cancelled는 스트림이 취소된다고 했으니까... 

do {
    for try await digit in digits {
        print(digit)
    }
} catch {

}

1, 2, 3, 4 까지만 호출되고 스트림이 종료되는건가..?

했는데, 그냥 1~10까지 정상적으로 다 나오더라구요.

continuation.onTermination = { termination in
    switch termination {
    case .finished:
        print("finished")
    case .cancelled:
        continuation.finish() ✅
    }
}

이렇게 onTermination내에서 취소되었을 때 니가 어떻게 하고싶은지 정의할 수 있게 한 것 같다는 생각이..(추측)

그래서 위 코드처럼 cancelled되었을 때 finish()를 호출해주면

do {
    for try await digit in digits {
        print(digit)
    }
} catch {

}
// 1
// 2
// 3
// 4

요렇게 호출되더라구요 ㅎㅎ

 

# RxSwift 6.5.0

Swift Concurrency ) AsyncSequence에서 RxSwift 6.5.0이야기도 했었죠. 

do {
    for try await value in observable.values {
        print("Got a value:", value)
    }
} catch {
    print("Got an error:", error)
}

저번 글에서는 그냥 observable.values가 AsyncSequence구나!로만 끝냈었는데,

오늘은 우리가 AsyncStream을 배웠잖아요?

그렇다면 이제 values의 내부 구조를 이해할 수 있을 것 같습니다 :D

Observable의 values는 Observable+Concurrency.swift에 구현 되어있습니다. 

var values: AsyncThrowingStream<Element, Error> {
    AsyncThrowingStream<Element, Error> { continuation in
        let disposable = asObservable().subscribe(
            onNext: { value in continuation.yield(value) },
            onError: { error in continuation.finish(throwing: error) },
            onCompleted: { continuation.finish() },
            onDisposed: { continuation.onTermination?(.cancelled) }
        )

        continuation.onTermination = { @Sendable _ in
            disposable.dispose()
        }
    }
}

✔️ 보통 Observable은 error를 낼 수 있으니 AsyncThrowingStream로 정의가 되어있는 것을 확인할 수 있습니다.

✔️ on~ 일때마다 각각 상황에 맞는 메소드를 호출하고 있습니다. 

 

또한 다음과 같이 Infallible, Driver, Signal은 Observable과 달리 에러를 발생시키지 않도록 보장되어있는데요. 

for await value in infallible.values {
    print("Got a value:", value)
}

이 친구들에게도 values가 추가되었습니다.

🤔: Observable.values가 AsyncThrowingStream이었는데..에러를 발생시키지 않는다? AsyncStream..??

맞습니다!! 이 친구들의 values는 Infallible+Concurrency.swift 에 구현 되어있는데요.

var values: AsyncStream<Element> {
    AsyncStream<Element> { continuation in
        let disposable = subscribe(
            onNext: { value in continuation.yield(value) },
            onCompleted: { continuation.finish() },
            onDisposed: { continuation.onTermination?(.cancelled) }
        )

        continuation.onTermination = { @Sendable _ in
            disposable.dispose()
        }
    }
}

이렇게 구현이 되어있습니다 👻

역시나 AsyncStream이네요

 


 

대충 AsyncStream과 AsyncThrowingStream이 어떤 친구들인지 이해가 가시나요!?

저는 AsyncSequence는 뭐고 AsyncStream은 뭐야.. 왜이렇게 새로운거 많이 냈어 갑자기... 얘네 뭔데..

WWDC21 ) Meet AsyncSequence 보는데도 갑자기 AsyncStream이 왜나와... 이랬는데!!!!

이젠 어느정도 이해한 것 같아 기분이 좋네요 🐥

사실 아직 AsyncStream / AsyncThrowingStream에서 안본것들이 좀 있기도하고(BufferingPolicy 같은거)

실제 프로젝트에서 어떻게 쓸 수 있는지도 더 고민해보려고 해요. 

(참고로 AsyncStream / AsyncThrowingStream은 iOS 13부터 사용가능합니다 :D) 

 

이런것들은 시간되면 정리해보려고 합니다! 

틀린점이 있다면 댓글 남겨주세요~ 

 

반응형