package com.eksad.masterdata.event; import com.eksad.masterdata.common.Constants; import io.vertx.core.json.JsonObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaEvent { @Autowired private KafkaTemplate kafkaTemplate; @KafkaListener(topics = Constants.topicStep1, groupId = "my-group") public void listenStep1(String message) { JsonObject jsonMessage=new JsonObject(message); String transactionId=jsonMessage.getString("transactionId"); System.out.println( String.format("Starting Step 1 - new taskid %s with paylod [[ %s ]]", transactionId, jsonMessage)); //do something here jsonMessage.put("resultStep1","step 1 a success"); //lets continue to other queue kafkaTemplate.send(Constants.topicStep2,jsonMessage.encode()); System.out.println( String.format("done Step 1 for taskid %s %n%n",transactionId)); } @KafkaListener(topics = Constants.topicStep2, groupId = "my-group") public void listenStep2(String message) { JsonObject jsonMessage=new JsonObject(message); String transactionId=jsonMessage.getString("transactionId"); System.out.println( String.format("Starting Step 2 - new taskid %s with paylod [[ %s ]]", transactionId, jsonMessage)); //do something here jsonMessage.put("resultStep2","step 2 a success"); System.out.println( String.format("done Step 2 for taskid %s with final result [[ %s ]] %n%n", transactionId, jsonMessage.encode())); } }