การ Integrate กับ Kafka (Reactive Kafka / Kafka Streams)

Sharing is caring!

ภาพหน้าปก: รวมพลัง Reactive กับ Apache Kafka และ Kafka Streams

การ Integrate กับ Kafka (Reactive Kafka / Kafka Streams)

เวลาอ่านโดยประมาณ 20 นาที — บทความนี้จะพาคุณดำดิ่งสู่การเชื่อมต่อ Spring / Project Reactor กับ Apache Kafka ในแบบ Reactive (non-blocking) ครบทุกแง่มุม ตั้งแต่ producer, consumer, backpressure, ไปจนถึง stateful stream processing ด้วย Kafka Streams


สารบัญ

  1. Kafka + Reactive = ทำไมต้องสน?
  2. เตรียม Project & Dependency
  3. Reactive Producer
  4. Reactive Consumer & Backpressure
  5. Kafka Streams Topology
  6. Error-Handling & Retry Pattern
  7. State Store & Queryable State
  8. Testing กับ Embedded Kafka
  9. Observability & Tuning
  10. Demo ครบวงจร

1. Kafka + Reactive = ทำไมต้องสน?

  • Throughput สูง: Kafka รองรับ data หลายแสนข้อความต่อวินาที
  • Backpressure-aware: Project Reactor ควบคุมแรงดันข้อมูลแบบ non-blocking
  • End-to-End Streaming: Kafka Streams สร้าง topology ประมวลผลแบบ stateful
  • Minimal Thread: ใช้ event-loop + boundedElastic เฉพาะ block I/O
ภาพที่ 1 : การไหลของข้อมูลจาก Reactive Application ไป Kafka

2. เตรียม Project & Dependency

ตัวอย่างใช้ Spring Boot 3.5 + Spring Kafka 3.2 + Reactor Kafka 1.3

<dependency>
    <groupId>org.springframework.kafka<groupId>
    <artifactId>spring-kafka<artifactId>
<dependency>
<dependency>
    <groupId>io.projectreactor.kafka<groupId>
    <artifactId>reactor-kafka<artifactId>
<dependency>
<dependency>
    <groupId>org.apache.kafka<groupId>
    <artifactId>kafka-streams<artifactId>
<dependency>

3. Reactive Producer

@Bean
public KafkaSender<String, OrderEvent> kafkaSender(ProducerFactory<String, OrderEvent> pf) {
    SenderOptions<String, OrderEvent> opts = SenderOptions.create(pf.getConfigurationProperties());
    return KafkaSender.create(opts);
}

public Mono<RecordMetadata> send(OrderEvent evt) {
    SenderRecord<String, OrderEvent, String> record =
        SenderRecord.create(new ProducerRecord<>("order-events", evt.getId(), evt), evt.getId());
    return kafkaSender.send(Mono.just(record))
                      .next()           // รับผล send แรก
                      .map(SenderResult::recordMetadata);
}

จุดเด่น: KafkaSender ส่งแบบ non-blocking; Mono เก็บ ack & metadata

4. Reactive Consumer & Backpressure

@Bean
public Flux<ReceiverRecord<String, OrderEvent>> orderStream(
        ReceiverOptions<String, OrderEvent> options) {

    return KafkaReceiver.create(
                options.subscription(Collections.singleton("order-events")))
            .receive()                      // Flux แห่งข้อความ
            .publishOn(Schedulers.parallel())  // ประมวลผล CPU-bound
            .limitRate(1024)                // Backpressure จาก downstream
            .doOnNext(rec -&gt; rec.receiverOffset().acknowledge());
}
ภาพที่ 2 : Producer → Kafka → Reactive Consumer

5. Kafka Streams Topology

ภาพที่ 3 : Topology Source → Processor → Sink
StreamsBuilder builder = new StreamsBuilder();

KStream<String, OrderEvent> orders = builder.stream("order-events");

KStream<String, OrderSummary> summary = orders
      .groupByKey()
      .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
      .aggregate(OrderSummary::new,
                 (key, event, agg) -> agg.add(event),
                 Materialized.<String, OrderSummary, WindowStore<Bytes, byte[]>>as("order-store")
                              .withValueSerde(orderSummarySerde))
      .toStream()
      .map((windowedKey, agg) -> KeyValue.pair(windowedKey.key(), agg));

summary.to("order-summary");

ไฮไลต์:

  • aggregate ใช้ State Store ใน Memory + RocksDB
  • windowedBy สร้าง Tumbling Window 1 นาที
  • ผลลัพธ์ถูกส่งไป topic order-summary

6. Error-Handling & Retry Pattern

Reactive Consumer: ใช้ retryBackoff

orderStream
   .flatMap(this::processEvent)
   .retryBackoff(5, Duration.ofMillis(500))
   .onErrorContinue((ex, obj) -&gt; log.error("skip {}", obj, ex));

Kafka Streams: ใช้ setDefaultDeserializationExceptionHandler

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
          LogAndContinueExceptionHandler.class);

7. State Store & Queryable State

Kafka Streams สามารถเปิด HTTP endpoint เพื่อ query store แบบ Interactive-Queries ได้ เช่น

@RestController
public class OrderQueryController {

  private final ReadOnlyWindowStore<String, OrderSummary> store;

  public OrderQueryController(StreamsBuilderFactoryBean factoryBean) {
      store = factoryBean.getKafkaStreams()
                         .store(StoreQueryParameters.fromNameAndType(
                                "order-store",
                                QueryableStoreTypes.windowStore()));
  }

  @GetMapping("/summary/{id}")
  public List<OrderSummary> summary(@PathVariable String id) {
      return StreamsUtils.fetchAll(store, id);
  }
}

8. Testing กับ Embedded Kafka

@EmbeddedKafka(partitions = 1, topics = { "order-events" })
@SpringBootTest
class KafkaIntegrationTest {

  @Autowired KafkaTemplate<String, OrderEvent> template;

  @Test
  void shouldConsumeOrder() {
      template.send("order-events", "1", new OrderEvent(...));
      StepVerifier.create(orderStream.filter(r -&gt; r.key().equals("1")))
                  .expectNextCount(1)
                  .thenCancel()
                  .verify();
  }
}

9. Observability & Tuning

  • เปิด spring.kafka.properties.metrics.recording.level=DEBUG
  • Micrometer + Prometheus: Export kafka.consumer.records-lag
  • ปรับ max.poll.interval.ms & max.poll.records ให้สอดคล้อง backpressure
  • Kafka Streams: ใช้ num.stream.threads = core

10. Demo ครบวงจร

docker compose up -d zookeeper kafka schema-registry
./mvnw spring-boot:run -pl reactive-producer
./mvnw spring-boot:run -pl reactive-consumer
./mvnw spring-boot:run -pl kafka-streams-app
curl -X POST http://localhost:8080/orders  # ส่ง event
kafka-console-consumer --bootstrap-server localhost:29092 --topic order-summary

คุณจะเห็น Messages ไหลจาก Producer → Broker → Streams → Topic order-summary และ Consumer อ่านแบบ Reactive พร้อม Backpressure


สรุป

การผสมผสาน Reactive Programming กับ Apache Kafka ช่วยให้ระบบของคุณ รับ-ส่งข้อความเร็ว, ปรับตัวตามโหลด, และ ประมวลผลสตรีมอย่างยืดหยุ่น ตั้งแต่ ingestion ไปจนถึง analytic. เมื่อจับคู่กับ Kafka Streams คุณยังสามารถสร้าง Topology ประมวลผล stateful ได้โดยไม่ต้องย้ายข้อมูลออกจากคลัสเตอร์. อย่าลืมทดสอบด้วย Embedded Kafka และเปิด Metric Monitoring เพื่อความปลอดภัยใน Production!

Leave a Reply

อีเมลของคุณจะไม่แสดงให้คนอื่นเห็น ช่องข้อมูลจำเป็นถูกทำเครื่องหมาย *