เชื่อมต่อกับ Redis แบบ Reactive

Sharing is caring!

Redis เป็น In-Memory Data Store ที่ได้รับความนิยมสูงมาก เมื่อผสานกับแนวคิด Reactive Programming เราจะได้ระบบที่ รวดเร็ว, Non-Blocking, และขยายตัวได้ บทความนี้จะพาคุณตั้งแต่พื้นฐาน จนถึงตัวอย่างใช้งาน ReactiveRedisTemplate, Pub/Sub, และ Redis Streams ด้วย Spring WebFlux โดยใช้เวลาอ่านราว 20 นาที

1. ทำความเข้าใจแนวคิด Reactive กับ Redis

แนวทาง Reactive ยึดหลัก Asynchronous + Non-Blocking I/O. ใน Spring เราจะทำงานผ่าน Project Reactor ที่ประกอบด้วย Mono (ค่าเดียว) และ Flux (หลายค่า).
เมื่อจับคู่กับ Redis (ซึ่งตอบสนองเร็วในระดับ < 1 ms) จึงเหมาะสำหรับงาน Realtime, Cache, Ranking, Counter ฯลฯ

2. เตรียมโปรเจกต์ Spring Boot WebFlux

<!-- pom.xml -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<!-- ใช้ Lombok & Embedded Redis สำหรับทดสอบ -->

สำหรับ Gradle ให้ใช้ implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'.

3. ตั้งค่า RedisConnectionFactory

@Configuration
public class RedisConfig {

  @Value("${redis.host:localhost}")
  private String host;

  @Value("${redis.port:6379}")
  private int port;

  @Bean
  public LettuceConnectionFactory lettuceConnectionFactory() {
      return new LettuceConnectionFactory(host, port);
  }

  @Bean
  public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(
          LettuceConnectionFactory connectionFactory) {

      RedisSerializationContext<String, String> context =
          RedisSerializationContext.string();
      return new ReactiveRedisTemplate<>(connectionFactory, context);
  }
}

4. ตัวอย่าง CRUD ด้วย ReactiveRedisTemplate

@RestController
@RequestMapping("/api/products")
@RequiredArgsConstructor
public class ProductController {

  private final ReactiveRedisTemplate<String, Product> redisTemplate;
  private static final String KEY = "products"; // Hash Key

  @PostMapping
  public Mono<Product> create(@RequestBody Product product) {
      return redisTemplate.opsForHash()
                          .put(KEY, product.getId(), product)
                          .thenReturn(product);
  }

  @GetMapping("/{id}")
  public Mono<Product> findById(@PathVariable String id) {
      return redisTemplate.opsForHash()
                          .get(KEY, id)
                          .cast(Product.class);
  }

  @DeleteMapping("/{id}")
  public Mono<Void> delete(@PathVariable String id) {
      return redisTemplate.opsForHash()
                          .remove(KEY, id)
                          .then();
  }
}

5. ทำ Layer Caching แบบ Reactive

@Service
@RequiredArgsConstructor
public class UserCacheService {

  private final ReactiveRedisTemplate<String, User> redis;
  private final UserRepository db; // ReactiveCrudRepository

  public Mono<User> getUser(String id) {
      String key = "user:" + id;
      return redis.opsForValue().get(key)
                 .switchIfEmpty(
                    db.findById(id)
                      .flatMap(u -> redis.opsForValue()
                                         .set(key, u, Duration.ofMinutes(10))
                                         .thenReturn(u))
                 );
  }
}

กลยุทธ์ Cache-Aside ที่ไม่บล็อกเธรด เหมาะกับงานอ่านเป็นส่วนใหญ่.

6. Pub/Sub แบบ Reactive

Redis รองรับข้อความแบบ PUBLISH / SUBSCRIBE. ใน Spring เราสามารถใช้ ReactiveRedisConnection.pubSubCommands() เพื่อรับ-ส่งข้อความโดยตรง.

@Component
@RequiredArgsConstructor
public class ChatService {

  private final ReactiveRedisConnectionFactory factory;
  private final Flux<String> sink; // กระจาย message ภายใน JVM

  @PostConstruct
  public void init() {
    ReactiveRedisConnection conn = factory.getReactiveConnection();
    conn.pubSubCommands().subscribe("chat");

    conn.receive().subscribe(msg -> sinkSinkNext(msg.getMessage()));
  }

  public Mono<Long> publish(String message) {
    return factory.getReactiveConnection()
                  .pubSubCommands()
                  .publish("chat", message);
  }
}

7. การใช้ Redis Streams (Consumer Group)

ตั้งแต่ Redis 5.0 มี Streams สำหรับ Event-Log. เราจะใช้คำสั่ง XADD, XREADGROUP และจัดการ Offset แบบ Reactive.

@Service
@RequiredArgsConstructor
public class OrderStreamService {

  private final ReactiveRedisConnectionFactory factory;
  private static final String STREAM = "orders";
  private static final String GROUP  = "order-worker";

  @PostConstruct
  public void createGroup() {
      factory.getReactiveConnection().streamCommands()
             .xGroupCreate(STREAM, GROUP, ReadOffset.latest())
             .onErrorResume(e -> Mono.empty())
             .subscribe();
  }

  public Mono<String> addOrder(OrderEvent evt) {
      Map<String, String> map = Map.of("id", evt.getId(),
                                       "status", evt.getStatus());
      return factory.getReactiveConnection()
                    .streamCommands()
                    .xAdd(STREAM, map);
  }

  public Flux<StreamMessage>String, String&gt;&gt; receive() {
      return factory.getReactiveConnection()
                    .streamCommands()
                    .xReadGroup(GROUP, "worker-1",
                       StreamOffset.from(STREAM, ReadOffset.lastConsumed()));
  }
}

8. การทดสอบด้วย Embedded Redis

@TestConfiguration
public class EmbeddedRedisConfig {

  private RedisServer server;

  @PostConstruct
  public void start() throws IOException {
      server = RedisServer.builder()
               .port(6379).setting("maxmemory 128M").build();
      server.start();
  }

  @PreDestroy
  public void stop() {
      server.stop();
  }
}

ใช้ embedded-redis ช่วยให้ Integration Test สะดวก ไม่ต้องพึ่ง Redis จริง.

9. Metrics & Monitoring

  • Micrometer + Prometheus สำหรับรวบรวม Metrics จาก Lettuce
  • คำสั่ง INFO MEMORY ตรวจสอบการใช้งาน RAM
  • Enable io.lettuce.core.event.EventBus จับ Slow Command

10. ทริค & ข้อควรระวัง

  1. กำหนด Timeout ใน LettuceClientConfiguration ป้องกัน Slow Network
  2. แบ่ง Key-Space ตาม Prefix ลดการชนกันของข้อมูล
  3. ใช้ SCAN แทน KEYS * เพื่อลดผลกระทบต่อ Production
  4. หลีกเลี่ยง Blocking Command (เช่น BLPOP) ใน WebFlux; ให้ใช้ BRPOP ผ่าน Worker แยกต่างหาก
  5. เพิ่ม Replica / Cluster กรณีต้องการ High Availability

สรุป

การเชื่อมต่อ Redis แบบ Reactive ช่วยให้ระบบของคุณรองรับ High Throughput และ Low Latency ได้อย่างมีประสิทธิภาพ พร้อมความยืดหยุ่นในการจัดการ Cache, Pub/Sub, และ Event Stream แบบสมัยใหม่. ด้วย Spring WebFlux + Lettuce คุณสามารถสร้างบริการที่ตอบสนองรวดเร็ว และใช้ทรัพยากรต่ำกว่าแบบ Thread-per-Request ได้หลายเท่า

SEO Keywords

Redis Reactive, Spring WebFlux Redis, ReactiveRedisTemplate, Redis Pub/Sub, Redis Stream, Lettuce Reactive, Redis Cache Non Blocking, Spring Boot Redis Reactive, Reactive Programming Redis, Redis Reactive Architecture

Leave a Reply

อีเมลของคุณจะไม่แสดงให้คนอื่นเห็น ช่องข้อมูลจำเป็นถูกทำเครื่องหมาย *