WEB/Reactive

RxJS - Using Subject

비주얼라이즈 2017. 1. 8. 21:28

RxJS - Subject 사용하기



이 글은 xgrommx.github.io의 글을 학습하며 정리한 글입니다. 내용에 문제가 있거나 개선가능한 부분이 있다면 언제든지 연락주세요.



  • Subject class 는 Observable과 Observer를 모두 상속받는다. 즉, Observable이면서 Observer 라는 말이다.
  • Observable에 여러 operator가 있는 것 처럼, Subject에도 여러 operator가 있다. (map, filter, reduce 등).
  • 우리는 Subject를 사용하여 모든 observer를 subscribe 할 수 있고, 그 다음 subscribe를 backend data source에 subject할 수 있다. 
  • 이러한 방식으로, Subject는 subscriber group 그리고 source에 대한 proxy를 수행할 수 있다.
  • Subject를 활용하여 cashing, buffering 및 time shifting 등에 대한 observable을 구성할 수 있다. 
  • 게다가 우리는 Subject를 사용하여 여러 subscribers에게 data를 broadcasting할 수 있다.


  • 기본적으로 Subject는 thread간에 동기화(synchronization)를 수행하지 않는다.
  • 그것들은 scheduler를 사용하지 않고, 모든 serialization과 문법적 정확성(grammatical correctness)이 Subject의 호출자(caller)에 의해 처리된다고 가정한다.

Subject는 thread로부터 안전한 subscribed observer에게 간단하게 broadcasting 된다. 이렇게 하면 overhead를 줄이고 성능을 향상시킬 수있다는 장점이있다.






Subject 사용하기 예제


다음 예제에서는 Subject를 만들고 subject를 subscribe한 다음, 동일한 subject 를 사용하여 observer에게 value를 publish한다. 그렇게함으로써 publication과 subscription을 동일한 출처로 결합한다.


Observer를 사용하는 것 외에도 subscribe 메서드는 onNext에 대한 함수를 취할 수 있다. 즉, item이 게시될 때마다 action이 실행된다. 예제에서 onNext가 호출 될 때 마다 item이 console에 기록된다.



var subject = new Rx.Subject();

var subscription = subject.subscribe(
    x => console.log('next: ' + x),
    e => console.log('error: ' + e.message),
    () => console.log('completed'));

subject.next(1);
// => next: 1

subject.next(2);
// => next: 2

subject.completed();
// => completed

subscription.dispose();


참고로 원본 문서에서는 onNext, onCompleted 이지만, 지금은 next, completed로 사용해주어야한다.






Subject 사용하기 예제 2


다음의 예제에서는 Subject의 proxy 및 broadcast의 특징을 살펴볼 수 있다.


  • 먼저 1초마다 정수(integer) 를 생성하는 source sequence를 만든다. 
  • We then create a Subject, and pass it as an observer to the source so that it will receive all the values pushed out by this source sequence. 
  • 그다음, 우리는 또 다른 두 개의 subscribe를 만든다. 이번에는 subject를 source로 사용한다.
  • subSubject1 및 subSubject2 subscription은 Subject에 의해 소스로부터 전달된 모든 값을 수신한다.

예제2의 코드


// Every second
var source = Rx.Observable.interval(1000);

var subject = new Rx.Subject();

var subSource = source.subscribe(subject);

var subSubject1 = subject.subscribe(
    x => console.log('Value published to observer #1: ' + x),
    e => console.log('onError: ' + e.message),
    () => console.log('onCompleted'));

var subSubject2 = subject.subscribe(
    x => console.log('Value published to observer #2: ' + x),
    e => console.log('onError: ' + e.message),
    () => console.log('onCompleted'));

setTimeout(() => {
    // Clean up
    subject.onCompleted();
    subSubject1.dispose();
    subSubject2.dispose();
}, 5000);

// => Value published to observer #1: 0
// => Value published to observer #2: 0
// => Value published to observer #1: 1
// => Value published to observer #2: 1
// => Value published to observer #1: 2
// => Value published to observer #2: 2
// => Value published to observer #1: 3
// => Value published to observer #2: 3
// => onCompleted
// => onCompleted









기타 Subject 관련 내용 정리

여기에서는 위 예제에 등장했거나, Subject 개념과 관련하여 자주 등장하는 기술 내용들을 추가로 정리해두었습니다.


  • .dispose() :  observer에 대한 모든 subscribe를 취소하고 리소스를 해제한다.[각주:1] 








  1. http://xgrommx.github.io/rx-book/content/subjects/subject/index.html#rxsubjectprototypedispose [본문으로]