Skip to content

Commit b65a577

Browse files
committed
Code refactoring
1 parent 48d99fe commit b65a577

File tree

11 files changed

+155
-47
lines changed

11 files changed

+155
-47
lines changed

spring-kafka-example/inventory-event-consumer/src/main/java/com/howtodoinjava/demo/kafka/consumer/consumer/InventoryEventsConsumer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.howtodoinjava.demo.kafka.consumer.consumer;
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.howtodoinjava.demo.kafka.consumer.model.InventoryEvent;
45
import com.howtodoinjava.demo.kafka.consumer.service.InventoryEventService;
56
import lombok.extern.slf4j.Slf4j;
67
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -16,7 +17,7 @@ public class InventoryEventsConsumer {
1617
private InventoryEventService inventoryEventService;
1718

1819
@KafkaListener(topics = {"inventory-events"}, groupId = "inventory-consumer-group-1")
19-
public void onMessage(ConsumerRecord<Integer, String> consumerRecord) throws JsonProcessingException {
20+
public void onMessage(ConsumerRecord<Integer, InventoryEvent> consumerRecord) throws JsonProcessingException {
2021
inventoryEventService.processLibraryEvent(consumerRecord);
2122
log.info("Consumer Record: {}", consumerRecord);
2223
}
@@ -30,7 +31,7 @@ public void onMessage(ConsumerRecord<Integer, String> consumerRecord) throws Jso
3031
@PartitionOffset(partition = "0", initialOffset = "0"),
3132
@PartitionOffset(partition = "2", initialOffset = "0")}))
3233
*/
33-
public void onMessage_PartitionIntialOffset(ConsumerRecord<Integer, String> consumerRecord) {
34+
public void onMessage_PartitionIntialOffset(ConsumerRecord<Integer, InventoryEvent> consumerRecord) {
3435
log.info("Consumer Record: {}", consumerRecord);
3536
}
3637

@@ -40,7 +41,7 @@ public void onMessage_PartitionIntialOffset(ConsumerRecord<Integer, String> cons
4041
*
4142
@KafkaListener(topicPartitions = @TopicPartition(topic = "inventory-events", partitions = { "0", "1" }))
4243
*/
43-
public void onMessage_PartitionNoOffset(ConsumerRecord<Integer, String> consumerRecord) {
44+
public void onMessage_PartitionNoOffset(ConsumerRecord<Integer, InventoryEvent> consumerRecord) {
4445
log.info("Consumer Record: {}", consumerRecord);
4546
}
4647
}

spring-kafka-example/inventory-event-consumer/src/main/java/com/howtodoinjava/demo/kafka/consumer/consumer/InventoryEventsConsumerManualOffset.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.howtodoinjava.demo.kafka.consumer.consumer;
22

3+
import com.howtodoinjava.demo.kafka.consumer.model.InventoryEvent;
34
import com.howtodoinjava.demo.kafka.consumer.service.InventoryEventService;
45
import lombok.SneakyThrows;
56
import lombok.extern.slf4j.Slf4j;
@@ -12,14 +13,14 @@
1213

1314
//@Component
1415
@Slf4j
15-
public class InventoryEventsConsumerManualOffset implements AcknowledgingMessageListener<Integer, String> {
16+
public class InventoryEventsConsumerManualOffset implements AcknowledgingMessageListener<Integer, InventoryEvent> {
1617

1718
@Autowired
1819
private InventoryEventService inventoryEventService;
1920

2021
@Override
2122
@KafkaListener(topics = "inventory-events", groupId = "inventory-consumer-group-1")
22-
public void onMessage(ConsumerRecord<Integer, String> consumerRecord, Acknowledgment acknowledgment) {
23+
public void onMessage(ConsumerRecord<Integer, InventoryEvent> consumerRecord, Acknowledgment acknowledgment) {
2324
log.info("Consumer Record: {}", consumerRecord);
2425
acknowledgment.acknowledge();
2526
}

spring-kafka-example/inventory-event-consumer/src/main/java/com/howtodoinjava/demo/kafka/consumer/service/InventoryEventService.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,23 @@ public class InventoryEventService {
1919
ObjectMapper objectMapper;
2020

2121
@Autowired
22-
KafkaTemplate<Integer,String> kafkaTemplate;
22+
KafkaTemplate<Integer, Object> kafkaTemplate;
2323

24-
public void processLibraryEvent(ConsumerRecord<Integer,String> consumerRecord) throws JsonProcessingException {
25-
InventoryEvent inventoryEvent = objectMapper.readValue(consumerRecord.value(), InventoryEvent.class);
24+
public void processLibraryEvent(ConsumerRecord<Integer,InventoryEvent> consumerRecord) throws JsonProcessingException {
25+
InventoryEvent inventoryEvent = consumerRecord.value();
2626
log.info("libraryEvent : {} ", inventoryEvent);
2727

2828
if(inventoryEvent.getInventoryId() != null && ( inventoryEvent.getInventoryId() == 999 )){
2929
throw new RecoverableDataAccessException("Temporary Network Issue");
3030
}
3131

32-
switch(inventoryEvent.getInventoryEventType()){
33-
case NEW:
34-
save(inventoryEvent);
35-
break;
36-
case UPDATE:
32+
switch (inventoryEvent.getInventoryEventType()) {
33+
case NEW -> save(inventoryEvent);
34+
case UPDATE -> {
3735
validate(inventoryEvent);
3836
save(inventoryEvent);
39-
break;
40-
default:
41-
log.info("Invalid Library Event Type");
37+
}
38+
default -> log.info("Invalid Library Event Type");
4239
}
4340

4441
}

spring-kafka-example/inventory-event-consumer/src/main/resources/application-dev.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ spring:
1313
consumer:
1414
bootstrap-servers: localhost:9092
1515
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
16-
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
16+
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
17+
properties:
18+
spring.json.trusted.packages: '*'
1719
producer:
1820
bootstrap-servers: localhost:9092
1921
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
20-
value-serializer: org.apache.kafka.common.serialization.StringSerializer
22+
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

spring-kafka-example/inventory-event-consumer/src/main/resources/application-prod.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ spring:
1313
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
1414
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
1515
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
16+
properties:
17+
spring.json.trusted.packages: '*'
1618
producer:
1719
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
1820
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer

spring-kafka-example/inventory-event-consumer/src/test/java/com/howtodoinjava/demo/kafka/consumer/consumer/InventoryEventsConsumerTest.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
import com.fasterxml.jackson.core.JsonProcessingException;
55
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import com.howtodoinjava.demo.kafka.consumer.model.InventoryEvent;
7+
import com.howtodoinjava.demo.kafka.consumer.model.InventoryEventType;
8+
import com.howtodoinjava.demo.kafka.consumer.model.Product;
69
import com.howtodoinjava.demo.kafka.consumer.service.InventoryEventService;
710
import org.apache.kafka.clients.consumer.Consumer;
811
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -59,7 +62,7 @@ class InventoryEventsConsumerTest {
5962
EmbeddedKafkaBroker embeddedKafkaBroker;
6063

6164
@Autowired
62-
KafkaTemplate<Integer, String> kafkaTemplate;
65+
KafkaTemplate<Integer, Object> kafkaTemplate;
6366

6467
@Autowired
6568
KafkaListenerEndpointRegistry endpointRegistry;
@@ -100,8 +103,20 @@ void tearDown() {
100103
@Test
101104
void publishNewInventoryEvent_Success() throws ExecutionException, InterruptedException, JsonProcessingException {
102105
//given
103-
String json = " {\"inventoryId\":null,\"inventoryEventType\":\"NEW\",\"product\":{\"productId\":456,\"productName\":\"Samsung S23\",\"price\":\"750000\",\"quantity\":50}}";
104-
kafkaTemplate.send("inventory-events", json).get();
106+
Product product = Product.builder()
107+
.productId(456)
108+
.productName("Samsung S23")
109+
.price("75000")
110+
.quantity(50)
111+
.build();
112+
113+
InventoryEvent inventoryEvent = InventoryEvent.builder()
114+
.inventoryId(null)
115+
.inventoryEventType(InventoryEventType.NEW)
116+
.product(product)
117+
.build();
118+
119+
kafkaTemplate.send("inventory-events", inventoryEvent).get();
105120

106121
//when
107122
CountDownLatch latch = new CountDownLatch(1);
@@ -115,9 +130,22 @@ void publishNewInventoryEvent_Success() throws ExecutionException, InterruptedEx
115130
@Test
116131
void publishModifyInventoryEvent_Null_InventoryId_DeadLetter_Topic() throws JsonProcessingException, InterruptedException, ExecutionException {
117132
//given
118-
Integer inventoryId = null;
119-
String json = "{\"inventoryId\":" + inventoryId + ",\"inventoryEventType\":\"UPDATE\",\"product\":{\"productId\":456,\"productName\":\"Samsung S23\",\"price\":\"750000\",\"quantity\":50}}";
120-
kafkaTemplate.send("inventory-events", json).get();
133+
Product product = Product.builder()
134+
.productId(456)
135+
.productName("Samsung S23")
136+
.price("75000")
137+
.quantity(50)
138+
.build();
139+
140+
InventoryEvent inventoryEvent = InventoryEvent.builder()
141+
.inventoryId(null)
142+
.inventoryEventType(InventoryEventType.UPDATE)
143+
.product(product)
144+
.build();
145+
146+
String json = objectMapper.writeValueAsString(inventoryEvent);
147+
148+
kafkaTemplate.send("inventory-events", inventoryEvent).get();
121149
//when
122150
CountDownLatch latch = new CountDownLatch(1);
123151
latch.await(3, TimeUnit.SECONDS);
@@ -130,6 +158,7 @@ void publishModifyInventoryEvent_Null_InventoryId_DeadLetter_Topic() throws Json
130158
consumer = new DefaultKafkaConsumerFactory<>(configs, new IntegerDeserializer(), new StringDeserializer()).createConsumer();
131159
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, deadLetterTopic);
132160

161+
//then
133162
ConsumerRecords<Integer, String> consumerRecords = KafkaTestUtils.getRecords(consumer);
134163

135164
var deadletterList = new ArrayList<ConsumerRecord<Integer, String>>();
@@ -141,7 +170,7 @@ void publishModifyInventoryEvent_Null_InventoryId_DeadLetter_Topic() throws Json
141170

142171
var finalList = deadletterList.stream()
143172
.filter(record -> record.value().equals(json))
144-
.collect(Collectors.toList());
173+
.toList();
145174

146175
assert finalList.size() == 1;
147176
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.howtodoinjava.demo.kafka.producer.config;
2+
3+
import org.apache.kafka.clients.producer.ProducerConfig;
4+
import org.apache.kafka.common.serialization.ByteArraySerializer;
5+
import org.apache.kafka.common.serialization.IntegerSerializer;
6+
import org.apache.kafka.common.serialization.StringSerializer;
7+
import org.springframework.beans.factory.annotation.Value;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.context.support.GenericApplicationContext;
11+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
12+
import org.springframework.kafka.core.KafkaTemplate;
13+
import org.springframework.kafka.core.ProducerFactory;
14+
import org.springframework.kafka.core.RoutingKafkaTemplate;
15+
import org.springframework.kafka.support.serializer.JsonSerializer;
16+
17+
import java.util.HashMap;
18+
import java.util.LinkedHashMap;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.regex.Pattern;
22+
23+
@Configuration
24+
public class KafkaProducerConfig {
25+
26+
@Value("${spring.kafka.producer.bootstrap-servers}")
27+
private List<String> bootstrapAddress;
28+
29+
@Bean
30+
public ProducerFactory<?, ?> producerFactory() {
31+
Map<String, Object> configProps = new HashMap<>();
32+
configProps.put(
33+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
34+
configProps.put(
35+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
36+
configProps.put(
37+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
38+
return new DefaultKafkaProducerFactory<>(configProps);
39+
}
40+
41+
@Bean
42+
public KafkaTemplate<?, ?> kafkaTemplate() {
43+
return new KafkaTemplate<>(producerFactory());
44+
}
45+
46+
//@Bean
47+
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
48+
ProducerFactory<Object, Object> pf) {
49+
50+
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
51+
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
52+
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
53+
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
54+
55+
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
56+
map.put(Pattern.compile("default-topic"), bytesPF);
57+
map.put(Pattern.compile("inventory-events"), pf); // Default PF with StringSerializer
58+
return new RoutingKafkaTemplate(map);
59+
}
60+
61+
}

spring-kafka-example/inventory-event-producer/src/main/java/com/howtodoinjava/demo/kafka/producer/producer/InventoryEventProducer.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,56 +24,50 @@ public class InventoryEventProducer {
2424
public String topic;
2525

2626
@Autowired
27-
private KafkaTemplate<Integer, String> kafkaTemplate;
27+
private KafkaTemplate<Integer, Object> kafkaTemplate;
2828

29-
@Autowired
30-
private ObjectMapper objectMapper;
31-
32-
public CompletableFuture<SendResult<Integer, String>> sendInventoryEvent_Async(InventoryEvent inventoryEvent) throws JsonProcessingException {
29+
public CompletableFuture<SendResult<Integer, Object>> sendInventoryEvent_Async(InventoryEvent inventoryEvent) throws JsonProcessingException {
3330

3431
var key = inventoryEvent.getInventoryId();
35-
var value = objectMapper.writeValueAsString(inventoryEvent);
3632

37-
var completableFuture = kafkaTemplate.send(topic, key, value);
33+
var completableFuture = kafkaTemplate.send(topic, key, inventoryEvent);
3834

3935
return completableFuture.whenComplete(((sendResult, throwable) -> {
4036
if (throwable != null) {
41-
handleFailure(key, value, throwable);
37+
handleFailure(key, inventoryEvent, throwable);
4238
} else {
43-
handleSuccess(key, value, sendResult);
39+
handleSuccess(key, inventoryEvent, sendResult);
4440
}
4541
}));
4642
}
4743

48-
public CompletableFuture<SendResult<Integer, String>> sendInventoryEvent_ProducerRecord(InventoryEvent inventoryEvent) throws JsonProcessingException {
44+
public CompletableFuture<SendResult<Integer, Object>> sendInventoryEvent_ProducerRecord(InventoryEvent inventoryEvent) throws JsonProcessingException {
4945

5046
var key = inventoryEvent.getInventoryId();
51-
var value = objectMapper.writeValueAsString(inventoryEvent);
52-
53-
var producerRecord = buildProducerRecord(key, value);
47+
var producerRecord = buildProducerRecord(key, inventoryEvent);
5448

5549
var completableFuture = kafkaTemplate.send(producerRecord);
5650

5751
return completableFuture.whenComplete(((sendResult, throwable) -> {
5852
if (throwable != null) {
59-
handleFailure(key, value, throwable);
53+
handleFailure(key, inventoryEvent, throwable);
6054
} else {
61-
handleSuccess(key, value, sendResult);
55+
handleSuccess(key, inventoryEvent, sendResult);
6256
}
6357
}));
6458
}
6559

66-
private ProducerRecord<Integer, String> buildProducerRecord(Integer key, String value) {
60+
private ProducerRecord<Integer, Object> buildProducerRecord(Integer key, Object value) {
6761
List<Header> recordHeader = List.of(new RecordHeader("event-source", "library-event-producer".getBytes()));
6862
return new ProducerRecord<>(topic, null, key, value, recordHeader);
6963
}
7064

71-
private void handleSuccess(Integer key, String value, SendResult<Integer, String> sendResult) {
65+
private void handleSuccess(Integer key, Object value, SendResult<Integer, Object> sendResult) {
7266
log.info("Message sent successfully for the key: {} and the value: {}, partition is: {}",
7367
key, value, sendResult.getRecordMetadata().partition());
7468
}
7569

76-
private void handleFailure(Integer key, String value, Throwable throwable) {
70+
private void handleFailure(Integer key, Object value, Throwable throwable) {
7771
log.error("Error sending message and exception is {}", throwable.getMessage(), throwable);
7872
}
7973
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.howtodoinjava.demo.kafka.producer.producer;
2+
3+
import com.howtodoinjava.demo.kafka.producer.model.InventoryEvent;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.springframework.kafka.core.RoutingKafkaTemplate;
6+
import org.springframework.stereotype.Component;
7+
8+
@Slf4j
9+
@Component
10+
public class RoutingKafkaProducer {
11+
12+
private RoutingKafkaTemplate routingTemplate;
13+
14+
public void sendDefaultTopic(String message) {
15+
routingTemplate.send("default-topic", message.getBytes());
16+
}
17+
18+
public void sendInventoryEvent(InventoryEvent inventoryEvent) {
19+
20+
routingTemplate.send("inventory-events", inventoryEvent);
21+
}
22+
}

spring-kafka-example/inventory-event-producer/src/main/resources/application-dev.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ spring:
1010
producer:
1111
bootstrap-servers: localhost:9092
1212
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
13-
value-serializer: org.apache.kafka.common.serialization.StringSerializer
13+
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
1414
properties:
1515
acks: all
1616
retries: 10

0 commit comments

Comments
 (0)