scala - akka-http queries do not run in parallel -
i new akka-http , have troubles run queries on same route in parallel.
i have route may return result (if cached) or not (heavy cpu multithreaded computations). run these queries in parallel, in case short 1 arrives after long 1 heavy computation, not want second call wait first finish.
however seems these queries not run in parallel if on same route (run in parallel if on different routes)
i can reproduice in basic project:
calling server 3 time in parallel (with 3 chrome's tab on http://localhost:8080/test) causes responses arrive respectively @ 3.0s, 6.0-s , 9.0-s. suppose queries not run in parallel.
running on 6 cores (with ht) machine on windows 10 jdk 8.
build.sbt
name := "akka-http-test" version := "1.0" scalaversion := "2.11.8" librarydependencies += "com.typesafe.akka" %% "akka-http-experimental" % "2.4.11"
*akkahttptest.scala**
import java.util.concurrent.executors import akka.actor.actorsystem import akka.http.scaladsl.http import akka.http.scaladsl.server.directives._ import akka.stream.actormaterializer import scala.concurrent.{executioncontext, future} object akkahttptest extends app { implicit val actorsystem = actorsystem("system") // no application.conf here implicit val executioncontext = executioncontext.fromexecutor(executors.newfixedthreadpool(6)) implicit val actormaterializer = actormaterializer() val route = path("test") { oncomplete(slowfunc()) { slowfuncresult => complete(slowfuncresult) } } def slowfunc()(implicit ec: executioncontext): future[string] = future { thread.sleep(3000) "waited 3s" } http().bindandhandle(route, "localhost", 8080) println("server started") }
what doing wrong here ?
thanks help
edit: @ramon j romero y vigil, added future wrapping, problem still persists
def slowfunc()(implicit ec : executioncontext) : future[string] = future { thread.sleep(3000) "waited 3.0s" } val route = path("test") { oncomplete(slowfunc()) { slowfuncresult => complete(slowfuncresult) } }
tries default thread pool, 1 defined above in config file, , fixed thread pool (6 threads).
it seems oncomplete directive still waits future complete , block route (with same connection).
same problem flow trick
import akka.stream.scaladsl.flow val parallelism = 10 val reqflow = flow[httprequest].filter(_.geturi().path().equalsignorecase("/test")) .mapasync(parallelism)(_ => slowfunc()) .map(str => httpresponse(status=statuscodes.ok, entity=str)) http().bindandhandle(reqflow, ...)
thanks help
each incomingconnection handled same route, therefore when "call server 3 times in parallel" using same connection , therefore same route.
the route handling 3 incoming httprequest
values in akka-stream fashion, i.e. route composed of multiple stages each stage can processes 1 element @ given time. in example "complete" stage of stream call thread.sleep
each incoming request , process each request one-at-a-time.
to multiple concurrent requests handled @ same time should establish unique connection each request.
an example of client side connection pool can created similar documentation examples:
import akka.http.scaladsl.http val connpoolflow = http().newhostconnectionpool("localhost", 8080)
this can integrated stream makes requests:
import akka.http.scaladsl.model.uri._ import akka.http.scaladsl.model.httprequest val request = httprequest(uri="/test") import akka.stream.scaladsl.source val reqstream = source.fromiterator(() => iterator.continually(request).take(3)) .via(connpoolflow) .via(flow.mapasync(3)(identity)) .to(sink foreach { resp => println(resp)}) .run()
route modification
if want each httprequest
processed in parallel can use same route must spawn off futures inside of route , use oncomplete
directive:
def slowfunc()(implicit ec : executioncontext) : future[string] = future { thread.sleep(1500) "waited 1.5s" } val route = path("test") { oncomplete(slowfunc()) { slowfuncresult => complete(slowfuncresult) } }
one thing aware of: if don't specify different executioncontext sleeping function same thread pool routes used sleeping. may exhaust available threads way. should use seperate ec sleeping...
flow based
one other way handle httprequests a stream flow:
import akka.stream.scaladsl.flow val parallelism = 10 val reqflow = flow[httprequest].filter(_.geturi().path().equalsignorecase("/test")) .mapasync(parallelism)(_ => slowfunc()) .map(str => httpresponse(status=statuscodes.ok, entity=str)) http().bindandhandle(reqflow, ...)
Comments
Post a Comment