WebFlux

스프링 코어 for Reactive

스프링 5.0 에서부터 리액티브 코드를 위한 여러가지 클래스들이 수정, 추가되었다.

ReactiveAdapter, ReactiveAdapterRegistry

RxJava, Reactor 에서 사용하는 발행자 클래스를 Publihser 로 변환해주는 Adapterspringframework.core 에 추가되어 사용 가능해졌다.

아래처럼 ReactiveAdapter 를 상속받아 RxJava Maybe 와 Publisher 간의 변환 작업을 해주는 Adapter 를 작성해서 사용하거나

@Component
public class MaybeReactiveAdapter extends ReactiveAdapter {

    public MaybeReactiveAdapter() {
        /**
         * Descriptor for a reactive type that can produce 0..1 values.
         * @param type the reactive type
         * @param emptySupplier a supplier of an empty-value instance of the reactive type
         */
        super(ReactiveTypeDescriptor.singleOptionalValue(Maybe.class, Maybe::empty),
            maybe -> ((Maybe<?>) maybe).toFlowable(), // Maybe->Publisher
            publisher -> Flowable.fromPublisher(publisher).singleElement()); // Publisher->Maybe
    }
}

ReactiveAdapterRegistry 를 사용해 싱글턴 Instance 변수에 Adapter 용 코드를 작성해 필요할때 마다 꺼내어 쓸 수 있다.

@PostConstruct
public void init() {
    ReactiveAdapterRegistry
        .getSharedInstance()
        .registerReactiveType(ReactiveTypeDescriptor.singleOptionalValue(Maybe.class, Maybe::empty),
            maybe -> ((Maybe<?>) maybe).toFlowable(), // Maybe->Publisher
            publisher -> Flowable.fromPublisher(publisher).singleElement()); // Publisher->Maybe
}

...

ReactiveAdapter adapter = ReactiveAdapterRegistry
    .getSharedInstance()
    .getAdapter(Maybe.class);
...

리액티브 I/O, 코덱

springframework.core.io 에 저장된 DataBuffer, DataBufferUtils 를 사용하면 I/O 작업이 필요한 파일, 네트워크 자원으로 부터 리액티브 스트림 형태로 작업을 처리할 수 있다. jav.nio.ByteBuffer 클래스의 형변환 기능을 추가하여 보다 쉽게 사용 가능하다.

Flux<DataBuffer> reactiveHamlet = DataBufferUtils.read(
    new DefaultResourceLoader().getResource("hamlet.txt"),
    new DefaultDataBufferFactory(),
    1024);

springframework.core.codec 에 정의된 인터페이스 Encoder, Decoder 를 사용하면 Non Blocking 방식으로 직렬화 데이터를 자바객체, 자바객체를 직렬화 데이터로 변환 가능하다.

WebFlux

Sprinb Boot 2 에 리액티브 웹서버를 위한 WebFlux 모델을 사용할 수 있도록 spring-boot-starter-webflux 라는 새로운 패키지를 추가할 수 있게 되었다.

해당 모듈은 Reactive Stream Adapter 위에 구축된며 Servlet 3.1+ 지원서버(Tomcat, Jetty 등), Netty, Undertow 서버엔진에서 모두 지원한다.

위의 엔진들은 java 8 에 추가된 java NIO 로 구현되어 Http 요청을 논블럭킹으로 처리한다.

springboot_react2

일반적은 WebMVC 모듈도 Spring 5.0 에 이르러 spring-boot-starter-web Servlet 3.1 을지원하면서 일부분은 리액티브 스트림을 지원하게 되었다.

ResponseBodyEmitterReturnValueHandler 클래스가 업그레이드 되면서 ReactiveTypeHandler 필드를 사용해 WebMVC 의 인프라 구조를 크게 해치지 않고 컨트롤러 메서드가 반환하는 Flux, Mono, Flowable 등의 Publisher(리액티브 스트림)을 처리한다.

springboot_react1

물론 서블릿 API 를 사용하기에 블록킹/스레드풀 방식을 사용한다.

WebFlux 개요

기존 WebMVC 모델 구조는 아래와 같다.

springboot_react3

ViewResolverRest 방식에선 생략된다.

각종 서블릿 컨테이너(Tomcat, JBoss 등) 요청을 서블릿 클래스를 상속한 DispatcherServlet 이 스프링 부트 컨트롤러 매핑에 따라 요청을 분배한다.

그림처럼 기존 WebMVC 방식은 동기/블로킹 방식으로 동작한다.

스프링 부트에서 리액티브 방식을 사용하려면 Servlet 3.1+(Tomcat, Jetty 등), Netty, Undertow 와 같은 서버를 사용해 리액티브하게 구조가 변경되어야 하는데

다행이도 스프링 프로젝트팀이 동일한 어노테이션 기반 프로그래밍 모델을 사용하면서 비동기/논블록킹 으로 동작하도록 이미 개발해두었다.

WebFlux with Flux

대략적으로 WebFlux 에서 Http Request, Response 어떻게 리액티브로 구현했는지 아래 인터페이스로 확인할 수 있다.

interface ServerHttpRequest { // Http Request 를 객체로 표현
    Flux<DataBuffer> getBody();
    ...
}
interface ServerHttpResponse { // Http Response 를 객체로 표현
    Mono<Void> writeWith(Publihser<? extends DataBuffer> body);
    ...
}
interface ServerWebExchange { // HTTP Request, Response 컨테이너 역할
    ServerHttpRequest getRequest();
    ServerHttpResponse getResponse();
    Mono<WebSession> getSession();
    ...
}

서블릿의 ServletRequest, ServletResponse 와 연관지어서 새로운 Http 요청, 반환을 객체로 표현할 수 있는 인터페이스들이 정의되어있고

DataBuffer 를 사용해 리액티브 타입과의 결합도를 낮춘다.

interface WebHandler {
    Mono<Void> handle(ServerWebExchange exchanage);
}
interface WebFilterChain {
    Mono<Void> filter(ServerWebExchange exchanage);
}
interface WebFilter {
    Mono<Void> filter(ServerWebExchange exchanage, WebFilterChain chain);
}

WebHandlerDispatcherServlet 역할, 실행결과를 받지 못할 수 있음으로 handle 메서드는 Mono<Void> 가 반환된다.

클라이언트를 위한 Http 반환은 exchange 안의 ServerHttpResponse

WebFilter 는 서블릿의 요청, 반환 필터처럼 리액티브에서도 비지니스 로직에 집중할 수 있도록 필터기능이 제공된다.

public interface HttpHandler {
    Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
}

마지막으로 WebHandler 로부터 전달받은 exchange 객체를 url 매핑할 수 있도록 HttpHandlerhandle 로 전달된다.

WebFlux - Functional Reactive Web Server

Vert.xRatpack 과 같은 프레임워크의 인기비결은 스프링의 복잡한 MVC 설정으로 라우팅 설정과 로직이 없이 간결한 설정으로 라우팅 로직을 작성할 수 있는 API 들이 잘 정의되어 있기 때문이다.

스프링 프레임워크도 위 프레임워크처럼 간결하게 라우팅 로직을 처리할 수 있는 API 를 개발하였다.

spring-boot-starter-webflux 모듈의 org.springframework.web.reactive.function.server 패키지에 정의된 RouterFunction 클래스 사용하여 라우팅 로직 정의가 가능하다.

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;
import static org.springframework.http.MediaType.APPLICATION_JSON;

@SpringBootApplication
public class DemoApplication {

    final ServerRedirectHandler serverRedirectHandler = new ServerRedirectHandler();

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> routes(OrderHandler orderHandler) {
        return nest(path("/orders"), 
            nest(accept(APPLICATION_JSON),
                route(GET("/{id}"), orderHandler::get)
                    .andRoute(method(HttpMethod.GET), orderHandler::list))
                .andNest(contentType(APPLICATION_JSON),
                    route(POST("/"), orderHandler::create))
                .andNest((serverRequest) -> serverRequest.cookies().containsKey("Redirect-Traffic"),
                    route(all(), serverRedirectHandler))
        );
    }
}

RouterFunction 을 스프링 Bean 으로 등록하고 /orders 에 대한 라우팅 로직 설정, 기본적인 Path, Http method, cookie 포함여부 등 여러 로직처리 가능하다.

함수형으로 웹서버를 구축할경우 Netty 서버를 같이 사용하면 별도의 스프링 설정없이 서버 실행이 가능하다.

이런점때문에 단순 테스트의 경우 @SpringBootApplication 을 사용하지 않고 단순 Netty 서버를 사용해 빠르게 서버를 실행하고 구현한 메서드들을 테스트 할 수 있다.

만약 패스워드 암호화 및 복호화 테스트를 한다면 서버기능을 하는 객체 외에 추가적으로 필요한 객체는 해시 기능이 있는 spring-boot-starter-securityPasswordEncoder 뿐이다.

PasswordEncoder 만 사용한다면 spring-security-core 만 의존성 처리해도 된다.

별도의 스프링 관련 어노테이션, Bean 등록과정 없이 RouterFunction, Netty, PasswordEncoder 3개 객체만 잘 정의해서 아래처럼 서버 실행이 가능하다.

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;

public static void main(String... args) {
    long start = System.currentTimeMillis();
    BCryptPasswordEncoder passwordEncoder = new BCryptPasswordEncoder(18); // encoder 생성

    HttpHandler httpHandler = RouterFunctions.toHttpHandler( // RouterFunction -> HttpHandler 변경
        route(POST("/password"), request -> request // RouterFunction 으로 라우터 로직 생성 
            .bodyToMono(PasswordDTO.class)
            .map(p -> passwordEncoder.matches(p.getRaw(), p.getSecured()))
            .flatMap(isMatched -> isMatched
                ? ServerResponse.ok().build()
                : ServerResponse.status(HttpStatus.EXPECTATION_FAILED).build())
        )
    );
    ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler); 
    // HandlerAdapter 에 HttpHandler 삽입, BiFunction 를 구현한 클래스임  
    DisposableServer server = HttpServer.create() // Netty Server
        .host("localhost").port(8080)
        .handle(reactorHttpHandler) // BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> 요구함
        .bindNow(); // 서버 엔진 시작  
        
    LOGGER.debug("Started in " + (System.currentTimeMillis() - start) + " ms"); // Started in 703 ms
    server.onDispose().block(); // main 스레드 차단  
}

서버가 0.7 초만에 실행된다. 스프링 컨테이너, 의존성 주입, 어노테이션 처리를 하지 않음으로 속도가 굉장히 빠르며 간단한 테스트진행은 위와같은 방식으로 진행하면 편하다.

WebFlux - Annotated Controller

RouterFunctions를 사용해도 되지만 WebMVC 모델에서 사용하는 @RestController, @RequestMapping 등의 어노테이션을 WebFlux 에서도 사용할 수 있다.

@RestController
@RequestMapping("/member")
@RequiredArgsConstructor
public class MemberController {

    private final MemberRepository memberRepository;

    @GetMapping
    public Flux<Member> getAll() {
        return memberRepository.findAll();
    }

    @GetMapping("/id/{id}")
    public Mono<Member> getById(@PathVariable Long id) {
        return memberRepository.findById(id);
    }
}

Flux 는 배열, Mono 는 객체로 반환된다.

WebFlux - Filter

더이상 javax.servlet.Filter 을 사용하지 못한다.

필터기능을 하는 방법은 여러가지다.

RouterFunctions

@SpringBootApplication
public class ReactApplication {
    public static void main(String[] args) {
        SpringApplication.run(ReactR2dbcApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> filterFunction(MemberComponent memberComponent) {
        return RouterFunctions
                .route(GET("/member/{memberId}")
                .and(accept(MediaType.APPLICATION_JSON)), memberComponent::getById)
                .filter(new ExampleHandlerFilterFunction());
    }
}

@Component
@RequiredArgsConstructor
class MemberComponent {
    private final MemberRepository memberRepository;
    public Mono<ServerResponse> getById(ServerRequest request) {
        return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
                .body(BodyInserters.fromValue(
                       memberRepository.findById(Long.valueOf(request.pathVariable("memberId")))));
    }
}

WebFilter

@Component
public class ExampleWebFilter implements WebFilter {
  
    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, 
      WebFilterChain webFilterChain) {
        serverWebExchange.getResponse().getHeaders().add("web-filter", "web-filter-test");
        return webFilterChain.filter(serverWebExchange);
    }
}

별도의 매핑 조건이 없기때문에 조건문 분기가 필요함

HandlerFilterFunction

public class ExampleHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {

    @Override
    public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> handlerFunction) {
        if (request.pathVariable("name").equalsIgnoreCase("test")) {
            return ServerResponse.status(HttpStatus.FORBIDDEN).build();
        }
        return handlerFunction.handle(request);
    }
}

WebFlux - Exception Handler

메서드 레벨에서 오류처리는 ServerResponse 에 status, body 등을 설정하면 쉽게 처리할 수 있다. 또한 클래스 내부에서 기존 WebMVC 에서 사용하던 @ExceptionHandler 를 사용해 처리할 수 있다.

@Controller public class SimpleController {
    @ExceptionHandler public ResponseEntity<String> handle(IOException ex) {
        // ... 
    }
}

글로벌 레벨에서 오류처리는 WebExceptionHandler 인터페이스를 구현해 필터 방식으로 처리할 수 있다.

DefaultErrorAttributes

DefaultErrorAttributesWebFlux 에서 사용하는 에러 핸들러로 기본 필터로 등록되어 있는 에러 핸들러가 DefaultErrorAttributes 안의 getErrorAttributes 를 호출해 아래와 같은 반환값을 반환한다.

{
"timestamp": "...",
"path": "...",
"status": 405,
"error": "Method Not Allowed",
"message": "",
"requestId": "ca35d584-1"
}

글로벌 레벨에서 오류처리를 통해 커스텀한 반환값을 설정하고 싶으면 DefaultErrorAttributesgetErrorAttributes 메서드를 오버라이딩 해야한다.

@Slf4j
@Component
@RequiredArgsConstructor
public class GlobalErrorAttributes extends DefaultErrorAttributes {

    // private final MessageSource messageSource;

    @Override
    public Map<String, Object> getErrorAttributes(ServerRequest request, ErrorAttributeOptions options) {
        Throwable throwable = getError(request);
        String acceptLanguage = request.headers().firstHeader("accept-language");
        Locale locale = acceptLanguage != null && acceptLanguage.startsWith("ko") ? Locale.KOREA : Locale.getDefault();
        log.error("unknown server error:{}, {}" + throwable.getClass().getCanonicalName(), throwable.getMessage());
        // throwable 의 종류에 맞게 반환값을 조정
        Map<String, Object> map = new HashMap<>();
        map.put("code", UNKNOWN_ERROR_CODE);
        map.put("error", UNKNOWN_ERROR_TYPE);
        // map.put("description", messageSource.getMessage("fms.error.unknown_server_error", null, locale));
        return map;
    }
}

에러 반환시 아래처럼 출력되도록 설정

{
"code": "500-00",
"error": "UnknownServerError"
}

이제 핸들러에서 DefaultErrorAttributesgetErrorAttributes 가 아닌 직접 정의한 GlobalErrorAttributesgetErrorAttributes 가 호출되도록 설정하면 된다.

AbstractErrorWebExceptionHandler

AbstractErrorWebExceptionHandler 는 에러발생시 필터로 등록되어 있는 핸들러 해당 핸들러보다 더 높은 우선순위를 가진 핸들러로 에러처리하도록 설정

@Order(-2) // 기본 에러처리는 -1, 보다 빠르게 설정한다.
@Component
public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {
    // constructors
    public GlobalErrorWebExceptionHandler(
            GlobalErrorAttributes errorAttributes, WebProperties.Resources resources,
            ApplicationContext applicationContext, ServerCodecConfigurer configurer) {
        super(errorAttributes, resources, applicationContext);
        this.setMessageWriters(configurer.getWriters());
    }

    @Override
    protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
        return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
    }

    private Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
        Map<String, Object> errorPropertiesMap = getErrorAttributes(request, ErrorAttributeOptions.defaults());
        Integer status = Integer.valueOf(((String) errorPropertiesMap.get("code")).substring(0, 3));
        return ServerResponse.status(status)
            .contentType(MediaType.APPLICATION_JSON)
            .body(BodyInserters.fromValue(errorPropertiesMap));
    }
}

WebFlux - WebSocket

이미 spring-boot-starter-websocket 모듈에서 제공하는 스프링 웹소켓을 통해 논블록킹으로 메세지를 처리할 수 있을 줄 알았지만 내부에서 블로킹 방식으로 동작하며 리액티브 서버 성능에 영향을 끼친다

WebFlux 에선 비동기/논블록킹 방식의 웹소켓 처리를 위해 org.springframework.web.reactive.socket 패키지를 제공한다.

패키지에 reactive 가 붙을뿐 클래스명이나 사용방법이 유사하다.

웹소켓 서버, 웹소켓 클라이언트 모두 제공한다.

웹소켓 서버는 WebSocketHandler 를 사용해 소켓 핸들러 역할을 하는 객체인 Handler 를 등록한다.

public class EchoWebSocketHandler implements WebSocketHandler {
    
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session
            .receive() // return Flux<WebSocketMessage> 
            .map(wsMessage -> wsMessage.getPayloadAsText()) // websocket 메세지의 payload (string) 흭득
            .map(tm -> "Echo: " + tm) // 문자열 변환
            .map(tm -> session.textMessage(tm)) // WebsocketSession 을 사용, client 보낼 메세지(payload) 작성
            .as(wsMessage -> session.send(wsMessage)); // client 에게 메세지(payload) 전송
    }
}

웹소켓을 사용하려면 먼저 웹소켓 설정 객체를 Bean 으로 등록해야 한다. 위에서 정의한 핸들러를 url 에 매핑하고, Request 요청을 Upgrade 하는 어뎁터를 Bean 으로 등록한다.

@Configuration
public class WebSocketConfiguration {

    @Bean
    public HandlerMapping handlerMapping() {
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(Collections.singletonMap("/ws/echo", new EchoWebSocketHandler())); // 경로기반 매핑 설정
        mapping.setOrder(-1); // 우선순위, 생략시 사용하지 않는 것으로 설정됨 
        return mapping;
    }
    @Bean
    public HandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter(); 
        // WebSocket Handshake (upgrade request) 를 처리하는 HandlerAdapter 생성 
    }
}

WebSocketMessagepayloadDataBuffer 를 구현하여 문자열, 바이트코드로 쉽게 형변환 가능

public class WebSocketMessage {
	private final Type type;
	private final DataBuffer payload;
    ...
}

WebSocketHandler 를 구현하면 해당 url 에 해당하는 WebSocketSession 객체를 사용해 메세지를 받고 보낸다. 웹소켓 테스트 툴을 사용해 ws://127.0.0.1:8080/ws/echo 로 접속, 메세지 전송시 Echo: ... 메세지 수신 확인

WebSocket Client

웹소켓 클라이언트의 경우 WebSocketClient 인터페이스를 구현한 ReactorNettyWebSocketClient 를 사용한다.

import org.springframework.web.reactive.socket.client.WebSocketClient;
...

@Bean
public CommandLineRunner commandLineRunner() {
    return (args) -> {
        ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();
        client.execute(URI.create("http://localhost:8080/ws/echo"),
            session -> Flux
                .interval(Duration.ofMillis(100))
                .map(String::valueOf)
                .map(session::textMessage)
                .as(session::send))
            .subscribe();
    };
}

정의해둔 echo 웹소켓 서버에 0.1 초마다 interval 데이터를 문자열로 전송

client 역시 WebsocketHandler 구현체를 excute 함수의 매개변수로 받으며 핸들러 매핑과 핸들러 어뎁터가 Bean 으로 등록되지 않을 뿐 웹소켓 서버 코드와 유사하다.

참고 코드: https://github.com/Kouzie/spring-reactive/tree/master/spring-reactive-websocket-client 안타깝게도 WebFlux 에서 제공하는 웹소켓 내용은 위의 기능이 전부이며 STOMP 를 사용한 메세지 매핑 등의 기능은 제공하지 않는다.

다중 통신 Sinks.Many

코드 참고: https://github.com/Kouzie/spring-reactive/tree/master/spring-reactive-websocket

리액티브의 웹소켓은 WebMVC 에서 사용한 웹소켓과 다른점이 있는데 모든 요청에 대한 응답을 수행할 때 위와같이 람다식이나 함수를 미리 등록해두어야 하고 두개 이상의 클라이언트들간 통신을 위해서 각 클라이언트의 session 을 찾아 데이터를 발행하는데 Sinks.Many 클래스를 사용한다.

웹소켓 설정 객체를 Bean 으로 등록하고 핸들러 매핑하는 것은 동일하다.

@Configuration
public class WebSocketConfig {

    @Bean
    public HandlerMapping handlerMapping(ChatSocketHandler chatSocketHandler) { // url, handler 매핑
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(Collections.singletonMap("/ws/chat", chatSocketHandler));
        mapping.setOrder(-1);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

@Slf4j
@Component
@RequiredArgsConstructor
public class ChatSocketHandler implements WebSocketHandler { // handler 정의 

    private final ObjectMapper mapper;

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        WebSocketMessageSubscriber subscriber = new WebSocketMessageSubscriber(session);
        return session.receive()
                .map(this::toDto)
                .doOnNext(subscriber::onNext) // 수신 콜백 함수 등록
                .doOnError(subscriber::onError) // 에러 콜백 함수 등록
                .doOnCancel(subscriber::onCancel) // 연결끊김 콜백 함수 등록  
                .zipWith(session.send(subscriber.getMany().asFlux().map(webSocketToClientDto -> // 메세지 발신 콜백 함수 등록
                        session.textMessage(webSocketToClientDto.getFrom() + ":" + webSocketToClientDto.getMessage()))))
                .then();
    }

    private WebSocketFromClientDto toDto(WebSocketMessage message) {
        try {
            WebSocketFromClientDto WsDto = mapper.readValue(message.getPayloadAsText(), WebSocketFromClientDto.class);
            return WsDto;
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }
}

session.send 에 등록되는 Flux 발행자는 Sinks.Many 로부터 생성되는데 이는 아래에서 사용하는 코드와 같이 메세지를 집어넣을 수 있는 발행자다. Websocket 외에도 SSE (Server Send Events) 에서도 사용된다.

@Slf4j
class WebSocketMessageSubscriber {
    // 본인을 포함한 다른 클라이언트의 웹소켓 메세지 발행자 맵
    public static Map<String, Sinks.Many<WebSocketToClientDto>> userMap = new HashMap<>(); //sessionId, sink
    private final String id;
    @Getter
    private final Sinks.Many<WebSocketToClientDto> many; // 본인의 웹소켓 메세지 발행자

    public WebSocketMessageSubscriber(WebSocketSession session) {
        many = Sinks.many().unicast().onBackpressureBuffer();
        id = session.getId();
        many.tryEmitNext(WebSocketToClientDto.builder().from("system").message("welcome, " + id).build());
        userMap.put(id, many);
    }

    public void onNext(WebSocketFromClientDto msg) {
        log.info("onNext invoked, to:{}, msg:{}", msg.getTo(), msg.getMessage());
        Sinks.Many<WebSocketToClientDto> to = userMap.get(msg.getTo());
        if (to == null)
            many.tryEmitNext(WebSocketToClientDto.builder().from("system").message("no user:" + msg.getTo()).build());
        else
            to.tryEmitNext(WebSocketToClientDto.builder().from(id).message(msg.getMessage()).build());
    }

    public void onError(Throwable error) {
        //TODO log error
        log.error("onError invoked, error:{}, {}", error.getClass().getSimpleName(), error.getMessage());
        many.tryEmitNext(WebSocketToClientDto.builder()
                .from("system")
                .message(id + " on error, error:" + error.getMessage())
                .build());
    }

    public void onCancel() {
        log.info("onCancel invoked, id:{}", id);
        userMap.remove(id);
        for (Map.Entry<String, Sinks.Many<WebSocketToClientDto>> entry : userMap.entrySet()) {
            if (!entry.getKey().equals(id))
                entry.getValue().tryEmitNext(WebSocketToClientDto.builder()
                        .from("system")
                        .message(id + " is exit")
                        .build());
        }
    }
}

@Getter
@Setter
@Builder
class WebSocketToClientDto {
    private String from;
    private String message;
}

@Getter
@Setter
class WebSocketFromClientDto {
    private String to;
    private String message;
}
springboot_react4

WebClient

논블록킹 Http Client로 기존 스프링 부트에서 대표적인 Http ClientRestTemplate(블록킹) 이 있다. 내부에 Flux, Mono 리액터 객체를 지원하는 매핑이 내장되어 있어 리액티브 서버에 잘 어울린다.

http://localhost:8080/api/user/{id} url 을 지원하는 간단한 웹서버 생성

@Slf4j
public class TestWebServer {
    public static void main(String[] args) {
        HttpHandler httpHandler = RouterFunctions.toHttpHandler( // RouterFunction -> HttpHandler 변경
            nest(path("/api"), route(GET("/users/{id}"),
                request -> {
                    String id = request.pathVariable("id");
                    return ServerResponse.ok().syncBody("hello " + id + " user!"); // 반환데이터 동기적으로 생성
                }) // end route
            ) // end nest
        );
        ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler);
        DisposableServer server = HttpServer.create()
            .host("localhost").port(8080)
            .handle(reactorHttpHandler)
            .bindNow();
        server.onDispose().block();
    }
}

WebClient 를 사용해 위 urlHttp GET Request 요청

public class TestWebClient {
    public static void main(String[] args) throws InterruptedException {
        WebClient.create("http://localhost:8080/api") // WebClient 객체 생성 + baseUrl 설정
            .get().uri("/users/{id}", 10) // method, uri 설정
            .retrieve() // 응답 내용 설정. ResponseSpec 반환
            .bodyToMono(String.class) // 응답 body 를 Mono 로 변환
            .subscribe(s -> System.out.println(s)); // Mono 에 대한 구독 설정
            // hello 10 user!
        Thread.sleep(1000); // main thread 종료 방지 
    }
}

위의 WebClientGET 방식이라 uri 만 설정했지만 API 에 따라 cookie, header, body 모두 설정 가능하다.

HTTP 응답을 처리할 수 있는 메서드가 retrieve()exchage() 가 있는데

retrieve()ResponseSpec 을 반환하고 exchage()Mono<ClientResponse> 를 반환한다.

만약 exchange() 를 사용할 경우 아래와 같이 코드작성

public static void main(String[] args) throws InterruptedException {
    WebClient.create("http://localhost:8080/api") // WebClient 객체 생성
        .get().uri("/users/{id}", 10) // method, uri 설정
        .exchange() // Mono<ClientResponse> 반환
        .flatMap(response -> response.bodyToMono(String.class)) // 응답 body 를 Mono 로 변환
        .subscribe(s -> System.out.println(s)); // Mono 에 대한 구독 설정
    Thread.sleep(1000);
}

exchage 를 사용하면 ClientResponse 에서 제공하는 Http Response 의 각종 정보를 조작할 수 있는 여러 메서드로 복잡한 반환 로직 구성이 가능하다. retrieve 의 경우 Http status 만 겨우 조작하여 DSL 형식으로 처리할 수 있다.

WebClient 는 인터페이스이고 DefaultWebClientWebClient 의 유일한 구현체이다. 실제 DefaultWebClient 내부에선

WebClient Serialize config

WebClient 에서 직렬화, 비직렬화를 수행할때 기존생성한 ObjectMapper 를 통해 처리할 수 잇다.

@Bean
public ObjectMapper objectMapper() {
    JavaTimeModule module = new JavaTimeModule();
    LocalDateTimeSerializer localDateTimeSerializer = new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));
    LocalDateTimeDeserializer localDateTimeDeserializer = new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));
    module.addSerializer(LocalDateTime.class, localDateTimeSerializer);
    module.addDeserializer(LocalDateTime.class, localDateTimeDeserializer);

    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.registerModule(module);
    objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    // UnrecognizedPropertyException 처리
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    // json -> clas 에서 unknown 속성이 있어도 처리
    objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); // null 이 아닌것만 변환
    objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); // snake case 로 변환
    return objectMapper;
}

@Bean
public WebClient webClient(ObjectMapper objectMapper) {
    ExchangeStrategies jacksonStrategy = ExchangeStrategies.builder()
            .codecs(config -> {
                config.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON));
                config.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON));
                config.defaultCodecs().maxInMemorySize(1024 * 1024 * 50);
            }).build();
    return WebClient.builder().exchangeStrategies(jacksonStrategy).build();
}

주의사항으로 uri(uriBuilder->..) 메서드를 사용해 query parameter 를 지정할 경우 문자열에 / 가 들어갈 escape 문자로 인식하기 때문에 base64 문자열로 변환할 수 없다, url encoding 을 진행하지 않는다. 또한 WebClient 생성시 baseUrl 을 설정하지 않으면 uribuilder 를 통해 scheme, host, port, path 빌드함수를 모두 호출해야 하기 때문에 번거롭다.

WebClientbean 으로 생성해 singleton 방식으로 사용한다면 StringBuilder 를 통해 uri 를 직접생성하는 것을 권장.

StringBuilder urlBuilder = new StringBuilder(WEATHER_GET_ULTRA_SRT_NCST); /*URL*/
urlBuilder.append("?" + URLEncoder.encode("ServiceKey", "UTF-8") + "=" + URLEncoder.encode(dataGovApiKey, "UTF-8")); /*공공데이터포털에서 받은 인증키*/
urlBuilder.append("&" + URLEncoder.encode("pageNo", "UTF-8") + "=" + URLEncoder.encode("1", "UTF-8")); /*페이지번호*/
urlBuilder.append("&" + URLEncoder.encode("numOfRows", "UTF-8") + "=" + URLEncoder.encode("10", "UTF-8")); /*한 페이지 결과 수*/
urlBuilder.append("&" + URLEncoder.encode("dataType", "UTF-8") + "=" + URLEncoder.encode("JSON", "UTF-8")); /*요청자료형식(XML/JSON)Default: XML*/
urlBuilder.append("&" + URLEncoder.encode("base_date", "UTF-8") + "=" + URLEncoder.encode(LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE), "UTF-8")); /*15년 12월 1일발표*/
urlBuilder.append("&" + URLEncoder.encode("base_time", "UTF-8") + "=" + URLEncoder.encode(String.format("%02d00", curHour), "UTF-8")); /*05시 발표*/
urlBuilder.append("&" + URLEncoder.encode("nx", "UTF-8") + "=" + URLEncoder.encode(String.valueOf(nx), "UTF-8")); /*예보지점 X 좌표값*/
urlBuilder.append("&" + URLEncoder.encode("ny", "UTF-8") + "=" + URLEncoder.encode(String.valueOf(ny), "UTF-8")); /*예보지점의 Y 좌표값*/
URI uri = new URL(urlBuilder.toString()).toURI();

SSE(Server-Sent Event)

참고: https://www.youtube.com/watch?v=4HlNv1qpZFY&t=1283s 평범한 HTTP, Websocket, SSE 프로토콜의 시퀀스 비교이다. springboot_websocket2

서버 단방향 통신이라 웹소켓이 비해 속도나 오버헤드 측면에서 SSE 가 효율적이지만 양방향이 안되는 이유로 웹소켓이 주로 사용된다.

WebFlux 를 사용하지 않고 spring-boot-starter-web 에서 SSE 프로토콜 사용이 가능하다.

@Component
@RequiredArgsConstructor
public class TemperatureSensor {
   // 스프링 제공 이벤트 발행 구독 지원 클래스
   private final ApplicationEventPublisher publisher;
   private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
   private final Random rnd = new Random();

   @PostConstruct
   public void startProcessing() {
      this.executor.schedule(this::probe, 1, SECONDS); // 데이터 발행 시작 
   }

   private void probe() {
      double temperature = 16 + rnd.nextGaussian() * 10;
      publisher.publishEvent(new Temperature(temperature)); // 이벤트 데이터 발행 
      executor.schedule(this::probe, rnd.nextInt(5000), MILLISECONDS); // 5초후 재발행
   }
}

ScheduledExecutorServiceApplicationEventPublisher 클래스를 사용해 Temperature 데이터를 계속 발행한다. @EventListener 를 사용해 ApplicationEventPublisher 에 발행된 데이터를 받을 수 있다.

@Slf4j
@RestController
public class TemperatureController {
    static final long SSE_SESSION_TIMEOUT = 5 * 1000L;
    // 연결 클라이언트 관리 list
    private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>();

    // TemperatureSensor 의 ApplicationEventPublisher 에서 발행되는 데이터 대응
    @Async
    @EventListener
    public void handleMessage(Temperature temperature) {
        log.info(format("Temperature: %4.2f C, active subscribers: %d", temperature.getValue(), clients.size()));
        // 관리되는 클라이언트에게 발행된 Temperature 데이터 전달 및 예외발생 클라이언트 삭제처리
        List<SseEmitter> deadEmitters = new ArrayList<>();
        clients.forEach(emitter -> {
            try {
                Instant start = Instant.now();
                emitter.send(temperature, MediaType.APPLICATION_JSON);
                log.info("Sent to client, took: {}", Duration.between(start, Instant.now()));
            } catch (Exception ignore) {
                deadEmitters.add(emitter);
            }
        });
        clients.removeAll(deadEmitters);
    }

    @RequestMapping(value = "/temperature-stream", method = RequestMethod.GET)
    public SseEmitter events(HttpServletRequest request) {
        log.info("SSE stream opened for client: " + request.getRemoteAddr());
        SseEmitter emitter = new SseEmitter(SSE_SESSION_TIMEOUT); // 5 초간 연결
        clients.add(emitter); // 관리 emitter 목록에 추가

        // Remove SseEmitter from active clients on error or client disconnect
        emitter.onTimeout(() -> clients.remove(emitter));
        emitter.onCompletion(() -> clients.remove(emitter));

        return emitter;
    }
    // 위에 설정된 5초가 지나면 AsyncRequestTimeoutException 이 발생하고 호출됨.  
    @ExceptionHandler(value = AsyncRequestTimeoutException.class)
    public ModelAndView handleTimeout(HttpServletResponse rsp) throws IOException {
        log.warn("handle timeout");
        if (!rsp.isCommitted()) {
            rsp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
        }
        return new ModelAndView();
    }
}

SseEmitter 를 지속적으로 유지, 관리한다. 반환 타입이 ResponseEntity, Map 과 같은 객체가 아닌 SseEmitter 인 것이 어색하다.

WebFlux 에선 FluxSpring 5.0 에 추가된 ServerSentEvent 를 사용해 SSE 프로토콜을 지원한다.

@RestController
@RequiredArgsConstructor
public class ServerSentController {

    private final StocksService stocksService;

    @GetMapping("/sse/stocks")
    public Flux<ServerSentEvent<?>> streamStocks() {
        return stocksService.stream() // Flux<StockItem> 반환
                .map(item -> ServerSentEvent
                        .builder(item)
                        .event("StockItem")
                        .id(item.getId())
                        .build());
    }

    @GetMapping("/sse/stocks2")
    public Flux<StockItem> streamStocks2() {
        return stocksService.stream();
    }
}

WebFlux 내부에서 발행원소를 ServerSentEvent 로 래핑 하기에 단순 Flux<> 만 컨트롤러 메서드에서 반환해도 된다.

스프링 시큐리티 with WebFlux

기존 서블릿 기반 스프링 부트는 하나의 스레드에 하나의 연결이 처리되어 ThreadLocalSecurityContext 를 저장해 연결동안 보안처리를 진행했지만

리액티브는 하나의 연결에 여러개의 스레드가 꼬여있을 수 있어 Reactor Context 를 사용해야 한다. spring-boot-starter-security 모듈 역시 기존 서블릿 기반 WebMVC 에서 WebFlux 를 지원할 수 있도록 업데이트 되었다.

ReactiveSecurityContextHolder

WebMVC 에선 SecurityContextHolder 에서 SecurityContext 를 가져왔다면 WebFlux 에선 ReactiveSecurityContextHolder 에서 SecurityContext 를 가져온다.

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
public class SecuredProfileController {

    private final ProfileService profileService;

    @GetMapping("/profiles")
    public Mono<Profile> getProfile() {
        return ReactiveSecurityContextHolder.getContext() // Mono<SecurityContext> 반환
            .map(SecurityContext::getAuthentication)
            .flatMap(auth -> profileService.getByUser(auth.getName()));
    }
}

로그인한 유저의 정보를 SecurityContext 가져와 Profile 에서 검색 후 출력한다.

당연히 SecurityContext::getAuthentication 메서드는 리액터 컨텍스트 를 사용한다.

// ReactiveSecurityContextHolder.java

public static Mono<SecurityContext> getContext() {
    return Mono.subscriberContext()
        .filter( c -> c.hasKey(SECURITY_CONTEXT_KEY))
        .flatMap( c-> c.<Mono<SecurityContext>>get(SECURITY_CONTEXT_KEY));
}

SecurityWebFilterChain

인증과정을 거치려면 내부 컨텍스트에 엑세스 하려고 하면 SecurityContext 가 해당 리액터 컨텍스트 에 존재해야하고 Authentication 객체가 SecurityContext 안에 할당되어야 한다.

이 모든 과정을 ReactorContextWebFilterSecurityWebFilterChain 적용하고 이를 통해 리액터 컨텍스트 안에 SecurityContext 객체와 Authentication 객체를 집어넣는다.

SecurityContext 를 집어 넣는 함수는 ServerSecurityContextRepository 를 사용한다.

package org.springframework.security.web.server.context;

public interface ServerSecurityContextRepository {
	Mono<Void> save(ServerWebExchange exchange, SecurityContext context);
	Mono<SecurityContext> load(ServerWebExchange exchange);
}

SecurityContext 를 특정 ServerWebExchange에 저장, 사용할 수 있다.

@Configuration
@EnableReactiveMethodSecurity // 별도의 MethodInterceptor 사용시 필요함
public class SecurityConfiguration {

    @Bean // ReactorContextWebFilter 에 적용할 시큐리티 필터
    public SecurityWebFilterChain securityFilterChainConfigurer(ServerHttpSecurity httpSecurity) {
        // 기존 mvc 에서 사용하던 HttpSecurity 에서 webflux 용으로 정의된 ServerHttpSecurity
        // 기존 spring security 사용 방식과 크게 다르지 않다.  
        return httpSecurity
            .authorizeExchange()
            .anyExchange().permitAll().and()
            .httpBasic().and()
            .formLogin().and()
            .build();
    }

    private static final Pattern PASSWORD_ALGORITHM_PATTERN = Pattern.compile("^\\{.+}.*$");
    private static final String NOOP_PASSWORD_PREFIX = "{noop}";

    @Bean // 로그인에 사용할 테스트 사용자 생성
    public MapReactiveUserDetailsService reactiveUserDetailsService(ObjectProvider<PasswordEncoder> passwordEncoder) {
        return new MapReactiveUserDetailsService(
            User.withUsername("user")
                .password("user")
                .passwordEncoder(p -> getOrDeducePassword(p, passwordEncoder.getIfAvailable()))
                .roles("USER")
                .build(),
            User.withUsername("admin")
                .password("admin")
                .passwordEncoder(p -> getOrDeducePassword(p, passwordEncoder.getIfAvailable()))
                .roles("USER", "ADMIN")
                .build());
    }

    private String getOrDeducePassword(String password, PasswordEncoder encoder) {
        if (encoder != null || PASSWORD_ALGORITHM_PATTERN.matcher(password).matches()) {
            return password;
        }
        return NOOP_PASSWORD_PREFIX + password;
    }
}

WebFlux with JWT

Custom SecurityContextRepository, AuthenticationManager

스프링 시큐리티는 SecurityContextRepository 를 통해 SecurityContext리액터 컨텍스트에 저장하고 삭제한다.

default 스프링 시큐리티의 경우 DB 로부터 UserDetails 를 가져와 등록해두고 사용하겠지만 우리는 JWT 를 사용하기에 ServerSecurityContextRepository 상속받아 커스터마이징 해야한다.

@Component
@RequiredArgsConstructor
public class SecurityContextRepository implements ServerSecurityContextRepository {

    private final AuthenticationManager authenticationManager;

    @Override
    public Mono<Void> save(ServerWebExchange swe, SecurityContext sc) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    public Mono<SecurityContext> load(ServerWebExchange swe) {
        ServerHttpRequest request = swe.getRequest();
        String authToken = request.getHeaders().getFirst(HttpHeaders.AUTHORIZATION);
        if (authToken != null) {
            Authentication auth = new UsernamePasswordAuthenticationToken(authToken, authToken);
            return authenticationManager
                    .authenticate(auth)
                    .map(authentication -> new SecurityContextImpl(authentication));
        } else {
            return Mono.empty();
        }
    }
}

SecurityContext 를 생성 및 저장하기 request 헤더로 부터 JWT 토큰을 가져와 AuthenticationManager로 넘기는 코드가 load 에 정의되어 있다.

@Component
@RequiredArgsConstructor
public class AuthenticationManager implements ReactiveAuthenticationManager {

    private final JwtTokenUtil jwtUtil;

    @Override
    public Mono<Authentication> authenticate(Authentication authentication) {
        String authToken = authentication.getCredentials().toString();
        if (!jwtUtil.validateToken(authToken)) {
            return Mono.empty();
        }
        Claims claims = jwtUtil.getAllClaimsFromToken(authToken);
        String role = claims.get("role", String.class);
        List<GrantedAuthority> authorities = Collections.singletonList(new SimpleGrantedAuthority(role));
        return Mono.just(new UsernamePasswordAuthenticationToken(claims.getSubject(), null, authorities));
    }
}

AuthenticationManager 는 전달받은 토큰으로 role 을 꺼내어 Authority 를 지정하고 Authentication 인증 객체를 반환한다.

SecurityContextPath 가 리액터 컨텍스트로 변경되었을 뿐 기존 WebMVC 모델의 스프링 시큐리티 방식과 비슷하다.

@Bean // ReactorContextWebFilter 에 적용할 시큐리티 필터
public SecurityWebFilterChain securityFilterChainConfigurer(ServerHttpSecurity httpSecurity) {
    // 기존 mvc 에서 사용하던 HttpSecurity 에서 webflux 용으로 정의된 ServerHttpSecurity
    // 기존 spring security 사용 방식과 크게 다르지 않다.
    return httpSecurity
        .exceptionHandling()
        //.authenticationEntryPoint((swe, e) -> Mono.fromRunnable(() -> // 미 로그인
        //        swe.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED)))
        //.accessDeniedHandler((swe, e) -> Mono.fromRunnable(() -> // 미 로그인
        //        swe.getResponse().setStatusCode(HttpStatus.FORBIDDEN)))
        .authenticationEntryPoint((swd, e) -> Mono.error(new AuthenticationCredentialsNotFoundException("")))
        .accessDeniedHandler((swe, e) -> Mono.error(new AccessDeniedException("")))
        .and()
        .authenticationManager(authenticationManager)
        .securityContextRepository(securityContextRepository)
        .csrf().disable()
        .cors().disable()
        .httpBasic().disable() // Basic Authentication disable
        .formLogin().disable()
        .authorizeExchange()
        .pathMatchers("/member/join", "/member/login").permitAll()
        .pathMatchers("/member/**", "/rent/**").authenticated()
        .pathMatchers("/admin/**").hasAnyRole("ROLE_ADMIN")
        .anyExchange().permitAll()
        .and()
        .build();
}

마지막으로 ServerHttpSecurity 에 커스터마이징한 SecurityContextRepository, AuthenticationManager 를 등록하고 별도의 에러 핸들링 처리를 한다면 에러를 반환해 핸들링, 없다면 HttpStatus.UNAUTHORIZED 로 단순 HTTP Status 만 반환할 수 있다.

스프링 데이터 with WebFlux

기존에 WebMVC 방식의 데이터베이스 접근시 JDBC 를 구현한 드라이버 라이브러리 를 사용해 접근해왔다.

리액티브한 DB 통신도 HTTP 와 다르지 않다. 이론적으로 DB 접근용 서비스를 생성하고 WebClient 를 사용해 DB 데이터를 가져온다면 비동기 DB 접근 라이브러리를 구현한 것 과 다름없다.

다행이도 다양한 DB 벤더사에서 자바 비동기 DB 연결 라이브러리인 리액티브 드라이버를 제공함으로 단순히 라이브러리만 추가하면 데이터베이스 레이어에 대한 논 블록킹 엑세스를 할 수 있다.

spring-boot-starter-data-mongodb-reactive spring-boot-starter-data-cassandra-reactive spring-boot-starter-data-redis-reactive spring-boot-starter-data-r2dbc

스프링 데이터 팀에서 기존에 사용한던 Repository 패턴을 리액티브 방식에도 똑같이 사용할 수 있도록 추상화를 통해 구현해두었다.

각 모듈들이 ReactiveCurdRepository 인터페이스를 사용해 Reactor 라이브러리와 통합되어 자연스럽게 리액티브하게 코드작성이 가능하다.

스프링 데이터 몽고DB 리액티브

NoSQL 의 경우 각 벤더사에서 통합된 규약이 없다. 각 벤더사에서 자기들만의 드라이버 라이브러리를 제공하고 스프링 데이터 팀은 스프링에서 해당 라이브러리들을 쉽게 사용할 수 있도록 각종 모듈을 개발하고 있다

NoSQL DB 는 최근에 만들어 져서 대부분 벤더사가 리액티브 드라이버 를 제공하고 있으며 스프링 데이터 팀은 몽고DB 에서 제공하는 리액티브 드라이버 를 쉽고 편하게 사용할 수 있도록 spring-boot-starter-data-mongodb-reactive 모듈을 작성해두었다.

해당 모듈을 사용하면 스프링 팀에서 만든 Repository 패턴을 사용해 메서드명 기반으로 쿼리문이 자동 생성/사용 할 수 있다.

ReactiveMongoRepository

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

// ReactiveCurdRepository 구현체
@Repository
public interface BookReactiveMongoRepository extends ReactiveMongoRepository<Book, ObjectId> {
    Mono<Book> findOneByTitle(Mono<String> title);

    Flux<Book> findManyByTitleRegex(String regexp);

    @Meta(maxScanDocuments = 3)
    Flux<Book> findByAuthorsOrderByPublishingYearDesc(Publisher<String> authors);

    @Query("{ 'authors.1': { $exists: true } }")
    Flux<Book> booksWithFewAuthors();

    Flux<Book> findByPublishingYearBetweenOrderByPublishingYear(
            Integer from,
            Integer to,
            Pageable pageable
    );
}

ReactiveMongoTemplate

ReactiveMongoRepository 외에도 ReactiveMongoTemplate 를 사용해 쿼리 조작이 가능하다.

@Service
@RequiredArgsConstructor
public class RxMongoTemplateQueryService {
    private static final String BOOK_COLLECTION = "book";

    private final ReactiveMongoTemplate mongoTemplate; // ReactiveMongoTemplate implements ReactiveMongoOperations

    public Flux<Book> findBooksByTitle(String title) {
        Query query = Query.query(new Criteria("title")
            .regex(".*" + title + ".*"))
            .limit(100);
        return mongoTemplate.find(query, Book.class, BOOK_COLLECTION);
    }
}

MongoClient

몽고DB 에서 제공하는 리액티브 드라이버 구현체가 com.mongodb.reactivestreams.client.MongoClient 클래스이다.

https://mongodb.github.io/mongo-java-driver-reactivestreams/

org.mongodb:mongodb-driver-reactivestreams 모듈에서 제공하며 spring-boot-starter-data-mongodb-reactive 에서 내부적으로 사용한다.

MongoClient 클래스를 사용해도 쿼리조작이 가능하다.

@Service
@RequiredArgsConstructor
public class RxMongoDriverQueryService {

    private final MongoClient mongoClient;
    
    public Flux<Book> findBooksByTitle(String title, boolean negate) {
        return Flux.defer(() -> {
            Bson query = Filters.regex("title", ".*" + title + ".*");
            if (negate) query = Filters.not(query);
            return mongoClient
                .getDatabase("test-db")
                .getCollection("book")
                .find(query);
        }).map(doc -> new Book(
            doc.getObjectId("id"), // Document
            doc.getString("title"),
            doc.getInteger("pubYear")));
    }
}

트랜잭션(ReactiveMongoTemplate.inTransaction)

MongoDB 4.0 버전 이전까지 하나의 문서 에 대해서만 트랜잭션을 제공하는 Single-Document Transaction 기능만 있었다. 하나의 문서에 모든 정보를 삽입하여 사용하기에 하나의 트랜잭션은 하나의 문서만 건들여서 Single-Document Transaction 으로도 충분해야 하지만 항상 예외가 있는법, 결국 여러 문서에 대한 트랜잭션 Multi-Document Transaction 기능을 MongoDB 4.0 부터 지원한다.

WiredTiger 스토리지 엔진의 샤딩설정이 되어 있지 않고 복제설정일 경우에만 Multi-Document Transaction 을 지원한다.

ReactiveMongoTemplateinTransaction 메서드를 사용하면

private Mono<TxResult> doTransferMoney(String from, String to, Integer amount) {
    return mongoTemplate.inTransaction().execute(session -> session
        .findOne(queryForOwner(from), Wallet.class)
        .flatMap(fromWallet -> session
            .findOne(queryForOwner(to), Wallet.class)
            .flatMap(toWallet -> {
                if (fromWallet.hasEnoughFunds(amount)) {
                    fromWallet.withdraw(amount);
                    toWallet.deposit(amount);

                    return session.save(fromWallet)
                        .then(session.save(toWallet))
                        .then(ReactiveMongoContext.getSession())
                        // An example how to resolve the current session
                        .doOnNext(tx -> log.info("Current session: {}", tx))
                        .then(Mono.just(TxResult.SUCCESS));
                } else {
                    return Mono.just(TxResult.NOT_ENOUGH_FUNDS);
                }
            })))
        .onErrorResume(e -> Mono.error(new RuntimeException("Conflict")))
        .last();
}

스프링 데이터 R2DBC

R2DBC: Reactive Relational Database Connectivity

https://r2dbc.io/ https://spring.io/projects/spring-data-r2dbc https://spring.io/projects/spring-data-r2dbc

아래와 같은 DBMS 에 대하여 r2dbc 라이브러리를 제공

H2 (io.r2dbc:r2dbc-h2)
MariaDB (org.mariadb:r2dbc-mariadb)
Microsoft SQL Server (io.r2dbc:r2dbc-mssql)
MySQL (dev.miku:r2dbc-mysql)
jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)
Postgres (io.r2dbc:r2dbc-postgresql)
Oracle (com.oracle.database.r2dbc:oracle-r2dbc)

지금까지 스프링 JDBC 혹은 스프링 데이터 JDBC 혹은 JPA 를 사용해 생성된 Hikari CP 안의 연결객체가 JDBC 드라이버를 사용해 관계형 DB 를 사용해 왔다.

@Repository
public interface BookSpringDataJdbcRepository extends CrudRepository<Book, Integer> {
    
    @Query("SELECT * FROM book b WHERE b.title = :title")
    CompletableFuture<Book> findBookByTitleAsync(@Param("title") String title);

}

JDBC, JPA 등의 관계형 DB 라이브러리들은 모두 동기/블럭킹 방식으로 동작한다.

다행이도 spring data jdbc 를 개발한 스프링 데이터 Relational 프로젝트 팀에서 리액티브에 적합자바 DB 드라이버리액티브 드라이버를 개발중이다. 이 리액티브 드라이버 를 사용한 프로젝트가 R2DBC 프로젝트이다.

더이상 JDBC 를 사용하지 않고 리액티브 스택에 적합한 리액티브 드라이버를 사용해 DB 에 접근, 데이터를 조작한다.

안타깝게도 JPA 는 기존 코드가 너무 복잡했는지 리액티브 지원을 하지 않을것으로 보인다.

ReactiveCrudRepository

@Repository
public interface MemberRepository extends ReactiveCrudRepository<Member, Long> {
    Mono<Member> findByName(String name);

    Mono<Member> findByUserName(String name);

    @Query("SELECT * FROM member WHERE name = :name AND user_name = :userName")
    Mono<Member> findByNameAndUserName(String name, String userName);
}

R2dbcEntityTemplate

@Service
@RequiredArgsConstructor
public class MemberDynamicRepository {
    private final R2dbcEntityTemplate r2dbcEntityTemplate;

    public Flux<Member> findTest(String userName) {
        Query query = Query.query(where("user_name").like("%" + userName + "%"))
                .limit(10)
                .offset(0);
        return r2dbcEntityTemplate.select(Member.class)
                .matching(query)
                .all();
    }
}

스프링 데이터 Redis

spring-boot-starter-data-redis-reactive 모듈을 사용 ReactiveRedisTemplate 클래스가 Redis 커넥션의 핵심클래스이다.

다른 스프링 데이터 프로젝트와 달리 Repository 가 존재하지 않음 일반적인 데이터 관리 외에도 구독/발행 구조의 메시지 기능도 지원한다.

spring-boot-starter-data-redis-reactive 모듈은 내부적으로 Lettuce 라이브러리를 사용한다.

https://lettuce.io/, 현재 non blokcing 을 지원하는 redis 라이브러리가 Lettuce 가 유일하다. 또한 Lettuce 라이브러리 내에서 Reactor 라이브러리를 사용한다.

public class Sample {
    private String name;
    private String description;
}


@Configuration
public class RedisConfig {
    @Value("${redis.host}")
    private String host;
    @Value("${redis.port}")
    private Integer port;

    @Bean
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
        return new LettuceConnectionFactory(host, port);
    }
    
    @Bean
    public ReactiveRedisTemplate<String, Sample> reactiveRedisTemplate(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        StringRedisSerializer keySerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<Sample> valueSerializer = new Jackson2JsonRedisSerializer<>(Sample.class);

        RedisSerializationContext.RedisSerializationContextBuilder<String, Sample> builder =
                RedisSerializationContext.newSerializationContext(keySerializer);

        RedisSerializationContext<String, Sample> context = builder.value(valueSerializer).build();

        return new ReactiveRedisTemplate(reactiveRedisConnectionFactory, context);
    }
}
@Service
@RequiredArgsConstructor
public class SampleService {
    private final ReactiveRedisTemplate<String, Sample> redisTemplate;

    public Mono<Boolean> put(String key, Sample sample) {
        return redisTemplate.opsForValue().set(key, sample);
    }

    public Mono<Sample> get(String key) {
        return redisTemplate.opsForValue().get(key);
    }

    public Flux<Sample> getAll(String keyPattern){
        return redisTemplate.keys(keyPattern)
                .flatMap(key-> redisTemplate.opsForValue().get(key));
    }

    public Mono<Boolean> delete(String key) {
        return redisTemplate.opsForValue().delete(key);
    }
}

기타

ListenableFuture

스프링에서 제공하는 Future 구현 클래스

public interface ListenableFuture<T> extends Future<T> {
	void addCallback(ListenableFutureCallback<? super T> callback);
	void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
	default CompletableFuture<T> completable() {
		CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
		addCallback(completable::complete, completable::completeExceptionally);
		return completable;
	}
}

위의 SseEmitter 와 마찬가지로 스프링 리액티브의 반환데이터로 사용된다.

// 비동기 RestTemplate
AsyncRestTemplate httpClient = new AsyncRestTemplate();

@GetMapping
public ListenableFuture<?> requestData() {
    AsyncDatabaseClient databaseClient = new FakeAsyncDatabaseClient();
    // /hello 의 호출결과를 CompletableFuture 으로 반환하는 어뎁터
    CompletionStage<String> completionStage = AsyncAdapters.toCompletion(httpClient.execute(
        "http://localhost:8080/hello",
        HttpMethod.GET, null,
        new HttpMessageConverterExtractor<>(String.class, messageConverters) // http 의 body 부분 컨버터들 지정
    ));
    // CompletionStage(CompletableFuture 의 인터페이스) 를 ListenableFuture 로 변환
    return AsyncAdapters.toListenable(databaseClient.store(completionStage));
}

AsyncRestTemplate.execute 가 반환하는 ListenableFutureCompletionStage 로 변환 반환된 CompletionStage 를 데이터베이스에 저장후 다시 반환된 CompletionStageListenableFuture 로 변환한다.

일반적으로 간단한 비동기 처리는 Future 를 구현한 CompletableFuture(CompletionStage 구현체) 를 주로 사용한다.

메서드 정의시 반환값을 스프링에서 기본 제공하는 ListenableFuture 를 구현하는 객체로 반환하기 힘들기에 위와같은 AsyncAdapters 를 사용해 비동기 결과를 ListenableFuture 로 변환해주는 어뎁터를 사용하는 것이 편하다.

public final class AsyncAdapters {
    // ListenableFuture -> completionStage
    public static <T> CompletionStage<T> toCompletion(ListenableFuture<T> future) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        future.addCallback(completableFuture::complete, completableFuture::completeExceptionally);
        return completableFuture;
    }
    // completionStage -> ListenableFuture
    public static <T> ListenableFuture<T> toListenable(CompletionStage<T> stage) {
        SettableListenableFuture<T> future = new SettableListenableFuture<>();
        stage.whenComplete((v, t) -> {
            if (t == null) future.set(v);
            else future.setException(t);
        });
        return future;
    }
}

AsyncRestTemplate

스프링 리액티브에선 동기방식인 일반 RestTemplate 을 사용하지 않고 AsyncRestTemplate 를 사용한다. 흔히사용하는 execute 메서드의 반환값이 ListenableFuture 객체이다.

@Override
public <T> ListenableFuture<T> execute(String url, HttpMethod method, AsyncRequestCallback requestCallback,
        ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {

    URI expanded = getUriTemplateHandler().expand(url, uriVariables);
    return doExecute(expanded, method, requestCallback, responseExtractor);
}

출처 : https://kouzie.github.io/spring/Spring-Boot-%EC%8A%A4%ED%94%84%EB%A7%81-%EB%A6%AC%EC%95%A1%ED%8B%B0%EB%B8%8C-%EA%B0%9C%EC%9A%94/#webflux

Last updated