
การ Integrate กับ Kafka (Reactive Kafka / Kafka Streams)
เวลาอ่านโดยประมาณ 20 นาที — บทความนี้จะพาคุณดำดิ่งสู่การเชื่อมต่อ Spring / Project Reactor กับ Apache Kafka ในแบบ Reactive (non-blocking) ครบทุกแง่มุม ตั้งแต่ producer, consumer, backpressure, ไปจนถึง stateful stream processing ด้วย Kafka Streams
สารบัญ
- Kafka + Reactive = ทำไมต้องสน?
- เตรียม Project & Dependency
- Reactive Producer
- Reactive Consumer & Backpressure
- Kafka Streams Topology
- Error-Handling & Retry Pattern
- State Store & Queryable State
- Testing กับ Embedded Kafka
- Observability & Tuning
- Demo ครบวงจร
1. Kafka + Reactive = ทำไมต้องสน?
- Throughput สูง: Kafka รองรับ data หลายแสนข้อความต่อวินาที
- Backpressure-aware: Project Reactor ควบคุมแรงดันข้อมูลแบบ non-blocking
- End-to-End Streaming: Kafka Streams สร้าง topology ประมวลผลแบบ stateful
- Minimal Thread: ใช้ event-loop + boundedElastic เฉพาะ block I/O

2. เตรียม Project & Dependency
ตัวอย่างใช้ Spring Boot 3.5 + Spring Kafka 3.2 + Reactor Kafka 1.3
<dependency> <groupId>org.springframework.kafka<groupId> <artifactId>spring-kafka<artifactId> <dependency> <dependency> <groupId>io.projectreactor.kafka<groupId> <artifactId>reactor-kafka<artifactId> <dependency> <dependency> <groupId>org.apache.kafka<groupId> <artifactId>kafka-streams<artifactId> <dependency>
3. Reactive Producer
@Bean public KafkaSender<String, OrderEvent> kafkaSender(ProducerFactory<String, OrderEvent> pf) { SenderOptions<String, OrderEvent> opts = SenderOptions.create(pf.getConfigurationProperties()); return KafkaSender.create(opts); } public Mono<RecordMetadata> send(OrderEvent evt) { SenderRecord<String, OrderEvent, String> record = SenderRecord.create(new ProducerRecord<>("order-events", evt.getId(), evt), evt.getId()); return kafkaSender.send(Mono.just(record)) .next() // รับผล send แรก .map(SenderResult::recordMetadata); }
จุดเด่น: KafkaSender
ส่งแบบ non-blocking; Mono
เก็บ ack & metadata
4. Reactive Consumer & Backpressure
@Bean public Flux<ReceiverRecord<String, OrderEvent>> orderStream( ReceiverOptions<String, OrderEvent> options) { return KafkaReceiver.create( options.subscription(Collections.singleton("order-events"))) .receive() // Flux แห่งข้อความ .publishOn(Schedulers.parallel()) // ประมวลผล CPU-bound .limitRate(1024) // Backpressure จาก downstream .doOnNext(rec -> rec.receiverOffset().acknowledge()); }

5. Kafka Streams Topology

StreamsBuilder builder = new StreamsBuilder(); KStream<String, OrderEvent> orders = builder.stream("order-events"); KStream<String, OrderSummary> summary = orders .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))) .aggregate(OrderSummary::new, (key, event, agg) -> agg.add(event), Materialized.<String, OrderSummary, WindowStore<Bytes, byte[]>>as("order-store") .withValueSerde(orderSummarySerde)) .toStream() .map((windowedKey, agg) -> KeyValue.pair(windowedKey.key(), agg)); summary.to("order-summary");
ไฮไลต์:
aggregate
ใช้ State Store ใน Memory + RocksDBwindowedBy
สร้าง Tumbling Window 1 นาที- ผลลัพธ์ถูกส่งไป topic
order-summary
6. Error-Handling & Retry Pattern
Reactive Consumer: ใช้ retryBackoff
orderStream .flatMap(this::processEvent) .retryBackoff(5, Duration.ofMillis(500)) .onErrorContinue((ex, obj) -> log.error("skip {}", obj, ex));
Kafka Streams: ใช้ setDefaultDeserializationExceptionHandler
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
7. State Store & Queryable State
Kafka Streams สามารถเปิด HTTP endpoint เพื่อ query store แบบ Interactive-Queries ได้ เช่น
@RestController public class OrderQueryController { private final ReadOnlyWindowStore<String, OrderSummary> store; public OrderQueryController(StreamsBuilderFactoryBean factoryBean) { store = factoryBean.getKafkaStreams() .store(StoreQueryParameters.fromNameAndType( "order-store", QueryableStoreTypes.windowStore())); } @GetMapping("/summary/{id}") public List<OrderSummary> summary(@PathVariable String id) { return StreamsUtils.fetchAll(store, id); } }
8. Testing กับ Embedded Kafka
@EmbeddedKafka(partitions = 1, topics = { "order-events" }) @SpringBootTest class KafkaIntegrationTest { @Autowired KafkaTemplate<String, OrderEvent> template; @Test void shouldConsumeOrder() { template.send("order-events", "1", new OrderEvent(...)); StepVerifier.create(orderStream.filter(r -> r.key().equals("1"))) .expectNextCount(1) .thenCancel() .verify(); } }
9. Observability & Tuning
- เปิด
spring.kafka.properties.metrics.recording.level=DEBUG
- Micrometer + Prometheus: Export
kafka.consumer.records-lag
- ปรับ
max.poll.interval.ms
&max.poll.records
ให้สอดคล้อง backpressure - Kafka Streams: ใช้
num.stream.threads = core
10. Demo ครบวงจร
docker compose up -d zookeeper kafka schema-registry ./mvnw spring-boot:run -pl reactive-producer ./mvnw spring-boot:run -pl reactive-consumer ./mvnw spring-boot:run -pl kafka-streams-app curl -X POST http://localhost:8080/orders # ส่ง event kafka-console-consumer --bootstrap-server localhost:29092 --topic order-summary
คุณจะเห็น Messages ไหลจาก Producer → Broker → Streams → Topic order-summary
และ Consumer อ่านแบบ Reactive พร้อม Backpressure
สรุป
การผสมผสาน Reactive Programming กับ Apache Kafka ช่วยให้ระบบของคุณ รับ-ส่งข้อความเร็ว, ปรับตัวตามโหลด, และ ประมวลผลสตรีมอย่างยืดหยุ่น ตั้งแต่ ingestion ไปจนถึง analytic. เมื่อจับคู่กับ Kafka Streams คุณยังสามารถสร้าง Topology ประมวลผล stateful ได้โดยไม่ต้องย้ายข้อมูลออกจากคลัสเตอร์. อย่าลืมทดสอบด้วย Embedded Kafka และเปิด Metric Monitoring เพื่อความปลอดภัยใน Production!