
เขียน WebSocket แบบ Reactive ด้วย WebFlux
เวลาอ่าน ≈ 20 นาที — บทความนี้จะพาคุณลงลึกตั้งแต่ขั้นตอน Handshake จนถึงการจัดการ Backpressure และ Session Lifecycle บน Spring WebFlux WebSocket ครบจบในที่เดียว พร้อมตัวอย่างโค้ด, Diagram และ Best Practice สำหรับ Production ระบบ Realtime ความเร็วสูง
สารบัญ
- WebSocket + Reactive คืออะไร?
- Handshake Flow
- ตั้งค่าฝั่ง Server ด้วย WebFlux
- Reactive WebSocketHandler
- Reactive Message Pipeline
- สร้าง WebSocket Client แบบ Flux
- Session Lifecycle และ Error Handling
- Backpressure & Flow Control
- Security – JWT & CORS
- ทดสอบด้วย WebTestClient + SockJS
- Checklist ก่อนขึ้น Production
1. WebSocket + Reactive คืออะไร?
ปกติ HTTP เป็น request / response แต่ WebSocket ให้ full-duplex ช่องทางเดียว บน TCP พอร์ต 80/443. เมื่อจับคู่กับ Project Reactor เราจะได้ non-blocking stream เช่น
Flux<WebSocketMessage> in = session.receive(); // รับไม่บล็อก Mono<Void> out = session.send(outFlux); // ส่งไม่บล็อก
ทำให้แอปรับ / ส่งหลัก หมื่น ข้อความต่อวินาทีได้สบาย ๆ โดยใช้ Thread น้อย
2. Handshake Flow

- Client ส่ง
Upgrade: websocketกับSec-WebSocket-Key - Server ตอบ
101 Switching Protocols - เริ่ม Frame ระดับ WebSocket (Opcode 0x1 = Text, 0x2 = Binary)
3. ตั้งค่าฝั่ง Server ด้วย WebFlux
@Configuration
@EnableWebFlux
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry r) {
r.addHandler(chatHandler(), "/ws/chat")
.setAllowedOrigins("https://example.com")
.withSockJS(); // optional fallback
}
@Bean
public WebSocketHandler chatHandler() { return new ChatHandler(); }
}
WebSocketHandlerคือ Callback หลัก (non-blocking)- ถ้าใช้ Spring Boot 3.5 ไม่ต้อง @EnableWebFlux; แต่ใส่ @EnableWebSocket เพียว ๆ ก็ได้
4. Reactive WebSocketHandler
public class ChatHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<String> input =
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(msg -> log.info("IN {}", msg));
Flux<WebSocketMessage> output =
input.map(String::toUpperCase)
.map(session::textMessage);
return session.send(output);
}
@Override public List<WebSocketExtension> getSupportedExtensions() { return List.of(); }
}
ทุกอย่างเป็น Flux/Mono จึงไม่บล็อก Netty event-loop
5. Reactive Message Pipeline

เราสามารถเพิ่ม Operator เช่น limitRate, buffer, retryWhen ได้ตามต้องการ
6. สร้าง WebSocket Client แบบ Flux
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(URI.create("ws://localhost:8080/ws/chat"),
session -> {
Mono<Void> send = session.send(Flux.interval(Duration.ofSeconds(1))
.map(i -> "ping-" + i)
.map(session::textMessage));
Mono<Void> receive = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println)
.then();
return Mono.zip(send, receive).then();
}).block();
7. Session Lifecycle & Error Handling

- ตรวจ
WebSocketCloseStatusในsession.closeStatus() - ใช้
.doOnErrorและretryWhenบน Client - Server ควร
session.close(CLOSE_TOO_BIG)ถ้า Payload เกินกำหนด
8. Backpressure & Flow Control
ถ้า Client ส่งเร็วกว่า Server ประมวลผล Buffer จะโตจนอาจ OOM. แก้ไขด้วย limitRate หรือ onBackpressureDrop
session.receive()
.onBackpressureBuffer(1024,
msg -> log.warn("Drop {}", msg.getPayloadAsText()),
BufferOverflowStrategy.DROP_OLDEST)
.publishOn(Schedulers.parallel())
.subscribe(handler::handle);
9. Security – JWT & CORS
@Configuration
public class WsSecurityConfig {
@Bean
public SecurityWebFilterChain chain(ServerHttpSecurity http) {
return http
.authorizeExchange(ex -> ex
.pathMatchers("/ws/**").authenticated()
.anyExchange().permitAll())
.addFilterAt(new JwtAuthWebFilter(jwtDecoder),
SecurityWebFiltersOrder.AUTHENTICATION)
.csrf(ServerHttpSecurity.CsrfSpec::disable) // WebSocket ≠ CSRF
.build();
}
}
ส่วน CORS ให้เปิด Origin ที่จำเป็นใน WebSocketHandlerRegistry
10. ทดสอบด้วย WebTestClient + SockJS
@WebFluxTest
class WsTest {
@Autowired WebTestClient client;
@Test
void handshakeShouldWork() {
client.get()
.uri("/ws/chat")
.header(HttpHeaders.UPGRADE, "websocket")
.exchange()
.expectStatus().isSwitchingProtocols();
}
}
Tip: ใช้ okhttp-ws หรือ Spring STOMP SockJS Test ยิงข้อความหลาย Thread เพื่อ Stress Test
11. Checklist ก่อน Production
- ตั้ง
spring.codec.max-in-memory-sizeป้องกัน OOM - จำกัด
WebSocketSession.getReceive().limitRateที่ Server - เปิด Prometheus metric
reactor.netty.websocket.connections - ใช้ ping/pong Frame เช็ก Idle แล้ว ปิด Session อัตโนมัติ
- เขียน Retry & Reconnect ที่ Client พร้อม Exponential Back-off
- แยก EventLoopGroup ถ้ามี Static File Serve ร่วม Port
สรุป
เมื่อรวมพลัง WebSocket เข้ากับ Spring WebFlux คุณจะได้ช่องทาง full-duplex ที่ เร็ว, ไม่บล็อก, และ ดูแลง่าย. จำไว้ว่าหัวใจอยู่ที่ Handshake, Backpressure และ Session Lifecycle. ทำตาม Best Practice ข้างต้น แอปของคุณพร้อมรองรับ Realtime หลัก หมื่น ผู้ใช้โดยใช้ CPU และ RAM อย่างคุ้มค่า!