
Reactive Streams กับ Project Reactor ลึก ๆ
เวลาอ่านโดยประมาณ 20 นาที — บทความนี้เป็นคู่มือเชิงลึกสำหรับนักพัฒนาสาย Reactive Java ที่อยากเข้าใจตั้งแต่สเปก “Reactive Streams” ไปจนถึงฟีเจอร์ลับของ Project Reactor ใน Spring WebFlux/Spring Boot 3.5+
สารบัญ
- Reactive Streams 101
- สัญญาณ 5 ชนิด & ลำดับชีวิต
- Mono vs Flux ลึกกว่าที่คิด
- Schedulers & Fusion
- Operator Groups สำคัญ
- Error Handling Pattern
- Backpressure Strategy
- Testing & Debugging
- Performance Tuning Checklist
- Demo ครบวงจร
1. Reactive Streams 101
Reactive Streams เป็น สเปก JVM ที่กำหนด interface พื้นฐาน 4 ตัว ได้แก่ Publisher<T>, Subscriber<T>, Subscription, และ Processor<IN, OUT>. จุดประสงค์คือสร้าง back-pressure-aware pipeline ที่ vendor ไหนก็ใช้งานร่วมกันได้ (RxJava 3, Reactor, Akka Streams, Mutiny ฯลฯ)

- Publisher — แหล่งข้อมูล
- Subscriber — ฝั่งบริโภค
- Subscription — สัญญาและ state ระหว่างคู่
- Processor — ทั้ง Producer & Consumer ในตัวเดียว
2. สัญญาณ 5 ชนิด & ลำดับชีวิต
onSubscribe(Subscription s) ← เริ่มต้น ├─| request(n) ← Backpressure demand │ ├─> onNext(T item) ← จัดส่งข้อมูลทีละชิ้น │ └─> ... ├─| onComplete() ← สำเร็จ └─| onError(Throwable t) ← ล้มเหลว cancel() ← ฝั่ง Subscriber ยกเลิก
กฎห้ามผิด:
- ต้อง
onSubscribeก่อนส่งสัญญาณอื่น - หลัง
onComplete/onErrorต้องจบทุกrequest - สัญญาณต้อง serial‐ordered ไม่ concurrent บนแม่น้ำเดียว
3. Mono vs Flux ลึกกว่าที่คิด

- Mono<T> เหมาะสำหรับผลลัพธ์เดี่ยว เช่น REST GET /one
- Flux<T> เหมาะกับ stream/data flow, SSE, WebSocket
- ทั้งคู่แชร์ operators เดียวกัน ~500+ เมธอด (map, flatMap, concat)
4. Schedulers & Fusion

Fusion คือเทคนิครวมหลาย operator เข้าด้วยกันให้ทำงานภายใน queue fusion ชุดเดียว ลดการสร้าง Subscriber ชั้นซ้อน. ส่วน Scheduler มีหลายชนิด:
| Scheduler | ใช้เมื่อ | ข้อดี | ข้อควรระวัง |
|---|---|---|---|
| parallel() | CPU – bound | ForkJoinPool | ไม่ควร block |
| boundedElastic() | Blocking I/O | ขยายได้ถึงค่าสูงสุด | เสี่ยง OOM ถ้า task block นาน |
| single() | Sequence ซิงเกิลเธรด | ง่ายต่อ debug | ผ่านงานได้ทีละ job |
5. Operator Groups สำคัญ
Transform: map, flatMap, concatMap
Filtering: filter, take, distinct
Combining: merge, zip, combineLatest
Error & Retry: retryWhen, onErrorResume
Backpressure: limitRate, onBackpressureBuffer, onBackpressureDrop
6. Error Handling Pattern
// Example: Retry with Exponential Backoff
Flux<String> callApi() {
return webClient.get()
.retrieve()
.bodyToFlux(String.class)
.retryWhen(Retry.backoff(5, Duration.ofMillis(200))
.filter(this::is5xx))
.onErrorResume(throwable -> Mono.just("fallback"));
}
หลักการ:
- จับให้ถูกประเภท (
filterใน Retry) - อย่าลืม Breaker (
timeout/take) กันติดบ่วง retry นิรันดร์ - Log สัญญาณ
.doOnErrorหรือ.log()ก่อนonErrorResume
7. Backpressure Strategy
ดูรายละเอียดเชิงลึกเรื่อง Backpressure ในบทความพี่ ก่อนหน้า แต่ใจความคือ limitRate = ผ่อนส่ง, buffer = เก็บก่อนไหล, drop = ทิ้งของเกิน. การเลือกขึ้นกับ SLA & RAM.
8. Testing & Debugging
@Test
void fluxPipeline_shouldEmitExpectedItems() {
Flux<Integer> pipeline = Flux.range(1, 5)
.map(i -> i * 10)
.log();
StepVerifier.create(pipeline)
.expectNext(10, 20, 30, 40, 50)
.verifyComplete();
}
StepVerifierตรวจ Sequencing, backpressure และ errorHooks.onOperatorDebug()เพิ่ม Stacktrace ละเอียด (แนะนำเฉพาะ Dev)BlockHoundค้นหา blocking call ใน event loop
9. Performance Tuning Checklist
- ปรับ
reactor.netty.io-worker-count= จำนวน CPU logical core - ตั้ง
spring.codec.max-in-memory-sizeเพื่อไม่ให้ buffer JSON เกิน - หลีกเลี่ยง
flatMapที่ไม่จำกัดconcurrency - เปิด Micrometer + Grafana ดู P99 Latency และ request(n)
- ใช้
log()พร้อมcheckpoint()ช่วยวินิจฉัย leak chain
10. Demo ครบวงจร: Database → Cache → REST SSE
@GetMapping(value = "/hot", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Article> hotNews() {
return newsRepository.findTrending() // R2DBC query
.switchIfEmpty(Mono.error(new NoData()))
.publishOn(Schedulers.boundedElastic()) // db I/O
.flatMap(cacheService::cacheIfAbsent) // Redis Reactive
.take(Duration.ofSeconds(30))
.retryBackoff(3, Duration.ofMillis(500))
.log();
}
Pipeline นี้รวบรวม Flux จากฐานข้อมูล reactive → ดึง/เขียน Redis แบบ non-blocking → stream ออกทาง Server-Sent Events (text/event-stream) พร้อม Backpressure โดยอัตโนมัติ.
สรุป
เมื่อเข้าใจกลไก Publisher–Subscriber อย่างลึกซึ้ง, รู้จักจัดการ backpressure, ใช้ operator อย่างมีชั้นเชิง, และตรวจสอบด้วย StepVerifier + BlockHound — คุณจะสามารถสร้างแอป Reactive ที่ เร็ว, เสถียร, และ ดูแลง่าย ได้ทันทีในโลก Spring WebFlux.