Custom Operator ด้วย Reactor

Sharing is caring!

ภาพหน้าปก: เส้นทางสร้าง Operator แบบกำหนดเองบน Reactor

Custom Operator ด้วย Reactor

เวลาอ่าน ≈ 20 นาที — บทความนี้เจาะลึกวิธีสร้าง Custom Operator ใน Project Reactor (Flux/Mono) เพื่อขยายความสามารถ และรีไซเคิล logic แบบประกอบได้ พร้อมโค้ดตัวอย่าง, รูปอธิบาย, Lifecycle, และแนวทางทดสอบ


สารบัญ

  1. แนวคิดเบื้องหลัง Operator
  2. ทางเลือกสร้าง Custom Operator
  3. สร้าง Operator ด้วย lift
  4. ใช้ transform / compose เมื่อต้องการ Reuse
  5. Operator Lifecycle & Signal Chain
  6. จัดการ Context และ Backpressure
  7. ทดสอบ Operator ด้วย StepVerifier
  8. จูนประสิทธิภาพ & Troubleshooting
  9. Demo ครบวงจร

1. แนวคิดเบื้องหลัง Operator

Operator ใน Reactive Streams คือ ฟังก์ชันบริสุทธิ์ แปลง Publisher<X> เป็น Publisher<Y>. Reactor ให้ Operator กว่า 500 รายการ (map, flatMap, zip ฯลฯ). แต่หลายกรณีเราต้องการ behaviour เฉพาะ — เช่น caching, duplicate suppression, metric enrichment — จึงต้องสร้าง Custom Operator เอง

ภาพที่ 1 : โครงสร้าง Publisher → Custom Operator → Subscriber

2. ทางเลือกสร้าง Custom Operator

  • lift — สร้าง Operator ระดับ Core (รับ/คืน Subscriber)
  • transform — ห่อ logic เป็น Function ระดับ Publisher
  • compose — เหมือน transform แต่สร้าง stateful function ใหม่ทุก subscribe
  • Extension Method / Utility — สร้าง static เมธอด ที่เรียก chain ของ Operator มาตรฐาน
ภาพที่ 2 : เปรียบเทียบ lift / transform / compose

3. สร้าง Operator ด้วย lift

ตัวอย่าง: ทำ “delay first element until upstream count ≥ N”

public static <T> FluxOperator<T, T> delayUntilEmits(int n) {
    return source -> source.lift((sc, actual) -> new DelaySubscriber<>(actual, n));
}

static final class DelaySubscriber<T> implements CoreSubscriber<T> {

    final CoreSubscriber<? super T> actual;
    final int threshold;
    int count;

    DelaySubscriber(CoreSubscriber<? super T> actual, int threshold) {
        this.actual = actual;
        this.threshold = threshold;
    }

    @Override public void onSubscribe(Subscription s) { actual.onSubscribe(s); }

    @Override public void onNext(T t) {
        if (++count >= threshold) actual.onNext(t);
    }

    @Override public void onError(Throwable t) { actual.onError(t); }

    @Override public void onComplete() { actual.onComplete(); }
}

ใช้งาน:

Flux.range(1, 10)
    .transform(delayUntilEmits(3))
    .subscribe(System.out::println); // พิมพ์ 4..10

4. ใช้ transform และ compose

transform ถูกสร้างครั้งเดียว แชร์ระหว่าง Subscriber ทั้งหมด; compose สร้างใหม่ทุกครั้ง — ดังนั้นเหมาะกับการเก็บ state ภายใน

Function<Flux<Integer>, Publisher<Integer>> dedupe =
        f -> f.distinctUntilChanged();

Flux.range(1, 5)
    .repeat(2)
    .transform(dedupe)
    .subscribe(System.out::println);  // 1 2 3 4 5

5. Operator Lifecycle & Signal Chain

ภาพที่ 3 : สัญญาณ onSubscribe → onNext → onError/onComplete

ทุก Operator ต้องปฏิบัติตามกฎ Reactive Streams:

  1. ส่ง onSubscribe ก่อนสัญญาณอื่น
  2. เคารพ request(n) เพื่อ Backpressure
  3. ปล่อยสัญญาณ onError/onComplete อย่างเดียวและครั้งเดียว

6. จัดการ Context และ Backpressure

ถ้า Operator สร้าง inner Publisher (เช่น flatMap) ควร propagate Context:

Mono.deferContextual(ctx -> {
        String reqId = ctx.get("reqId");
        return source.map(v -> "[" + reqId + "]" + v);
}).contextWrite(Context.of("reqId", "123"));

ส่วน Backpressure — ใช้ BaseSubscriber หรือรักษา requested counter manual

7. ทดสอบ Operator ด้วย StepVerifier

@Test
void delayUntilEmits_shouldSkipFirstN() {
    StepVerifier.create(Flux.range(1, 5).transform(delayUntilEmits(2)))
                .expectNext(3, 4, 5)
                .verifyComplete();
}

ใช้ VirtualTimeScheduler หาก Operator เกี่ยวกับเวลา

8. จูนประสิทธิภาพ & Troubleshooting

  • เปิด Hooks.onOperatorDebug() แสดง assembly ก่อน deploy
  • ใช้ .log() หรือ SignalLogger เฉพาะจุดที่สงสัย
  • ห้ามบล็อก Thread ใน Operator; ส่งไป Schedulers.boundedElastic()
  • วัด P99 latency ด้วย Micrometer + Grafana

9. Demo ครบวงจร

// Pipeline: custom delayFirst(3) + dedupe + window
Flux<Integer> demo =
        Flux.range(1, 20)
            .delayElements(Duration.ofMillis(50))
            .transform(delayUntilEmits(3))
            .compose(dedupe)
            .windowTimeout(5, Duration.ofSeconds(1))
            .flatMap(win -> win.collectList());

demo.subscribe(System.out::println);

Run demo แล้วสังเกตผลลัพธ์, Thread, และ Backpressure ผ่าน .log()


สรุป

การสร้าง Custom Operator ช่วยให้ Pipeline ของคุณ สั้น, อ่านง่าย, และ รีใช้ซ้ำ. เลือก lift เมื่อต้องการควบคุมระดับ Subscriber, ใช้ transform/compose สำหรับ Function re-use, และอย่าลืมทดสอบความถูกต้องด้วย StepVerifier ก่อนนำขึ้น Production

Leave a Reply

อีเมลของคุณจะไม่แสดงให้คนอื่นเห็น ช่องข้อมูลจำเป็นถูกทำเครื่องหมาย *