리액티브 연산자
연산자 종류 | 연산자 함수 |
---|---|
생성 연산자 | just(), fromXXX(), create(), interval(), range(), timer(), intervalRange(), defer(), repeat() |
변환 연산자 | map(), flatMap(), concatMap(), switchMap), groupBy(), scan(), buffer(), window() |
필터 연산자 | filter(), take(), skip(), distinct() |
결합 연산자 | zip(), combineLatest(), Merge(), concat() |
조건 연산자 | amb(), takeUntil(), skipUntil(), all() |
에러 처리 연산자 | onErrorReturn(), onErrorResumeNext(), retry(), retryUntil() |
기타 연산자 | subscribe(), subscribeOn(), observeOn(), reduce(), count() |
조건 연산자는 Observeble의 흐름을 제어하는 역할을 합니다. 필터 연산자가 발행된 값을 채택하느냐 기각하느냐 여부에 초점을 맞춘다면, 조건연산자는 지금까지의 흐름을 어떻게 제어할 것인지에 초점을 맞춤니다.
여러 개의 Observable 중에서 가장 먼저 데이터를 발행하는 1개의 Observable를 선택합니다. 이후 나머지 Observable에서 발행하는 데이터는 모두 무시합니다.
takeUntil() 함수는 take() 함수에 조건을 설정할 수 있습니다. 인자로 받은 Observable에서 어떤 값을 발행하면 현재 Observable의 데이터 발행을 중단하고 즉시 완료(onComplete)합니다. 즉 take() 함수처럼 일정 개수만 값을 발행하되 완료 기준을 다른 Observable에서 값을 발행하는지로 판단하는 것입니다.
예시코드
String[] data = {"1", "2", "3", "4", "5", "6"};
Observable<String> source = Observable.fromArray(data)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(val, notUsed) -> val)
.takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));
source.subscribe(Log::i);
CommonUtils.sleep(1000);
결과
RxComputationThreadPool-2 | value = 1
RxComputationThreadPool-2 | value = 2
RxComputationThreadPool-2 | value = 3
RxComputationThreadPool-2 | value = 4
skipUntil(other) 함수는 takeUntil() 함수와 정반대의 함수입니다.
takeUntil() 함수와는 다르게 other Observable에서 화살표가 나올때 까지는 값을 발행하지 않고 건너뛰다가 other Observable에서 값을 발행하는 순간부터 원래 Observable에서 값을 정상적으로 발행하기 시작합니다.
all() 함수는 단순합니다. 주어진 조건에 100% 맞을 때만 true값을 발행하고 조건에 맞지 않는 데이터가 발행되면 바로 false값을 발행합니다.
기타 연산자
- delay() 함수 유틸리티 연산자로 보조 역할을 한다 인자로 전달받는 time과 시간 단위 (ms,m 등) 만큼 입력받은 Observable의 데이터 발행을 지연시켜주는 역할을 한다.
TimeInterval() 함수 어떤 값을 발행했을 때 이전 값을 발행한 이후 시간이 얼마나 흘렀는지 알려준다.
예시코드
String[] data = {"RED", "GREEN", "ORANGE"}; CommonUtils.exampleStart(); Observable<Timed<String>> source = Observable.fromArray(data) .delay(item -> { CommonUtils.doSomething(); return Observable.just(item); }) .timeInterval(); source.subscribe(Log::it);
결과
main | 165 | value = Timed[time=14, unit=MILLISECONDS, value=RED] main | 264 | value = Timed[time=99, unit=MILLISECONDS, value=GREEN] main | 306 | value = Timed[time=42, unit=MILLISECONDS, value=ORANGE]
reference