messagebroker - How do I make my ActiveMQ broker drop offline durable subscribers -
we have activemq broker that's connected different clients using jms, amqp, , mqtt. reason haven't figured out yet specific set of mqtt clients (not always) subscribes durably. test environment clients added , removed quite often, latter pulling plug or rebooting embedded device, cannot unsubscribe. effect (iiuc) broker piles "offline durable subscription" devices might never see again (i can see these under http://my_broker:8161/admin/subscribers.jsp), keeping messages on topics forever, until breaks down under own memory footprint.
the issue @ hand here subscribers subscribe durably, , need find out why that's case. however, decided clients doing (unwittingly) shouldn't bring broker grinding halt, need solve problem independently.
i have found there settings timeout offline durable subscriptions , put our broker configuration (last 2 lines):
<broker xmlns="http://activemq.apache.org/schema/core" brokername="my_broker" datadirectory="${activemq.data}" usejmx="true" advisorysupport="false" persistent="false" offlinedurablesubscribertimeout="1800000" offlinedurablesubscribertaskschedule="60000">
if understand correctly, above should check every minute , dismiss clients hasn't seen half hour. however, contrary docs, doesn't seem work: consumer had subscribe , pulled plug on days ago still visible in list of offline durable subscribers, broker's memory footprint increasing, , if delete subscribers manually in broker's web interface can see memory footprint going down.
so here's questions:
- what determines whether mqtt subscription topic on activemq broker durable?
- what doing wrong in setting timeout dropping offline durably subscriptions in activemq settings?
i extracted relevant code (docleanup()
) removes timed out durable subscriptions.
in success case, executes:
log.info("destroying durable subscriber due inactivity: {}", sub);
in failure case, executes:
log.error("failed remove inactive durable subscriber", e);
look above log line in log file , match details observed using admin/subscribers.jsp viewer. if doesn't print of lines, subscriptions might remaining active
reason or may have stumbled bug.
also, try remove underscore (_) in broker name if can? manual talks problems underscores in broker names.
code:
public topicregion(regionbroker broker, destinationstatistics destinationstatistics, systemusage memorymanager, taskrunnerfactory taskrunnerfactory, destinationfactory destinationfactory) { super(broker, destinationstatistics, memorymanager, taskrunnerfactory, destinationfactory); if (broker.getbrokerservice().getofflinedurablesubscribertaskschedule() != -1 && broker.getbrokerservice().getofflinedurablesubscribertimeout() != -1) { this.cleanuptimer = new timer("activemq durable subscriber cleanup timer", true); this.cleanuptask = new timertask() { @override public void run() { docleanup(); } }; this.cleanuptimer.schedule(cleanuptask, broker.getbrokerservice().getofflinedurablesubscribertaskschedule(),broker.getbrokerservice().getofflinedurablesubscribertaskschedule()); } } public void docleanup() { long = system.currenttimemillis(); (map.entry<subscriptionkey, durabletopicsubscription> entry : durablesubscriptions.entryset()) { durabletopicsubscription sub = entry.getvalue(); if (!sub.isactive()) { long offline = sub.getofflinetimestamp(); if (offline != -1 && - offline >= broker.getbrokerservice().getofflinedurablesubscribertimeout()) { log.info("destroying durable subscriber due inactivity: {}", sub); try { removesubscriptioninfo info = new removesubscriptioninfo(); info.setclientid(entry.getkey().getclientid()); info.setsubscriptionname(entry.getkey().getsubscriptionname()); connectioncontext context = new connectioncontext(); context.setbroker(broker); context.setclientid(entry.getkey().getclientid()); removesubscription(context, info); } catch (exception e) { log.error("failed remove inactive durable subscriber", e); } } } } } // tostring method durabletopicsubscription class @override public synchronized string tostring() { return "durabletopicsubscription-" + getsubscriptionkey() + ", id=" + info.getconsumerid() + ", active=" + isactive() + ", destinations=" + durabledestinations.size() + ", total=" + getsubscriptionstatistics().getenqueues().getcount() + ", pending=" + getpendingqueuesize() + ", dispatched=" + getsubscriptionstatistics().getdispatched().getcount() + ", inflight=" + dispatched.size() + ", prefetchextension=" + getprefetchextension(); }
Comments
Post a Comment