messagebroker - How do I make my ActiveMQ broker drop offline durable subscribers -


we have activemq broker that's connected different clients using jms, amqp, , mqtt. reason haven't figured out yet specific set of mqtt clients (not always) subscribes durably. test environment clients added , removed quite often, latter pulling plug or rebooting embedded device, cannot unsubscribe. effect (iiuc) broker piles "offline durable subscription" devices might never see again (i can see these under http://my_broker:8161/admin/subscribers.jsp), keeping messages on topics forever, until breaks down under own memory footprint.

the issue @ hand here subscribers subscribe durably, , need find out why that's case. however, decided clients doing (unwittingly) shouldn't bring broker grinding halt, need solve problem independently.

i have found there settings timeout offline durable subscriptions , put our broker configuration (last 2 lines):

<broker   xmlns="http://activemq.apache.org/schema/core"    brokername="my_broker"   datadirectory="${activemq.data}"    usejmx="true"   advisorysupport="false"    persistent="false"   offlinedurablesubscribertimeout="1800000"   offlinedurablesubscribertaskschedule="60000"> 

if understand correctly, above should check every minute , dismiss clients hasn't seen half hour. however, contrary docs, doesn't seem work: consumer had subscribe , pulled plug on days ago still visible in list of offline durable subscribers, broker's memory footprint increasing, , if delete subscribers manually in broker's web interface can see memory footprint going down.

so here's questions:

  1. what determines whether mqtt subscription topic on activemq broker durable?
  2. what doing wrong in setting timeout dropping offline durably subscriptions in activemq settings?

i extracted relevant code (docleanup()) removes timed out durable subscriptions.

in success case, executes:

    log.info("destroying durable subscriber due inactivity: {}", sub); 

in failure case, executes:

    log.error("failed remove inactive durable subscriber", e); 

look above log line in log file , match details observed using admin/subscribers.jsp viewer. if doesn't print of lines, subscriptions might remaining active reason or may have stumbled bug.

also, try remove underscore (_) in broker name if can? manual talks problems underscores in broker names.

code:

public topicregion(regionbroker broker, destinationstatistics destinationstatistics, systemusage memorymanager, taskrunnerfactory taskrunnerfactory, destinationfactory destinationfactory) {    super(broker, destinationstatistics, memorymanager, taskrunnerfactory, destinationfactory);    if (broker.getbrokerservice().getofflinedurablesubscribertaskschedule() != -1 && broker.getbrokerservice().getofflinedurablesubscribertimeout() != -1) {       this.cleanuptimer = new timer("activemq durable subscriber cleanup timer", true);       this.cleanuptask = new timertask() {          @override          public void run() {             docleanup();          }       };       this.cleanuptimer.schedule(cleanuptask, broker.getbrokerservice().getofflinedurablesubscribertaskschedule(),broker.getbrokerservice().getofflinedurablesubscribertaskschedule());    } }  public void docleanup() {    long = system.currenttimemillis();    (map.entry<subscriptionkey, durabletopicsubscription> entry : durablesubscriptions.entryset()) {       durabletopicsubscription sub = entry.getvalue();       if (!sub.isactive()) {          long offline = sub.getofflinetimestamp();          if (offline != -1 && - offline >= broker.getbrokerservice().getofflinedurablesubscribertimeout()) {             log.info("destroying durable subscriber due inactivity: {}", sub);             try {                removesubscriptioninfo info = new removesubscriptioninfo();                info.setclientid(entry.getkey().getclientid());                info.setsubscriptionname(entry.getkey().getsubscriptionname());                connectioncontext context = new connectioncontext();                context.setbroker(broker);                context.setclientid(entry.getkey().getclientid());                removesubscription(context, info);             } catch (exception e) {                log.error("failed remove inactive durable subscriber", e);             }          }       }    } }  // tostring method durabletopicsubscription class @override public synchronized string tostring() {     return "durabletopicsubscription-" + getsubscriptionkey() + ", id=" + info.getconsumerid() + ", active=" + isactive() + ", destinations=" + durabledestinations.size() + ", total=" + getsubscriptionstatistics().getenqueues().getcount() + ", pending=" + getpendingqueuesize() + ", dispatched=" + getsubscriptionstatistics().getdispatched().getcount() + ", inflight=" + dispatched.size() + ", prefetchextension=" + getprefetchextension(); } 

Comments

Popular posts from this blog

asynchronous - C# WinSCP .NET assembly: How to upload multiple files asynchronously -

aws api gateway - SerializationException in posting new Records via Dynamodb Proxy Service in API -

asp.net - Problems sending emails from forum -