apache kafka - DStream.foreachRDD do not work ,after transformation -
guys!
val kafkastream = kafkautils.createdirectstream[string, string, stringdecoder, stringdecoder](ssc, kafkaparams, topicset) kafkastream.map(_._2).foreachrdd(rdd => rdd.foreach(println))
it worked ,it print kafka message.but when run this:
val dstreamsfilterbytablename = kafkastream.transform(rdd => rdd.map(_._2).map(line => json.parse(line).asinstanceof[jsonobject]) .filter(json => otterkafkaparser.filterjsonwithtableanddb(json, tablename, dbname))) dstreamsfilterbytablename.foreachrdd(rdd => rdd.foreach(println))
it did not work.when debugged in,there data in dstreamsfilterbytablename,but did not print.
why did happen?
thanks advice.
Comments
Post a Comment