RxJS Introduction


source: https://www.youtube.com/watch?v=R62iQvZ0bdQ

Observables


An observable is:

  • a collection
  • any number of values
  • any amount of tim
  • an async abstraction similar to a Promsie
  • Lazy: wait for subscription
  • cancellable: you can unsubscribe
  • not streams, but can create streams

Most importantly, observerables are just functions that tie a producer to a consumer and return a cancellation function.

For example, the idea of observable comes from the following:

version #1: idea

// Note: the obsrver object must implement a next() function
const interval = (observer) => {
   let i = 0;
   const id = setInterval(() => observer.next(i++), 1000);
   return () => clearInterval(id);
};

const unsubscribe = interval({
   next(x) { console.log(x); }
});

// some time later...
unsubscribe();

Above code can be changed to the following use Observable:

version #2: real code

const interval = new Observable((observer) => {
   let i = 0;
   const id = setInterval(() => observer.next(i++), 1000);
   return () => clearInterval(id);
});

const subscription = interval.subscribe({
   next(x) { console.log(x); }
});

// some time later...
subscription.unsubscribe();

Operators


Operators:

  • transform streams into new streams
  • subscribe to a source stream
  • transform the values in some way
  • return those values in a new stream

Most importantly, operators are just functions which return an observable that subscribes to another observable and (generally) join their subscriptions.

The above definition can also be read as: operators are just functions which return a function that takes an observer and returns a cancellation function, that calls another function that takes an observer and returns a cancellation function.

Idea comes from the following:

version #1: idea

const map = (source, projection) => {
   // note: a function takes an observer is an observable
   return (observer) => {
   
      // subscribe to the source observable, and return the subscription
      return source({
      
         // look at the implementation, it seems apply 'map' on an observable is 
         // not chaining that observable;
         // but chaining the observer.
         next(value) { observer.next(projection(value)); }
      });
   };
};

const mapped = map(interval, (x) => x + '!!!');

const unsubscribe = mapped({
   next(x) { console.log(x); }
});

Above code can be changed to the following use Observable:

version #2: idea

const map = (source, projection) => {
   return new Observable((observer) => {
      return source.subscribe({
         next(value) { observer.next(projection(value)); }
      });
   });
};

const mapped = map(interval, (x) => x + '!!!');

const subscription = mapped.subscribe({
   next(x) { console.log(x); }
});

However, if you want to chaining the map, the result will become really gross.

const mapped = map(map(interval, (x) => x + '!!!'), (y) => y + '?');

Wouldn’t be better if we can chain map together like this:

const mapped = interval.map((x) => x + '!!!').map((y) => y + '?');

That means we need to also change the implementation of the map. This is the final implementation:

version #3: real code

Observable.prototype.map = (projection) => {
   return new Observable((observer) => {
   
      // now, source become 'this'
      return this.subscribe({
         next(value) { observer.next(projection(value)); }
      });
   });
};

const mapped = interval.map((x) => x + '!!!').map((y) => y + '?');

Common Operators


  • map
  • filter
  • reduce
  • scan
  • merge
  • mergeAll
  • concat
  • switch
  • mergeMap
  • concatMap
  • switchMap
  • zip
  • combineLatest
  • withLatestFrom
  • expand
map

const mapped = source.map(x => x + x);

// source 
------1-----2----3---4---|

// mapped
------2-----4----6---8---|

// Note:
// dash '-' stands for time you subscribe but noting occur
// number means get a value from the source
// pipe '|' means unsubscribe 
IMPORTANT: What if an observerable returns MORE observables?

const mapped = source.map(x => http.get(x));

// source
------------1-----------2---------3-------4----|

// mapped
------------?-----------?---------?-------?----|

// Observable<Observable<HttpResponse>>

// If we subscribe directly to the mapped one,
// we find that the result we will get is Observable<HttpResponse> 
// which is not useful at all.
// we want HttpResponse, WITHOUT the Observable wrapper.

Notice: in most case, when we have observable of observables, it’s likely that the return value of the observable of observables is what we are looking for. Thus, once we see an observable that returns a list of observables in a sequence. We want to merge those observables back to the source observable.

Basic Merging Strategies: merge, concat, switch

mergeAll

IMMEDIATELY merge all observables generated by source, terminate when all observables and the source observable are complete

const result = observables.mergeAll();

// observable of observables
---------A-------B------C-----D-----|
                                    ^ source complete but result not done, because C is not done yet

// A     ---a-----a------a--|
         ^ subscribe to A immediately
                            ^ unsubscribe A
            ^ A return me a value 'a'
                     
// B             ---b------b----b-|
                 ^ subscribe to B immediately
                                  ^ unsubscribe B
// C                    ----c------c-----|
// D                          ---d-|

// result
------------a-----a-b----a-bc---bd-c-----|

merge strategy:

  • will subscribe to all observables
  • and forward all of their values
  • until all observables are complete (including the source observable)
concatAll

merge all observables generated by source ONE at a time, terminate when the last one complete

const result = observables.concatAll();

// observable of observables
---------A-------B-----|

// A     -----a-----a-----a--|
         ^ subscribe to A immediately
                 ^ meet B but just stack it without subscribing it 
                 
// B                         --------b-----b--b--|
                             ^ A is done, start subscribe on B

// result
--------------a-----a-----a----------b-----b--b--| 

concat strategy:

  • subscribe to all observables, but ONE at a time
  • other observables are waiting in queue, will subscribe once the active one is doen
  • does not complate untill all observable and the source are complete
switch (the most useful one)

once the source observable generates a new observable, throw away the old one, and switch to the new one

usage: throw away useless ajax request

const result = observables.switch();

// observable of observables
-----------A----------B--------C-------|

// A       ----a----a---a---a---a----|
           ^ subscribe A
                      ^ unsubscribe A because B comes
                      
// B                  ----b---b---b-|
                      ^ subscribe B
// C                           ---c---c-----c--|

// result
---------------a----a-----b---b---c---c-----c--|

switch strategy:

  • subscribe to each observable as SOON as it arrives, but only ONE subscription at a time
  • if one arrives while another is active, the active subscription is unsubscribed and thrown away.
Mapping and Merging

// notice observable.map will return Observable<Observable<X>>

observable.map(x => ???).mergeAll()    <->    observable.mergeMap(x => ???)

observable.map(x => ???).concatAll()   <->    observable.concatMap(x => ???)

observable.map(x => ???).switch()      <->    observable.switchMap(x => ???)

Notice: flatMap is just mergeMap

‘Hot’ vs ‘Cold’ Observables


Hot vs Cold Observables” just means “Active vs Inactive Producers”.

‘Cold’ Observable Example

Before you subscribe, the Producer(WebSoceket) isn’t active.

const coldObservable = new Observable(observer => {
   const socket = new WebSocket('ws://someserver');
   socket.onmessage = (e) => observer.next(e.data);
   socket.onerr = (e) => observer.err(e);
   socket.onclose = () => observer.complete();
   return () => socket.close();
});

If you create 3 of above observables, it will create 3 websockets.

‘Hot’ Observable Example

Before you subscribe, the Producer(WebSoceket) IS active.

// Just simple activate the producer
const socket = new WebSocket('ws://someserver');

const hotObservable = new Observable(observer => {
   socket.onmessage = (e) => observer.next(e.data);
   socket.onerr = (e) => observer.err(e);
   socket.onclose = () => observer.complete();
   return () => socket.close(); // <= But, there's a problem
});

Problem: since before subscribe, the WebSoceket is already active. If you create 3 of above observables, the first one unsubscribe will terminate the websocket.

Hot to make an Observable “Hot”

function makeHot(coldObservable) {
   // subject here acts like a wire
   const subject = new Subject();
   
   // coldObserver is now the producer activated(via subscribe) 
   // outside of the rturned Observable
   coldObservable.subscribe(subject); 

   
   return new Observable(observer => {
      return subject.subscribe(observer);
   });
}
Publish/Connect - make observable “hot” but don’t do it right away, do it after ‘connect’

function publish(coldObservable) {
   const subject = new Subject();
   const published = new Observable(observer => {
      return subject.subscribe(observer);
   });
   
   publish.connect = () => coldObservable.subscribe(subject);
   return published;
}

const foo = publish(source);

foo.subscribe(x => console.log(x));

// until you call 'connect', the observable is still cold
foo.connect();