android - PublishSubject subscriber is not receiving events -


i have class viewmodel exposes publishsubject binder.

viewmodel

public class viewmodel {      private publishsubject<actionsevent> binder = publishsubject.create();     private service service = createservice();      @override     public observable<actionsevent> getbinder() {         return binder.doonsubscribe(initialize());     }      private action0 initialize() {         return new action0() {             @override             public void call() {                 service.getactions().subscribe(new action1<action>() {                     @override                     public void call(action action) {                         log.d(tag, "so far, good");                         binder.onnext(new actionfetchedevent(action));                     }                 });             }         };     }  } 

and in activity, subscribe action executed when each event fetched.

activity

public class myactivity extends activity {      @override     public void oncreate(bundle savedinstance) {         //more code         viewmodel.getbinder().subscribe(new action1<actionsevent>() {             @override             public void call(actionsevent event) {                 log.d(tag, "this not printed!!");                 paintactioninuserinterface(event.getaction());             }         });     } } 

service

public interface actionsservice {     @get("/actions")     observable<action> getactions(); //performs http request retrofit } 

actionfetchedevent

public class actionfetchedevent implements actionsevent {      private action action;      //getters , setters  } 

but subscriber doesn't receive event. why?

it because not create subject .create() factory-method, , onsubscribe called before callback of subscription, subscribe late , miss element. use bahavioursubject, replay last element, if subscribe.

could please tell want achieve, because think compose observables in way better way, subscribing , posting onnext onto subject.

please have @ example. use rxjava2 environment.

public class viewmodeltest {     class actionsevent {     }      class actionfetchedevent extends actionsevent {         public actionfetchedevent(actionsevent actionevent) {          }     }      interface service {         public observable<actionsevent> getactions();     }      class myviewmodel {         private behaviorsubject<actionsevent> binder;          private service service;          public myviewmodel(service service) {             this.service = service;             this.binder = behaviorsubject.create();         }          public observable<actionsevent> getbinder() {             return binder.doonsubscribe(disposable -> {                 service.getactions().subscribe(action -> {                             binder.onnext(new actionfetchedevent(action));                         }                 );             });         }     }      @test     public void name() throws exception {         service mock = mock(service.class);          myviewmodel viewmodel = new myviewmodel(mock);          when(mock.getactions()).thenanswer(invocation -> {             return observable.just(new actionsevent());         });          testobserver<actionsevent> test = viewmodel.getbinder().test();          test.assertvaluecount(1);     } } 

Comments

Popular posts from this blog

asynchronous - C# WinSCP .NET assembly: How to upload multiple files asynchronously -

aws api gateway - SerializationException in posting new Records via Dynamodb Proxy Service in API -

asp.net - Problems sending emails from forum -