티스토리 뷰
입력
import common.Log
import io.reactivex.Observable
class HelloRxJava2V2 {
fun emit() {
Observable.just("Hello", "RxJava2!!")
.subscribe { data -> Log.it(data) }
}
}
fun main() {
val demo = HelloRxJava2V2()
demo.emit()
}
출력
main | value = Hello
main | value = RxJava2!!
위에는 just를 사용한 간단한 예제입니다.
지금까지 배운 예제의 공통점은 대부분의 동작이 메인 스레드에서 이루어진다는 것입니다.
요구 사항에 맞게 비동기로 동작할 수 있도록 바꿔야 될 때가 있는데, 이때 스케줄러를 사용한다.
스케줄러는 스레드를 지정할 수 있게 해 줍니다.
스케줄러의 개념에 대해 알아보기 위해 마블 다이어그램을 살펴보겠습니다.
아래는 위 마블 다이어그램의 코드입니다.
입력
import common.CommonUtils
import common.Log
import common.Shape
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
class FlipExample {
fun marbleDiagram() {
val objs = arrayOf("1-S", "2-T", "3-P")
val source = Observable.fromArray(*objs)
.doOnNext { data -> Log.it("Original data = $data") }
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(Shape()::flip)
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(500)
}
}
fun main() {
val demo = FlipExample()
demo.marbleDiagram()
}
출력
RxNewThreadScheduler-1 | value = Original data = 1-S
RxNewThreadScheduler-1 | value = Original data = 2-T
RxNewThreadScheduler-1 | value = Original data = 3-P
RxNewThreadScheduler-2 | value = (flipped)1-S
RxNewThreadScheduler-2 | value = (flipped)2-T
RxNewThreadScheduler-2 | value = (flipped)3-P
objs에는 마블 다이어그램의 도형이 담겨 있습니다.
map 함수에 flip 함수를 호출하여 도형을 뒤집습니다.
flip함수는 아래와 같습니다.
입력
class Shape {
val FLIPPED = "(flipped)";
fun flip(item: String): String {
if (item.startsWith(FLIPPED)) return item.replace(FLIPPED, "")
return FLIPPED + item
}
}
doOnNext 함수를 통해 Observable에서 onNext 이벤트가 발생하면 원래의 데이터 값을 확인할 수 있습니다.
그리고 subscribeOn 함수를 통해 구독자가 Observable에서 구독할 때 실행되는 스레드를 지정합니다.
마지막으로 observeOn을 통해 Observable에서 생성한 데이터 흐름이 여기저기 함수를 거치며 처리될 때 동작이 어느 스레드에서 일어나는지 지정할 수 있습니다.
subscribeOn과 observeOn 함수에 인자로 Schedulers.newThread()를 넘겼는데, 이는 새로운 스레드를 생성한다는 의미입니다.
출력 결과를 보면 값을 처리하는 스레드가 다른 것을 볼 수 있습니다.
이처럼 스케줄러를 활용하는 비동기 프로그래밍의 핵심은 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 전달하는 슬레드를 분리할 수 있다는 것입니다.
아래는 위에 코드에서 observeOn 호출 부분을 제거한 코드입니다.
입력
import common.CommonUtils
import common.Log
import common.Shape
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
class FlipExample {
fun observeOnRemoved() {
val objs = arrayOf("1-S", "2-T", "3-P")
val source = Observable.fromArray(*objs)
.doOnNext { data -> Log.it("Original data = $data") }
.subscribeOn(Schedulers.newThread())
//.observeOn(Schedulers.newThread())
.map(Shape()::flip)
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(500)
}
}
fun main() {
val demo = FlipExample()
demo.observeOnRemoved()
}
출력
RxNewThreadScheduler-1 | value = Original data = 1-S
RxNewThreadScheduler-1 | value = (flipped)1-S
RxNewThreadScheduler-1 | value = Original data = 2-T
RxNewThreadScheduler-1 | value = (flipped)2-T
RxNewThreadScheduler-1 | value = Original data = 3-P
RxNewThreadScheduler-1 | value = (flipped)3-P
출력 결과를 보면 하나의 스레드에서 모든 처리를 하는 것을 볼 수 있습니다.
즉, observeOn 함수를 지정하지 않으면 subscribeOn 함수로 지정한 스레드에서 모든 로직을 실행합니다.
지금까지의 설명을 간단하게 정리하면 아래와 같습니다.
- 스케줄러는 RxJava 코드를 어느 스레드에서 실행할지 지정할 수 있다.
- subscribeOn 함수와 observeOn 함수를 모두 지정하면 Observable에서 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 발행하는 스레드를 분리할 수 있다.
- subscribeOn 함수만 호출하면 Observable의 동일한 스레드에서 실행된다.
- 스케줄러를 별도로 지정하지 않으면 메인 스레드에서 실행한다.
'알려주는 이야기 > RxJava' 카테고리의 다른 글
20. RxJava - 콜백 지옥 (0) | 2020.07.21 |
---|---|
19. RxJava - 스케줄러 종류 (0) | 2020.07.21 |
17. RxJava - delay, timeInterval (0) | 2020.07.19 |
16. RxJava - 조건 연산자 (0) | 2020.07.19 |
15. RxJava - 결합 연산자 (0) | 2020.07.17 |