spring boot - Async and shared rabbit template -
i have queue config below
@bean public connectionfactory connectionfactory() { cachingconnectionfactory connectionfactory = new cachingconnectionfactory(hostname); connectionfactory.setusername(mqusername); connectionfactory.setpassword(mqpassword); connectionfactory.setvirtualhost(virtualhost); return connectionfactory; } @bean rabbittemplate rabbittemplate(connectionfactory connectionfactory) { rabbittemplate rabbittemplate = new rabbittemplate(connectionfactory); rabbittemplate.setmessageconverter(new jackson2jsonmessageconverter()); return rabbittemplate; } @bean public amqpadmin amqpadmin() { rabbitadmin rabbitadmin = new rabbitadmin(connectionfactory()); return rabbitadmin; } and have async config like
@enableasync @configuration public class asyncconfiguration implements asyncconfigurer { @override public executor getasyncexecutor() { return taskexector(); } @override public asyncuncaughtexceptionhandler getasyncuncaughtexceptionhandler() { return new simpleasyncuncaughtexceptionhandler(); } @bean public threadpooltaskexecutor taskexector() { threadpooltaskexecutor taskexecutor = new threadpooltaskexecutor(); taskexecutor.setcorepoolsize(10); taskexecutor.setmaxpoolsize(10); taskexecutor.initialize(); return taskexecutor; } } and in async method using amqp admin , rabbit template bean. per configuration have 10 threads @ max executing tasks, have found after while application hangs , taking dump using actuator found below information, seems deadlock on use of rabbit template/amqp admin bean line number. wrong approach or how make sure multiple threads can access rabbit mq beans.
versions: spring boot 1.4.0.release, java 8.
my service this
@service public class qdispatcherservice implements dispatcherservice { private final logger logger = loggerfactory.getlogger(this.getclass()); @autowired private amqpadmin amqpadmin; @autowired rabbittemplate rabbittemplate; @override public void senddata(data dataobject) throws exception { try { //something on properties , have check if queue exist or there messages in decide add message in other queue properties properties = amqpadmin.getqueueproperties(queuename); amqpadmin.declarequeue(new queue(queuename)); logger.info("***********************debug 4***********************"); rabbittemplate.convertandsend(queuename, dataobject); } catch (exception ex) { ex.printstacktrace(); } } } { "threadname": "taskexector-10", "threadid": 77, "blockedtime": -1, "blockedcount": 317, "waitedtime": -1, "waitedcount": 379, "lockname": "com.rabbitmq.utility.blockingvalueorexception@105f30b9", "lockownerid": -1, "lockownername": null, "innative": false, "suspended": false, "threadstate": "waiting", "stacktrace": [ { "methodname": "wait", "filename": "object.java", "linenumber": -2, "classname": "java.lang.object", "nativemethod": true }, { "methodname": "wait", "filename": "object.java", "linenumber": 502, "classname": "java.lang.object", "nativemethod": false }, { "methodname": "get", "filename": "blockingcell.java", "linenumber": 50, "classname": "com.rabbitmq.utility.blockingcell", "nativemethod": false }, { "methodname": "uninterruptibleget", "filename": "blockingcell.java", "linenumber": 89, "classname": "com.rabbitmq.utility.blockingcell", "nativemethod": false }, { "methodname": "uninterruptiblegetvalue", "filename": "blockingvalueorexception.java", "linenumber": 33, "classname": "com.rabbitmq.utility.blockingvalueorexception", "nativemethod": false }, { "methodname": "getreply", "filename": "amqchannel.java", "linenumber": 361, "classname": "com.rabbitmq.client.impl.amqchannel$blockingrpccontinuation", "nativemethod": false }, { "methodname": "privaterpc", "filename": "amqchannel.java", "linenumber": 226, "classname": "com.rabbitmq.client.impl.amqchannel", "nativemethod": false }, { "methodname": "exnwrappingrpc", "filename": "amqchannel.java", "linenumber": 118, "classname": "com.rabbitmq.client.impl.amqchannel", "nativemethod": false }, { "methodname": "queuedeclare", "filename": "channeln.java", "linenumber": 844, "classname": "com.rabbitmq.client.impl.channeln", "nativemethod": false }, { "methodname": "queuedeclare", "filename": "channeln.java", "linenumber": 61, "classname": "com.rabbitmq.client.impl.channeln", "nativemethod": false }, { "methodname": "invoke", "filename": null, "linenumber": -1, "classname": "sun.reflect.generatedmethodaccessor176", "nativemethod": false }, { "methodname": "invoke", "filename": "delegatingmethodaccessorimpl.java", "linenumber": 43, "classname": "sun.reflect.delegatingmethodaccessorimpl", "nativemethod": false }, { "methodname": "invoke", "filename": "method.java", "linenumber": 498, "classname": "java.lang.reflect.method", "nativemethod": false }, { "methodname": "invoke", "filename": "cachingconnectionfactory.java", "linenumber": 916, "classname": "org.springframework.amqp.rabbit.connection.cachingconnectionfactory$cachedchannelinvocationhandler", "nativemethod": false }, { "methodname": "queuedeclare", "filename": null, "linenumber": -1, "classname": "com.sun.proxy.$proxy166", "nativemethod": false }, { "methodname": "declarequeues", "filename": "rabbitadmin.java", "linenumber": 577, "classname": "org.springframework.amqp.rabbit.core.rabbitadmin", "nativemethod": false }, { "methodname": "access$200", "filename": "rabbitadmin.java", "linenumber": 67, "classname": "org.springframework.amqp.rabbit.core.rabbitadmin", "nativemethod": false }, { "methodname": "doinrabbit", "filename": "rabbitadmin.java", "linenumber": 209, "classname": "org.springframework.amqp.rabbit.core.rabbitadmin$3", "nativemethod": false }, { "methodname": "doinrabbit", "filename": "rabbitadmin.java", "linenumber": 206, "classname": "org.springframework.amqp.rabbit.core.rabbitadmin$3", "nativemethod": false }, { "methodname": "doexecute", "filename": "rabbittemplate.java", "linenumber": 1394, "classname": "org.springframework.amqp.rabbit.core.rabbittemplate", "nativemethod": false }, { "methodname": "execute", "filename": "rabbittemplate.java", "linenumber": 1367, "classname": "org.springframework.amqp.rabbit.core.rabbittemplate", "nativemethod": false }, { "methodname": "execute", "filename": "rabbittemplate.java", "linenumber": 1343, "classname": "org.springframework.amqp.rabbit.core.rabbittemplate", "nativemethod": false }, { "methodname": "declarequeue", "filename": "rabbitadmin.java", "linenumber": 206, "classname": "org.springframework.amqp.rabbit.core.rabbitadmin", "nativemethod": false }, { "methodname": "senddata", "filename": "qdispatcherservice.java", "linenumber": 59, "classname": "com.mycompany.qdispatcherservice", "nativemethod": false }, .... "lockedmonitors": [ { "classname": "java.lang.object", "identityhashcode": 285810320, "lockedstackframe": { "methodname": "invoke", "filename": "cachingconnectionfactory.java", "linenumber": 916, "classname": "org.springframework.amqp.rabbit.connection.cachingconnectionfactory$cachedchannelinvocationhandler", "nativemethod": false }, "lockedstackdepth": 13 } ], "lockedsynchronizers": [ { "classname": "java.util.concurrent.threadpoolexecutor$worker", "identityhashcode": 372417558 } ], "lockinfo": { "classname": "com.rabbitmq.utility.blockingvalueorexception", "identityhashcode": 274673849 } }, ________________________________________________________________________-
new trace
{ "threadname": "taskexector-10", "threadid": 77, "blockedtime": -1, "blockedcount": 37, "waitedtime": -1, "waitedcount": 1113, "lockname": "java.io.dataoutputstream@33111fc", "lockownerid": 65, "lockownername": "taskexector-8", "innative": false, "suspended": false, "threadstate": "blocked", "stacktrace": [ { "methodname": "writeframe", "filename": "socketframehandler.java", "linenumber": 170, "classname": "com.rabbitmq.client.impl.socketframehandler", "nativemethod": false }, { "methodname": "writeframe", "filename": "amqconnection.java", "linenumber": 542, "classname": "com.rabbitmq.client.impl.amqconnection", "nativemethod": false }, { "methodname": "transmit", "filename": "amqcommand.java", "linenumber": 104, "classname": "com.rabbitmq.client.impl.amqcommand", "nativemethod": false }, { "methodname": "quiescingtransmit", "filename": "amqchannel.java", "linenumber": 337, "classname": "com.rabbitmq.client.impl.amqchannel", "nativemethod": false }, { "methodname": "transmit", "filename": "amqchannel.java", "linenumber": 313, "classname": "com.rabbitmq.client.impl.amqchannel", "nativemethod": false }, { "methodname": "basicpublish", "filename": "channeln.java", "linenumber": 686, "classname": "com.rabbitmq.client.impl.channeln", "nativemethod": false }, { "methodname": "basicpublish", "filename": "channeln.java", "linenumber": 668, "classname": "com.rabbitmq.client.impl.channeln", "nativemethod": false }, { "methodname": "invoke", "filename": null, "linenumber": -1, "classname": "sun.reflect.generatedmethodaccessor176", "nativemethod": false }, { "methodname": "invoke", "filename": "delegatingmethodaccessorimpl.java", "linenumber": 43, "classname": "sun.reflect.delegatingmethodaccessorimpl", "nativemethod": false }, { "methodname": "invoke", "filename": "method.java", "linenumber": 498, "classname": "java.lang.reflect.method", "nativemethod": false }, { "methodname": "invoke", "filename": "cachingconnectionfactory.java", "linenumber": 916, "classname": "org.springframework.amqp.rabbit.connection.cachingconnectionfactory$cachedchannelinvocationhandler", "nativemethod": false }, { "methodname": "basicpublish", "filename": null, "linenumber": -1, "classname": "com.sun.proxy.$proxy166", "nativemethod": false }, { "methodname": "dosend", "filename": "rabbittemplate.java", "linenumber": 1451, "classname": "org.springframework.amqp.rabbit.core.rabbittemplate", "nativemethod": false }, { "methodname": "doinrabbit", "filename": "rabbittemplate.java", "linenumber": 703, "classname": "org.springframework.amqp.rabbit.core.rabbittemplate$3", "nativemethod": false }, { "methodname": "doexecute", "filename": "rabbittemplate.java", "linenumber": 1394, "classname": "org.springframework.amqp.rabbit.core.rabbittemplate", "nativemethod": false }, { "methodname": "execute", "filename": "rabbittemplate.java", "linenumber": 1367, "classname": "org.springframework.amqp.rabbit.core.rabbittemplate", "nativemethod": false }, { "methodname": "send", "filename": "rabbittemplate.java", "linenumber": 699, "classname": "org.springframework.amqp.rabbit.core.rabbittemplate", "nativemethod": false }, { "methodname": "convertandsend", "filename": "rabbittemplate.java", "linenumber": 767, "classname": "org.springframework.amqp.rabbit.core.rabbittemplate", "nativemethod": false }, { "methodname": "convertandsend", "filename": "rabbittemplate.java", "linenumber": 754, "classname": "org.springframework.amqp.rabbit.core.rabbittemplate", "nativemethod": false }
it's bit unusual declare queue on every send; said, apart inefficiency should work. stack trace indicates stuck in rabbit client waiting reply queue declaration. have seen when more 1 thread using same channel, should never happen here because channel returned cache when current operation (admin, template) completes , can't used multiple threads.
you might want try new 4.0.0.rc1 amqp-client jar because have added logging (which not available in 3.x.x client). might track things down.
Comments
Post a Comment