java - Call to collect() doesn't return after called on rdd produced by combineByKey function -
disclaimer : new spark
i have rdd looks :
[(t,[tina, thomas]), (t,[tolis]), (c,[cory, christine]), (j,[joseph, jimmy, james, jackeline, juan]), (j,[jimbo, jina])]
and call combinebykey , result javapairrdd< character, integer >
this call seems work correct (the control flow passes point , in debugger foo seems have kind of value)
javapairrdd<character, integer> foo = rdd.combinebykey(createacc, addandcount, combine); system.out.println(foo.collect());
my problem program doesn't return after call foo.collect(); have ideas ? tried debug eclipse debugger had no chance @ all
i using spark version 2.0.0 , java 8
edit : code functions called combinebykey following (it dummy code cause new spark , goal call combinebykey find total length of lists of strings beloning each key) :
function<iterable<string>, integer> createacc = new function<iterable<string>, integer>() { public integer call(iterable<string> x) { int counter = 0; iterator<string> = x.iterator(); while (it.hasnext()) { counter++; } return counter; } }; function2<integer, iterable<string>, integer> addandcount = new function2<integer,iterable<string>, integer>() { public integer call(integer acc , iterable<string> x) { int counter = 0; iterator<string> = x.iterator(); while (it.hasnext()) { counter++; } return counter + acc; } }; function2<integer,integer,integer> combine = new function2<integer,integer, integer>() { public integer call(integer x, integer y) { return x+y; } };
update2 : requested log following
16/11/11 17:21:32 info sparkcontext: starting job: count @ foo.java:265 16/11/11 17:21:32 info dagscheduler: got job 9 (count @ foo.java:265) 3 output partitions 16/11/11 17:21:32 info dagscheduler: final stage: resultstage 20 (count @ foo.java:265) 16/11/11 17:21:32 info dagscheduler: parents of final stage: list(shufflemapstage 19, shufflemapstage 18) 16/11/11 17:21:32 info dagscheduler: missing parents: list() 16/11/11 17:21:32 info dagscheduler: submitting resultstage 20 (mappartitionsrdd[24] @ combinebykey @ foo.java:264), has no missing parents 16/11/11 17:21:32 info memorystore: block broadcast_12 stored values in memory (estimated size 6.7 kb, free 1946.0 mb) 16/11/11 17:21:32 info memorystore: block broadcast_12_piece0 stored bytes in memory (estimated size 3.4 kb, free 1946.0 mb) 16/11/11 17:21:32 info blockmanagerinfo: added broadcast_12_piece0 in memory on xxx.xxx.xx.xx:55712 (size: 3.4 kb, free: 1946.1 mb) 16/11/11 17:21:32 info sparkcontext: created broadcast 12 broadcast @ dagscheduler.scala:1012 16/11/11 17:21:32 info dagscheduler: submitting 3 missing tasks resultstage 20 (mappartitionsrdd[24] @ combinebykey @ foo.java:264) 16/11/11 17:21:32 info taskschedulerimpl: adding task set 20.0 3 tasks 16/11/11 17:21:32 info tasksetmanager: starting task 0.0 in stage 20.0 (tid 30, localhost, partition 0, any, 5288 bytes) 16/11/11 17:21:32 info executor: running task 0.0 in stage 20.0 (tid 30) 16/11/11 17:21:32 info shuffleblockfetcheriterator: getting 2 non-empty blocks out of 3 blocks 16/11/11 17:21:32 info shuffleblockfetcheriterator: started 0 remote fetches in 0 ms
it simple java issue: "while" loops never call it.next, , never end.
change them
while (it.hasnext()) { it.next(); counter++; }
Comments
Post a Comment