javascript - buffering multiple subscribers to rx.js observable -


i have

  var subject = new rx.subject();   var stream =     rx.observable.fromevent(blah, 'event')                   .filter(blah)                   .map(blah)                   .subscribe(subject);                  return subject; 

then pass subject several different handlers going process event in different ways , @ different speeds.
have in each handler

subject.subscribe(async function (x) {         const func = self[x.eventname];         if (func) {           await eventhandlerwrapper(self.handlername, func, x);         }       }) 

i have 2 questions, a) if events come in super fast handler going process them synchronously , in right order given way have it? , b) if different handlers handle event @ different speeds going wait till slowest handler through before next event provided? or sort of buffer , handle @ they're own pace?

thank you, r

first of all, creation of subject can simplified this:

const subject = rx.observable.fromevent(blah, 'event')               .filter(blah)               .map(blah)               .share(); 

the share method create subject stream. if return subject instance every subscriber, you'll same behaviour , looks better.

 a) if events come in super fast handler going process  them synchronously , in right order given way have it? 

events going pushed through entire chain 1 one , in correct order. meaning, event comes in through 'fromevent' pushed through entire chain until point subscribed it, before handling next value (unless there's async operator in between :)). ben lesh explained @ angular connect 2015: https://www.youtube.com/watch?v=koot7barvhq (you can watch whole talk it's around min 17 compares arrays observables).

b) if different handlers handle event @ different speeds  going wait till slowest handler through before     next event provided? or sort of buffer , handle @   they're own pace? 

they handle events @ own pace. check following example:

let interval$ = rx.observable.interval(1000).share();  interval$.concatmap((val) => {     console.log('called');     return rx.observable.of(val).delay(3000)   })   .subscribe((val) => console.log("slow ", val));  interval$.subscribe((val) => console.log("fast ", val)); 

here use interval observable convert subject. send out event every second. have 1 subscription taking value, handling value (which takes 2seconds) , taking next 1 (with concatmap). , subscription processes them immediately. if run code (jsbin here: https://jsbin.com/zekalab/edit?js,console), you'll see both handle events @ own pace.

so not wait slowest handler , buffered internally.

the situation describing have potentially dangerous situation if slowest processor slower frequency events thrown. in case, buffer keep growing , application crash. concept called pressure. getting events faster processing them. in case, need use operators 'buffer' or 'window' on slowest processors avoid situation.


Comments

Popular posts from this blog

sql server - Cannot query correctly (MSSQL - PHP - JSON) -

php - trouble displaying mysqli database results in correct order -

C++ Linked List -