เชื่อมต่อกับ Redis แบบ Reactive

Sharing is caring!

สารบัญ

  1. ทำไมต้อง Reactive Redis?
  2. สแตกเทคโนโลยี
  3. การตั้งค่าโปรเจค
  4. การเชื่อมต่อแบบ Reactive
  5. Reactive Pub/Sub
  6. Reactive Streams API
  7. การทดสอบ & Benchmark
  8. Best Practices
  9. สรุป

1. ทำไมต้อง Reactive Redis?

หากแอปพลิเคชันของคุณมีผู้ใช้พร้อมกันจำนวนมาก และ I/O-bound เช่น API ที่ต้องเรียกฐานข้อมูลหรือบริการอื่น ๆ บ่อยครั้ง การใช้โค้ดแบบ Blocking จะทำให้ thread ถูกยึดครองและเสีย resource โดยไม่จำเป็น Reactive Programming เข้ามาแก้ปัญหานี้ด้วย Back-pressure, Non-Blocking I/O, และ Event Loop ทำให้เราสามารถ ประมวลผลคำสั่ง Redis จำนวนมากได้พร้อม กัน โดยใช้ thread เพียงไม่กี่ตัวเท่านั้น

2. สแตกเทคโนโลยี

  • Spring Boot 3.5.x (หรือใช้ Spring Framework 6 ขึ้นไป)
  • Spring Data Redis Reactive (spring-data-redis-2.7.x+)
  • Lettuce (Netty-based reactive Redis client)
  • Project Reactor 3.6+ (Mono, Flux)
  • Redis 6/7 พร้อมเปิด replication, ACL, และ Redis Streams

3. การตั้งค่าโปรเจค

3.1 Gradle หรือ Maven Dependency

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

3.2 เพิ่ม Reactive Redis Configuration

@Configuration
@EnableReactiveRedisRepositories
public class RedisConfig {

    @Bean
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
        return new LettuceConnectionFactory("localhost", 6379);
    }

    @Bean
    public ReactiveRedisTemplate<String, Person> reactiveRedisTemplate(
            ReactiveRedisConnectionFactory factory,
            Jackson2JsonRedisSerializer<Person> serializer) {

        RedisSerializationContext.SerializationPair<String> keySerializer =
            RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer());

        return new ReactiveRedisTemplate<>(
                factory,
                RedisSerializationContext
                    .newSerializationContext(keySerializer)
                    .value(serializer)
                    .build());
    }
}
  

4. การเชื่อมต่อแบบ Reactive

แผนผังการไหลของ Mono/Flux ไปยัง Redis ผ่าน Lettuce Non-Blocking I/O

4.1 CRUD พื้นฐาน

@Service
@RequiredArgsConstructor
public class PersonService {

    private final ReactiveRedisTemplate<String, Person> redisTemplate;

    public Mono<Person> savePerson(Person p) {
        return redisTemplate
                .opsForValue()
                .set("person:" + p.getId(), p)
                .thenReturn(p);
    }

    public Mono<Person> findPerson(String id) {
        return redisTemplate
                .opsForValue()
                .get("person:" + id);
    }

    public Mono<Boolean> deletePerson(String id) {
        return redisTemplate
                .opsForValue()
                .delete("person:" + id);
    }

    public Flux<Person> findAll() {
        return redisTemplate
                .scan()
                .filter(key -> key.startsWith("person:"))
                .flatMap(redisTemplate.opsForValue()::get);
    }
}
  

4.2 Handling Timeout & Retry

savePerson(p)
    .timeout(Duration.ofSeconds(2))
    .retryWhen(Retry.backoff(3, Duration.ofMillis(100)))
    .doOnError(e -> log.error("Redis error", e))
    .subscribe();
  

5. Reactive Pub/Sub

ตัวอย่าง Publish/Subscribe ด้วย Flux และ Channel ชัดเจน
@Bean
public Flux<Message<String, String>> eventStream(ReactiveRedisConnectionFactory factory) {
    return factory.getReactiveConnection()
                  .pubSubCommands()
                  .subscribe("events")
                  .flatMap(channel -> channel.receive()
                                              .map(msg -> MessageBuilder
                                                  .withPayload(msg.getMessage())
                                                  .setHeader("redisChannel", msg.getChannel())
                                                  .build()));
}

public Mono<Long> publishEvent(String json) {
    return redisTemplate.convertAndSend("events", json);
}
  

6. Reactive Redis Streams API

โฟลว์ Consumer Group กับ Redis Streams แบบ Reactive
@Bean
public Flux<MapRecord<String, Object, Object>> streamConsumer() {
    return redisTemplate
            .opsForStream()
            .createGroup("mystream", ReadOffset.latest(), "service-A")
            .thenMany(redisTemplate
                .opsForStream()
                .read(Consumer.from("service-A", UUID.randomUUID().toString()),
                      StreamOffset.create("mystream", ReadOffset.lastConsumed()))
                .retryWhen(Retry.indefinitely()))
            .onBackpressureBuffer();
}
  

7. การทดสอบ & Benchmark

7.1 Unit Test Reactive

@Test
void save_shouldReturnMono() {
    Person bob = new Person("1", "Bob");
    StepVerifier.create(service.savePerson(bob))
                .expectNextMatches(p -> p.getName().equals("Bob"))
                .verifyComplete();
}
  

7.2 Benchmark คร่าว ๆ

จากการทดสอบ 100 K ops ต่อวินาทีบนเครื่อง M1 โดยใช้ redis-bench พบว่า:

  • Reactive API ใช้ CPU ~40 % น้อยกว่าการใช้ Jedis แบบ Blocking
  • Latency p99 ลดลงจาก 12 ms เหลือ 3.5 ms

8. Best Practices

  1. เปิดใช้ connection pooling แม้จะเป็น Non-Blocking
  2. จำกัดขนาด Flux ด้วย .limitRate() เพื่อป้องกัน OOM
  3. ตั้งค่า timeout & retry ทุกคำสั่งที่สำคัญ
  4. ใช้ Back-pressure เมื่ออ่าน Streams
  5. ปิด subscription อย่าง ปลอดภัยใน @PreDestroy

9. สรุป

การเชื่อมต่อกับ Redis แบบ Reactive ช่วยให้เราดึงประสิทธิภาพสูงสุดจาก Redis ได้ด้วย resource น้อยลง เหมาะกับ Microservices ที่ต้องการ scalability สูง และ latency ต่ำ ไม่ว่าจะเป็น Pub/Sub, Streams หรือ Key-Value ธรรมดา คุณก็สามารถผสานเข้ากับ Spring WebFlux อย่างแนบเนียนได้ในไม่กี่บรรทัดโค้ด!

พร้อมเริ่มต้นหรือยัง? ลอง clone ตัวอย่างโค้ดและทดลอง benchmark ของคุณเองได้เลย 😉


© 2025 poolsawat.com • ขอบคุณที่ติดตาม – หากบทความนี้มีประโยชน์อย่าลืมแชร์ให้เพื่อน ๆ ด้วยนะครับ

Leave a Reply

อีเมลของคุณจะไม่แสดงให้คนอื่นเห็น ช่องข้อมูลจำเป็นถูกทำเครื่องหมาย *