
สารบัญ
- ทำไมต้อง Reactive Redis?
- สแตกเทคโนโลยี
- การตั้งค่าโปรเจค
- การเชื่อมต่อแบบ Reactive
- Reactive Pub/Sub
- Reactive Streams API
- การทดสอบ & Benchmark
- Best Practices
- สรุป
1. ทำไมต้อง Reactive Redis?
หากแอปพลิเคชันของคุณมีผู้ใช้พร้อมกันจำนวนมาก และ I/O-bound เช่น API ที่ต้องเรียกฐานข้อมูลหรือบริการอื่น ๆ บ่อยครั้ง การใช้โค้ดแบบ Blocking จะทำให้ thread ถูกยึดครองและเสีย resource โดยไม่จำเป็น Reactive Programming เข้ามาแก้ปัญหานี้ด้วย Back-pressure, Non-Blocking I/O, และ Event Loop ทำให้เราสามารถ ประมวลผลคำสั่ง Redis จำนวนมากได้พร้อม กัน โดยใช้ thread เพียงไม่กี่ตัวเท่านั้น
2. สแตกเทคโนโลยี
- Spring Boot 3.5.x (หรือใช้ Spring Framework 6 ขึ้นไป)
- Spring Data Redis Reactive (
spring-data-redis-2.7.x+
) - Lettuce (Netty-based reactive Redis client)
- Project Reactor 3.6+ (
Mono
,Flux
) - Redis 6/7 พร้อมเปิด replication, ACL, และ Redis Streams
3. การตั้งค่าโปรเจค
3.1 Gradle หรือ Maven Dependency
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency>
3.2 เพิ่ม Reactive Redis Configuration
@Configuration @EnableReactiveRedisRepositories public class RedisConfig { @Bean public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() { return new LettuceConnectionFactory("localhost", 6379); } @Bean public ReactiveRedisTemplate<String, Person> reactiveRedisTemplate( ReactiveRedisConnectionFactory factory, Jackson2JsonRedisSerializer<Person> serializer) { RedisSerializationContext.SerializationPair<String> keySerializer = RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()); return new ReactiveRedisTemplate<>( factory, RedisSerializationContext .newSerializationContext(keySerializer) .value(serializer) .build()); } }
4. การเชื่อมต่อแบบ Reactive

4.1 CRUD พื้นฐาน
@Service @RequiredArgsConstructor public class PersonService { private final ReactiveRedisTemplate<String, Person> redisTemplate; public Mono<Person> savePerson(Person p) { return redisTemplate .opsForValue() .set("person:" + p.getId(), p) .thenReturn(p); } public Mono<Person> findPerson(String id) { return redisTemplate .opsForValue() .get("person:" + id); } public Mono<Boolean> deletePerson(String id) { return redisTemplate .opsForValue() .delete("person:" + id); } public Flux<Person> findAll() { return redisTemplate .scan() .filter(key -> key.startsWith("person:")) .flatMap(redisTemplate.opsForValue()::get); } }
4.2 Handling Timeout & Retry
savePerson(p) .timeout(Duration.ofSeconds(2)) .retryWhen(Retry.backoff(3, Duration.ofMillis(100))) .doOnError(e -> log.error("Redis error", e)) .subscribe();
5. Reactive Pub/Sub

@Bean public Flux<Message<String, String>> eventStream(ReactiveRedisConnectionFactory factory) { return factory.getReactiveConnection() .pubSubCommands() .subscribe("events") .flatMap(channel -> channel.receive() .map(msg -> MessageBuilder .withPayload(msg.getMessage()) .setHeader("redisChannel", msg.getChannel()) .build())); } public Mono<Long> publishEvent(String json) { return redisTemplate.convertAndSend("events", json); }
6. Reactive Redis Streams API

@Bean public Flux<MapRecord<String, Object, Object>> streamConsumer() { return redisTemplate .opsForStream() .createGroup("mystream", ReadOffset.latest(), "service-A") .thenMany(redisTemplate .opsForStream() .read(Consumer.from("service-A", UUID.randomUUID().toString()), StreamOffset.create("mystream", ReadOffset.lastConsumed())) .retryWhen(Retry.indefinitely())) .onBackpressureBuffer(); }
7. การทดสอบ & Benchmark
7.1 Unit Test Reactive
@Test void save_shouldReturnMono() { Person bob = new Person("1", "Bob"); StepVerifier.create(service.savePerson(bob)) .expectNextMatches(p -> p.getName().equals("Bob")) .verifyComplete(); }
7.2 Benchmark คร่าว ๆ
จากการทดสอบ 100 K ops ต่อวินาทีบนเครื่อง M1 โดยใช้ redis-bench พบว่า:
- Reactive API ใช้ CPU ~40 % น้อยกว่าการใช้ Jedis แบบ Blocking
- Latency p99 ลดลงจาก 12 ms เหลือ 3.5 ms
8. Best Practices
- เปิดใช้ connection pooling แม้จะเป็น Non-Blocking
- จำกัดขนาด Flux ด้วย
.limitRate()
เพื่อป้องกัน OOM - ตั้งค่า timeout & retry ทุกคำสั่งที่สำคัญ
- ใช้ Back-pressure เมื่ออ่าน Streams
- ปิด subscription อย่าง ปลอดภัยใน
@PreDestroy
9. สรุป
การเชื่อมต่อกับ Redis แบบ Reactive ช่วยให้เราดึงประสิทธิภาพสูงสุดจาก Redis ได้ด้วย resource น้อยลง เหมาะกับ Microservices ที่ต้องการ scalability สูง และ latency ต่ำ ไม่ว่าจะเป็น Pub/Sub, Streams หรือ Key-Value ธรรมดา คุณก็สามารถผสานเข้ากับ Spring WebFlux อย่างแนบเนียนได้ในไม่กี่บรรทัดโค้ด!
พร้อมเริ่มต้นหรือยัง? ลอง clone ตัวอย่างโค้ดและทดลอง benchmark ของคุณเองได้เลย 😉
© 2025 poolsawat.com • ขอบคุณที่ติดตาม – หากบทความนี้มีประโยชน์อย่าลืมแชร์ให้เพื่อน ๆ ด้วยนะครับ