
การ Integrate กับ Kafka (Reactive Kafka / Kafka Streams)
เวลาอ่านโดยประมาณ 20 นาที — บทความนี้จะพาคุณดำดิ่งสู่การเชื่อมต่อ Spring / Project Reactor กับ Apache Kafka ในแบบ Reactive (non-blocking) ครบทุกแง่มุม ตั้งแต่ producer, consumer, backpressure, ไปจนถึง stateful stream processing ด้วย Kafka Streams
สารบัญ
- Kafka + Reactive = ทำไมต้องสน?
- เตรียม Project & Dependency
- Reactive Producer
- Reactive Consumer & Backpressure
- Kafka Streams Topology
- Error-Handling & Retry Pattern
- State Store & Queryable State
- Testing กับ Embedded Kafka
- Observability & Tuning
- Demo ครบวงจร
1. Kafka + Reactive = ทำไมต้องสน?
- Throughput สูง: Kafka รองรับ data หลายแสนข้อความต่อวินาที
- Backpressure-aware: Project Reactor ควบคุมแรงดันข้อมูลแบบ non-blocking
- End-to-End Streaming: Kafka Streams สร้าง topology ประมวลผลแบบ stateful
- Minimal Thread: ใช้ event-loop + boundedElastic เฉพาะ block I/O

2. เตรียม Project & Dependency
ตัวอย่างใช้ Spring Boot 3.5 + Spring Kafka 3.2 + Reactor Kafka 1.3
<dependency>
<groupId>org.springframework.kafka<groupId>
<artifactId>spring-kafka<artifactId>
<dependency>
<dependency>
<groupId>io.projectreactor.kafka<groupId>
<artifactId>reactor-kafka<artifactId>
<dependency>
<dependency>
<groupId>org.apache.kafka<groupId>
<artifactId>kafka-streams<artifactId>
<dependency>
3. Reactive Producer
@Bean
public KafkaSender<String, OrderEvent> kafkaSender(ProducerFactory<String, OrderEvent> pf) {
SenderOptions<String, OrderEvent> opts = SenderOptions.create(pf.getConfigurationProperties());
return KafkaSender.create(opts);
}
public Mono<RecordMetadata> send(OrderEvent evt) {
SenderRecord<String, OrderEvent, String> record =
SenderRecord.create(new ProducerRecord<>("order-events", evt.getId(), evt), evt.getId());
return kafkaSender.send(Mono.just(record))
.next() // รับผล send แรก
.map(SenderResult::recordMetadata);
}
จุดเด่น: KafkaSender ส่งแบบ non-blocking; Mono เก็บ ack & metadata
4. Reactive Consumer & Backpressure
@Bean
public Flux<ReceiverRecord<String, OrderEvent>> orderStream(
ReceiverOptions<String, OrderEvent> options) {
return KafkaReceiver.create(
options.subscription(Collections.singleton("order-events")))
.receive() // Flux แห่งข้อความ
.publishOn(Schedulers.parallel()) // ประมวลผล CPU-bound
.limitRate(1024) // Backpressure จาก downstream
.doOnNext(rec -> rec.receiverOffset().acknowledge());
}

5. Kafka Streams Topology

StreamsBuilder builder = new StreamsBuilder();
KStream<String, OrderEvent> orders = builder.stream("order-events");
KStream<String, OrderSummary> summary = orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.aggregate(OrderSummary::new,
(key, event, agg) -> agg.add(event),
Materialized.<String, OrderSummary, WindowStore<Bytes, byte[]>>as("order-store")
.withValueSerde(orderSummarySerde))
.toStream()
.map((windowedKey, agg) -> KeyValue.pair(windowedKey.key(), agg));
summary.to("order-summary");
ไฮไลต์:
aggregateใช้ State Store ใน Memory + RocksDBwindowedByสร้าง Tumbling Window 1 นาที- ผลลัพธ์ถูกส่งไป topic
order-summary
6. Error-Handling & Retry Pattern
Reactive Consumer: ใช้ retryBackoff
orderStream
.flatMap(this::processEvent)
.retryBackoff(5, Duration.ofMillis(500))
.onErrorContinue((ex, obj) -> log.error("skip {}", obj, ex));
Kafka Streams: ใช้ setDefaultDeserializationExceptionHandler
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
7. State Store & Queryable State
Kafka Streams สามารถเปิด HTTP endpoint เพื่อ query store แบบ Interactive-Queries ได้ เช่น
@RestController
public class OrderQueryController {
private final ReadOnlyWindowStore<String, OrderSummary> store;
public OrderQueryController(StreamsBuilderFactoryBean factoryBean) {
store = factoryBean.getKafkaStreams()
.store(StoreQueryParameters.fromNameAndType(
"order-store",
QueryableStoreTypes.windowStore()));
}
@GetMapping("/summary/{id}")
public List<OrderSummary> summary(@PathVariable String id) {
return StreamsUtils.fetchAll(store, id);
}
}
8. Testing กับ Embedded Kafka
@EmbeddedKafka(partitions = 1, topics = { "order-events" })
@SpringBootTest
class KafkaIntegrationTest {
@Autowired KafkaTemplate<String, OrderEvent> template;
@Test
void shouldConsumeOrder() {
template.send("order-events", "1", new OrderEvent(...));
StepVerifier.create(orderStream.filter(r -> r.key().equals("1")))
.expectNextCount(1)
.thenCancel()
.verify();
}
}
9. Observability & Tuning
- เปิด
spring.kafka.properties.metrics.recording.level=DEBUG - Micrometer + Prometheus: Export
kafka.consumer.records-lag - ปรับ
max.poll.interval.ms&max.poll.recordsให้สอดคล้อง backpressure - Kafka Streams: ใช้
num.stream.threads = core
10. Demo ครบวงจร
docker compose up -d zookeeper kafka schema-registry ./mvnw spring-boot:run -pl reactive-producer ./mvnw spring-boot:run -pl reactive-consumer ./mvnw spring-boot:run -pl kafka-streams-app curl -X POST http://localhost:8080/orders # ส่ง event kafka-console-consumer --bootstrap-server localhost:29092 --topic order-summary
คุณจะเห็น Messages ไหลจาก Producer → Broker → Streams → Topic order-summary และ Consumer อ่านแบบ Reactive พร้อม Backpressure
สรุป
การผสมผสาน Reactive Programming กับ Apache Kafka ช่วยให้ระบบของคุณ รับ-ส่งข้อความเร็ว, ปรับตัวตามโหลด, และ ประมวลผลสตรีมอย่างยืดหยุ่น ตั้งแต่ ingestion ไปจนถึง analytic. เมื่อจับคู่กับ Kafka Streams คุณยังสามารถสร้าง Topology ประมวลผล stateful ได้โดยไม่ต้องย้ายข้อมูลออกจากคลัสเตอร์. อย่าลืมทดสอบด้วย Embedded Kafka และเปิด Metric Monitoring เพื่อความปลอดภัยใน Production!