typescript - On error swap to another observable after atleast one successful response -


i'm using observables of rxjs communicate device accessible on http. http-calls started before device online, therefore calls can fail @ beginning. @ point device online, update started , progress polled periodically. @ end of update device has restart , afterwards provide progress on different url.

now want map behaviour rxjs:

successfulcall: boolean = false;    testget() {     return observable.timer(0, 1000)       .concatmap(() => this.http.get(this.url))       .map(response => {         let result: string = string.fromcharcode.apply(null, new uint16array(response.arraybuffer()));         this.successfulcall = true;         return result;       })       .timeout(2000)       .retrywhen(errors => {          if (this.successfulcall) {           return observable.throw("offline");         }         else {           return errors.delay(2000);         }        })       .onerrorresumenext(this.observabletwo); 

my idea use retrywhen switch, can chose other observable. second observable never started, first 1 called. dependent on device status (online or offline) either receive timeouts or valid answers , calls continue. know exception in normal control flow not pretty idea.

i made little flowchart visualize idea, because imagine description of problem complicated.

wassuccessful have initialized false , processing of observable results not shown

flowchart of observable

also other (maybe prettier/better) approaches okay, want solve problem , not have insist on approch.

so want poll remote device until comes online , after initial bootup has succeeded continue update process after goes offline , want start polling again.

// mock implementation of polling until succes  var attempts = 0;  function startup() {    attempts++;    if(attempts < 10) {      return rx.observable.throw(new error('not booted yet'));    }    attempts = 0;    return rx.observable.of('remote device online');  }    const bootuppollingsource = rx.observable.of('')    .concatmap(_ =>        rx.observable.defer(() => startup())       .do(null, err => console.log('[debug] bootup err:' + err.message))       .retrywhen(attempts => attempts.delay(100))    );    const regularoperationssource = rx.observable.from(['starting update','...','...'])    .concat(rx.observable.throw(new error('device rebooting')))    .catch(err => bootuppollingsource)    bootuppollingsource    .concat(regularoperationssource)    .subscribe(console.log, console.log, () => console.log('completed'));
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/rx.js"></script>


Comments

Popular posts from this blog

aws api gateway - SerializationException in posting new Records via Dynamodb Proxy Service in API -

depending on nth recurrence of job in control M -

asp.net - Problems sending emails from forum -