Spring Cloud Streams not setting the kafka key in the message? -
the story this. have kafka broker , particular object (which jsonify send through topics) has id want use key.
currently i'm using 'partitionkeyextractorclass' configuration set class extracts id , returns key.
it looks this:
def extractkey(message<?> message) { log.info('extracting key message') string id = new jsonslurper().parsetext(new string(message.payload)).properties.id log.info("got = ${id}") return id }
my actual problem when browse message on topic consumerrecord holds message says key null...
is bug? doing wrong? documentation on doesn't go further this.
look, mixing partition
key
.
currently kafkamessagechannelbinder
doesn't provide option determine key
against message
.
only existing functionality can use powerfully kafkaheaders.message_key
:
object messagekey = this.messagekeyexpression != null ? this.messagekeyexpression.getvalue(this.evaluationcontext, message) : message.getheaders().get(kafkaheaders.message_key);
so, before output
message should calculate key , place header.
Comments
Post a Comment