apache kafka - DStream.foreachRDD do not work ,after transformation -


guys!

val kafkastream = kafkautils.createdirectstream[string, string, stringdecoder, stringdecoder](ssc, kafkaparams, topicset) kafkastream.map(_._2).foreachrdd(rdd => rdd.foreach(println)) 

it worked ,it print kafka message.but when run this:

val dstreamsfilterbytablename = kafkastream.transform(rdd => rdd.map(_._2).map(line => json.parse(line).asinstanceof[jsonobject])   .filter(json => otterkafkaparser.filterjsonwithtableanddb(json, tablename, dbname)))  dstreamsfilterbytablename.foreachrdd(rdd =>   rdd.foreach(println)) 

it did not work.when debugged in,there data in dstreamsfilterbytablename,but did not print.

why did happen?

thanks advice.


Comments

Popular posts from this blog

asynchronous - C# WinSCP .NET assembly: How to upload multiple files asynchronously -

aws api gateway - SerializationException in posting new Records via Dynamodb Proxy Service in API -

asp.net - Problems sending emails from forum -