scala - akka-http queries do not run in parallel -


i new akka-http , have troubles run queries on same route in parallel.

i have route may return result (if cached) or not (heavy cpu multithreaded computations). run these queries in parallel, in case short 1 arrives after long 1 heavy computation, not want second call wait first finish.

however seems these queries not run in parallel if on same route (run in parallel if on different routes)

i can reproduice in basic project:

calling server 3 time in parallel (with 3 chrome's tab on http://localhost:8080/test) causes responses arrive respectively @ 3.0s, 6.0-s , 9.0-s. suppose queries not run in parallel.

running on 6 cores (with ht) machine on windows 10 jdk 8.

build.sbt

name := "akka-http-test"  version := "1.0"  scalaversion := "2.11.8"  librarydependencies += "com.typesafe.akka" %% "akka-http-experimental" % "2.4.11" 

*akkahttptest.scala**

import java.util.concurrent.executors  import akka.actor.actorsystem import akka.http.scaladsl.http import akka.http.scaladsl.server.directives._ import akka.stream.actormaterializer  import scala.concurrent.{executioncontext, future}  object akkahttptest extends app {    implicit val actorsystem = actorsystem("system") // no application.conf here   implicit val executioncontext =                               executioncontext.fromexecutor(executors.newfixedthreadpool(6))   implicit val actormaterializer = actormaterializer()    val route = path("test") {     oncomplete(slowfunc()) { slowfuncresult =>       complete(slowfuncresult)     }   }    def slowfunc()(implicit ec: executioncontext): future[string] = future {     thread.sleep(3000)     "waited 3s"   }    http().bindandhandle(route, "localhost", 8080)    println("server started") } 

what doing wrong here ?

thanks help

edit: @ramon j romero y vigil, added future wrapping, problem still persists

def slowfunc()(implicit ec : executioncontext) : future[string] = future {   thread.sleep(3000)   "waited 3.0s" }  val route = path("test") {   oncomplete(slowfunc()) { slowfuncresult =>     complete(slowfuncresult)   } } 

tries default thread pool, 1 defined above in config file, , fixed thread pool (6 threads).

it seems oncomplete directive still waits future complete , block route (with same connection).

same problem flow trick

import akka.stream.scaladsl.flow  val parallelism = 10  val reqflow =    flow[httprequest].filter(_.geturi().path().equalsignorecase("/test"))                    .mapasync(parallelism)(_ => slowfunc())                    .map(str => httpresponse(status=statuscodes.ok, entity=str))  http().bindandhandle(reqflow, ...) 

thanks help

each incomingconnection handled same route, therefore when "call server 3 times in parallel" using same connection , therefore same route.

the route handling 3 incoming httprequest values in akka-stream fashion, i.e. route composed of multiple stages each stage can processes 1 element @ given time. in example "complete" stage of stream call thread.sleep each incoming request , process each request one-at-a-time.

to multiple concurrent requests handled @ same time should establish unique connection each request.

an example of client side connection pool can created similar documentation examples:

import akka.http.scaladsl.http  val connpoolflow = http().newhostconnectionpool("localhost", 8080) 

this can integrated stream makes requests:

import akka.http.scaladsl.model.uri._ import akka.http.scaladsl.model.httprequest  val request = httprequest(uri="/test")  import akka.stream.scaladsl.source  val reqstream =    source.fromiterator(() => iterator.continually(request).take(3))         .via(connpoolflow)         .via(flow.mapasync(3)(identity))         .to(sink foreach { resp => println(resp)})         .run() 

route modification

if want each httprequest processed in parallel can use same route must spawn off futures inside of route , use oncomplete directive:

def slowfunc()(implicit ec : executioncontext) : future[string] = future {   thread.sleep(1500)   "waited 1.5s" }  val route = path("test") {   oncomplete(slowfunc()) { slowfuncresult =>     complete(slowfuncresult)   } } 

one thing aware of: if don't specify different executioncontext sleeping function same thread pool routes used sleeping. may exhaust available threads way. should use seperate ec sleeping...

flow based

one other way handle httprequests a stream flow:

import akka.stream.scaladsl.flow  val parallelism = 10  val reqflow =    flow[httprequest].filter(_.geturi().path().equalsignorecase("/test"))                    .mapasync(parallelism)(_ => slowfunc())                    .map(str => httpresponse(status=statuscodes.ok, entity=str))  http().bindandhandle(reqflow, ...) 

Comments

Popular posts from this blog

aws api gateway - SerializationException in posting new Records via Dynamodb Proxy Service in API -

asp.net - Problems sending emails from forum -