package com.example.demo.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 3);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "user-service-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.demo.*");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
}
package com.example.demo.service;
import com.example.demo.dto.UserEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Service
public class KafkaProducerService {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);
private static final String USER_EVENTS_TOPIC = "user-events";
private final KafkaTemplate<String, Object> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendUserEvent(UserEvent event) {
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(USER_EVENTS_TOPIC, event.getUserId().toString(), event);
future.whenComplete((result, ex) -> {
if (ex == null) {
logger.info("Sent message=[{}] with offset=[{}]",
event, result.getRecordMetadata().offset());
} else {
logger.error("Unable to send message=[{}] due to: {}", event, ex.getMessage());
}
});
}
public void sendUserCreated(Long userId, String email) {
UserEvent event = new UserEvent("USER_CREATED", userId, email);
sendUserEvent(event);
}
public void sendUserUpdated(Long userId, String email) {
UserEvent event = new UserEvent("USER_UPDATED", userId, email);
sendUserEvent(event);
}
}
package com.example.demo.service;
import com.example.demo.dto.UserEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
@KafkaListener(topics = "user-events", groupId = "user-service-group")
public void consumeUserEvent(@Payload UserEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
logger.info("Received user event: {} from partition: {} with offset: {}",
event, partition, offset);
switch (event.getEventType()) {
case "USER_CREATED":
handleUserCreated(event);
break;
case "USER_UPDATED":
handleUserUpdated(event);
break;
default:
logger.warn("Unknown event type: {}", event.getEventType());
}
}
private void handleUserCreated(UserEvent event) {
logger.info("Processing user created: {}", event.getUserId());
// Send welcome email, create user profile, etc.
}
private void handleUserUpdated(UserEvent event) {
logger.info("Processing user updated: {}", event.getUserId());
// Invalidate cache, notify other services, etc.
}
}
Apache Kafka provides distributed messaging for event-driven architectures. Spring Kafka simplifies producer and consumer implementations. KafkaTemplate sends messages to topics. @KafkaListener consumes messages asynchronously. I configure serializers/deserializers for message formats—JSON, Avro, Protobuf. Partitioning distributes load across consumers. Consumer groups enable parallel processing. Acknowledgment modes control reliability—manual ack for critical messages, automatic for throughput. Error handling uses retry topics and dead letter queues. Kafka enables microservices communication, event sourcing, and real-time data pipelines. Transaction support ensures exactly-once semantics. Headers carry metadata. The pub-sub model decouples services, improving scalability and resilience. Proper topic design and partition strategies are crucial for performance.