javascript - buffering multiple subscribers to rx.js observable -
i have
var subject = new rx.subject(); var stream = rx.observable.fromevent(blah, 'event') .filter(blah) .map(blah) .subscribe(subject); return subject; then pass subject several different handlers going process event in different ways , @ different speeds.
have in each handler
subject.subscribe(async function (x) { const func = self[x.eventname]; if (func) { await eventhandlerwrapper(self.handlername, func, x); } }) i have 2 questions, a) if events come in super fast handler going process them synchronously , in right order given way have it? , b) if different handlers handle event @ different speeds going wait till slowest handler through before next event provided? or sort of buffer , handle @ they're own pace?
thank you, r
first of all, creation of subject can simplified this:
const subject = rx.observable.fromevent(blah, 'event') .filter(blah) .map(blah) .share(); the share method create subject stream. if return subject instance every subscriber, you'll same behaviour , looks better.
a) if events come in super fast handler going process them synchronously , in right order given way have it? events going pushed through entire chain 1 one , in correct order. meaning, event comes in through 'fromevent' pushed through entire chain until point subscribed it, before handling next value (unless there's async operator in between :)). ben lesh explained @ angular connect 2015: https://www.youtube.com/watch?v=koot7barvhq (you can watch whole talk it's around min 17 compares arrays observables).
b) if different handlers handle event @ different speeds going wait till slowest handler through before next event provided? or sort of buffer , handle @ they're own pace? they handle events @ own pace. check following example:
let interval$ = rx.observable.interval(1000).share(); interval$.concatmap((val) => { console.log('called'); return rx.observable.of(val).delay(3000) }) .subscribe((val) => console.log("slow ", val)); interval$.subscribe((val) => console.log("fast ", val)); here use interval observable convert subject. send out event every second. have 1 subscription taking value, handling value (which takes 2seconds) , taking next 1 (with concatmap). , subscription processes them immediately. if run code (jsbin here: https://jsbin.com/zekalab/edit?js,console), you'll see both handle events @ own pace.
so not wait slowest handler , buffered internally.
the situation describing have potentially dangerous situation if slowest processor slower frequency events thrown. in case, buffer keep growing , application crash. concept called pressure. getting events faster processing them. in case, need use operators 'buffer' or 'window' on slowest processors avoid situation.
Comments
Post a Comment