apache spark - How to get the entity from aggregateMessages instead of just the vertexId -
package com.mypackage import org.apache.spark.graphx._ import org.apache.spark.{sparkcontext, sparkconf} /** * created sidazhang on 11/8/16. */ case class person(age: int) case class edgeimpl() object graphapp { def main(args: array[string]): unit = { val conf = new sparkconf().setappname("sparkmain").setmaster("local[1]") val sc = new sparkcontext(conf) val vertices = sc.parallelize(array((1l, person(10)), (2l, person(15)), (3l, person(20)), (4l, person(30)))) // create rdd edges val relationships = sc.parallelize(array(edge(2l, 1l, edgeimpl()), edge(3l, 1l, edgeimpl()), edge(4l, 1l, edgeimpl()))) val graph = graph(vertices, relationships) // compute number of older followers , total age val olderfollowers: vertexrdd[array[person]] = graph.aggregatemessages[array[person]]( ctx => ctx.sendtodst(array(ctx.srcattr)), // merge array of followers (a, b) => ++ b ) // here have id of person , list of followers. // how vertex of person olderfollowers.collect.foreach { case (id, followers) => followers.foreach(println(id, _)) } } } the question through aggregatemessage api, end vertexid. how actual vertex.
(the question inline)
you have join original data:
graph.joinvertices(olderfollowers)(somemergingfunction).vertices
Comments
Post a Comment