티스토리 뷰
변환 연산자
데이터 흐름을 원하는 대로 변형할 수 있습니다.
concatMap
flatMap와 매우 비슷합니다.
flatMap은 먼저 들어온 데이터를 처리하는 도중에 새로운 데이터가 들어오면 나중에 들어온 데이터의 처리 결과가 먼저 출력될 수도 있습니다. 하지만 concatMap은 먼저 들어온 데이터 순서대로 처리해서 결과를 낼 수 있도록 보장합니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class ConcatExample {
fun marbleDiagram() {
CommonUtils.start()
val balls = arrayOf("1", "3", "5")
val source = Observable.interval(100, TimeUnit.MILLISECONDS)
.map { idx -> balls[idx.toInt()] }
.take(balls.size.toLong())
.concatMap {
ball -> Observable.interval(200, TimeUnit.MILLISECONDS)
.map { "$ball ◇" }
.take(2)
}
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(2000)
}
}
fun main() {
val demo = ConcatExample()
demo.marbleDiagram()
}
출력
RxComputationThreadPool-2 | 456 | value = 1 ◇
RxComputationThreadPool-2 | 653 | value = 1 ◇
RxComputationThreadPool-3 | 854 | value = 3 ◇
RxComputationThreadPool-3 | 1054 | value = 3 ◇
RxComputationThreadPool-4 | 1256 | value = 5 ◇
RxComputationThreadPool-4 | 1456 | value = 5 ◇
- 100ms 간격으로 interval을 호출한 후 0부터 발생하는 Long을 Int로 변환하면서 숫자 1, 3, 5를 문자열로 변환합니다.
- 입력인 1이 100ms 간격으로 발생하지만 출력인 ◇는 200ms 간격으로 발생하기 때문에 입력과 출력의 순서가 역전될 수 있다. 이것을 concatMap으로 잡아 줍니다.
- take(2)를 사용하여 다이아몬드 2 개를 발행합니다.
아래는 concatMap을 flatMap으로 바꾼 코드입니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class ConcatExample {
fun interleaving() {
CommonUtils.start()
val balls = arrayOf("1", "3", "5")
val source = Observable.interval(100, TimeUnit.MILLISECONDS)
.map { idx -> balls[idx.toInt()] }
.take(balls.size.toLong())
.flatMap { ball ->
Observable.interval(200, TimeUnit.MILLISECONDS)
.map { "$ball ◇" }
.take(2)
}
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(2000)
}
}
fun main() {
val demo = ConcatExample()
demo.interleaving()
}
출력
RxComputationThreadPool-2 | 483 | value = 1 ◇
RxComputationThreadPool-3 | 565 | value = 3 ◇
RxComputationThreadPool-4 | 667 | value = 5 ◇
RxComputationThreadPool-2 | 681 | value = 1 ◇
RxComputationThreadPool-3 | 766 | value = 3 ◇
RxComputationThreadPool-4 | 867 | value = 5 ◇
원이 들어가는 속도보다 다이아몬드가 나오는 속도가 느리므로 섞이게 됩니다.
실행 시간은 concatMap을 사용했을 때보다 훨씬 빠릅니다. 이유는 인터리빙을 허용하기 때문입니다.
즉, concatMap의 순서를 보장해주려면 추가 시간이 필요하다는 사실을 알 수 있습니다.
switchMap
concatMap이 인터리빙이 발생할 수 있는 상황에서 동작의 순서를 보장해 준다면, switchMap은 순서를 보장하기 위해 기존에 진행 중이던 작업을 바로 중단합니다.
중간에 끊기더라도 마지막 데이터의 처리는 보장하기 때문에 여러 개의 값이 발행 되었을 때 마지막에 들어온 값만 처리하고 싶을 때 사용합니다.
빨간색의 경우 정상적으로 처리했지만 초록색을 처리하는 도중에 파란색이 들어와서 초록색의 처리는 중단하고 파란색을 처리하는 것을 볼 수 있습니다.
아래는 concatMap을 switchMap으로 바꾼 코드입니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class SwitchMapExample {
fun marbleDiagram() {
CommonUtils.start()
val balls = arrayOf("1", "3", "5")
val source = Observable.interval(100, TimeUnit.MILLISECONDS)
.map { idx -> balls[idx.toInt()] }
.take(balls.size.toLong())
.switchMap { ball ->
Observable.interval(200, TimeUnit.MILLISECONDS)
.map { "$ball ◇" }
.take(2)
}
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(2000)
}
}
fun main() {
val demo = SwitchMapExample()
demo.marbleDiagram()
}
출력
RxComputationThreadPool-4 | 816 | value = 5 ◇
RxComputationThreadPool-4 | 1015 | value = 5 ◇
아래는 doOnNext를 통해 위 코드가 어떻게 동작했는지 알아보는 코드입니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class SwitchMapExample {
fun usingDoOnNext() {
CommonUtils.start()
val balls = arrayOf("1", "3", "5")
val source = Observable.interval(100, TimeUnit.MILLISECONDS)
.map { idx -> balls[idx.toInt()] }
.take(balls.size.toLong())
.doOnNext { data -> Log.it(data) }
.switchMap { ball ->
Observable.interval(200, TimeUnit.MILLISECONDS)
.map { "$ball ◇" }
.take(2)
}
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(2000)
}
}
fun main() {
val demo = SwitchMapExample()
demo.usingDoOnNext()
}
출력
RxComputationThreadPool-1 | 293 | value = 1
RxComputationThreadPool-1 | 381 | value = 3
RxComputationThreadPool-1 | 480 | value = 5
RxComputationThreadPool-4 | 682 | value = 5 ◇
RxComputationThreadPool-4 | 882 | value = 5 ◇
위 결과를 통해 새로운 사실을 알 수 있습니다.
-
Observable은 데이터를 발행하는 스레드와 값을 전달하는 스레드를 다르게 사용합니다. switchMap 예제를 보면 1번 스레드 없이 2, 3, 4 번만 발생한 것을 볼 수 있다. 이 말은 1 번 스레드를 값을 발행하는 데 사용했다는 것입니다.
-
5◇만 출력을 했습니다. 원은 100ms 간격으로 발행하고 다이아몬드는 200ms 간격으로 발행하기 때문에 1◇가 발행되기 전에 5가 발행되었습니다. 따라서 중간에 있던 3의 발행이 취소되고 5를 처리한 결과인 5◇만 두 번 출력됩니다.
switchMap은 센서 등의 값을 얻어와서 동적으로 처리하는 경우에 매우 유용합니다. 센서 값은 중간 값보다는 최종적인 값으로 결과를 처리하는 경우가 많기 때문입니다.
flatMap으로 매번 새로운 결과를 검사하지 말고 switchMap을 사용하면 됩니다.
groupBy
어떤 기준으로 단일 Observable을 여러 개로 이루어진 Observable 그룹으로 만듭니다.
입력
import common.Shape
import io.reactivex.Observable
import io.reactivex.observables.GroupedObservable
class GroupByExample {
fun marbleDiagram() {
val objs = arrayOf("6", "4", "2-T", "2", "6-T", "4-T")
val source : Observable<GroupedObservable<String, String>> = Observable.fromArray(*objs).groupBy(Shape()::getShape)
source.subscribe { obj -> obj.subscribe { data -> println("GROUP: ${obj.key} \t Value: $data") } }
}
}
fun main() {
val demo = GroupByExample()
demo.marbleDiagram()
}
출력
GROUP: BALL Value: 6
GROUP: BALL Value: 4
GROUP: TRIANGLE Value: 2-T
GROUP: BALL Value: 2
GROUP: TRIANGLE Value: 6-T
GROUP: TRIANGLE Value: 4-T
GroupedObservable은 Observable과 동일하지만 getKey라는 메서드를 제공하여 구분된 그룹을 알 수 있게 해 줍니다.
source는 objs [] 배열에서 입력 데이터를 가져온다. 그룹은 Shape(). getShape를 호출한 것입니다.
또한, source.subscribe에 전달하는 obj는 GroupedObservable 객체입니다. 그룹 별로 1 개씩 생성되므로 생성된 obj 별로 subscribe를 한 번 더 호출해야 합니다.
Shape 클래스는 아래와 같습니다.
Shape 클래스
class Shape {
fun getShape(obj: String): String {
if (obj == "") return "NO-SHAPE"
if (obj.endsWith("-H")) return "HEXAGON"
if (obj.endsWith("-O")) return "OCTAGON"
if (obj.endsWith("-R")) return "RECTANGLE"
if (obj.endsWith("-T")) return "TRIANGLE"
if (obj.endsWith("◇")) return "DIAMOND"
return "BALL"
}
}
아래는 특정 그룹만 처리하기 위해서 filter 함수를 추가한 코드입니다.
입력
import common.Shape
import io.reactivex.Observable
import io.reactivex.observables.GroupedObservable
class GroupByExample {
fun filterBallGroup() {
val objs = arrayOf("6", "4", "2-T", "2", "6-T", "4-T")
val source : Observable<GroupedObservable<String, String>> = Observable.fromArray(*objs).groupBy(Shape()::getShape)
source.subscribe { obj ->
obj.filter { obj.key.equals("BALL") }
.subscribe { data -> println("GROUP: ${obj.key} \t Value: $data") }
}
}
}
fun main() {
val demo = GroupByExample()
demo.filterBallGroup()
}
출력
GROUP: BALL Value: 6
GROUP: BALL Value: 4
GROUP: BALL Value: 2
source.subscribe 부분에 obj.filter만 추가되었습니다.
실행 결과는 BALL 그룹만 출력합니다.
scan
reduce 함수와 비슷한 함수입니다. 하지만 reduce 함수는 Observable에서 모든 데이터가 입력된 후 그것을 종합하여 마지막 1 개의 데이터 만을 구독자에게 발행한다면, scan 함수는 실행할 때마다 입력 값에 맞는 중간 결과 및 최종 결과를 구독자에게 발행합니다.
입력
import common.Log
import io.reactivex.Observable
class ScanExample {
fun marbleDiagram() {
val balls = arrayOf("1", "3", "5")
val source = Observable.fromArray(*balls)
.scan { ball1, ball2 -> "$ball2($ball1)" }
source.subscribe { data -> Log.it(data) }
}
}
fun main() {
val demo = ScanExample()
demo.marbleDiagram()
}
출력
main | value = 1
main | value = 3(1)
main | value = 5(3(1))
reduce와 scan의 다른 점은 source의 타입이 reduce는 Maybe, scan은 Observable입니다.
reduce의 경우 마지막 값이 입력되지 않거나 onComplete 이벤트가 발생하지 않으면 구독자에게 값을 발행하지 않습니다. 따라서 값을 발행하지 않고 종료할 수도 있으므로 Maybe 클래스 타입입니다.
scan은 값이 입력될 때마다 구독자에게 값을 발행합니다. 따라서 Observable입니다.
'알려주는 이야기 > RxJava' 카테고리의 다른 글
16. RxJava - 조건 연산자 (0) | 2020.07.19 |
---|---|
15. RxJava - 결합 연산자 (0) | 2020.07.17 |
13. RxJava - 생성 연산자 (0) | 2020.07.10 |
12. RxJava - Reduce (0) | 2020.07.09 |
11. RxJava - Filter (0) | 2020.07.09 |