AsyncStream / AsyncThrowingStream (feat. RxSwift + Concurrency)
안녕하세요 :) Zedd입니다.
오늘은 AsyncStream에 대해서 공부해보려고 합니다 :D
오늘 공부할 AsyncStream은 반드시 AsyncSequence를 알아야 이해가 가능합니다.
# AsyncStream
✔️ 정의 : 순서가 있고, 비동기적으로 생성된 요소들의 sequence ✔️
정의도 한번에 와닿지 않고, AsyncStream에 대한 이런 저런 이야기가 많지만 딱 하나만 기억하면 됩니다.
📝 AsyncSequence를 생성하는 인터페이스!!!!!! 📝
그래서 AsyncSequence를 알아야 이해가 가능하다고 말한거였어요
정말 간단한 예를 들어봅시다.
비동기랑은 상관없지만;; 1부터 10까지의 요소가 있는 AsyncSequence를 생성하고 싶다고 칩시다.
물론 Swift Concurrency ) AsyncSequence 에서 본 것 처럼 직접 타입을 만들어서 해도 되지만...
AsyncStream을 이용하면 훨씬 간단해집니다.
다음의 step을 따라 AsyncStream을 만들어봅시다!
1. AsyncStream을 만든다.
2. 타입을 지정한다.
3. 클로져안에서 하고싶은 일들을 한다.
✔️ 1. AsyncStream을 만든다.
let digits = AsyncStream
✔️ 2. 타입을 지정한다.
저는 1부터 10까지의 요소가 있는 Sequence를 만들고 싶기 때문에 Int로 지정해주겠습니다.
let digits = AsyncStream<Int>
✔️ 3. 클로져안에서 하고싶은 일들을 한다.
let digits = AsyncStream<Int> { continuation in
for digit in 1...10 {
continuation.yield(digit)
}
continuation.finish()
}
이렇게 하면 얼렁뚱땅 AsyncSequence가 만들어지고,
Sequence이기 때문에 for-in loop에서 사용할 수 있게 됩니다.
for await digit in digits {
print(digit)
}
/*
1
2
...
10
*/
3. 클로져안에서 하고싶은 일들을 한다. 에서
let digits = AsyncStream<Int> { continuation in
for digit in 1...10 {
continuation.yield(digit)
}
continuation.finish()
}
continuation, yield, finish같은 뭔가 알수없는 것들이 나왔는데요! 하나씩 보겠습니다.
# Continuation
continuation은 그냥 parameters죠!
하지만 궁금한건 왜 하필!! continuation이라는 이름이냐..인 것입니다.
→ continuation 파라미터의 타입이 AsyncStream.Continuation이기 때문입니다.
# yield / finish
let digits = AsyncStream<Int> { continuation in
for digit in 1...10 {
continuation.yield(digit)
}
continuation.finish()
}
그럼 yield와 finish는 AsyncStream.Continuation의 메소드겠네요.
맞습니다!
아주 간단하게 설명하면
✔️ yield - 스트림에 Element를 제공
✔️ finish - 정상적으로 스트림을 종료(sequence iterator가 sequence를 종료하는 nil을 생성하도록 함)
입니다.
다시 코드를 보도록 합시다.
let digits = AsyncStream<Int> { continuation in
for digit in 1...10 {
continuation.yield(digit)
}
continuation.finish()
}
내가 원하는건 1부터 10까지의 요소가 있는 AsyncSequence였습니다.
for문을 돌면서 1부터 10을 yield해주고, 스트림을 종료(finish)시켜 줍니다.
# AsyncThrowingStream
Swift Concurrency ) AsyncSequence에서 Throwable한 AsyncSequence도 만들어 봤었는데요.
AsyncStream은 에러를 throw하지 못합니다.
대신!! AsyncThrowingStream이라는 것이 따로 있습니다.
AsyncStream이랑 다 똑같습니다! 다른점은 에러를 던질 수 있는 AsyncSequence를 만드는 친구인거죠.
위에서 했던거랑 똑같이 만들어보겠습니다.
1. AsyncThrowingStream을 만든다.
2. 타입을 지정한다.
3. 클로져안에서 하고싶은 일들을 한다.
✔️ 1. AsyncThrowingStream을 만든다.
let digits = AsyncThrowingStream
✔️ 2. 타입을 지정한다.
let digits = AsyncThrowingStream<Int, Error>
AsyncThrowingStream이기 때문에 Error타입도 같이 지정해줘야합니다.
✔️ 3. 클로져안에서 하고싶은 일들을 한다.
let digits = AsyncThrowingStream<Int> { continuation in
for digit in 1...10 {
continuation.yield(digit)
}
continuation.finish()
}
💡 여기서 continuation의 타입은 AsyncThrowingStream.Continuation입니다.
AsyncThrowingStream이니 에러 한번 던져봐야겠죠
let digits = AsyncThrowingStream<Int, Error> { continuation in
for digit in 1...10 {
continuation.yield(digit)
}
continuation.finish(throwing: ZeddError.someError)
}
finish()대신 finish(throwing: )을 사용하면 됩니다.
digits를 사용하는 곳에서는
do {
for try await digit in digits {
print(digit)
}
} catch {
}
for-try-await-in loop를 사용해야겠죠?
# onTermination
continuation.onTermination = { termination in
print(termination)
}
이런식으로 onTermination 콜백을 설정할 수도 있습니다.
termination은 AsyncStream.Continuation.Termination enum 타입인데요.
(물론 AsyncThrowingStream은 AsyncThrowingStream.Continuation.Termination이겠죠?)
finished와 cancelled가 있습니다.
✔️ finish - 스트림이 finish메소드를 통해 종료되었을 때
✔️ cancelled - 스트림이 취소되었을 때
그래서 다음 코드와 같이
let digits = AsyncStream(Int.self) { continuation in
continuation.onTermination = { termination in
switch termination {
case .finished:
print("finished")
case .cancelled:
print("cancelled")
}
}
for digit in 1...100 {
print(digit) ✅
continuation.yield(digit)
}
print("finish before") ✅
continuation.finish()
print("finish after") ✅
}
switch case문을 사용하여 print를 찍게 해줬습니다.
위 코드로 어떤 것들이 찍힐까요?
1
2
3
4
5
6
7
8
9
10
finish before
finished // onTermination callback
finish after
요렇게 찍히게 됩니다.
cancelled를 직접 줄 순 없을까요?
줄 수 있습니다!!
continuation.onTermination?(.cancelled)
이렇게 해줄 수 있어요. 물론
continuation.onTermination?(.finished)
도 가능합니다 :D
let digits = AsyncStream(Int.self) { continuation in
continuation.onTermination = { termination in
switch termination {
case .finished:
print("finished")
case .cancelled:
print("cancelled")
}
}
for digit in 1...10 {
if digit == 5 {
continuation.onTermination?(.cancelled) ✅
} else {
continuation.yield(digit)
}
print(digit)
continuation.yield(digit)
}
print("finish before")
continuation.finish()
print("finish after")
}
코드가 길지만 봐야할 곳은 하나입니다.
if digit == 5 {
continuation.onTermination?(.cancelled)
}
digit이 5일때 cancelled로 onTermination을 호출했습니다.
🤔 : cancelled는 스트림이 취소된다고 했으니까...
do {
for try await digit in digits {
print(digit)
}
} catch {
}
1, 2, 3, 4 까지만 호출되고 스트림이 종료되는건가..?
했는데, 그냥 1~10까지 정상적으로 다 나오더라구요.
continuation.onTermination = { termination in
switch termination {
case .finished:
print("finished")
case .cancelled:
continuation.finish() ✅
}
}
이렇게 onTermination내에서 취소되었을 때 니가 어떻게 하고싶은지 정의할 수 있게 한 것 같다는 생각이..(추측)
그래서 위 코드처럼 cancelled되었을 때 finish()를 호출해주면
do {
for try await digit in digits {
print(digit)
}
} catch {
}
// 1
// 2
// 3
// 4
요렇게 호출되더라구요 ㅎㅎ
# RxSwift 6.5.0
Swift Concurrency ) AsyncSequence에서 RxSwift 6.5.0이야기도 했었죠.
do {
for try await value in observable.values {
print("Got a value:", value)
}
} catch {
print("Got an error:", error)
}
저번 글에서는 그냥 observable.values가 AsyncSequence구나!로만 끝냈었는데,
오늘은 우리가 AsyncStream을 배웠잖아요?
그렇다면 이제 values의 내부 구조를 이해할 수 있을 것 같습니다 :D
Observable의 values는 Observable+Concurrency.swift에 구현 되어있습니다.
var values: AsyncThrowingStream<Element, Error> {
AsyncThrowingStream<Element, Error> { continuation in
let disposable = asObservable().subscribe(
onNext: { value in continuation.yield(value) },
onError: { error in continuation.finish(throwing: error) },
onCompleted: { continuation.finish() },
onDisposed: { continuation.onTermination?(.cancelled) }
)
continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}
✔️ 보통 Observable은 error를 낼 수 있으니 AsyncThrowingStream로 정의가 되어있는 것을 확인할 수 있습니다.
✔️ on~ 일때마다 각각 상황에 맞는 메소드를 호출하고 있습니다.
또한 다음과 같이 Infallible, Driver, Signal은 Observable과 달리 에러를 발생시키지 않도록 보장되어있는데요.
for await value in infallible.values {
print("Got a value:", value)
}
이 친구들에게도 values가 추가되었습니다.
🤔: Observable.values가 AsyncThrowingStream이었는데..에러를 발생시키지 않는다? AsyncStream..??
맞습니다!! 이 친구들의 values는 Infallible+Concurrency.swift 에 구현 되어있는데요.
var values: AsyncStream<Element> {
AsyncStream<Element> { continuation in
let disposable = subscribe(
onNext: { value in continuation.yield(value) },
onCompleted: { continuation.finish() },
onDisposed: { continuation.onTermination?(.cancelled) }
)
continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}
이렇게 구현이 되어있습니다 👻
역시나 AsyncStream이네요
대충 AsyncStream과 AsyncThrowingStream이 어떤 친구들인지 이해가 가시나요!?
저는 AsyncSequence는 뭐고 AsyncStream은 뭐야.. 왜이렇게 새로운거 많이 냈어 갑자기... 얘네 뭔데..
WWDC21 ) Meet AsyncSequence 보는데도 갑자기 AsyncStream이 왜나와... 이랬는데!!!!
이젠 어느정도 이해한 것 같아 기분이 좋네요 🐥
사실 아직 AsyncStream / AsyncThrowingStream에서 안본것들이 좀 있기도하고(BufferingPolicy 같은거)
실제 프로젝트에서 어떻게 쓸 수 있는지도 더 고민해보려고 해요.
(참고로 AsyncStream / AsyncThrowingStream은 iOS 13부터 사용가능합니다 :D)
이런것들은 시간되면 정리해보려고 합니다!
틀린점이 있다면 댓글 남겨주세요~