Reactive Streams กับ Project Reactor ลึก ๆ

Sharing is caring!

ภาพหน้าปก: เส้นทางลูกศรสีสันบอกเล่าโลก Reactive Programming

Reactive Streams กับ Project Reactor ลึก ๆ

เวลาอ่านโดยประมาณ 20 นาที — บทความนี้เป็นคู่มือเชิงลึกสำหรับนักพัฒนาสาย Reactive Java ที่อยากเข้าใจตั้งแต่สเปก “Reactive Streams” ไปจนถึงฟีเจอร์ลับของ Project Reactor ใน Spring WebFlux/Spring Boot 3.5+


สารบัญ

  1. Reactive Streams 101
  2. สัญญาณ 5 ชนิด & ลำดับชีวิต
  3. Mono vs Flux ลึกกว่าที่คิด
  4. Schedulers & Fusion
  5. Operator Groups สำคัญ
  6. Error Handling Pattern
  7. Backpressure Strategy
  8. Testing & Debugging
  9. Performance Tuning Checklist
  10. Demo ครบวงจร

1. Reactive Streams 101

Reactive Streams เป็น สเปก JVM ที่กำหนด interface พื้นฐาน 4 ตัว ได้แก่ Publisher<T>, Subscriber<T>, Subscription, และ Processor<IN, OUT>. จุดประสงค์คือสร้าง back-pressure-aware pipeline ที่ vendor ไหนก็ใช้งานร่วมกันได้ (RxJava 3, Reactor, Akka Streams, Mutiny ฯลฯ)​

ภาพที่ 1 : ไลฟ์ไซเคิลของ Publisher → Subscription → Subscriber
  • Publisher — แหล่งข้อมูล
  • Subscriber — ฝั่งบริโภค
  • Subscription — สัญญาและ state ระหว่างคู่
  • Processor — ทั้ง Producer & Consumer ในตัวเดียว

2. สัญญาณ 5 ชนิด & ลำดับชีวิต

onSubscribe(Subscription s)   ← เริ่มต้น
  ├─| request(n)             ← Backpressure demand
  │      ├─> onNext(T item)  ← จัดส่งข้อมูลทีละชิ้น
  │      └─> ...
  ├─| onComplete()           ← สำเร็จ
  └─| onError(Throwable t)   ← ล้มเหลว
cancel()                      ← ฝั่ง Subscriber ยกเลิก

กฎห้ามผิด:

  1. ต้อง onSubscribe ก่อนส่งสัญญาณอื่น
  2. หลัง onComplete/onError ต้องจบทุก request
  3. สัญญาณต้อง serial‐ordered ไม่ concurrent บนแม่น้ำเดียว

3. Mono vs Flux ลึกกว่าที่คิด

ภาพที่ 2 : Mono = 0..1, Flux = 0..N
  • Mono<T> เหมาะสำหรับผลลัพธ์เดี่ยว เช่น REST GET /one
  • Flux<T> เหมาะกับ stream/data flow, SSE, WebSocket
  • ทั้งคู่แชร์ operators เดียวกัน ~500+ เมธอด (map, flatMap, concat)​

4. Schedulers & Fusion

ภาพที่ 3 : Fusion ทำให้ map→filter→flatMap รวมกันเป็นสายเดียว ช่วยลด allocation

Fusion คือเทคนิครวมหลาย operator เข้าด้วยกันให้ทำงานภายใน queue fusion ชุดเดียว ลดการสร้าง Subscriber ชั้นซ้อน. ส่วน Scheduler มีหลายชนิด:

Schedulerใช้เมื่อข้อดีข้อควรระวัง
parallel()CPU – boundForkJoinPoolไม่ควร block
boundedElastic()Blocking I/Oขยายได้ถึงค่าสูงสุดเสี่ยง OOM ถ้า task block นาน
single()Sequence ซิงเกิลเธรดง่ายต่อ debugผ่านงานได้ทีละ job

5. Operator Groups สำคัญ

Transform: map, flatMap, concatMap
Filtering: filter, take, distinct
Combining: merge, zip, combineLatest
Error & Retry: retryWhen, onErrorResume
Backpressure: limitRate, onBackpressureBuffer, onBackpressureDrop

6. Error Handling Pattern

// Example: Retry with Exponential Backoff
Flux<String> callApi() {
    return webClient.get()
        .retrieve()
        .bodyToFlux(String.class)
        .retryWhen(Retry.backoff(5, Duration.ofMillis(200))
                        .filter(this::is5xx))
        .onErrorResume(throwable -> Mono.just("fallback"));
}

หลักการ:

  1. จับให้ถูกประเภท (filter ใน Retry)
  2. อย่าลืม Breaker (timeout/take) กันติดบ่วง retry นิรันดร์
  3. Log สัญญาณ .doOnError หรือ .log() ก่อน onErrorResume

7. Backpressure Strategy

ดูรายละเอียดเชิงลึกเรื่อง Backpressure ในบทความพี่ ก่อนหน้า แต่ใจความคือ limitRate = ผ่อนส่ง, buffer = เก็บก่อนไหล, drop = ทิ้งของเกิน. การเลือกขึ้นกับ SLA & RAM.

8. Testing & Debugging

@Test
void fluxPipeline_shouldEmitExpectedItems() {
    Flux<Integer> pipeline = Flux.range(1, 5)
                                  .map(i -> i * 10)
                                  .log();

    StepVerifier.create(pipeline)
                .expectNext(10, 20, 30, 40, 50)
                .verifyComplete();
}
  • StepVerifier ตรวจ Sequencing, backpressure และ error
  • Hooks.onOperatorDebug() เพิ่ม Stacktrace ละเอียด (แนะนำเฉพาะ Dev)
  • BlockHound ค้นหา blocking call ใน event loop

9. Performance Tuning Checklist

  1. ปรับ reactor.netty.io-worker-count = จำนวน CPU logical core
  2. ตั้ง spring.codec.max-in-memory-size เพื่อไม่ให้ buffer JSON เกิน
  3. หลีกเลี่ยง flatMap ที่ไม่จำกัด concurrency
  4. เปิด Micrometer + Grafana ดู P99 Latency และ request(n)
  5. ใช้ log() พร้อม checkpoint() ช่วยวินิจฉัย leak chain

10. Demo ครบวงจร: Database → Cache → REST SSE

@GetMapping(value = "/hot", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Article> hotNews() {
    return newsRepository.findTrending()     // R2DBC query
                         .switchIfEmpty(Mono.error(new NoData()))
                         .publishOn(Schedulers.boundedElastic()) // db I/O
                         .flatMap(cacheService::cacheIfAbsent)  // Redis Reactive
                         .take(Duration.ofSeconds(30))
                         .retryBackoff(3, Duration.ofMillis(500))
                         .log();
}

Pipeline นี้รวบรวม Flux จากฐานข้อมูล reactive → ดึง/เขียน Redis แบบ non-blocking → stream ออกทาง Server-Sent Events (text/event-stream) พร้อม Backpressure โดยอัตโนมัติ.


สรุป

เมื่อเข้าใจกลไก Publisher–Subscriber อย่างลึกซึ้ง, รู้จักจัดการ backpressure, ใช้ operator อย่างมีชั้นเชิง, และตรวจสอบด้วย StepVerifier + BlockHound — คุณจะสามารถสร้างแอป Reactive ที่ เร็ว, เสถียร, และ ดูแลง่าย ได้ทันทีในโลก Spring WebFlux.

Leave a Reply

อีเมลของคุณจะไม่แสดงให้คนอื่นเห็น ช่องข้อมูลจำเป็นถูกทำเครื่องหมาย *