typescript - On error swap to another observable after atleast one successful response -
i'm using observables of rxjs communicate device accessible on http. http-calls started before device online, therefore calls can fail @ beginning. @ point device online, update started , progress polled periodically. @ end of update device has restart , afterwards provide progress on different url.
now want map behaviour rxjs:
successfulcall: boolean = false; testget() { return observable.timer(0, 1000) .concatmap(() => this.http.get(this.url)) .map(response => { let result: string = string.fromcharcode.apply(null, new uint16array(response.arraybuffer())); this.successfulcall = true; return result; }) .timeout(2000) .retrywhen(errors => { if (this.successfulcall) { return observable.throw("offline"); } else { return errors.delay(2000); } }) .onerrorresumenext(this.observabletwo);
my idea use retrywhen
switch, can chose other observable. second observable never started, first 1 called. dependent on device status (online or offline) either receive timeouts or valid answers , calls continue. know exception in normal control flow not pretty idea.
i made little flowchart visualize idea, because imagine description of problem complicated.
wassuccessful have initialized false , processing of observable results not shown
also other (maybe prettier/better) approaches okay, want solve problem , not have insist on approch.
so want poll remote device until comes online , after initial bootup has succeeded continue update process after goes offline , want start polling again.
// mock implementation of polling until succes var attempts = 0; function startup() { attempts++; if(attempts < 10) { return rx.observable.throw(new error('not booted yet')); } attempts = 0; return rx.observable.of('remote device online'); } const bootuppollingsource = rx.observable.of('') .concatmap(_ => rx.observable.defer(() => startup()) .do(null, err => console.log('[debug] bootup err:' + err.message)) .retrywhen(attempts => attempts.delay(100)) ); const regularoperationssource = rx.observable.from(['starting update','...','...']) .concat(rx.observable.throw(new error('device rebooting'))) .catch(err => bootuppollingsource) bootuppollingsource .concat(regularoperationssource) .subscribe(console.log, console.log, () => console.log('completed'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/rx.js"></script>
Comments
Post a Comment