티스토리 뷰
조건 연산자
Observable의 흐름을 제어하는 역할을 합니다.
filter 연산자가 발행된 값을 채택하느냐 기각하느냐 여부에 초점을 맞춘다면, 조건 연산자는 지금까지의 흐름을 어떻게 제어할 것 인지에 초점을 맞춥니다.
amb
amb는 ambigious (모호한)라는 영어 단어의 줄임 말입니다.
여러 개의 Observable 중에서 1 개의 Observable을 선택하는데, 선택 기준은 가장 먼저 데이터를 발행하는 Observable입니다. 나머지 Observable에서 발행하는 데이터는 모두 무시합니다.
첫 번째 Observable인 원을 발행했으므로 사각형을 발행하는 두 번째 Observable에서 발행되는 값은 모두 무시합니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class AmbExample {
fun marbleDiagram() {
val data1 = arrayOf("1", "3", "5")
val data2 = arrayOf("2-R", "4-R")
val sources = listOf(
Observable.fromArray(*data1)
.doOnComplete { Log.d("Observable #1 : onComplete()") },
Observable.fromArray(*data2)
.delay(100, TimeUnit.MILLISECONDS)
.doOnComplete { Log.d("Observable #2 : onComplete()") }
)
Observable.amb(sources)
.doOnComplete { Log.d("Result : onComplete()") }
.subscribe { data -> Log.it(data) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = AmbExample()
demo.marbleDiagram()
}
출력
main | value = 1
main | value = 3
main | value = 5
main | debug = Observable #1 : onComplete()
main | debug = Result : onComplete()
첫 번째 Observable에서 onComplete 이벤트가 발생하면 Observable도 최종 완료가 됩니다.
takeUntil
take 함수에 조건을 설정할 수 있습니다.
인자로 받은 Observable에서 어떤 값을 발행하면 현재 Observable의 데이터 발행을 중단하고 즉시 완료합니다.
즉, take 함수처럼 일정 개수만 값을 발행하되, 완료 기준을 다른 Observable에서 값을 발행하는지로 판단하는 것입니다.
takeUntil 함수의 인자로는 값을 발행할 수 있는 다른 Observable이 필요합니다.
다른 Observable에서 값이 발행되면 기존 Observable에서 나오는 값을 더 발행하지 않고 완료합니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit
class TakeUntilExample {
fun marbleDiagram() {
val data = arrayOf("1", "2", "3", "4", "5", "6")
val source = Observable.fromArray(*data)
.zipWith(
Observable.interval(100, TimeUnit.MILLISECONDS),
BiFunction { value: String, _: Long -> value }
)
.takeUntil(Observable.timer(500, TimeUnit.MILLISECONDS))
source.subscribe { value -> Log.it(value) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = TakeUntilExample()
demo.marbleDiagram()
}
출력
RxComputationThreadPool-2 | value = 1
RxComputationThreadPool-2 | value = 2
RxComputationThreadPool-2 | value = 3
RxComputationThreadPool-2 | value = 4
fromArray 함수에 데이터를 넣고 zipWith과 interval을 활용하여 100ms 간격으로 데이터를 발행합니다.
takeUntil의 인자로는 timer을 호출하여 500ms 후에 값을 발행하도록 했습니다.
timer 함수는 값을 한 번만 발행하므로 takeUntil 함수와 함께 활용하기에 편리합니다.
skipUntil
takeUntil과 정반대로 Observable에서 데이터를 발행할 때까지 값을 건너뜁니다.
takeUntil 함수와는 다르게 다른 Observable에서 화살표가 나올 때까지는 값을 발행하지 않고 건너뛰다가 다른 Observable에서 값을 발행하는 순간부터 원래 Observable에서 값을 정상적으로 발행하기 시작합니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit
class SkipUntilExample {
fun marbleDiagram() {
val data = arrayOf("1", "2", "3", "4", "5", "6")
val source = Observable.fromArray(*data)
.zipWith(
Observable.interval(100, TimeUnit.MILLISECONDS),
BiFunction { value: String, _: Long -> value }
)
.skipUntil(Observable.timer(500, TimeUnit.MILLISECONDS))
source.subscribe { value -> Log.it(value) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = SkipUntilExample()
demo.marbleDiagram()
}
출력
RxComputationThreadPool-2 | value = 5
RxComputationThreadPool-2 | value = 6
all
주어진 조건이 100% 맞을 때만 true를 발행하고, 조건에 맞지 않는 데이터가 발행되면 바로 false 값을 발행합니다.
모두 원 모양이어야만 true를 발행합니다.
입력
import common.Log
import common.Shape
import io.reactivex.Observable
class AllFunctionExample {
fun marbleDiagram() {
val data = arrayOf("1", "2", "3", "4")
val source = Observable.fromArray(*data)
.map(Shape()::getShape)
.all(Shape().BALL::equals)
// .all { value -> Shape().BALL == Shape().getShape(value) }
source.subscribe { value -> Log.it(value) }
}
}
fun main() {
val demo = AllFunctionExample()
demo.marbleDiagram()
}
출력
main | value = true
'알려주는 이야기 > RxJava' 카테고리의 다른 글
18. RxJava - 스케줄러 개념 (0) | 2020.07.20 |
---|---|
17. RxJava - delay, timeInterval (0) | 2020.07.19 |
15. RxJava - 결합 연산자 (0) | 2020.07.17 |
14. RxJava - 변환 연산자 (0) | 2020.07.17 |
13. RxJava - 생성 연산자 (0) | 2020.07.10 |