เขียน WebSocket แบบ Reactive ด้วย WebFlux

Sharing is caring!

ภาพหน้าปก: เขียน WebSocket แบบ Reactive ด้วย Spring WebFlux

เขียน WebSocket แบบ Reactive ด้วย WebFlux

เวลาอ่าน ≈ 20 นาที — บทความนี้จะพาคุณลงลึกตั้งแต่ขั้นตอน Handshake จนถึงการจัดการ Backpressure และ Session Lifecycle บน Spring WebFlux WebSocket ครบจบในที่เดียว พร้อมตัวอย่างโค้ด, Diagram และ Best Practice สำหรับ Production ระบบ Realtime ความเร็วสูง


สารบัญ

  1. WebSocket + Reactive คืออะไร?
  2. Handshake Flow
  3. ตั้งค่าฝั่ง Server ด้วย WebFlux
  4. Reactive WebSocketHandler
  5. Reactive Message Pipeline
  6. สร้าง WebSocket Client แบบ Flux
  7. Session Lifecycle และ Error Handling
  8. Backpressure & Flow Control
  9. Security – JWT & CORS
  10. ทดสอบด้วย WebTestClient + SockJS
  11. Checklist ก่อนขึ้น Production

1. WebSocket + Reactive คืออะไร?

ปกติ HTTP เป็น request / response แต่ WebSocket ให้ full-duplex ช่องทางเดียว บน TCP พอร์ต 80/443. เมื่อจับคู่กับ Project Reactor เราจะได้ non-blocking stream เช่น

Flux<WebSocketMessage> in = session.receive();   // รับไม่บล็อก
Mono<Void>          out = session.send(outFlux); // ส่งไม่บล็อก

ทำให้แอปรับ / ส่งหลัก หมื่น ข้อความต่อวินาทีได้สบาย ๆ โดยใช้ Thread น้อย

2. Handshake Flow

ภาพที่ 1 : ขั้นตอน Handshake ระหว่าง Client ↔ Server
  1. Client ส่ง Upgrade: websocket กับ Sec-WebSocket-Key
  2. Server ตอบ 101 Switching Protocols
  3. เริ่ม Frame ระดับ WebSocket (Opcode 0x1 = Text, 0x2 = Binary)

3. ตั้งค่าฝั่ง Server ด้วย WebFlux

@Configuration
@EnableWebFlux
public class WebSocketConfig implements WebSocketConfigurer {
  @Override
  public void registerWebSocketHandlers(WebSocketHandlerRegistry r) {
      r.addHandler(chatHandler(), "/ws/chat")
       .setAllowedOrigins("https://example.com")
       .withSockJS(); // optional fallback
  }

  @Bean
  public WebSocketHandler chatHandler() { return new ChatHandler(); }
}
  • WebSocketHandler คือ Callback หลัก (non-blocking)
  • ถ้าใช้ Spring Boot 3.5 ไม่ต้อง @EnableWebFlux; แต่ใส่ @EnableWebSocket เพียว ๆ ก็ได้

4. Reactive WebSocketHandler

public class ChatHandler implements WebSocketHandler {
  @Override
  public Mono<Void> handle(WebSocketSession session) {

      Flux<String> input =
          session.receive()
                 .map(WebSocketMessage::getPayloadAsText)
                 .doOnNext(msg -> log.info("IN&nbsp;{}", msg));

      Flux<WebSocketMessage> output =
          input.map(String::toUpperCase)
               .map(session::textMessage);

      return session.send(output);
  }
  @Override public List<WebSocketExtension> getSupportedExtensions() { return List.of(); }
}

ทุกอย่างเป็น Flux/Mono จึงไม่บล็อก Netty event-loop

5. Reactive Message Pipeline

ภาพที่ 2 : ข้อความไหลผ่าน Subscription → Custom Operator → Handler

เราสามารถเพิ่ม Operator เช่น limitRate, buffer, retryWhen ได้ตามต้องการ

6. สร้าง WebSocket Client แบบ Flux

WebSocketClient client = new ReactorNettyWebSocketClient();

client.execute(URI.create("ws://localhost:8080/ws/chat"),
        session -> {
            Mono<Void> send = session.send(Flux.interval(Duration.ofSeconds(1))
                                             .map(i -> "ping-" + i)
                                             .map(session::textMessage));

            Mono<Void> receive = session.receive()
                                        .map(WebSocketMessage::getPayloadAsText)
                                        .doOnNext(System.out::println)
                                        .then();

            return Mono.zip(send, receive).then();
        }).block();

7. Session Lifecycle & Error Handling

ภาพที่ 3 : Opening → Active → Closing → Closed (+ Abnormal Termination)
  • ตรวจ WebSocketCloseStatus ใน session.closeStatus()
  • ใช้ .doOnError และ retryWhen บน Client
  • Server ควร session.close(CLOSE_TOO_BIG) ถ้า Payload เกินกำหนด

8. Backpressure & Flow Control

ถ้า Client ส่งเร็วกว่า Server ประมวลผล Buffer จะโตจนอาจ OOM. แก้ไขด้วย limitRate หรือ onBackpressureDrop

session.receive()
       .onBackpressureBuffer(1024,
             msg -> log.warn("Drop {}", msg.getPayloadAsText()),
             BufferOverflowStrategy.DROP_OLDEST)
       .publishOn(Schedulers.parallel())
       .subscribe(handler::handle);

9. Security – JWT & CORS

@Configuration
public class WsSecurityConfig {

 @Bean
 public SecurityWebFilterChain chain(ServerHttpSecurity http) {
     return http
       .authorizeExchange(ex -> ex
             .pathMatchers("/ws/**").authenticated()
             .anyExchange().permitAll())
       .addFilterAt(new JwtAuthWebFilter(jwtDecoder),
             SecurityWebFiltersOrder.AUTHENTICATION)
       .csrf(ServerHttpSecurity.CsrfSpec::disable) // WebSocket ≠ CSRF
       .build();
 }
}

ส่วน CORS ให้เปิด Origin ที่จำเป็นใน WebSocketHandlerRegistry

10. ทดสอบด้วย WebTestClient + SockJS

@WebFluxTest
class WsTest {

 @Autowired WebTestClient client;

 @Test
 void handshakeShouldWork() {
     client.get()
           .uri("/ws/chat")
           .header(HttpHeaders.UPGRADE, "websocket")
           .exchange()
           .expectStatus().isSwitchingProtocols();
 }
}

Tip: ใช้ okhttp-ws หรือ Spring STOMP SockJS Test ยิงข้อความหลาย Thread เพื่อ Stress Test

11. Checklist ก่อน Production

  • ตั้ง spring.codec.max-in-memory-size ป้องกัน OOM
  • จำกัด WebSocketSession.getReceive().limitRate ที่ Server
  • เปิด Prometheus metric reactor.netty.websocket.connections
  • ใช้ ping/pong Frame เช็ก Idle แล้ว ปิด Session อัตโนมัติ
  • เขียน Retry & Reconnect ที่ Client พร้อม Exponential Back-off
  • แยก EventLoopGroup ถ้ามี Static File Serve ร่วม Port

สรุป

เมื่อรวมพลัง WebSocket เข้ากับ Spring WebFlux คุณจะได้ช่องทาง full-duplex ที่ เร็ว, ไม่บล็อก, และ ดูแลง่าย. จำไว้ว่าหัวใจอยู่ที่ Handshake, Backpressure และ Session Lifecycle. ทำตาม Best Practice ข้างต้น แอปของคุณพร้อมรองรับ Realtime หลัก หมื่น ผู้ใช้โดยใช้ CPU และ RAM อย่างคุ้มค่า!

Leave a Reply

อีเมลของคุณจะไม่แสดงให้คนอื่นเห็น ช่องข้อมูลจำเป็นถูกทำเครื่องหมาย *