
เขียน WebSocket แบบ Reactive ด้วย WebFlux
เวลาอ่าน ≈ 20 นาที — บทความนี้จะพาคุณลงลึกตั้งแต่ขั้นตอน Handshake จนถึงการจัดการ Backpressure และ Session Lifecycle บน Spring WebFlux WebSocket ครบจบในที่เดียว พร้อมตัวอย่างโค้ด, Diagram และ Best Practice สำหรับ Production ระบบ Realtime ความเร็วสูง
สารบัญ
- WebSocket + Reactive คืออะไร?
- Handshake Flow
- ตั้งค่าฝั่ง Server ด้วย WebFlux
- Reactive WebSocketHandler
- Reactive Message Pipeline
- สร้าง WebSocket Client แบบ Flux
- Session Lifecycle และ Error Handling
- Backpressure & Flow Control
- Security – JWT & CORS
- ทดสอบด้วย WebTestClient + SockJS
- Checklist ก่อนขึ้น Production
1. WebSocket + Reactive คืออะไร?
ปกติ HTTP เป็น request / response แต่ WebSocket ให้ full-duplex ช่องทางเดียว บน TCP พอร์ต 80/443. เมื่อจับคู่กับ Project Reactor เราจะได้ non-blocking stream เช่น
Flux<WebSocketMessage> in = session.receive(); // รับไม่บล็อก Mono<Void> out = session.send(outFlux); // ส่งไม่บล็อก
ทำให้แอปรับ / ส่งหลัก หมื่น ข้อความต่อวินาทีได้สบาย ๆ โดยใช้ Thread น้อย
2. Handshake Flow

- Client ส่ง
Upgrade: websocket
กับSec-WebSocket-Key
- Server ตอบ
101 Switching Protocols
- เริ่ม Frame ระดับ WebSocket (Opcode 0x1 = Text, 0x2 = Binary)
3. ตั้งค่าฝั่ง Server ด้วย WebFlux
@Configuration @EnableWebFlux public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry r) { r.addHandler(chatHandler(), "/ws/chat") .setAllowedOrigins("https://example.com") .withSockJS(); // optional fallback } @Bean public WebSocketHandler chatHandler() { return new ChatHandler(); } }
WebSocketHandler
คือ Callback หลัก (non-blocking)- ถ้าใช้ Spring Boot 3.5 ไม่ต้อง @EnableWebFlux; แต่ใส่ @EnableWebSocket เพียว ๆ ก็ได้
4. Reactive WebSocketHandler
public class ChatHandler implements WebSocketHandler { @Override public Mono<Void> handle(WebSocketSession session) { Flux<String> input = session.receive() .map(WebSocketMessage::getPayloadAsText) .doOnNext(msg -> log.info("IN {}", msg)); Flux<WebSocketMessage> output = input.map(String::toUpperCase) .map(session::textMessage); return session.send(output); } @Override public List<WebSocketExtension> getSupportedExtensions() { return List.of(); } }
ทุกอย่างเป็น Flux/Mono จึงไม่บล็อก Netty event-loop
5. Reactive Message Pipeline

เราสามารถเพิ่ม Operator เช่น limitRate
, buffer
, retryWhen
ได้ตามต้องการ
6. สร้าง WebSocket Client แบบ Flux
WebSocketClient client = new ReactorNettyWebSocketClient(); client.execute(URI.create("ws://localhost:8080/ws/chat"), session -> { Mono<Void> send = session.send(Flux.interval(Duration.ofSeconds(1)) .map(i -> "ping-" + i) .map(session::textMessage)); Mono<Void> receive = session.receive() .map(WebSocketMessage::getPayloadAsText) .doOnNext(System.out::println) .then(); return Mono.zip(send, receive).then(); }).block();
7. Session Lifecycle & Error Handling

- ตรวจ
WebSocketCloseStatus
ในsession.closeStatus()
- ใช้
.doOnError
และretryWhen
บน Client - Server ควร
session.close(CLOSE_TOO_BIG)
ถ้า Payload เกินกำหนด
8. Backpressure & Flow Control
ถ้า Client ส่งเร็วกว่า Server ประมวลผล Buffer จะโตจนอาจ OOM. แก้ไขด้วย limitRate
หรือ onBackpressureDrop
session.receive() .onBackpressureBuffer(1024, msg -> log.warn("Drop {}", msg.getPayloadAsText()), BufferOverflowStrategy.DROP_OLDEST) .publishOn(Schedulers.parallel()) .subscribe(handler::handle);
9. Security – JWT & CORS
@Configuration public class WsSecurityConfig { @Bean public SecurityWebFilterChain chain(ServerHttpSecurity http) { return http .authorizeExchange(ex -> ex .pathMatchers("/ws/**").authenticated() .anyExchange().permitAll()) .addFilterAt(new JwtAuthWebFilter(jwtDecoder), SecurityWebFiltersOrder.AUTHENTICATION) .csrf(ServerHttpSecurity.CsrfSpec::disable) // WebSocket ≠ CSRF .build(); } }
ส่วน CORS ให้เปิด Origin ที่จำเป็นใน WebSocketHandlerRegistry
10. ทดสอบด้วย WebTestClient + SockJS
@WebFluxTest class WsTest { @Autowired WebTestClient client; @Test void handshakeShouldWork() { client.get() .uri("/ws/chat") .header(HttpHeaders.UPGRADE, "websocket") .exchange() .expectStatus().isSwitchingProtocols(); } }
Tip: ใช้ okhttp-ws
หรือ Spring STOMP SockJS Test
ยิงข้อความหลาย Thread เพื่อ Stress Test
11. Checklist ก่อน Production
- ตั้ง
spring.codec.max-in-memory-size
ป้องกัน OOM - จำกัด
WebSocketSession.getReceive().limitRate
ที่ Server - เปิด Prometheus metric
reactor.netty.websocket.connections
- ใช้ ping/pong Frame เช็ก Idle แล้ว ปิด Session อัตโนมัติ
- เขียน Retry & Reconnect ที่ Client พร้อม Exponential Back-off
- แยก EventLoopGroup ถ้ามี Static File Serve ร่วม Port
สรุป
เมื่อรวมพลัง WebSocket เข้ากับ Spring WebFlux คุณจะได้ช่องทาง full-duplex ที่ เร็ว, ไม่บล็อก, และ ดูแลง่าย. จำไว้ว่าหัวใจอยู่ที่ Handshake, Backpressure และ Session Lifecycle. ทำตาม Best Practice ข้างต้น แอปของคุณพร้อมรองรับ Realtime หลัก หมื่น ผู้ใช้โดยใช้ CPU และ RAM อย่างคุ้มค่า!