spring boot - Async and shared rabbit template -


i have queue config below

 @bean    public connectionfactory connectionfactory() {    cachingconnectionfactory connectionfactory = new            cachingconnectionfactory(hostname);     connectionfactory.setusername(mqusername);     connectionfactory.setpassword(mqpassword);     connectionfactory.setvirtualhost(virtualhost);     return connectionfactory;    }     @bean    rabbittemplate rabbittemplate(connectionfactory connectionfactory) {     rabbittemplate rabbittemplate = new rabbittemplate(connectionfactory);     rabbittemplate.setmessageconverter(new jackson2jsonmessageconverter());     return rabbittemplate;    }    @bean   public amqpadmin amqpadmin() {     rabbitadmin rabbitadmin = new rabbitadmin(connectionfactory());     return rabbitadmin;   } 

and have async config like

@enableasync @configuration public class asyncconfiguration implements asyncconfigurer {      @override     public executor getasyncexecutor() {         return taskexector();     }      @override     public asyncuncaughtexceptionhandler getasyncuncaughtexceptionhandler() {         return new simpleasyncuncaughtexceptionhandler();     }      @bean     public threadpooltaskexecutor taskexector() {         threadpooltaskexecutor taskexecutor = new threadpooltaskexecutor();         taskexecutor.setcorepoolsize(10);         taskexecutor.setmaxpoolsize(10);         taskexecutor.initialize();         return taskexecutor;     }  }        

and in async method using amqp admin , rabbit template bean. per configuration have 10 threads @ max executing tasks, have found after while application hangs , taking dump using actuator found below information, seems deadlock on use of rabbit template/amqp admin bean line number. wrong approach or how make sure multiple threads can access rabbit mq beans.

versions: spring boot 1.4.0.release, java 8.

my service this

@service public class qdispatcherservice implements dispatcherservice {     private final logger logger = loggerfactory.getlogger(this.getclass());      @autowired     private amqpadmin amqpadmin;       @autowired     rabbittemplate rabbittemplate;      @override     public void senddata(data dataobject) throws exception {          try {             //something on properties , have check if queue exist or there messages in decide add message in other queue             properties properties = amqpadmin.getqueueproperties(queuename);             amqpadmin.declarequeue(new queue(queuename));              logger.info("***********************debug 4***********************");             rabbittemplate.convertandsend(queuename, dataobject);          } catch (exception ex) {             ex.printstacktrace();         }     }  } 

{     "threadname": "taskexector-10",     "threadid": 77,     "blockedtime": -1,     "blockedcount": 317,     "waitedtime": -1,     "waitedcount": 379,     "lockname": "com.rabbitmq.utility.blockingvalueorexception@105f30b9",     "lockownerid": -1,     "lockownername": null,     "innative": false,     "suspended": false,     "threadstate": "waiting",     "stacktrace": [       {         "methodname": "wait",         "filename": "object.java",         "linenumber": -2,         "classname": "java.lang.object",         "nativemethod": true       },       {         "methodname": "wait",         "filename": "object.java",         "linenumber": 502,         "classname": "java.lang.object",         "nativemethod": false       },       {         "methodname": "get",         "filename": "blockingcell.java",         "linenumber": 50,         "classname": "com.rabbitmq.utility.blockingcell",         "nativemethod": false       },       {         "methodname": "uninterruptibleget",         "filename": "blockingcell.java",         "linenumber": 89,         "classname": "com.rabbitmq.utility.blockingcell",         "nativemethod": false       },       {         "methodname": "uninterruptiblegetvalue",         "filename": "blockingvalueorexception.java",         "linenumber": 33,         "classname": "com.rabbitmq.utility.blockingvalueorexception",         "nativemethod": false       },       {         "methodname": "getreply",         "filename": "amqchannel.java",         "linenumber": 361,         "classname": "com.rabbitmq.client.impl.amqchannel$blockingrpccontinuation",         "nativemethod": false       },       {         "methodname": "privaterpc",         "filename": "amqchannel.java",         "linenumber": 226,         "classname": "com.rabbitmq.client.impl.amqchannel",         "nativemethod": false       },       {         "methodname": "exnwrappingrpc",         "filename": "amqchannel.java",         "linenumber": 118,         "classname": "com.rabbitmq.client.impl.amqchannel",         "nativemethod": false       },       {         "methodname": "queuedeclare",         "filename": "channeln.java",         "linenumber": 844,         "classname": "com.rabbitmq.client.impl.channeln",         "nativemethod": false       },       {         "methodname": "queuedeclare",         "filename": "channeln.java",         "linenumber": 61,         "classname": "com.rabbitmq.client.impl.channeln",         "nativemethod": false       },       {         "methodname": "invoke",         "filename": null,         "linenumber": -1,         "classname": "sun.reflect.generatedmethodaccessor176",         "nativemethod": false       },       {         "methodname": "invoke",         "filename": "delegatingmethodaccessorimpl.java",         "linenumber": 43,         "classname": "sun.reflect.delegatingmethodaccessorimpl",         "nativemethod": false       },       {         "methodname": "invoke",         "filename": "method.java",         "linenumber": 498,         "classname": "java.lang.reflect.method",         "nativemethod": false       },       {         "methodname": "invoke",         "filename": "cachingconnectionfactory.java",         "linenumber": 916,         "classname": "org.springframework.amqp.rabbit.connection.cachingconnectionfactory$cachedchannelinvocationhandler",         "nativemethod": false       },       {         "methodname": "queuedeclare",         "filename": null,         "linenumber": -1,         "classname": "com.sun.proxy.$proxy166",         "nativemethod": false       },       {         "methodname": "declarequeues",         "filename": "rabbitadmin.java",         "linenumber": 577,         "classname": "org.springframework.amqp.rabbit.core.rabbitadmin",         "nativemethod": false       },       {         "methodname": "access$200",         "filename": "rabbitadmin.java",         "linenumber": 67,         "classname": "org.springframework.amqp.rabbit.core.rabbitadmin",         "nativemethod": false       },       {         "methodname": "doinrabbit",         "filename": "rabbitadmin.java",         "linenumber": 209,         "classname": "org.springframework.amqp.rabbit.core.rabbitadmin$3",         "nativemethod": false       },       {         "methodname": "doinrabbit",         "filename": "rabbitadmin.java",         "linenumber": 206,         "classname": "org.springframework.amqp.rabbit.core.rabbitadmin$3",         "nativemethod": false       },       {         "methodname": "doexecute",         "filename": "rabbittemplate.java",         "linenumber": 1394,         "classname": "org.springframework.amqp.rabbit.core.rabbittemplate",         "nativemethod": false       },       {         "methodname": "execute",         "filename": "rabbittemplate.java",         "linenumber": 1367,         "classname": "org.springframework.amqp.rabbit.core.rabbittemplate",         "nativemethod": false       },       {         "methodname": "execute",         "filename": "rabbittemplate.java",         "linenumber": 1343,         "classname": "org.springframework.amqp.rabbit.core.rabbittemplate",         "nativemethod": false       },       {         "methodname": "declarequeue",         "filename": "rabbitadmin.java",         "linenumber": 206,         "classname": "org.springframework.amqp.rabbit.core.rabbitadmin",         "nativemethod": false       },       {         "methodname": "senddata",         "filename": "qdispatcherservice.java",         "linenumber": 59,         "classname": "com.mycompany.qdispatcherservice",         "nativemethod": false       },        ....         "lockedmonitors": [       {         "classname": "java.lang.object",         "identityhashcode": 285810320,         "lockedstackframe": {           "methodname": "invoke",           "filename": "cachingconnectionfactory.java",           "linenumber": 916,           "classname": "org.springframework.amqp.rabbit.connection.cachingconnectionfactory$cachedchannelinvocationhandler",           "nativemethod": false         },         "lockedstackdepth": 13       }     ],     "lockedsynchronizers": [       {         "classname": "java.util.concurrent.threadpoolexecutor$worker",         "identityhashcode": 372417558       }     ],     "lockinfo": {       "classname": "com.rabbitmq.utility.blockingvalueorexception",       "identityhashcode": 274673849     }   }, 

________________________________________________________________________-

new trace

{ "threadname": "taskexector-10", "threadid": 77, "blockedtime": -1, "blockedcount": 37, "waitedtime": -1, "waitedcount": 1113, "lockname": "java.io.dataoutputstream@33111fc", "lockownerid": 65, "lockownername": "taskexector-8", "innative": false, "suspended": false, "threadstate": "blocked", "stacktrace": [   {     "methodname": "writeframe",     "filename": "socketframehandler.java",     "linenumber": 170,     "classname": "com.rabbitmq.client.impl.socketframehandler",     "nativemethod": false   },   {     "methodname": "writeframe",     "filename": "amqconnection.java",     "linenumber": 542,     "classname": "com.rabbitmq.client.impl.amqconnection",     "nativemethod": false   },   {     "methodname": "transmit",     "filename": "amqcommand.java",     "linenumber": 104,     "classname": "com.rabbitmq.client.impl.amqcommand",     "nativemethod": false   },   {     "methodname": "quiescingtransmit",     "filename": "amqchannel.java",     "linenumber": 337,     "classname": "com.rabbitmq.client.impl.amqchannel",     "nativemethod": false   },   {     "methodname": "transmit",     "filename": "amqchannel.java",     "linenumber": 313,     "classname": "com.rabbitmq.client.impl.amqchannel",     "nativemethod": false   },   {     "methodname": "basicpublish",     "filename": "channeln.java",     "linenumber": 686,     "classname": "com.rabbitmq.client.impl.channeln",     "nativemethod": false   },   {     "methodname": "basicpublish",     "filename": "channeln.java",     "linenumber": 668,     "classname": "com.rabbitmq.client.impl.channeln",     "nativemethod": false   },   {     "methodname": "invoke",     "filename": null,     "linenumber": -1,     "classname": "sun.reflect.generatedmethodaccessor176",     "nativemethod": false   },   {     "methodname": "invoke",     "filename": "delegatingmethodaccessorimpl.java",     "linenumber": 43,     "classname": "sun.reflect.delegatingmethodaccessorimpl",     "nativemethod": false   },   {     "methodname": "invoke",     "filename": "method.java",     "linenumber": 498,     "classname": "java.lang.reflect.method",     "nativemethod": false   },   {     "methodname": "invoke",     "filename": "cachingconnectionfactory.java",     "linenumber": 916,     "classname": "org.springframework.amqp.rabbit.connection.cachingconnectionfactory$cachedchannelinvocationhandler",     "nativemethod": false   },   {     "methodname": "basicpublish",     "filename": null,     "linenumber": -1,     "classname": "com.sun.proxy.$proxy166",     "nativemethod": false   },   {     "methodname": "dosend",     "filename": "rabbittemplate.java",     "linenumber": 1451,     "classname": "org.springframework.amqp.rabbit.core.rabbittemplate",     "nativemethod": false   },   {     "methodname": "doinrabbit",     "filename": "rabbittemplate.java",     "linenumber": 703,     "classname": "org.springframework.amqp.rabbit.core.rabbittemplate$3",     "nativemethod": false   },   {     "methodname": "doexecute",     "filename": "rabbittemplate.java",     "linenumber": 1394,     "classname": "org.springframework.amqp.rabbit.core.rabbittemplate",     "nativemethod": false   },   {     "methodname": "execute",     "filename": "rabbittemplate.java",     "linenumber": 1367,     "classname": "org.springframework.amqp.rabbit.core.rabbittemplate",     "nativemethod": false   },   {     "methodname": "send",     "filename": "rabbittemplate.java",     "linenumber": 699,     "classname": "org.springframework.amqp.rabbit.core.rabbittemplate",     "nativemethod": false   },   {     "methodname": "convertandsend",     "filename": "rabbittemplate.java",     "linenumber": 767,     "classname": "org.springframework.amqp.rabbit.core.rabbittemplate",     "nativemethod": false   },   {     "methodname": "convertandsend",     "filename": "rabbittemplate.java",     "linenumber": 754,     "classname": "org.springframework.amqp.rabbit.core.rabbittemplate",     "nativemethod": false   } 

it's bit unusual declare queue on every send; said, apart inefficiency should work. stack trace indicates stuck in rabbit client waiting reply queue declaration. have seen when more 1 thread using same channel, should never happen here because channel returned cache when current operation (admin, template) completes , can't used multiple threads.

you might want try new 4.0.0.rc1 amqp-client jar because have added logging (which not available in 3.x.x client). might track things down.


Comments

Popular posts from this blog

sql server - Cannot query correctly (MSSQL - PHP - JSON) -

php - trouble displaying mysqli database results in correct order -

C++ Linked List -