티스토리 뷰
생성 연산자
생성 연산자의 역할은 데이터 흐름을 만드는 것이다. 간단하게 Observable을 만든다고 생각하면 됩니다.
interval
일정 시간 간격으로 데이터 흐름을 생성합니다.
주어진 시간 간격으로 0부터 1 씩 증가하는 Long 객체를 발행합니다.
interval 함수는 기본적으로 영원히 지속 실행되기 때문에 폴링 용도로 많이 사용합니다.
입력
class IntervalExample {
fun printNumber() {
CommonUtils.start()
val source = Observable.interval(100, TimeUnit.MILLISECONDS)
.map { data -> (data + 1) * 100 }
.take(5)
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = IntervalExample()
demo.printNumber()
}
출력
RxComputationThreadPool-1 | 228 | value = 100
RxComputationThreadPool-1 | 323 | value = 200
RxComputationThreadPool-1 | 424 | value = 300
RxComputationThreadPool-1 | 523 | value = 400
RxComputationThreadPool-1 | 623 | value = 500
- 100ms 간격으로 0부터 데이터를 발행한 후 map 함수를 통해 1을 더하고 100을 곱한다.
- 따라서 100, 200, 300... 등의 데이터를 발행한다.
- take 함수를 통해 5 개의 값만 가져와서 100, 200, 300, 400, 500 이 출력된다.
- sleep를 호출하는 이유는 다른 스레드에서 실행이 완료될 때까지 기다려야 하기 때문이다.
- sleep를 주석 처리하면 main 스레드에서 할 일이 없기 때문에 바로 종료된다.
위에서 사용된 CommonUtils 클래스와 Log 클래스는 아래와 같습니다.
CommonUtils
class CommonUtils {
companion object {
var startTime: Long? = null
fun start() {
startTime = System.currentTimeMillis()
}
fun sleep(millis: Long) {
try {
Thread.sleep(millis)
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
}
}
Log
class Log {
companion object {
fun it(obj: Any) {
when (CommonUtils.startTime) {
null -> println("${Thread.currentThread().name} | value = $obj")
else -> {
val time = System.currentTimeMillis() - CommonUtils.startTime!!
println("${Thread.currentThread().name} | $time | value = $obj")
}
}
}
}
}
위 두 개의 클래스는 앞으로 계속 사용하게 될 것입니다.
아래는 초기 지연 시간을 넣을 수 있는 interval 함수의 두 번째 원형을 활용한 코드입니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class IntervalExample {
fun nonInitialDelay() {
CommonUtils.start()
val source = Observable.interval(0, 100, TimeUnit.MILLISECONDS)
.map { data -> data + 100 }
.take(5)
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = IntervalExample()
demo.nonInitialDelay()
}
출력
RxComputationThreadPool-1 | 140 | value = 100
RxComputationThreadPool-1 | 237 | value = 101
RxComputationThreadPool-1 | 336 | value = 102
RxComputationThreadPool-1 | 437 | value = 103
RxComputationThreadPool-1 | 537 | value = 104
printNumber() 코드와 비교했을 때 initialDelay 인자만 0으로 입력한 것을 제외하면 변경된 것이 없습니다.
하지만 시간 값이 228에서 140으로 거의 100ms가 줄었다. 초기 지연 값이 100ms에서 0ms로 줄었기 때문입니다.
timer
interval과 유사하지만 한 번만 실행되는 함수입니다.
일정 시간이 지난 후에 한 개의 데이터를 발행하고 onComplete 이벤트가 발생합니다.
전반적으로 interval 함수와 매우 비슷합니다.
현재 스레드가 아닌 계산 스케줄러에서 실행되는 것도 동일하고 발행되는 데이터도 interval 함수의 첫 번째 값인 0입니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import java.text.SimpleDateFormat
import java.util.concurrent.TimeUnit
class TimerExample {
fun showTime() {
CommonUtils.start()
val source = Observable.timer(500, TimeUnit.MILLISECONDS)
.map { SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(System.currentTimeMillis()) }
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = TimerExample()
demo.showTime()
}
출력
RxComputationThreadPool-1 | 653 | value = 2020/03/24 18:23:59
range
주어진 값(n)부터 m개의 Int 객체를 발행합니다.
interval 함수나 timer 함수와는 다르게 스케줄러에서 실행되지 않고 현재 스레드에서 실행합니다.
range는 반복문을 대체할 수 있습니다.
입력
import common.Log
import io.reactivex.Observable
class RangeExample {
fun forLoop() {
val source = Observable.range(1, 10)
.filter { number -> number % 2 == 0 }
source.subscribe { data -> Log.it(data) }
}
}
fun main() {
val demo = RangeExample()
demo.forLoop()
}
출력
main | value = 2
main | value = 4
main | value = 6
main | value = 8
main | value = 10
현재 스레드에서 실행되기 때문에 sleep를 호출하지 않습니다.
intervalRange
interval과 range를 혼합해놓은 함수입니다.
interval처럼 일정한 시간 간격으로 값을 출력하지만, range 함수처럼 시작 숫자(n)로부터 m개만큼의 값만 생성하고 onComplete 이벤트가 발생합니다.
즉, interval처럼 무한한 데이터 흐름이 발생하지 않습니다.
interval과 range를 혼합한 형태이기 때문에 계산 스케줄러에서 실행된다는 점이 같습니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class IntervalRangeExample {
fun printNumbers() {
val source = Observable.intervalRange(1, 5, 100, 100, TimeUnit.MILLISECONDS)
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = IntervalRangeExample()
demo.printNumbers()
}
출력
RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-1 | value = 2
RxComputationThreadPool-1 | value = 3
RxComputationThreadPool-1 | value = 4
RxComputationThreadPool-1 | value = 5
main 스레드가 아니기 때문에 sleep를 호출하여 동작이 완료될 때까지 대기했습니다.
아래는 interval로 intervalRange를 만든 코드입니다.
입력
import common.CommonUtils
import common.Log
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class IntervalRangeExample {
fun makeWithInterval() {
val source = Observable.interval(100, TimeUnit.MILLISECONDS)
.map { data -> data + 1 }
.take(5)
source.subscribe { data -> Log.it(data) }
CommonUtils.sleep(1000)
}
}
fun main() {
val demo = IntervalRangeExample()
demo.makeWithInterval()
}
인자가 5개나 되는 intervalRange 보다는 interval, map, take를 조합하여 만들어내는 코드가 더 이해하기 쉽고 자연스러워 보입니다.
ReactiveX 홈페이지를 보면 각종 연산자를 설명하면서 조합이라는 단어를 자주 언급합니다.
실제로 기본적인 함수의 조합을 통해 Observable 클래스에 존재하는 많은 함수를 만들 수 있습니다.
map, flatMap, take, filter, reduce 등의 함수는 함수 조합에 매우 중요합니다.
defer
timer 함수와 비슷하지만 데이터 흐름 생성을 구독자가 subscribe를 호출할 때까지 미룰 수 있습니다.
Observable의 생성이 구독할 때까지 미뤄지기 대문에 최신 데이터를 얻을 수 있습니다.
마블 다이어그램을 보면 빨간색 구독자는 빨간색 도형들을 수신하고 초록색 구독자는 초록색 도형들을 수신하는 것을 볼 수 있습니다.
또한 defer은 스케줄러가 없어서 메인 스레드에서 실행됩니다.
입력
import io.reactivex.Observable
class DeferExample {
private var name = "Im"
fun marbleDiagram() {
val source1 = Observable.just(name)
source1.subscribe { data -> println(data) }
name = "Leaf"
val source2 = Observable.defer { Observable.just(name) }
source2.subscribe { data -> println(data) }
source1.subscribe { data -> println(data) }
}
}
fun main() {
val demo = DeferExample()
demo.marbleDiagram()
}
출력
Im
Leaf
Im
defer을 사용한 source2는 subscribe 한 시점부터 보기 때문에 Leaf라는 값이 출력되었고, source1은 Im이 출력됩니다.
repeat
단순 반복 실행을 합니다.
서버 통신을 하면 해당 서버가 잘 살아있는지 확인하는 코드로 사용합니다.
Observable에서 발행한 빨간색, 초록색, 파란색 원을 무한히 반복해서 발행합니다.
입력
import common.Log
import io.reactivex.Observable
class RepeatExample {
fun marbleDiagram() {
val balls = arrayOf("1", "3", "5")
val source = Observable.fromArray(*balls)
.repeat(3)
source.doOnComplete{ println("onComplete") }
.subscribe { data -> Log.it(data) }
}
}
fun main() {
val demo = RepeatExample()
demo.marbleDiagram()
}
출력
main | value = 1
main | value = 3
main | value = 5
main | value = 1
main | value = 3
main | value = 5
main | value = 1
main | value = 3
main | value = 5
onComplete
정확히 3번 반복한 후에 onComplete 이벤트가 발생합니다.
아래는 repeat를 사용한 서버가 살아있는지 확인하는 heart beat의 코드입니다.
먼저 build.gradle에 아래의 코드를 추가합니다.
dependencies {
implementation 'com.squareup.okhttp3:okhttp:3.2.0'
}
그리고 OkHttpHelper라는 클래스를 만듭니다.
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import java.io.IOException
class OkHttpHelper {
private val client: OkHttpClient = OkHttpClient()
@Throws(IOException::class)
fun get(url: String): String {
val request: Request = Request.Builder()
.url(url)
.build()
val response: Response = client.newCall(request).execute()
return response.body().string()
}
}
입력
import common.CommonUtils
import common.Log
import common.OkHttpHelper
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class RepeatExample {
fun heartbeatV1() {
CommonUtils.start()
val serverUrl = "https://api.github.com/zen"
// 2초 간격으로 서버에 ping 보내기
Observable.timer(2, TimeUnit.SECONDS)
.map { OkHttpHelper().get(serverUrl) }
.repeat()
.subscribe { data -> Log.it("Ping Result : $data") }
CommonUtils.sleep(10000)
}
}
fun main() {
val demo = RepeatExample()
demo.heartbeatV1()
}
출력
RxComputationThreadPool-1 | 4461 | value = Ping Result : Half measures are as bad as nothing at all.
RxComputationThreadPool-2 | 6680 | value = Ping Result : Keep it logically awesome.
RxComputationThreadPool-3 | 8903 | value = Ping Result : Avoid administrative distraction.
timer 함수를 사용하여 2 초마다 반복되도록 합니다.
그리고 OkHttpHelper.get()을 통해 URL의 정보를 얻습니다.
여기서 "https://api.github.com/zen"은 매번 다른 문구들을 무작위로 출력하는 Github API입니다.
HTTP GET 명령을 호출하고 결과를 리턴합니다.
첫 번째 실행되는 스레드는 RxComputationThreadPool-1이고 2 초 간격으로 실행됩니다.
원래 timer 함수는 한번 호출된 후에는 종료됩니다.
하지만 repeat를 사용하여 계속 반복해서 실행됩니다.
따라서 구독할 때마다 동작하는 스레드의 번호가 달라집니다.
스레드를 동일하게 하려면 timer와 repeat를 빼고 interval을 대신 넣어서 호출하면 됩니다.
아래는 interval을 넣어서 작성한 heart beat의 코드입니다.
입력
import common.CommonUtils
import common.Log
import common.OkHttpHelper
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
class RepeatExample {
fun heartbeatV2() {
CommonUtils.start()
val serverUrl = "https://api.github.com/zen"
// 2초 간격으로 서버에 ping 보내기
Observable.interval(2, TimeUnit.SECONDS)
.map { OkHttpHelper().get(serverUrl) }
.take(3)
.subscribe { data -> Log.it("Ping Result : $data") }
CommonUtils.sleep(10000)
}
}
fun main() {
val demo = RepeatExample()
demo.heartbeatV2()
}
출력
RxComputationThreadPool-1 | 4393 | value = Ping Result : Responsive is better than fast.
RxComputationThreadPool-1 | 4615 | value = Ping Result : Design for failure.
RxComputationThreadPool-1 | 6350 | value = Ping Result : Speak like a human.
'알려주는 이야기 > RxJava' 카테고리의 다른 글
15. RxJava - 결합 연산자 (0) | 2020.07.17 |
---|---|
14. RxJava - 변환 연산자 (0) | 2020.07.17 |
12. RxJava - Reduce (0) | 2020.07.09 |
11. RxJava - Filter (0) | 2020.07.09 |
10. RxJava - FlatMap (0) | 2020.07.09 |