การใช้งาน Spring Reactive ร่วมกับ WebClient พร้อมทำ Request, Response Logging

Sharing is caring!

จากบทความที่แล้ว “สร้าง Reactive RESTful Web Service ด้วย Spring Boot 3 WebFlux” ได้อธิบายเกี่ยวกับ Reactive (Webflux) และแนะนำวิธีการสร้างโปรเจคเบื้องต้นของ Reactive RESTful Web Service แบบง่าย ๆ สำหรับบทความนี้จะมาเพิ่มความสามารถของโปรเจคในกรณีต้องการที่จะเรียกข้อมูลภายนอก เราจำเป็นต้องมี http client เพื่อใช้เป็นตัวเรียกข้อมูลที่สามารถกำหนด GET, POST, PUT, DELETE และข้อมูลร้องขอ WebClient จะทำหน้าที่นี้ บทความนี้จะพามาตั้งค่าเพิ่มเติมให้โปรเจคตั้งต้นของเรา

สร้าง Mock Data ด้วย Mockoon

ก่อนที่จะไปโด๊ดเพิ่มในโปรเจค เราจำเป็นต้องจำลองข้อมูลเพื่อให้ Reactive Service ของเราเรียกใช้งาน Mockoon จะมาช่วยทำเรื่องนี้

[
"Project Reactor",
"RxJava",
"RxJS",
"Rx.NET",
"RxScala",
"RxClojure",
"RxKotlin",
"RxSwift",
"RxGo",
"RxPHP"
]

Configuration WebClient เพิ่มในโปรเจค

– src/main/java/com/poolsawat/reactivewebflux/configs/WebClientConfiguration.java

package com.poolsawat.reactivewebflux.configs;

import com.poolsawat.reactivewebflux.loggers.ClientCallLogger;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import lombok.val;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

import java.time.Duration;

@Configuration
@EnableWebFlux
public class WebClientConfiguration implements WebFluxConfigurer {

    @Value("${clients.endpoints.reactive.host}")
    private String reactiveHost;

    @Bean
    public WebClient createWebClient(HttpClient httpClient){
        return WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .baseUrl(reactiveHost)
                .build();
    }

    @Bean
    public HttpClient createHttpClient(ClientCallLogger clientLogger){
        val connectionProvider = ConnectionProvider.builder("reactive-tcp-connection-pool")
                .maxConnections(10)
                .pendingAcquireTimeout(Duration.ofMillis(15000)) // 15 sec
                .maxIdleTime(Duration.ofMillis(30000)) // 30 sec
                .build();

        return HttpClient.create(connectionProvider)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 15000) // 30 sec
                .doOnConnected( it -> {
                    it.addHandlerLast(new ReadTimeoutHandler(30000)) // 30 sec
                            .addHandlerLast(new WriteTimeoutHandler(30000)); // 30 sec
                })
                .doOnRequest( (x,conn) -> conn.addHandlerFirst(clientLogger))
                .doOnResponse( (x, conn) -> conn.addHandlerFirst(clientLogger));
    }
}

– src/main/java/com/poolsawat/reactivewebflux/clients/proxy/ReactiveClientProxy.java

package com.poolsawat.reactivewebflux.clients.proxy;

import reactor.core.publisher.Mono;

import java.util.List;

public interface ReactiveClientProxy {
    Mono<List<String>> getReactiveItems();
}

– src/main/java/com/poolsawat/reactivewebflux/clients/ReactiveClient.java

package com.poolsawat.reactivewebflux.clients;

import com.poolsawat.reactivewebflux.clients.proxy.ReactiveClientProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Repository;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import java.util.List;

@Repository
public class ReactiveClient implements ReactiveClientProxy {

    @Value("${clients.endpoints.reactive.path}")
    private String reactivePath;

    @Autowired
    private WebClient webClient;

    @Override
    public Mono<List<String>> getReactiveItems() {
        return webClient.get()
                .uri(reactivePath)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<List<String>>() {});
    }
}

– src/main/java/com/poolsawat/reactivewebflux/loggers/ClientCallLogger.java

package com.poolsawat.reactivewebflux.loggers;

import com.poolsawat.reactivewebflux.constants.HeaderKeys;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.stereotype.Component;

import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;

import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT;

@Component
@Slf4j
public class ClientCallLogger extends LoggingHandler {

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        super.connect(ctx, remoteAddress, localAddress, promise);
    }

    private void log(String content) {
        if (content != null)
            log.info(content);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log(format(ctx, "READ", msg));
        super.channelRead(ctx, msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log(format(ctx, "WRITE", msg));
        super.write(ctx, msg, promise);
    }

    @Override
    protected String format(ChannelHandlerContext ctx, String eventName, Object arg) {
        if (arg instanceof ByteBuf) {
            return super.format(ctx, eventName, arg);
        } else {
            if (arg instanceof HttpRequest i) {
                return formatRequestHeadersAndBody((HttpRequest) arg);
            } else if (arg instanceof HttpResponse l) {
                return formatResponseMetaData((HttpResponse) arg);
            } else if (arg instanceof ByteBufHolder d) {
                return formatResponseBody((ByteBufHolder) arg);
            } else {
                return super.format(ctx, eventName, arg);
            }
        }
    }

    private String tryExtractBody(Object arg) {
        try {
           return ((ByteBufHolder)arg).content().toString(StandardCharsets.UTF_8);
        } catch (Exception ex ) {
            log.warn("error occurred while extracting body from http, ignored with empty", ex);
            return "";
        }
    }
    private String formatRequestHeadersAndBody(HttpRequest req) {
        val builder = new StringBuilder("HTTPClient Request    --> method=[");
         if (req instanceof DefaultFullHttpRequest) {
             return builder.append(req.method()).append(HeaderKeys.URI.getKey()).append(req.uri()).append(HeaderKeys.HEADERS.getKey()).append(req.headers())
                    .append("] body=[").append(tryExtractBody(req)).append("]").toString();
        } else {
             return builder.append(req.method()).append(HeaderKeys.URI.getKey()).append(req.uri()).append(HeaderKeys.HEADERS.getKey()).append(req.headers())
                     .append("]").toString();
         }
    }

    private String formatResponseMetaData(HttpResponse res) {
        return new StringBuilder("HTTPClient Response Meta Data  <-- status=[").append(res.status()).append(HeaderKeys.HEADERS.getKey())
                .append(res.headers()).append("]").toString();
    }

    private String formatResponseBody(ByteBufHolder res){
        if (res == EMPTY_LAST_CONTENT)
            return null;
        return new StringBuilder("HTTPClient Response Body  <-- body=[").append(tryExtractBody(res)).append("]").toString();
    }


}

– src/main/resources/application.properties

server.port=8443

spring.webflux.base-path=/reactive

clients.endpoints.reactive.host=http://localhost:3000
clients.endpoints.reactive.path=/reactive/items

– src/main/java/com/poolsawat/reactivewebflux/services/impl/ReactiveServiceImpl.java

package com.poolsawat.reactivewebflux.services.impl;

import com.poolsawat.reactivewebflux.clients.ReactiveClient;
import com.poolsawat.reactivewebflux.services.ReactiveService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import java.util.List;

@Service
public class ReactiveServiceImpl implements ReactiveService {

    @Autowired
    private ReactiveClient reactiveClient;

    @Override
    public Mono<List<String>> getReactiveListItems() {
        return reactiveClient.getReactiveItems();
    }
}

ทำการ start application server ด้วยคำสั่ง `mvn spring-boot:run`

ทดสอบเรียก http://localhost:8443/reactive/welcome

curl --silent --location 'http://localhost:8443/reactive/welcome'

สังเกตุ logging ของโปรเจค

2023-08-21T23:11:15.128+07:00  INFO 94946 --- [ctor-http-nio-3] c.p.r.loggers.ClientCallLogger           : HTTPClient Request    --> method=[GET] uri=[/reactive/items] headers=[DefaultHttpHeaders[user-agent: ReactorNetty/1.1.9, host: localhost:3000, Content-Type: application/json, Accept: application/json]]
2023-08-21T23:11:15.138+07:00  INFO 94946 --- [ctor-http-nio-3] c.p.r.loggers.ClientCallLogger           : HTTPClient Response Meta Data  <-- status=[200 OK] headers=[DefaultHttpHeaders[Content-Type: application/json; charset=utf-8, Content-Length: 113, Date: Mon, 21 Aug 2023 16:11:15 GMT, Connection: keep-alive, Keep-Alive: timeout=5]]
2023-08-21T23:11:15.172+07:00  INFO 94946 --- [ctor-http-nio-3] c.p.r.loggers.ClientCallLogger           : HTTPClient Response Body  <-- body=[[
"Project Reactor",
"RxJava",
"RxJS",
"Rx.NET",
"RxScala",
"RxClojure",
"RxKotlin",
"RxSwift",
"RxGo",
"RxPHP"
]]

สรุปท้ายบทความ

หวังว่าบทความนี้จะเป็นประโยชน์กับผู้อ่าน รอติดตามบทความหน้าจะมีอะไรมาแบ่งปัน คอยติดตามด้วยนะครับ

Github Source code -> https://github.com/pool13433/spring-reactive-webflux

ใส่ความเห็น

อีเมลของคุณจะไม่แสดงให้คนอื่นเห็น ช่องข้อมูลจำเป็นถูกทำเครื่องหมาย *