티스토리 뷰
결합 연산자
생성 연산자와 변환 연산자는 1 개의 데이터 흐름을 다뤘습니다.
결합 연산자는 여러 개의 Observable을 조합하여 활용합니다.
zip
zip 함수의 특징은 각각의 Observable을 모두 활용해 2 개 혹은 그 이상의 Observable을 결합할 수 있다는 것입니다.
예를 들어 A, B 두 개의 Observable을 결합한다면 2 개의 Observable에서 모두 데이터를 발행할 때까지 발행을 기다립니다.
입력
import common.Log
import common.Shape
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
class ZipExample {
fun marbleDiagram() {
val shapes = arrayOf("BALL", "PENTAGON", "STAR")
val coloredTriangles = arrayOf("2-T", "6-T", "4-T")
val source = Observable.zip(
Observable.fromArray(*shapes).map(Shape()::getSuffix),
Observable.fromArray(*coloredTriangles).map(Shape()::getColor),
BiFunction<String?, String?, String> { suffix, color -> "$color$suffix" }
)
source.subscribe { data -> Log.it(data) }
}
}
fun main() {
val demo = ZipExample()
demo.marbleDiagram()
}
출력
main | value = 2
main | value = 6-P
main | value = 4-S
아래는 위에서 사용한 getSuffix 함수와 getColor 함수의 코드입니다.
입력
class Shape {
val HEXAGON = "HEXAGON"
val OCTAGON = "OCTAGON"
val RECTANGLE = "RECTANGLE"
val TRIANGLE = "TRIANGLE"
val DIAMOND = "DIAMOND"
val PENTAGON = "PENTAGON"
val BALL = "BALL"
val STAR = "STAR"
fun getSuffix(shape: String): String {
if (HEXAGON == shape) return "-H"
if (OCTAGON == shape) return "-O"
if (RECTANGLE == shape) return "-R"
if (TRIANGLE == shape) return "-T"
if (DIAMOND == shape) return "<>"
if (PENTAGON == shape) return "-P"
return if (STAR == shape) "-S"
else "" // 이것은 BALL
}
fun getColor(shape: String): String? {
if (shape.endsWith("<>")) //diamond
return shape.replace("<>", "").trim(' ')
val hyphen = shape.indexOf("-")
return if (hyphen > 0) {
shape.substring(0, hyphen)
} else shape
}
}
getSuffix 함수는 shape에서 값을 받아온 다음, 도형의 모양 접미사를 가져옵니다. ( 예를 들어 STAR은 -S )
그리고 getColor 함수는 값을 받아서 모양의 색상 값으로 변환합니다. ( 예를 들어 2-T는 2 )
위에서는 2 개의 Observable을 결합했지만, zip 함수는 최대 9 개의 Observable을 결합할 수 있습니다.
하지만 보통 2 개, 3 개면 충분합니다.
숫자 결합
아래는 zip 함수를 활용한 숫자 결합의 예제 코드입니다.
입력
import common.Log
import io.reactivex.Observable
import io.reactivex.functions.Function3
class ZipExample {
fun zipNumbers() {
val source = Observable.zip(
Observable.just(100, 200, 300),
Observable.just(10, 20, 30),
Observable.just(1, 2, 3),
Function3 { num1: Int, num2: Int, num3: Int -> num1 + num2 + num3 }
)
source.subscribe { data -> Log.it(data) }
}
}
fun main() {
val demo = ZipExample()
demo.zipNumbers()
}
출력
main | value = 111
main | value = 222
main | value = 333
3 개의 Observable을 결합하였다.
interval 함수를 이용한 시간 결합
아래는 zip과 interval 함수를 사용한 시간 결합의 예제 코드입니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit
class ZipExample {
fun zipInterval() {
val source = Observable.zip(
Observable.just("RED", "GREEN", "BLUE"),
Observable.interval(200, TimeUnit.MILLISECONDS),
BiFunction { value: String, _: Long -> value }
)
CommonUtils.start()
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = ZipExample()
demo.zipInterval()
}
출력
RxComputationThreadPool-1 | 223 | value = RED
RxComputationThreadPool-1 | 409 | value = GREEN
RxComputationThreadPool-1 | 609 | value = BLUE
얼핏 생각하면 같은 타입의 데이터만 결합할 수 있을 것으로 생각되지만, 데이터뿐만 아니라 시간과도 결합하는 것을 볼 수 있습니다. 이를 zipInterval 기법이라고 합니다. 데이터를 발행하는 시간을 조절할 수 있습니다.
전기 요금 계산 예제
아래는 전기 요금 계산 예제 코드입니다.
기본요금(원/호) | 전력량 요금(원/kWh) |
200kWh 이하 사용 - 910 | 처음 200kWh까지 - 93.3 |
201 ~ 400kWh 사용 - 1,600 | 다음 200kWh까지 - 187.9 |
400kWh 초과 사용 - 7.300 | 400kWh 초과 - 280.6 |
입력
import common.Log
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.lang.Math.max
import java.lang.Math.min
import java.lang.StringBuilder
import java.text.DecimalFormat
class ZipExample {
fun electricBillV1() {
var index: Int = 0
val data = arrayOf(
"100", //910 + 93.3 * 100 = 10,240원
"300" //1600 + 93.3 * 200 + 187.9 * 100 = 39,050원
)
val basePrice: Observable<Int> = Observable.fromArray(*data)
.map { value -> Integer.parseInt(value) }
.map { value ->
if (value <= 200) return@map 910
if (value <= 400) return@map 1600
return@map 7300
}
val usagePrice = Observable.fromArray(*data)
.map { value -> Integer.parseInt(value) }
.map { value ->
val series1 = min(200, value) * 93.3
val series2 = min(200, max(value - 200, 0)) * 187.9
val series3 = max(0, max(value - 400, 0)) * 280.65
return@map (series1 + series2 + series3).toInt()
}
val source = Observable.zip(
basePrice,
usagePrice,
BiFunction { v1: Int, v2: Int -> v1 + v2 }
)
source.map { value -> DecimalFormat("#,###").format(value) }
.subscribe {value ->
val sb = StringBuilder()
sb.append("Usage: ${data[index]} kWh => ")
sb.append("Price: ${value}원")
Log.it(sb.toString())
index++
}
}
}
fun main() {
val demo = ZipExample()
demo.electricBillV1()
}
출력
main | value = Usage: 100 kWh => Price: 10,240원
main | value = Usage: 300 kWh => Price: 39,050원
기본 요금은 basePrice Observable, 전력량 요금은 usagePrice Observable로 분리했습니다.
이 코드에서 중요한 점은 두 개의 요금을 계산한 것이 아닌, zip을 사용하여 결합한 것입니다.
전기 요금을 출력하기 위해서는 천 원 단위로 콤마를 붙여줘야 하는데 DecimalFormat을 사용하였습니다.
위에 코드에는 문제점이 있습니다. 전력 사용량을 출력하기 위해 멤버 변수 index를 참조하여 부수 효과가 생긴 것입니다.
부수 효과란, 결과 값 이외에 다른 상태를 변경시킬 때 부수 효과가 있다고 말한다. 예를 들어, 함수가 전역변수나 정적 변수를 수정하거나, 인자로 넘어온 것들 중 하나를 변경하거나 화면이나 파일에 데이터를 쓰거나, 다른 부수 효과가 있는 함수에서 데이터를 읽어오는 경우가 있다. 부수 효과는 프로그램의 동작을 이해하기 어렵게 한다.
부수 효과를 없앤 전기 요금 계산 예제
아래는 부수 효과를 없앤 전기 요금 계산 예제 코드입니다.
조건은 아래와 같습니다.
- data를 추가로 넘겨주는 방법
- Pair 클래스의 사용
- zip이 결합하는 3 개의 Observable
입력
import common.Log
import io.reactivex.Observable
import io.reactivex.functions.Function3
import java.lang.Math.max
import java.lang.Math.min
import java.lang.StringBuilder
import java.text.DecimalFormat
class ZipExample {
fun electricBillV2() {
val data = arrayOf(
"100", //910 + 93.3 * 100 = 10,240원
"300" //1600 + 93.3 * 200 + 187.9 * 100 = 39,050원
)
val basePrice: Observable<Int> = Observable.fromArray(*data)
.map { value -> Integer.parseInt(value) }
.map { value ->
if (value <= 200) return@map 910
if (value <= 400) return@map 1600
return@map 7300
}
val usagePrice = Observable.fromArray(*data)
.map { value -> Integer.parseInt(value) }
.map { value ->
val series1 = min(200, value) * 93.3
val series2 = min(200, max(value - 200, 0)) * 187.9
val series3 = max(0, max(value - 400, 0)) * 280.65
return@map (series1 + series2 + series3).toInt()
}
val source = Observable.zip(
basePrice,
usagePrice,
Observable.fromArray(*data),
Function3{ v1: Int, v2: Int, i: String -> Pair(i, v1 + v2) }
)
source.map { value -> Pair(value.first, DecimalFormat("#,###").format(value.second)) }
.subscribe { value ->
val sb = StringBuilder()
sb.append("Usage: ${value.first} kWh => ")
sb.append("Price: ${value.second}원")
Log.it(sb.toString())
}
}
}
fun main() {
val demo = ZipExample()
demo.electricBillV2()
}
출력
main | value = Usage: 100 kWh => Price: 10,240원
main | value = Usage: 300 kWh => Price: 39,050원
zip 함수의 세 번째 인자로 원래 데이터 그대로 넣고 Pair 클래스를 호출하여 Pair 객체를 생성했습니다.
결과를 출력할 땐 Pair 객체의 first와 second를 사용했습니다.
실행 결과는 같지만 위와 같은 방법으로 부수 효과를 없앨 수 있습니다.
아래는 zipWith을 활용한 코드입니다.
입력
import common.Log
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
class ZipExample {
fun zipWithNumbers() {
val source = Observable.zip(
Observable.just(100, 200, 300),
Observable.just(10, 20, 30),
BiFunction { a: Int, b: Int -> a + b })
.zipWith(Observable.just(1, 2, 3), BiFunction { ab: Int, c: Int -> ab + c })
source.subscribe { data -> Log.it(data) }
}
}
fun main() {
val demo = ZipExample()
demo.zipWithNumbers()
}
출력
main | value = 111
main | value = 222
main | value = 333
zipNumbers 함수에서 3 개의 Observable을 결합했지만, 위에 코드는 2 개의 Observable을 zip으로 묶고 세 번째 Observable을 zipWith으로 결합했습니다.
실행 결과는 같습니다.
combineLatest
2 개 이상의 Observable을 기반으로 각각의 값이 변경되었을 때 갱신해주는 함수입니다.
두 Observable에서 값을 모두 발행해야지 결괏값이 나오는 것은 zip과 같습니다. 하지만 발행한 후에 둘 중에 어떤 것이 갱신되던지 최신 결괏값을 보여 주는 것이 다릅니다.
입력
import common.CommonUtils
import common.Log
import common.Shape
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit
class CombineLatestExample {
fun marbleDiagram() {
val data1 = arrayOf("6", "7", "4", "2")
val data2 = arrayOf("DIAMOND", "STAR", "PENTAGON")
val source = Observable.combineLatest(
Observable.fromArray(*data1)
.zipWith(
Observable.interval(100, TimeUnit.MILLISECONDS),
BiFunction { shape: String, _: Long -> Shape().getColor(shape) }
),
Observable.fromArray(*data2)
.zipWith(
Observable.interval(150, 200, TimeUnit.MILLISECONDS),
BiFunction { shape: String, _: Long -> Shape().getSuffix(shape) }
),
BiFunction { v1: String, v2: String -> v1 + v2 }
)
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = CombineLatestExample()
demo.marbleDiagram()
}
출력
RxComputationThreadPool-2 | value = 6<>
RxComputationThreadPool-1 | value = 7<>
RxComputationThreadPool-1 | value = 4<>
RxComputationThreadPool-2 | value = 4-S
RxComputationThreadPool-1 | value = 2-S
RxComputationThreadPool-2 | value = 2-P
첫 번째 Observable에서는 색상을 얻어오고 두 번째 Observable에서는 도형 모양에 대한 접미사를 얻어옵니다.
첫 번째는 100ms 간격으로 값을 발행하고, 두 번째는 최초에 150ms를 쉬고 200ms 간격으로 값을 발행합니다.
zip 함수와 다르게 어느 1 개의 값만 변경되어도 결과가 발행됩니다.
리액티브 연산자로 합계 구하기
combineLatest의 대표적인 활용 예는 마이크로소프트의 엑셀의 셀입니다.
어떤 셀에 '=A+B'라는 수식을 넣었다면 A셀과 B셀의 어떤 값이 변경되는 즉시 새로운 합의 결과를 표시합니다.
입력
import io.reactivex.Observable
import io.reactivex.ObservableEmitter
import io.reactivex.functions.BiFunction
import io.reactivex.observables.ConnectableObservable
import java.util.*
class ReactiveSum {
fun run() {
val source: ConnectableObservable<String> = userInput()
val a = source
.filter { str -> str.startsWith("a:") }
.map { str -> str.replace("a:", "") }
.map { str -> Integer.parseInt(str) }
val b = source
.filter { str -> str.startsWith("b:") }
.map { str -> str.replace("b:", "") }
.map { str -> Integer.parseInt(str) }
Observable.combineLatest(
a.startWith(0),
b.startWith(0),
BiFunction { x: Int, y: Int -> x + y }
).subscribe { res -> println("Result: $res") }
source.connect()
}
private fun userInput(): ConnectableObservable<String> {
return Observable.create { emitter: ObservableEmitter<String> ->
val scanner = Scanner(System.`in`)
while (true) {
println("Input: ")
val line = scanner.nextLine()
emitter.onNext(line)
if (line.indexOf("Exit") >= 0) {
scanner.close()
break;
}
}
}.publish()
}
}
fun main() {
ReactiveSum().run()
}
출력
Result: 0
Input:
a:100
Result: 100
Input:
b:2020
Result: 2120
Input:
a:300
Result: 2320
Input:
Exit
먼저 userInput 함수는 Observable.create를 활용하여 사용자에게 값을 받고 받아온 값을 그대로 발행합니다. 그리고 Exit을 입력받으면 종료합니다.
run 함수는 ConnectableObservable 클래스로 userInput 함수에서 Observable을 생성합니다.
첫 번째 Observable인 a는 'a:'로 입력한 경우에만 값을 추출하여 Int로 변환합니다.
두 번째 Observable인 b는 'b:'로 입력한 경우에는 값을 추출하여 Int로 변환합니다.
그리고 2 개의 값을 combineLatest로 결합합니다.
두 Observable 모두 startWith(0)을 추가했습니다. 추가하지 않고 코드를 실행하면 값을 입력해도 결과를 출력하지 않는 것을 확인 할 수 있습니다. 이유는 combineLatest 함수에서 처음 값을 발행하려면 입력 Observable인 a, b 모두 값을 발행해야 하기 때문입니다.
값을 입력했을 때 바로 출력하기를 원하기 때문에 starWith(0)을 호출하여 0으로 초기화했습니다.
이처럼 사용자 입력을 받을 때는 startWith 함수를 유용하게 활용할 수 있습니다.
마지막으로 connect를 호출하여 데이터 흐름을 시작합니다.
merge
가장 단순한 결합 함수입니다. 입력 Observable의 순서와 모든 Observable이 데이터를 발행하는지 등에 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행합니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class MergeExample {
fun marbleDiagram() {
val data1 = arrayOf("1", "3")
val data2 = arrayOf("2", "4", "6")
val source1 = Observable.interval(0, 100, TimeUnit.MILLISECONDS)
.map { data -> data.toInt() }
.map { idx -> data1[idx] }
.take(data1.size.toLong())
val source2 = Observable.interval(50, TimeUnit.MILLISECONDS)
.map { data -> data.toInt() }
.map { idx -> data2[idx] }
.take(data2.size.toLong())
val source = Observable.merge(source1, source2)
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = MergeExample()
demo.marbleDiagram()
}
출력
RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-2 | value = 2
RxComputationThreadPool-2 | value = 4
RxComputationThreadPool-1 | value = 3
RxComputationThreadPool-2 | value = 6
첫 번째 Observable은 대기 시간 없이 100ms 간격으로 값을 발행하고, 두 번째 Observable은 50ms 간격으로 값을 발행하므로 2 개의 값이 섞입니다.
하지만 첫 번째와 두 번째 Observable의 데이터 발행이 모두 개별의 스레드에서 이뤄지는 것을 볼 수 있습니다.
concat
2 개 이상의 Observable을 이어 붙여 주는 함수입니다.
첫 번째 Observable에 onComplete 이벤트가 발생해야 두 번째 Observable을 구독합니다.
첫 번째 Observable에 onComplete 이벤트가 발생하지 않게 하면 두 번째 Observable은 영원히 대기합니다. 이는 잠재적인 메모리 누수의 위험을 내포합니다. 따라서 반드시 onComplete 이벤트가 발생하도록 해야 합니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class ConcatExample {
fun marbleDiagram() {
val onCompleteAction = { Log.it("onComplete()") }
val data1 = arrayOf("1", "3", "5")
val data2 = arrayOf("2", "4", "6")
val source1 = Observable.fromArray(*data1)
.doOnComplete(onCompleteAction)
val source2 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map { data -> data.toInt() }
.map { idx -> data2[idx] }
.take(data2.size.toLong())
.doOnComplete(onCompleteAction)
val source = Observable.concat(source1, source2)
.doOnComplete(onCompleteAction)
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = ConcatExample()
demo.marbleDiagram()
}
출력
main | value = 1
main | value = 3
main | value = 5
main | debug = onComplete()
RxComputationThreadPool-1 | value = 2
RxComputationThreadPool-1 | value = 4
RxComputationThreadPool-1 | value = 6
RxComputationThreadPool-1 | debug = onComplete()
RxComputationThreadPool-1 | debug = onComplete()
concat을 활용할 때는 onComplete 이벤트의 발생 여부 확인이 중요합니다.
따라서 () -> Unit을 사용하여 객체를 생성했습니다. doOnComplete처럼 인자가 없는 람다 표현식에 넣어야 할 때 사용됩니다.
concat은 최대 4 개의 Observable을 결합할 수 있습니다.
'알려주는 이야기 > RxJava' 카테고리의 다른 글
17. RxJava - delay, timeInterval (0) | 2020.07.19 |
---|---|
16. RxJava - 조건 연산자 (0) | 2020.07.19 |
14. RxJava - 변환 연산자 (0) | 2020.07.17 |
13. RxJava - 생성 연산자 (0) | 2020.07.10 |
12. RxJava - Reduce (0) | 2020.07.09 |