본문 바로가기
Swfit/RxSwift

RxSwift-Observable, Subject, Relay 알아보기

by GGShin 2022. 11. 19.

RxSwift에서 가장 핵심이 되는 개념인 Observable과 Observer. 그 중에서도 먼저 알아야 할 Observable에 대해서 정리해보고자 합니다. 이름이 생소하지만, observable이라는 단어 자체를 해석해보면, '관찰할 수 있는/ 볼 수 있는 대상' 이라고 할 수 있습니다. Observable이 가지고 있는 요소들을 통틀어서 'event'라고 하는데, 이 event들을 누군가가 '관찰'할 수 있습니다. 이때 관찰을 하는 쪽을 observer라고 합니다. Observable을 Observer가 관찰하는 것이죠. '관찰할 수 있는 대상을 관찰한다' 라고 해석하면 되는데, 말이 조금 헷갈리네요 ㅎㅎ Observer는 자신이 관찰(구독)하는 observable에 담겨 있는 이벤트 정보들을 받아볼 수 있습니다. Observable의 종류에 따라서 어떤 식으로 정보를 관찰할 수 있는지에 조금씩 차이가 있습니다. 

Observable은 array나 list등과 같은 sequence와 동일한 개념입니다. 순서가 있는 요소들의 집합이라고 생각하면 될 것 같습니다. 

그렇다면 Subject나 Relay는 무엇일까요? 이 둘 역시 일종의 observable입니다. 차이점이 물론 있겠죠? Observable은 read-only인 반면 Subject와 Relay는 요소가 추가될 수도 있습니다. Subject는 완료되거나 에러로 인해 중단이 될 수 있는 반면 Relay는 완료나 에러로 인한 종료가 없습니다. 

 

  Observable Subject Relay
Read O O O
Write X O O
Complete/Error X O X

큰 차이는 이렇고, 세부적인 사항들을 살펴보겠습니다. 

 

1. Observable

Observable은 sequence이며, 여러 event들을 방출하는 대상입니다. Event란, 정상적인 정보나 에러 혹은 완료를 통틀어 지칭하는 단어입니다. RxSwift에서 Event는 enum입니다. 

 

enum Event<Element>  {
    case next(Element)      // next element of a sequence
    case error(Swift.Error) // sequence failed with error
    case completed          // sequence terminated successfully
}

 

next는 다음번에 방출될 요소를 의미하고(에러나 완료가 아닌 정보), error는 모종의 오류가 있을 때 방출되는 것이고 completed는 정상적으로 emit이 모두 완료되었을 때 마지막으로 방출되는 일종의 사인입니다. error가 방출될 경우는 더 이상 다음 next나 completed를 방출하지 않고 종료됩니다. 


각각의 이벤트를 방출하는 methods는 다음과 같습니다.

onNext //on(.next())로 표현하기도 합니다.

onError //on(.error())로 표현하기도 합니다.

onCompleted //on(.completed)로 표현하기도 합니다.

 

onNext는 Observable이 item을 방출할 때 호출하는 메서드입니다. Observable이 방출할 item을 파라미터로 갖습니다.

onError는 필요한 데이터 생성에 실패하거나 오류가 발생하였을 때 호출하는 메서드입니다. 에러를 발생하는 원인을 파라미터로 갖습니다.

onCompleted는 마지막 onNext가 호출된 이후에 호출됩니다. (에러가 없었을 때만 호출)

 

그렇기 때문에 Observer에는 Observable이 호출하는 method의 종류에 따라서 어떠한 작업을 할 것인지 명시해주면 됩니다!

예) onNext일 때는 print("on next"), onError일 때는 print("on error") 등등


Observable을 생성하는 operator들은 아래와 같이 12개나 되는데, 그 중에 몇 가지를 알아보겠습니다.

1-1. Create

Create으로는 Observable을 기초(?)부터 생성할 수 있습니다. 

 

    let observable: Observable<String> = Observable.create { observer -> Disposable in
        observer.onNext("A")
        observer.onNext("B")
        observer.onNext("C")
        
        observer.onCompleted()
        
        return Disposables.create {}
        //sequence 모든 작업이 끝났을 때 Disposable을 return합니다. 
        //여기서는 빈 Disposable을 반환하였지만, 
        //sequence가 완료된 후 필요한 작업이 있다면 {} 안에 명시해주면 됩니다.
        
    }

 

위의 예시를 보면, String type의 요소들을 갖는 Observable을 만드는 과정이 나와있습니다. 클로저를 사용하여 해당 Observable을 구독하는 observer가 어떤 이벤트를 받는지 명시해주고 있습니다. onNext로 A, B, C를 순차적으로 방출받을 것이고 C 방출이 끝나면 complete 됩니다. 

 

이후에 조금 더 다루겠지만, 이렇게 만든 Observable은 아래처럼 구독할 수 있습니다.

        observable.subscribe { next in
            print(next)
        } onError: { error in
            print(error.localizedDescription)
        } onCompleted: {
            print("All emitted")
        }.disposed(by: disposeBag)
        
        //Prints
        //A
        //B
        //C
        //All emitted

onNext인 경우는 방출되는 아이템을 print하고, onError인 경우는 에러메세지를 print하고, onCompleted인 경우는 All emitted라는 문구를 print하도록 하였습니다.

 

(추가예정)

1-2. Of

1-3. From

1-4. Interval

1-5. Timer


🏷공식 문서를 보면 이렇게 Observable을 설명하고 있습니다. 

"Observer가 Observable을 subscribe하고, observer는 observable이 emits하는 item이나 sequence of items에 반응한다." 위에서 설명했던 내용과 비슷합니다. "이러한 패턴은 concurrent하게(동시 다발적으로) 동작하는데, observable이 object를 emit할 때까지 기다리는 것이 아니라 observer 형태의 sentry(군사라는 뜻인데 여기서는 observable이 object를 emit하는 것을 감지하는 대상을 이야기 합니다.)를 만들어서 observable이 object를 emit할 때 적절하게 반응하도록 대기시킵니다. 서로 독립적인 여러 tasks가 있을 때, 동시 다발적으로 실행시킬 수 있어서, 전체 tasks가 종료되기 까지의 시간이 순차적으로 진행될 때보다 적게 걸린다는 장점이 있습니다." 

 

무슨 말인지 잘 이해가 안되어서 여러번 읽으며 개념을 익혔습니다 ㅎㅎ 설명 중에 동시다발적으로 동작한다는 말이 있습니다. 이 부분을 좀 더 명확하게 이해하기 위해 예시를 들어보도록 하겠습니다. 

 

 

하나의 Observable이 있고, 이를 subscribe하는 두개의 (혹은 그 이상의) observer들은 동시에 task를 수행하게 됩니다. 하나의 observer가 방출되는 이벤트들을 모두 받고 처리하면 다른 observer가 이벤트를 받아 처리하는 것이 아니라, Observable이 자신이 가진 event를 순차적으로 방출하고, observer들이 방출되는 이벤트를 하나씩 동시에 처리를 하는 것입니다.  

 

 


 

2. Subject

Subject는 observer이면서 Observable인 proxy 종류입니다. Observer이기 때문에 한개 이상의 Observables를 구독할 수 있습니다. 그리고 Observable이기 때문에 새로운 items emit할 수 있고, observe하여 방출 받은 item을 재방출할 수도 있습니다. (Cold한 observable를 구독하고 있는 경우에는, subject가 subscribe 하는 시점에 emit이 시작됩니다.)

 

Subject에 몇가지 종류가 있습니다. 

 

2-1. AsyncSubject

AsyncSubject는 자신이 subscribe하는 observable(= source Observable)이 emit하는 마지막 item만 emit합니다. 

 

    var asyncSubject: AsyncSubject<String> = AsyncSubject()
     
    func observeAsyncSubject() {
        asyncSubject.subscribe{
            print($0)
        }.disposed(by: disposeBag)
    }
    
     func asyncSubjectMaker() {
         asyncSubject.onNext("A")
         asyncSubject.onNext("B")
         asyncSubject.onNext("C")
         
         asyncSubject.onCompleted()
     }
  
    
    //Prints
    //next(C)
    //completed

 

 

2-2. PublishSubject

PublishSubject는 subscription 시점 이후에 source Observable이 emit한 item부터 emit하기 시작합니다.

 

    /*PublishSubject*/

    var publishSubject: PublishSubject<String> = PublishSubject()
    
    func observePublishSubject() {
        publishSubject
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
    }
    
    func publishSubjectMaker1() {
        publishSubject.onNext("A")
        publishSubject.onNext("B")
        publishSubject.onNext("C")
        
    }
    
    func publishSubjectMaker2() {
        publishSubject.onNext("D")
        publishSubject.onNext("E")
        
        publishSubject.onCompleted()
    }
    
    //호출 순서
    publishSubjectMaker1()
    observePublishSubject()
    publishSubjectMaker2()
    
    //Prints
    //next(D)
	//next(E)
	//completed

 

2-3. BehaviorSubject

BehaviorSubject는 subsription이 시작될 때 source Observable 가장 최근 emit한 item부터 emit하기 시작합니다. BehaviorSubject를 생성할 때는 초기 value를 설정해주어야 합니다.

 

    /*BehaviorSubject*/
    var behaviorSubject: BehaviorSubject<String> = BehaviorSubject(value: "Inital item")
  
    func observeBehaviorSubject() {
        
        behaviorSubject
            .subscribe {
                print($0)
            }
            .disposed(by: disposeBag)
    }
    
    func behaviorSubjectMaker() {
        behaviorSubject.onNext("A")
        behaviorSubject.onNext("B")
        behaviorSubject.onNext("C")
        
        behaviorSubject.onCompleted()
    }
    
    //Prints
    //next(Inital item)
	//next(A)
	//next(B)
	//next(C)
	//completed

 

2-4. ReplaySubject

 

ReplaySubject는 subscription 시점과 관계없이 source Observable(s) 방출했던 모든 item들과 그 이후에 emit될 item들 까지도 방출합니다. bufferSize로 subscription 전 방출되었던 item 중 emit할 item의 갯수를 설정할 수 있습니다. 

 

    /*ReplaySubject*/
    var replaySubject: ReplaySubject<String> = ReplaySubject<String>.create(bufferSize: 2)
    //ReplaySubject의 initializer는 internal이라 바로 호출이 안되고 create method로 생성이 가능했습니다.
    
    func observerReplaySubject() {
        replaySubject.subscribe {
            print($0)
        }
        .disposed(by: disposeBag)
    }
    
    func replaySubjectMaker1() {
        replaySubject.onNext("A")
        replaySubject.onNext("B")
        replaySubject.onNext("C")
    }
    
    func replaySubjectMaker2() {
        replaySubject.onNext("D")
        replaySubject.onNext("E")
        
        replaySubject.onCompleted()
    }
    
    //호출순서
        replaySubjectMaker1()
        observerReplaySubject()
        replaySubjectMaker2()
        
    //Prints
    //next(B)
	//next(C)
	//next(D)
	//next(E)
	//completed

 

3. Relay

Relay는 Subject의 wrapper 입니다. Subject와 거의 동일하지만 error를 방출하지 않고 complete되지 않는다는 특징이 있습니다. 

Relay의 경우는 onNext(_:)가 아닌 accept(_:)  메소드로 value를 추가할 수 있습니다. 뿐만 아니라, error나 completed는 추가할 수 없습니다. 

주의: Relay는 RxSwift가 아니라 RxCocoa에 포함되는 개념입니다. 따라서 RxCocoa를 import해야만 사용이 가능합니다!

 

 

3-1. PublishRelay

PublishRelay는 PublishSubject와 동일합니다. 

 

    var publishRelay: PublishRelay<String> = PublishRelay()
    
    func observePublishRelay() {
        publishRelay.subscribe {
            print($0)
        }
        .disposed(by: disposeBag)
        
    }
    
    func publishRelayMaker() {
        publishRelay.accept("A")
        publishRelay.accept("B")
        publishRelay.accept("C")
        
    }

 

3-2. BehaviorRelay

BehaviorRelay는 BehaviorSubject와 동일합니다. 

    var behaviorRelay: BehaviorRelay<String> = BehaviorRelay(value: "Initial value")
    
    func observeBehaviorRelay() {
        behaviorRelay.subscribe {
            print($0)
        }
        .disposed(by: disposeBag)
        
    }
    
    func behaviorRelayMaker() {
        behaviorRelay.accept("A")
        behaviorRelay.accept("B")
        behaviorRelay.accept("C")
        
    }
    
    //Prints
    //next(Initial value)
	//next(A)
	//next(B)
	//next(C)

 

 

3-3. ReplayRelay

ReplayRelay는 ReplaySubject와 동일합니다. 

 

    var replayRelay: ReplayRelay<String> = ReplayRelay<String>.create(bufferSize: 2)
    
    func observeReplayRelay() {
        replayRelay.subscribe {
            print($0)
        }
        .disposed(by: disposeBag)
        
    }
    
    func replayRelayMaker1() {
        replayRelay.accept("A")
        replayRelay.accept("B")
        replayRelay.accept("C")
        
    }
    
    func replayRelayMaker2() {
        replayRelay.accept("D")
        replayRelay.accept("E")
        replayRelay.accept("F")
        
    }
    
    //호출순서
    
    replayRelayMaker1()
    observeReplayRelay()
    replayRelayMaker2()
    
    //Prints
    //next(B)
	//next(C)
	//next(D)
	//next(E)
	//next(F)

 


 

참고자료

 

https://reactivex.io/documentation/observable.html

 

ReactiveX - Observable

Observable In ReactiveX an observer subscribes to an Observable. Then that observer reacts to whatever item or sequence of items the Observable emits. This pattern facilitates concurrent operations because it does not need to block while waiting for the Ob

reactivex.io

 

 

[return Disposable 에 대한 설명]

https://jeonyeohun.tistory.com/365

 

[RxSwift] Disposables.create() 왜 하지?

Disposables.create 분석해보기 Rx를 하면서 많이 들었던 의문.. let numbers = Observable.create { observer in observer.onNext(1) observer.onNext(2) observer.onNext(2) return Disposables.create() } 여기서 마지막에 Disposables.create()를

jeonyeohun.tistory.com

 

https://github.com/ReactiveX/RxSwift/blob/main/Documentation/Subjects.md

 

GitHub - ReactiveX/RxSwift: Reactive Programming in Swift

Reactive Programming in Swift. Contribute to ReactiveX/RxSwift development by creating an account on GitHub.

github.com

 

반응형

'Swfit > RxSwift' 카테고리의 다른 글

RxSwift와 RxCocoa의 차이점  (0) 2023.01.01
RxSwift란  (0) 2022.12.18
RxSwift-Operator 종류 알아보기: Interval  (0) 2022.11.18