# WebFlux

## 스프링 코어 for Reactive <a href="#for-reactive" id="for-reactive"></a>

스프링 5.0 에서부터 리액티브 코드를 위한 여러가지 클래스들이 수정, 추가되었다.

### ReactiveAdapter, ReactiveAdapterRegistry <a href="#reactiveadapter-reactiveadapterregistry" id="reactiveadapter-reactiveadapterregistry"></a>

`RxJava, Reactor` 에서 사용하는 발행자 클래스를 `Publihser` 로 변환해주는 `Adapter` 가 `springframework.core` 에 추가되어 사용 가능해졌다.

아래처럼 `ReactiveAdapter` 를 상속받아 RxJava Maybe 와 Publisher 간의 변환 작업을 해주는 Adapter 를 작성해서 사용하거나

```
@Component
public class MaybeReactiveAdapter extends ReactiveAdapter {

    public MaybeReactiveAdapter() {
        /**
         * Descriptor for a reactive type that can produce 0..1 values.
         * @param type the reactive type
         * @param emptySupplier a supplier of an empty-value instance of the reactive type
         */
        super(ReactiveTypeDescriptor.singleOptionalValue(Maybe.class, Maybe::empty),
            maybe -> ((Maybe<?>) maybe).toFlowable(), // Maybe->Publisher
            publisher -> Flowable.fromPublisher(publisher).singleElement()); // Publisher->Maybe
    }
}
```

`ReactiveAdapterRegistry` 를 사용해 싱글턴 `Instance` 변수에 `Adapter` 용 코드를 작성해 필요할때 마다 꺼내어 쓸 수 있다.

```
@PostConstruct
public void init() {
    ReactiveAdapterRegistry
        .getSharedInstance()
        .registerReactiveType(ReactiveTypeDescriptor.singleOptionalValue(Maybe.class, Maybe::empty),
            maybe -> ((Maybe<?>) maybe).toFlowable(), // Maybe->Publisher
            publisher -> Flowable.fromPublisher(publisher).singleElement()); // Publisher->Maybe
}

...

ReactiveAdapter adapter = ReactiveAdapterRegistry
    .getSharedInstance()
    .getAdapter(Maybe.class);
...
```

### 리액티브 I/O, 코덱 <a href="#io" id="io"></a>

`springframework.core.io` 에 저장된 `DataBuffer`, `DataBufferUtils` 를 사용하면 I/O 작업이 필요한 파일, 네트워크 자원으로 부터 리액티브 스트림 형태로 작업을 처리할 수 있다. `jav.nio.ByteBuffer` 클래스의 형변환 기능을 추가하여 보다 쉽게 사용 가능하다.

```
Flux<DataBuffer> reactiveHamlet = DataBufferUtils.read(
    new DefaultResourceLoader().getResource("hamlet.txt"),
    new DefaultDataBufferFactory(),
    1024);
```

`springframework.core.codec` 에 정의된 인터페이스 `Encoder`, `Decoder` 를 사용하면 `Non Blocking` 방식으로 직렬화 데이터를 자바객체, 자바객체를 직렬화 데이터로 변환 가능하다.

## WebFlux <a href="#webflux" id="webflux"></a>

`Sprinb Boot 2` 에 리액티브 웹서버를 위한 `WebFlux` 모델을 사용할 수 있도록 `spring-boot-starter-webflux` 라는 새로운 패키지를 추가할 수 있게 되었다.

해당 모듈은 `Reactive Stream Adapter` 위에 구축된며 `Servlet 3.1+ 지원서버(Tomcat, Jetty 등)`, `Netty`, `Undertow` 서버엔진에서 모두 지원한다.

> 위의 엔진들은 `java 8` 에 추가된 `java NIO` 로 구현되어 Http 요청을 논블럭킹으로 처리한다.

![springboot\_react2](https://kouzie.github.io/assets/springboot/springboot_react2.png)

일반적은 `WebMVC` 모듈도 `Spring 5.0` 에 이르러 `spring-boot-starter-web` `Servlet 3.1` 을지원하면서 일부분은 리액티브 스트림을 지원하게 되었다.

`ResponseBodyEmitterReturnValueHandler` 클래스가 업그레이드 되면서 `ReactiveTypeHandler` 필드를 사용해 `WebMVC` 의 인프라 구조를 크게 해치지 않고 `컨트롤러 메서드`가 반환하는 `Flux, Mono, Flowable` 등의 `Publisher`(리액티브 스트림)을 처리한다.

![springboot\_react1](https://kouzie.github.io/assets/springboot/springboot_react1.png)

물론 서블릿 API 를 사용하기에 `블록킹/스레드풀` 방식을 사용한다.

### WebFlux 개요 <a href="#webflux" id="webflux"></a>

기존 `WebMVC` 모델 구조는 아래와 같다.

![springboot\_react3](https://kouzie.github.io/assets/springboot/springboot_react3.png)

> `ViewResolver` 는 `Rest` 방식에선 생략된다.

각종 서블릿 컨테이너(`Tomcat, JBoss` 등) 요청을 서블릿 클래스를 상속한 `DispatcherServlet` 이 스프링 부트 컨트롤러 매핑에 따라 요청을 분배한다.

그림처럼 기존 `WebMVC` 방식은 `동기/블로킹` 방식으로 동작한다.

스프링 부트에서 리액티브 방식을 사용하려면 `Servlet 3.1+(Tomcat, Jetty 등)`, `Netty`, `Undertow` 와 같은 서버를 사용해 리액티브하게 구조가 변경되어야 하는데

다행이도 스프링 프로젝트팀이 동일한 어노테이션 기반 프로그래밍 모델을 사용하면서 `비동기/논블록킹` 으로 동작하도록 이미 개발해두었다.

#### WebFlux with Flux <a href="#webflux-with-flux" id="webflux-with-flux"></a>

대략적으로 `WebFlux` 에서 Http Request, Response 어떻게 리액티브로 구현했는지 아래 인터페이스로 확인할 수 있다.

```
interface ServerHttpRequest { // Http Request 를 객체로 표현
    Flux<DataBuffer> getBody();
    ...
}
interface ServerHttpResponse { // Http Response 를 객체로 표현
    Mono<Void> writeWith(Publihser<? extends DataBuffer> body);
    ...
}
interface ServerWebExchange { // HTTP Request, Response 컨테이너 역할
    ServerHttpRequest getRequest();
    ServerHttpResponse getResponse();
    Mono<WebSession> getSession();
    ...
}
```

서블릿의 `ServletRequest`, `ServletResponse` 와 연관지어서 새로운 Http 요청, 반환을 객체로 표현할 수 있는 인터페이스들이 정의되어있고

`DataBuffer` 를 사용해 리액티브 타입과의 결합도를 낮춘다.

```
interface WebHandler {
    Mono<Void> handle(ServerWebExchange exchanage);
}
interface WebFilterChain {
    Mono<Void> filter(ServerWebExchange exchanage);
}
interface WebFilter {
    Mono<Void> filter(ServerWebExchange exchanage, WebFilterChain chain);
}
```

`WebHandler` 는 `DispatcherServlet` 역할, 실행결과를 받지 못할 수 있음으로 `handle` 메서드는 `Mono<Void>` 가 반환된다.

> 클라이언트를 위한 Http 반환은 `exchange` 안의 `ServerHttpResponse` 에

`WebFilter` 는 서블릿의 요청, 반환 필터처럼 리액티브에서도 비지니스 로직에 집중할 수 있도록 필터기능이 제공된다.

```
public interface HttpHandler {
    Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
}
```

마지막으로 `WebHandler` 로부터 전달받은 `exchange` 객체를 `url` 매핑할 수 있도록 `HttpHandler` 의 `handle` 로 전달된다.

### WebFlux - Functional Reactive Web Server <a href="#webflux---functional-reactive-web-server" id="webflux---functional-reactive-web-server"></a>

`Vert.x` 나 `Ratpack` 과 같은 프레임워크의 인기비결은 스프링의 복잡한 MVC 설정으로 라우팅 설정과 로직이 없이\
간결한 설정으로 라우팅 로직을 작성할 수 있는 API 들이 잘 정의되어 있기 때문이다.

스프링 프레임워크도 위 프레임워크처럼 간결하게 라우팅 로직을 처리할 수 있는 API 를 개발하였다.

`spring-boot-starter-webflux` 모듈의 `org.springframework.web.reactive.function.server` 패키지에 정의된 `RouterFunction` 클래스 사용하여 라우팅 로직 정의가 가능하다.

```
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;
import static org.springframework.http.MediaType.APPLICATION_JSON;

@SpringBootApplication
public class DemoApplication {

    final ServerRedirectHandler serverRedirectHandler = new ServerRedirectHandler();

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> routes(OrderHandler orderHandler) {
        return nest(path("/orders"), 
            nest(accept(APPLICATION_JSON),
                route(GET("/{id}"), orderHandler::get)
                    .andRoute(method(HttpMethod.GET), orderHandler::list))
                .andNest(contentType(APPLICATION_JSON),
                    route(POST("/"), orderHandler::create))
                .andNest((serverRequest) -> serverRequest.cookies().containsKey("Redirect-Traffic"),
                    route(all(), serverRedirectHandler))
        );
    }
}
```

`RouterFunction` 을 스프링 `Bean` 으로 등록하고 `/orders` 에 대한 라우팅 로직 설정, 기본적인 `Path, Http method, cookie` 포함여부 등 여러 로직처리 가능하다.

함수형으로 웹서버를 구축할경우 `Netty` 서버를 같이 사용하면 별도의 스프링 설정없이 서버 실행이 가능하다.

이런점때문에 단순 테스트의 경우 `@SpringBootApplication` 을 사용하지 않고 단순 `Netty` 서버를 사용해 빠르게 서버를 실행하고 구현한 메서드들을 테스트 할 수 있다.

만약 패스워드 암호화 및 복호화 테스트를 한다면 서버기능을 하는 객체 외에 추가적으로 필요한 객체는 해시 기능이 있는 `spring-boot-starter-security` 의 `PasswordEncoder` 뿐이다.

> `PasswordEncoder` 만 사용한다면 `spring-security-core` 만 의존성 처리해도 된다.

별도의 스프링 관련 어노테이션, Bean 등록과정 없이 `RouterFunction, Netty, PasswordEncoder` 3개 객체만 잘 정의해서 아래처럼 서버 실행이 가능하다.

```
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;

public static void main(String... args) {
    long start = System.currentTimeMillis();
    BCryptPasswordEncoder passwordEncoder = new BCryptPasswordEncoder(18); // encoder 생성

    HttpHandler httpHandler = RouterFunctions.toHttpHandler( // RouterFunction -> HttpHandler 변경
        route(POST("/password"), request -> request // RouterFunction 으로 라우터 로직 생성 
            .bodyToMono(PasswordDTO.class)
            .map(p -> passwordEncoder.matches(p.getRaw(), p.getSecured()))
            .flatMap(isMatched -> isMatched
                ? ServerResponse.ok().build()
                : ServerResponse.status(HttpStatus.EXPECTATION_FAILED).build())
        )
    );
    ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler); 
    // HandlerAdapter 에 HttpHandler 삽입, BiFunction 를 구현한 클래스임  
    DisposableServer server = HttpServer.create() // Netty Server
        .host("localhost").port(8080)
        .handle(reactorHttpHandler) // BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> 요구함
        .bindNow(); // 서버 엔진 시작  
        
    LOGGER.debug("Started in " + (System.currentTimeMillis() - start) + " ms"); // Started in 703 ms
    server.onDispose().block(); // main 스레드 차단  
}
```

서버가 0.7 초만에 실행된다.\
스프링 컨테이너, 의존성 주입, 어노테이션 처리를 하지 않음으로 속도가 굉장히 빠르며\
간단한 테스트진행은 위와같은 방식으로 진행하면 편하다.

### WebFlux - Annotated Controller <a href="#webflux---annotated-controller" id="webflux---annotated-controller"></a>

`RouterFunctions`를 사용해도 되지만 `WebMVC` 모델에서 사용하는 `@RestController, @RequestMapping` 등의 어노테이션을 `WebFlux` 에서도 사용할 수 있다.

```
@RestController
@RequestMapping("/member")
@RequiredArgsConstructor
public class MemberController {

    private final MemberRepository memberRepository;

    @GetMapping
    public Flux<Member> getAll() {
        return memberRepository.findAll();
    }

    @GetMapping("/id/{id}")
    public Mono<Member> getById(@PathVariable Long id) {
        return memberRepository.findById(id);
    }
}
```

`Flux` 는 배열, `Mono` 는 객체로 반환된다.

### WebFlux - Filter <a href="#webflux---filter" id="webflux---filter"></a>

더이상 `javax.servlet.Filter` 을 사용하지 못한다.

필터기능을 하는 방법은 여러가지다.

#### RouterFunctions <a href="#routerfunctions" id="routerfunctions"></a>

```
@SpringBootApplication
public class ReactApplication {
    public static void main(String[] args) {
        SpringApplication.run(ReactR2dbcApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> filterFunction(MemberComponent memberComponent) {
        return RouterFunctions
                .route(GET("/member/{memberId}")
                .and(accept(MediaType.APPLICATION_JSON)), memberComponent::getById)
                .filter(new ExampleHandlerFilterFunction());
    }
}

@Component
@RequiredArgsConstructor
class MemberComponent {
    private final MemberRepository memberRepository;
    public Mono<ServerResponse> getById(ServerRequest request) {
        return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
                .body(BodyInserters.fromValue(
                       memberRepository.findById(Long.valueOf(request.pathVariable("memberId")))));
    }
}
```

#### WebFilter <a href="#webfilter" id="webfilter"></a>

```
@Component
public class ExampleWebFilter implements WebFilter {
  
    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, 
      WebFilterChain webFilterChain) {
        serverWebExchange.getResponse().getHeaders().add("web-filter", "web-filter-test");
        return webFilterChain.filter(serverWebExchange);
    }
}
```

별도의 매핑 조건이 없기때문에 조건문 분기가 필요함

#### HandlerFilterFunction <a href="#handlerfilterfunction" id="handlerfilterfunction"></a>

```
public class ExampleHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {

    @Override
    public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> handlerFunction) {
        if (request.pathVariable("name").equalsIgnoreCase("test")) {
            return ServerResponse.status(HttpStatus.FORBIDDEN).build();
        }
        return handlerFunction.handle(request);
    }
}
```

### WebFlux - Exception Handler <a href="#webflux---exception-handler" id="webflux---exception-handler"></a>

**메서드 레벨**에서 오류처리는 `ServerResponse` 에 status, body 등을 설정하면 쉽게 처리할 수 있다.\
또한 클래스 내부에서 기존 WebMVC 에서 사용하던 `@ExceptionHandler` 를 사용해 처리할 수 있다.

```
@Controller public class SimpleController {
    @ExceptionHandler public ResponseEntity<String> handle(IOException ex) {
        // ... 
    }
}
```

**글로벌 레벨**에서 오류처리는 `WebExceptionHandler` 인터페이스를 구현해 필터 방식으로 처리할 수 있다.

#### DefaultErrorAttributes <a href="#defaulterrorattributes" id="defaulterrorattributes"></a>

`DefaultErrorAttributes` 는 `WebFlux` 에서 사용하는 에러 핸들러로 기본 필터로 등록되어 있는 에러 핸들러가 `DefaultErrorAttributes` 안의 `getErrorAttributes` 를 호출해 아래와 같은 반환값을 반환한다.

```
{
"timestamp": "...",
"path": "...",
"status": 405,
"error": "Method Not Allowed",
"message": "",
"requestId": "ca35d584-1"
}
```

글로벌 레벨에서 오류처리를 통해 커스텀한 반환값을 설정하고 싶으면 `DefaultErrorAttributes` 의 `getErrorAttributes` 메서드를 오버라이딩 해야한다.

```
@Slf4j
@Component
@RequiredArgsConstructor
public class GlobalErrorAttributes extends DefaultErrorAttributes {

    // private final MessageSource messageSource;

    @Override
    public Map<String, Object> getErrorAttributes(ServerRequest request, ErrorAttributeOptions options) {
        Throwable throwable = getError(request);
        String acceptLanguage = request.headers().firstHeader("accept-language");
        Locale locale = acceptLanguage != null && acceptLanguage.startsWith("ko") ? Locale.KOREA : Locale.getDefault();
        log.error("unknown server error:{}, {}" + throwable.getClass().getCanonicalName(), throwable.getMessage());
        // throwable 의 종류에 맞게 반환값을 조정
        Map<String, Object> map = new HashMap<>();
        map.put("code", UNKNOWN_ERROR_CODE);
        map.put("error", UNKNOWN_ERROR_TYPE);
        // map.put("description", messageSource.getMessage("fms.error.unknown_server_error", null, locale));
        return map;
    }
}
```

에러 반환시 아래처럼 출력되도록 설정

```
{
"code": "500-00",
"error": "UnknownServerError"
}
```

이제 핸들러에서 `DefaultErrorAttributes` 의 `getErrorAttributes` 가 아닌\
직접 정의한 `GlobalErrorAttributes` 의 `getErrorAttributes` 가 호출되도록 설정하면 된다.

#### AbstractErrorWebExceptionHandler <a href="#abstracterrorwebexceptionhandler" id="abstracterrorwebexceptionhandler"></a>

`AbstractErrorWebExceptionHandler` 는 에러발생시 필터로 등록되어 있는 핸들러\
해당 핸들러보다 더 높은 우선순위를 가진 핸들러로 에러처리하도록 설정

```
@Order(-2) // 기본 에러처리는 -1, 보다 빠르게 설정한다.
@Component
public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {
    // constructors
    public GlobalErrorWebExceptionHandler(
            GlobalErrorAttributes errorAttributes, WebProperties.Resources resources,
            ApplicationContext applicationContext, ServerCodecConfigurer configurer) {
        super(errorAttributes, resources, applicationContext);
        this.setMessageWriters(configurer.getWriters());
    }

    @Override
    protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
        return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
    }

    private Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
        Map<String, Object> errorPropertiesMap = getErrorAttributes(request, ErrorAttributeOptions.defaults());
        Integer status = Integer.valueOf(((String) errorPropertiesMap.get("code")).substring(0, 3));
        return ServerResponse.status(status)
            .contentType(MediaType.APPLICATION_JSON)
            .body(BodyInserters.fromValue(errorPropertiesMap));
    }
}
```

### WebFlux - WebSocket <a href="#webflux---websocket" id="webflux---websocket"></a>

이미 `spring-boot-starter-websocket` 모듈에서 제공하는 스프링 웹소켓을 통해 논블록킹으로 메세지를 처리할 수 있을 줄 알았지만 내부에서 블로킹 방식으로 동작하며 리액티브 서버 성능에 영향을 끼친다

`WebFlux` 에선 비동기/논블록킹 방식의 웹소켓 처리를 위해 `org.springframework.web.reactive.socket` 패키지를 제공한다.

> 패키지에 `reactive` 가 붙을뿐 클래스명이나 사용방법이 유사하다.

웹소켓 서버, 웹소켓 클라이언트 모두 제공한다.

웹소켓 서버는 `WebSocketHandler` 를 사용해 소켓 핸들러 역할을 하는 객체인 `Handler` 를 등록한다.

```
public class EchoWebSocketHandler implements WebSocketHandler {
    
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session
            .receive() // return Flux<WebSocketMessage> 
            .map(wsMessage -> wsMessage.getPayloadAsText()) // websocket 메세지의 payload (string) 흭득
            .map(tm -> "Echo: " + tm) // 문자열 변환
            .map(tm -> session.textMessage(tm)) // WebsocketSession 을 사용, client 보낼 메세지(payload) 작성
            .as(wsMessage -> session.send(wsMessage)); // client 에게 메세지(payload) 전송
    }
}
```

웹소켓을 사용하려면 먼저 웹소켓 설정 객체를 `Bean` 으로 등록해야 한다.\
위에서 정의한 핸들러를 `url` 에 매핑하고, Request 요청을 Upgrade 하는 어뎁터를 `Bean` 으로 등록한다.

```
@Configuration
public class WebSocketConfiguration {

    @Bean
    public HandlerMapping handlerMapping() {
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(Collections.singletonMap("/ws/echo", new EchoWebSocketHandler())); // 경로기반 매핑 설정
        mapping.setOrder(-1); // 우선순위, 생략시 사용하지 않는 것으로 설정됨 
        return mapping;
    }
    @Bean
    public HandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter(); 
        // WebSocket Handshake (upgrade request) 를 처리하는 HandlerAdapter 생성 
    }
}
```

> `WebSocketMessage` 는 `payload` 로 `DataBuffer` 를 구현하여 문자열, 바이트코드로 쉽게 형변환 가능
>
> ```
> public class WebSocketMessage {
> 	private final Type type;
> 	private final DataBuffer payload;
>     ...
> }
> ```

`WebSocketHandler` 를 구현하면 해당 url 에 해당하는 `WebSocketSession` 객체를 사용해 메세지를 받고 보낸다.\
웹소켓 테스트 툴을 사용해 `ws://127.0.0.1:8080/ws/echo` 로 접속, 메세지 전송시 `Echo: ...` 메세지 수신 확인

#### WebSocket Client <a href="#websocket-client" id="websocket-client"></a>

웹소켓 클라이언트의 경우 `WebSocketClient` 인터페이스를 구현한 `ReactorNettyWebSocketClient` 를 사용한다.

```
import org.springframework.web.reactive.socket.client.WebSocketClient;
...

@Bean
public CommandLineRunner commandLineRunner() {
    return (args) -> {
        ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();
        client.execute(URI.create("http://localhost:8080/ws/echo"),
            session -> Flux
                .interval(Duration.ofMillis(100))
                .map(String::valueOf)
                .map(session::textMessage)
                .as(session::send))
            .subscribe();
    };
}
```

정의해둔 `echo` 웹소켓 서버에 0.1 초마다 `interval` 데이터를 문자열로 전송

`client` 역시 `WebsocketHandler` 구현체를 `excute` 함수의 매개변수로 받으며 핸들러 매핑과 핸들러 어뎁터가 `Bean` 으로 등록되지 않을 뿐 웹소켓 서버 코드와 유사하다.

> 참고 코드: <https://github.com/Kouzie/spring-reactive/tree/master/spring-reactive-websocket-client\\>
> 안타깝게도 `WebFlux` 에서 제공하는 웹소켓 내용은 위의 기능이 전부이며 STOMP 를 사용한 메세지 매핑 등의 기능은 제공하지 않는다.

#### 다중 통신 Sinks.Many <a href="#sinksmany" id="sinksmany"></a>

> 코드 참고: <https://github.com/Kouzie/spring-reactive/tree/master/spring-reactive-websocket>

리액티브의 웹소켓은 WebMVC 에서 사용한 웹소켓과 다른점이 있는데 모든 요청에 대한 응답을 수행할 때 위와같이 람다식이나 함수를 미리 등록해두어야 하고\
두개 이상의 클라이언트들간 통신을 위해서 각 클라이언트의 `session` 을 찾아 데이터를 발행하는데 `Sinks.Many` 클래스를 사용한다.

웹소켓 설정 객체를 `Bean` 으로 등록하고 핸들러 매핑하는 것은 동일하다.

```
@Configuration
public class WebSocketConfig {

    @Bean
    public HandlerMapping handlerMapping(ChatSocketHandler chatSocketHandler) { // url, handler 매핑
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(Collections.singletonMap("/ws/chat", chatSocketHandler));
        mapping.setOrder(-1);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

@Slf4j
@Component
@RequiredArgsConstructor
public class ChatSocketHandler implements WebSocketHandler { // handler 정의 

    private final ObjectMapper mapper;

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        WebSocketMessageSubscriber subscriber = new WebSocketMessageSubscriber(session);
        return session.receive()
                .map(this::toDto)
                .doOnNext(subscriber::onNext) // 수신 콜백 함수 등록
                .doOnError(subscriber::onError) // 에러 콜백 함수 등록
                .doOnCancel(subscriber::onCancel) // 연결끊김 콜백 함수 등록  
                .zipWith(session.send(subscriber.getMany().asFlux().map(webSocketToClientDto -> // 메세지 발신 콜백 함수 등록
                        session.textMessage(webSocketToClientDto.getFrom() + ":" + webSocketToClientDto.getMessage()))))
                .then();
    }

    private WebSocketFromClientDto toDto(WebSocketMessage message) {
        try {
            WebSocketFromClientDto WsDto = mapper.readValue(message.getPayloadAsText(), WebSocketFromClientDto.class);
            return WsDto;
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }
}
```

`session.send` 에 등록되는 `Flux` 발행자는 `Sinks.Many` 로부터 생성되는데\
이는 아래에서 사용하는 코드와 같이 메세지를 집어넣을 수 있는 발행자다.\
Websocket 외에도 SSE (Server Send Events) 에서도 사용된다.

```
@Slf4j
class WebSocketMessageSubscriber {
    // 본인을 포함한 다른 클라이언트의 웹소켓 메세지 발행자 맵
    public static Map<String, Sinks.Many<WebSocketToClientDto>> userMap = new HashMap<>(); //sessionId, sink
    private final String id;
    @Getter
    private final Sinks.Many<WebSocketToClientDto> many; // 본인의 웹소켓 메세지 발행자

    public WebSocketMessageSubscriber(WebSocketSession session) {
        many = Sinks.many().unicast().onBackpressureBuffer();
        id = session.getId();
        many.tryEmitNext(WebSocketToClientDto.builder().from("system").message("welcome, " + id).build());
        userMap.put(id, many);
    }

    public void onNext(WebSocketFromClientDto msg) {
        log.info("onNext invoked, to:{}, msg:{}", msg.getTo(), msg.getMessage());
        Sinks.Many<WebSocketToClientDto> to = userMap.get(msg.getTo());
        if (to == null)
            many.tryEmitNext(WebSocketToClientDto.builder().from("system").message("no user:" + msg.getTo()).build());
        else
            to.tryEmitNext(WebSocketToClientDto.builder().from(id).message(msg.getMessage()).build());
    }

    public void onError(Throwable error) {
        //TODO log error
        log.error("onError invoked, error:{}, {}", error.getClass().getSimpleName(), error.getMessage());
        many.tryEmitNext(WebSocketToClientDto.builder()
                .from("system")
                .message(id + " on error, error:" + error.getMessage())
                .build());
    }

    public void onCancel() {
        log.info("onCancel invoked, id:{}", id);
        userMap.remove(id);
        for (Map.Entry<String, Sinks.Many<WebSocketToClientDto>> entry : userMap.entrySet()) {
            if (!entry.getKey().equals(id))
                entry.getValue().tryEmitNext(WebSocketToClientDto.builder()
                        .from("system")
                        .message(id + " is exit")
                        .build());
        }
    }
}

@Getter
@Setter
@Builder
class WebSocketToClientDto {
    private String from;
    private String message;
}

@Getter
@Setter
class WebSocketFromClientDto {
    private String to;
    private String message;
}
```

![springboot\_react4](https://kouzie.github.io/assets/springboot/springboot_react4.png)

### WebClient <a href="#webclient" id="webclient"></a>

논블록킹 `Http Client`로 기존 스프링 부트에서 대표적인 `Http Client` 로 `RestTemplate`(블록킹) 이 있다.\
내부에 `Flux, Mono` 리액터 객체를 지원하는 매핑이 내장되어 있어 리액티브 서버에 잘 어울린다.

`http://localhost:8080/api/user/{id}` url 을 지원하는 간단한 웹서버 생성

```
@Slf4j
public class TestWebServer {
    public static void main(String[] args) {
        HttpHandler httpHandler = RouterFunctions.toHttpHandler( // RouterFunction -> HttpHandler 변경
            nest(path("/api"), route(GET("/users/{id}"),
                request -> {
                    String id = request.pathVariable("id");
                    return ServerResponse.ok().syncBody("hello " + id + " user!"); // 반환데이터 동기적으로 생성
                }) // end route
            ) // end nest
        );
        ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler);
        DisposableServer server = HttpServer.create()
            .host("localhost").port(8080)
            .handle(reactorHttpHandler)
            .bindNow();
        server.onDispose().block();
    }
}
```

`WebClient` 를 사용해 위 `url` 에 `Http GET Request` 요청

```
public class TestWebClient {
    public static void main(String[] args) throws InterruptedException {
        WebClient.create("http://localhost:8080/api") // WebClient 객체 생성 + baseUrl 설정
            .get().uri("/users/{id}", 10) // method, uri 설정
            .retrieve() // 응답 내용 설정. ResponseSpec 반환
            .bodyToMono(String.class) // 응답 body 를 Mono 로 변환
            .subscribe(s -> System.out.println(s)); // Mono 에 대한 구독 설정
            // hello 10 user!
        Thread.sleep(1000); // main thread 종료 방지 
    }
}
```

위의 `WebClient` 는 `GET` 방식이라 `uri` 만 설정했지만 `API` 에 따라 `cookie, header, body` 모두 설정 가능하다.

HTTP 응답을 처리할 수 있는 메서드가 `retrieve()` 와 `exchage()` 가 있는데

`retrieve()` 는 `ResponseSpec` 을 반환하고 `exchage()` 는 `Mono<ClientResponse>` 를 반환한다.

만약 `exchange()` 를 사용할 경우 아래와 같이 코드작성

```
public static void main(String[] args) throws InterruptedException {
    WebClient.create("http://localhost:8080/api") // WebClient 객체 생성
        .get().uri("/users/{id}", 10) // method, uri 설정
        .exchange() // Mono<ClientResponse> 반환
        .flatMap(response -> response.bodyToMono(String.class)) // 응답 body 를 Mono 로 변환
        .subscribe(s -> System.out.println(s)); // Mono 에 대한 구독 설정
    Thread.sleep(1000);
}
```

`exchage` 를 사용하면 `ClientResponse` 에서 제공하는 `Http Response` 의 각종 정보를 조작할 수 있는 여러 메서드로 복잡한 반환 로직 구성이 가능하다.\
`retrieve` 의 경우 `Http status` 만 겨우 조작하여 `DSL` 형식으로 처리할 수 있다.

`WebClient` 는 인터페이스이고 `DefaultWebClient` 가 `WebClient` 의 유일한 구현체이다.\
실제 `DefaultWebClient` 내부에선

#### WebClient Serialize config <a href="#webclient-serialize-config" id="webclient-serialize-config"></a>

`WebClient` 에서 직렬화, 비직렬화를 수행할때 기존생성한 `ObjectMapper` 를 통해 처리할 수 잇다.

```
@Bean
public ObjectMapper objectMapper() {
    JavaTimeModule module = new JavaTimeModule();
    LocalDateTimeSerializer localDateTimeSerializer = new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));
    LocalDateTimeDeserializer localDateTimeDeserializer = new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));
    module.addSerializer(LocalDateTime.class, localDateTimeSerializer);
    module.addDeserializer(LocalDateTime.class, localDateTimeDeserializer);

    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.registerModule(module);
    objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    // UnrecognizedPropertyException 처리
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    // json -> clas 에서 unknown 속성이 있어도 처리
    objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); // null 이 아닌것만 변환
    objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); // snake case 로 변환
    return objectMapper;
}

@Bean
public WebClient webClient(ObjectMapper objectMapper) {
    ExchangeStrategies jacksonStrategy = ExchangeStrategies.builder()
            .codecs(config -> {
                config.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON));
                config.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON));
                config.defaultCodecs().maxInMemorySize(1024 * 1024 * 50);
            }).build();
    return WebClient.builder().exchangeStrategies(jacksonStrategy).build();
}
```

주의사항으로 `uri(uriBuilder->..)` 메서드를 사용해 `query parameter` 를 지정할 경우 문자열에 `/` 가 들어갈 `escape` 문자로 인식하기 때문에 base64 문자열로 변환할 수 없다, url encoding 을 진행하지 않는다. 또한 `WebClient` 생성시 `baseUrl` 을 설정하지 않으면 `uribuilder` 를 통해 `scheme, host, port, path` 빌드함수를 모두 호출해야 하기 때문에 번거롭다.

`WebClient` 를 `bean` 으로 생성해 `singleton` 방식으로 사용한다면 `StringBuilder` 를 통해 `uri` 를 직접생성하는 것을 권장.

```
StringBuilder urlBuilder = new StringBuilder(WEATHER_GET_ULTRA_SRT_NCST); /*URL*/
urlBuilder.append("?" + URLEncoder.encode("ServiceKey", "UTF-8") + "=" + URLEncoder.encode(dataGovApiKey, "UTF-8")); /*공공데이터포털에서 받은 인증키*/
urlBuilder.append("&" + URLEncoder.encode("pageNo", "UTF-8") + "=" + URLEncoder.encode("1", "UTF-8")); /*페이지번호*/
urlBuilder.append("&" + URLEncoder.encode("numOfRows", "UTF-8") + "=" + URLEncoder.encode("10", "UTF-8")); /*한 페이지 결과 수*/
urlBuilder.append("&" + URLEncoder.encode("dataType", "UTF-8") + "=" + URLEncoder.encode("JSON", "UTF-8")); /*요청자료형식(XML/JSON)Default: XML*/
urlBuilder.append("&" + URLEncoder.encode("base_date", "UTF-8") + "=" + URLEncoder.encode(LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE), "UTF-8")); /*15년 12월 1일발표*/
urlBuilder.append("&" + URLEncoder.encode("base_time", "UTF-8") + "=" + URLEncoder.encode(String.format("%02d00", curHour), "UTF-8")); /*05시 발표*/
urlBuilder.append("&" + URLEncoder.encode("nx", "UTF-8") + "=" + URLEncoder.encode(String.valueOf(nx), "UTF-8")); /*예보지점 X 좌표값*/
urlBuilder.append("&" + URLEncoder.encode("ny", "UTF-8") + "=" + URLEncoder.encode(String.valueOf(ny), "UTF-8")); /*예보지점의 Y 좌표값*/
URI uri = new URL(urlBuilder.toString()).toURI();
```

### SSE(Server-Sent Event) <a href="#sseserver-sent-event" id="sseserver-sent-event"></a>

> 참고: <https://www.youtube.com/watch?v=4HlNv1qpZFY\\&t=1283s\\>
> 평범한 `HTTP, Websocket, SSE 프로토콜`의 시퀀스 비교이다.\
> ![springboot\_websocket2](https://kouzie.github.io/assets/springboot/springboot_websocket2.png)

서버 단방향 통신이라 웹소켓이 비해 속도나 오버헤드 측면에서 SSE 가 효율적이지만 양방향이 안되는 이유로 웹소켓이 주로 사용된다.

`WebFlux` 를 사용하지 않고 `spring-boot-starter-web` 에서 SSE 프로토콜 사용이 가능하다.

```
@Component
@RequiredArgsConstructor
public class TemperatureSensor {
   // 스프링 제공 이벤트 발행 구독 지원 클래스
   private final ApplicationEventPublisher publisher;
   private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
   private final Random rnd = new Random();

   @PostConstruct
   public void startProcessing() {
      this.executor.schedule(this::probe, 1, SECONDS); // 데이터 발행 시작 
   }

   private void probe() {
      double temperature = 16 + rnd.nextGaussian() * 10;
      publisher.publishEvent(new Temperature(temperature)); // 이벤트 데이터 발행 
      executor.schedule(this::probe, rnd.nextInt(5000), MILLISECONDS); // 5초후 재발행
   }
}
```

`ScheduledExecutorService` 와 `ApplicationEventPublisher` 클래스를 사용해 `Temperature` 데이터를 계속 발행한다.\
`@EventListener` 를 사용해 `ApplicationEventPublisher` 에 발행된 데이터를 받을 수 있다.

```
@Slf4j
@RestController
public class TemperatureController {
    static final long SSE_SESSION_TIMEOUT = 5 * 1000L;
    // 연결 클라이언트 관리 list
    private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>();

    // TemperatureSensor 의 ApplicationEventPublisher 에서 발행되는 데이터 대응
    @Async
    @EventListener
    public void handleMessage(Temperature temperature) {
        log.info(format("Temperature: %4.2f C, active subscribers: %d", temperature.getValue(), clients.size()));
        // 관리되는 클라이언트에게 발행된 Temperature 데이터 전달 및 예외발생 클라이언트 삭제처리
        List<SseEmitter> deadEmitters = new ArrayList<>();
        clients.forEach(emitter -> {
            try {
                Instant start = Instant.now();
                emitter.send(temperature, MediaType.APPLICATION_JSON);
                log.info("Sent to client, took: {}", Duration.between(start, Instant.now()));
            } catch (Exception ignore) {
                deadEmitters.add(emitter);
            }
        });
        clients.removeAll(deadEmitters);
    }

    @RequestMapping(value = "/temperature-stream", method = RequestMethod.GET)
    public SseEmitter events(HttpServletRequest request) {
        log.info("SSE stream opened for client: " + request.getRemoteAddr());
        SseEmitter emitter = new SseEmitter(SSE_SESSION_TIMEOUT); // 5 초간 연결
        clients.add(emitter); // 관리 emitter 목록에 추가

        // Remove SseEmitter from active clients on error or client disconnect
        emitter.onTimeout(() -> clients.remove(emitter));
        emitter.onCompletion(() -> clients.remove(emitter));

        return emitter;
    }
    // 위에 설정된 5초가 지나면 AsyncRequestTimeoutException 이 발생하고 호출됨.  
    @ExceptionHandler(value = AsyncRequestTimeoutException.class)
    public ModelAndView handleTimeout(HttpServletResponse rsp) throws IOException {
        log.warn("handle timeout");
        if (!rsp.isCommitted()) {
            rsp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
        }
        return new ModelAndView();
    }
}
```

`SseEmitter` 를 지속적으로 유지, 관리한다. 반환 타입이 `ResponseEntity, Map` 과 같은 객체가 아닌 `SseEmitter` 인 것이 어색하다.

`WebFlux` 에선 `Flux` 와 `Spring 5.0` 에 추가된 `ServerSentEvent` 를 사용해 SSE 프로토콜을 지원한다.

```
@RestController
@RequiredArgsConstructor
public class ServerSentController {

    private final StocksService stocksService;

    @GetMapping("/sse/stocks")
    public Flux<ServerSentEvent<?>> streamStocks() {
        return stocksService.stream() // Flux<StockItem> 반환
                .map(item -> ServerSentEvent
                        .builder(item)
                        .event("StockItem")
                        .id(item.getId())
                        .build());
    }

    @GetMapping("/sse/stocks2")
    public Flux<StockItem> streamStocks2() {
        return stocksService.stream();
    }
}
```

`WebFlux` 내부에서 발행원소를 `ServerSentEvent` 로 래핑 하기에 단순 `Flux<>` 만 컨트롤러 메서드에서 반환해도 된다.

## 스프링 시큐리티 with WebFlux <a href="#with-webflux" id="with-webflux"></a>

기존 서블릿 기반 스프링 부트는 하나의 스레드에 하나의 연결이 처리되어\
`ThreadLocal` 에 `SecurityContext` 를 저장해 연결동안 보안처리를 진행했지만

리액티브는 하나의 연결에 여러개의 스레드가 꼬여있을 수 있어 `Reactor Context` 를 사용해야 한다.\
`spring-boot-starter-security` 모듈 역시 기존 서블릿 기반 `WebMVC` 에서 `WebFlux` 를 지원할 수 있도록 업데이트 되었다.

### ReactiveSecurityContextHolder <a href="#reactivesecuritycontextholder" id="reactivesecuritycontextholder"></a>

`WebMVC` 에선 `SecurityContextHolder` 에서 `SecurityContext` 를 가져왔다면\
`WebFlux` 에선 `ReactiveSecurityContextHolder` 에서 `SecurityContext` 를 가져온다.

```
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
public class SecuredProfileController {

    private final ProfileService profileService;

    @GetMapping("/profiles")
    public Mono<Profile> getProfile() {
        return ReactiveSecurityContextHolder.getContext() // Mono<SecurityContext> 반환
            .map(SecurityContext::getAuthentication)
            .flatMap(auth -> profileService.getByUser(auth.getName()));
    }
}
```

로그인한 유저의 정보를 `SecurityContext` 가져와 `Profile` 에서 검색 후 출력한다.

당연히 `SecurityContext::getAuthentication` 메서드는 `리액터 컨텍스트` 를 사용한다.

```
// ReactiveSecurityContextHolder.java

public static Mono<SecurityContext> getContext() {
    return Mono.subscriberContext()
        .filter( c -> c.hasKey(SECURITY_CONTEXT_KEY))
        .flatMap( c-> c.<Mono<SecurityContext>>get(SECURITY_CONTEXT_KEY));
}
```

### SecurityWebFilterChain <a href="#securitywebfilterchain" id="securitywebfilterchain"></a>

인증과정을 거치려면 내부 컨텍스트에 엑세스 하려고 하면 `SecurityContext` 가 해당 `리액터 컨텍스트` 에 존재해야하고 `Authentication` 객체가 `SecurityContext` 안에 할당되어야 한다.

이 모든 과정을 `ReactorContextWebFilter` 에 `SecurityWebFilterChain` 적용하고 이를 통해 `리액터 컨텍스트` 안에 `SecurityContext` 객체와 `Authentication` 객체를 집어넣는다.

`SecurityContext` 를 집어 넣는 함수는 `ServerSecurityContextRepository` 를 사용한다.

```
package org.springframework.security.web.server.context;

public interface ServerSecurityContextRepository {
	Mono<Void> save(ServerWebExchange exchange, SecurityContext context);
	Mono<SecurityContext> load(ServerWebExchange exchange);
}
```

`SecurityContext` 를 특정 `ServerWebExchange`에 저장, 사용할 수 있다.

```
@Configuration
@EnableReactiveMethodSecurity // 별도의 MethodInterceptor 사용시 필요함
public class SecurityConfiguration {

    @Bean // ReactorContextWebFilter 에 적용할 시큐리티 필터
    public SecurityWebFilterChain securityFilterChainConfigurer(ServerHttpSecurity httpSecurity) {
        // 기존 mvc 에서 사용하던 HttpSecurity 에서 webflux 용으로 정의된 ServerHttpSecurity
        // 기존 spring security 사용 방식과 크게 다르지 않다.  
        return httpSecurity
            .authorizeExchange()
            .anyExchange().permitAll().and()
            .httpBasic().and()
            .formLogin().and()
            .build();
    }

    private static final Pattern PASSWORD_ALGORITHM_PATTERN = Pattern.compile("^\\{.+}.*$");
    private static final String NOOP_PASSWORD_PREFIX = "{noop}";

    @Bean // 로그인에 사용할 테스트 사용자 생성
    public MapReactiveUserDetailsService reactiveUserDetailsService(ObjectProvider<PasswordEncoder> passwordEncoder) {
        return new MapReactiveUserDetailsService(
            User.withUsername("user")
                .password("user")
                .passwordEncoder(p -> getOrDeducePassword(p, passwordEncoder.getIfAvailable()))
                .roles("USER")
                .build(),
            User.withUsername("admin")
                .password("admin")
                .passwordEncoder(p -> getOrDeducePassword(p, passwordEncoder.getIfAvailable()))
                .roles("USER", "ADMIN")
                .build());
    }

    private String getOrDeducePassword(String password, PasswordEncoder encoder) {
        if (encoder != null || PASSWORD_ALGORITHM_PATTERN.matcher(password).matches()) {
            return password;
        }
        return NOOP_PASSWORD_PREFIX + password;
    }
}
```

### WebFlux with JWT <a href="#webflux-with-jwt" id="webflux-with-jwt"></a>

#### Custom SecurityContextRepository, AuthenticationManager <a href="#custom-securitycontextrepository-authenticationmanager" id="custom-securitycontextrepository-authenticationmanager"></a>

스프링 시큐리티는 `SecurityContextRepository` 를 통해 `SecurityContext` 를 `리액터 컨텍스트`에 저장하고 삭제한다.

default 스프링 시큐리티의 경우 DB 로부터 UserDetails 를 가져와 등록해두고 사용하겠지만 우리는 JWT 를 사용하기에 `ServerSecurityContextRepository` 상속받아 커스터마이징 해야한다.

```
@Component
@RequiredArgsConstructor
public class SecurityContextRepository implements ServerSecurityContextRepository {

    private final AuthenticationManager authenticationManager;

    @Override
    public Mono<Void> save(ServerWebExchange swe, SecurityContext sc) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    public Mono<SecurityContext> load(ServerWebExchange swe) {
        ServerHttpRequest request = swe.getRequest();
        String authToken = request.getHeaders().getFirst(HttpHeaders.AUTHORIZATION);
        if (authToken != null) {
            Authentication auth = new UsernamePasswordAuthenticationToken(authToken, authToken);
            return authenticationManager
                    .authenticate(auth)
                    .map(authentication -> new SecurityContextImpl(authentication));
        } else {
            return Mono.empty();
        }
    }
}
```

`SecurityContext` 를 생성 및 저장하기 request 헤더로 부터 JWT 토큰을 가져와 `AuthenticationManager`로 넘기는 코드가 `load` 에 정의되어 있다.

```
@Component
@RequiredArgsConstructor
public class AuthenticationManager implements ReactiveAuthenticationManager {

    private final JwtTokenUtil jwtUtil;

    @Override
    public Mono<Authentication> authenticate(Authentication authentication) {
        String authToken = authentication.getCredentials().toString();
        if (!jwtUtil.validateToken(authToken)) {
            return Mono.empty();
        }
        Claims claims = jwtUtil.getAllClaimsFromToken(authToken);
        String role = claims.get("role", String.class);
        List<GrantedAuthority> authorities = Collections.singletonList(new SimpleGrantedAuthority(role));
        return Mono.just(new UsernamePasswordAuthenticationToken(claims.getSubject(), null, authorities));
    }
}
```

`AuthenticationManager` 는 전달받은 토큰으로 `role` 을 꺼내어 `Authority` 를 지정하고 `Authentication` 인증 객체를 반환한다.

`SecurityContextPath` 가 리액터 컨텍스트로 변경되었을 뿐 기존 `WebMVC` 모델의 스프링 시큐리티 방식과 비슷하다.

```
@Bean // ReactorContextWebFilter 에 적용할 시큐리티 필터
public SecurityWebFilterChain securityFilterChainConfigurer(ServerHttpSecurity httpSecurity) {
    // 기존 mvc 에서 사용하던 HttpSecurity 에서 webflux 용으로 정의된 ServerHttpSecurity
    // 기존 spring security 사용 방식과 크게 다르지 않다.
    return httpSecurity
        .exceptionHandling()
        //.authenticationEntryPoint((swe, e) -> Mono.fromRunnable(() -> // 미 로그인
        //        swe.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED)))
        //.accessDeniedHandler((swe, e) -> Mono.fromRunnable(() -> // 미 로그인
        //        swe.getResponse().setStatusCode(HttpStatus.FORBIDDEN)))
        .authenticationEntryPoint((swd, e) -> Mono.error(new AuthenticationCredentialsNotFoundException("")))
        .accessDeniedHandler((swe, e) -> Mono.error(new AccessDeniedException("")))
        .and()
        .authenticationManager(authenticationManager)
        .securityContextRepository(securityContextRepository)
        .csrf().disable()
        .cors().disable()
        .httpBasic().disable() // Basic Authentication disable
        .formLogin().disable()
        .authorizeExchange()
        .pathMatchers("/member/join", "/member/login").permitAll()
        .pathMatchers("/member/**", "/rent/**").authenticated()
        .pathMatchers("/admin/**").hasAnyRole("ROLE_ADMIN")
        .anyExchange().permitAll()
        .and()
        .build();
}
```

마지막으로 `ServerHttpSecurity` 에 커스터마이징한 `SecurityContextRepository, AuthenticationManager` 를 등록하고 별도의 에러 핸들링 처리를 한다면 에러를 반환해 핸들링,\
없다면 `HttpStatus.UNAUTHORIZED` 로 단순 `HTTP Status` 만 반환할 수 있다.

## 스프링 데이터 with WebFlux <a href="#with-webflux" id="with-webflux"></a>

기존에 `WebMVC` 방식의 데이터베이스 접근시 `JDBC` 를 구현한 `드라이버 라이브러리` 를 사용해 접근해왔다.

리액티브한 DB 통신도 HTTP 와 다르지 않다.\
이론적으로 DB 접근용 서비스를 생성하고 `WebClient` 를 사용해 DB 데이터를 가져온다면 비동기 DB 접근 라이브러리를 구현한 것 과 다름없다.

다행이도 다양한 DB 벤더사에서 자바 비동기 DB 연결 라이브러리인 `리액티브 드라이버`를 제공함으로 단순히 라이브러리만 추가하면 **데이터베이스 레이어**에 대한 논 블록킹 엑세스를 할 수 있다.

`spring-boot-starter-data-mongodb-reactive`\
`spring-boot-starter-data-cassandra-reactive`\
`spring-boot-starter-data-redis-reactive`\
`spring-boot-starter-data-r2dbc`

스프링 데이터 팀에서 기존에 사용한던 `Repository` 패턴을 리액티브 방식에도 똑같이 사용할 수 있도록 추상화를 통해 구현해두었다.

각 모듈들이 `ReactiveCurdRepository` 인터페이스를 사용해 `Reactor` 라이브러리와 통합되어 자연스럽게 리액티브하게 코드작성이 가능하다.

### 스프링 데이터 몽고DB 리액티브 <a href="#db" id="db"></a>

`NoSQL` 의 경우 각 벤더사에서 통합된 규약이 없다.\
각 벤더사에서 자기들만의 드라이버 라이브러리를 제공하고 스프링 데이터 팀은 스프링에서 해당 라이브러리들을 쉽게 사용할 수 있도록 각종 모듈을 개발하고 있다

`NoSQL DB` 는 최근에 만들어 져서 대부분 벤더사가 `리액티브 드라이버` 를 제공하고 있으며\
스프링 데이터 팀은 몽고DB 에서 제공하는 `리액티브 드라이버` 를 쉽고 편하게 사용할 수 있도록 `spring-boot-starter-data-mongodb-reactive` 모듈을 작성해두었다.

해당 모듈을 사용하면 스프링 팀에서 만든 `Repository` 패턴을 사용해 메서드명 기반으로 쿼리문이 자동 생성/사용 할 수 있다.

#### ReactiveMongoRepository <a href="#reactivemongorepository" id="reactivemongorepository"></a>

```
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

// ReactiveCurdRepository 구현체
@Repository
public interface BookReactiveMongoRepository extends ReactiveMongoRepository<Book, ObjectId> {
    Mono<Book> findOneByTitle(Mono<String> title);

    Flux<Book> findManyByTitleRegex(String regexp);

    @Meta(maxScanDocuments = 3)
    Flux<Book> findByAuthorsOrderByPublishingYearDesc(Publisher<String> authors);

    @Query("{ 'authors.1': { $exists: true } }")
    Flux<Book> booksWithFewAuthors();

    Flux<Book> findByPublishingYearBetweenOrderByPublishingYear(
            Integer from,
            Integer to,
            Pageable pageable
    );
}
```

#### ReactiveMongoTemplate <a href="#reactivemongotemplate" id="reactivemongotemplate"></a>

`ReactiveMongoRepository` 외에도 `ReactiveMongoTemplate` 를 사용해 쿼리 조작이 가능하다.

```
@Service
@RequiredArgsConstructor
public class RxMongoTemplateQueryService {
    private static final String BOOK_COLLECTION = "book";

    private final ReactiveMongoTemplate mongoTemplate; // ReactiveMongoTemplate implements ReactiveMongoOperations

    public Flux<Book> findBooksByTitle(String title) {
        Query query = Query.query(new Criteria("title")
            .regex(".*" + title + ".*"))
            .limit(100);
        return mongoTemplate.find(query, Book.class, BOOK_COLLECTION);
    }
}
```

#### MongoClient <a href="#mongoclient" id="mongoclient"></a>

몽고DB 에서 제공하는 리액티브 드라이버 구현체가 `com.mongodb.reactivestreams.client.MongoClient` 클래스이다.

> <https://mongodb.github.io/mongo-java-driver-reactivestreams/>

`org.mongodb:mongodb-driver-reactivestreams` 모듈에서 제공하며 `spring-boot-starter-data-mongodb-reactive` 에서 내부적으로 사용한다.

`MongoClient` 클래스를 사용해도 쿼리조작이 가능하다.

```
@Service
@RequiredArgsConstructor
public class RxMongoDriverQueryService {

    private final MongoClient mongoClient;
    
    public Flux<Book> findBooksByTitle(String title, boolean negate) {
        return Flux.defer(() -> {
            Bson query = Filters.regex("title", ".*" + title + ".*");
            if (negate) query = Filters.not(query);
            return mongoClient
                .getDatabase("test-db")
                .getCollection("book")
                .find(query);
        }).map(doc -> new Book(
            doc.getObjectId("id"), // Document
            doc.getString("title"),
            doc.getInteger("pubYear")));
    }
}
```

#### 트랜잭션(ReactiveMongoTemplate.inTransaction) <a href="#reactivemongotemplateintransaction" id="reactivemongotemplateintransaction"></a>

`MongoDB 4.0` 버전 이전까지 하나의 문서 에 대해서만 트랜잭션을 제공하는 `Single-Document Transaction` 기능만 있었다.\
하나의 문서에 모든 정보를 삽입하여 사용하기에 하나의 트랜잭션은 하나의 문서만 건들여서 `Single-Document Transaction` 으로도 충분해야 하지만 항상 예외가 있는법,\
결국 여러 문서에 대한 트랜잭션 `Multi-Document Transaction` 기능을 `MongoDB 4.0` 부터 지원한다.

> WiredTiger 스토리지 엔진의 샤딩설정이 되어 있지 않고 복제설정일 경우에만 `Multi-Document Transaction` 을 지원한다.

`ReactiveMongoTemplate` 의 `inTransaction` 메서드를 사용하면

```
private Mono<TxResult> doTransferMoney(String from, String to, Integer amount) {
    return mongoTemplate.inTransaction().execute(session -> session
        .findOne(queryForOwner(from), Wallet.class)
        .flatMap(fromWallet -> session
            .findOne(queryForOwner(to), Wallet.class)
            .flatMap(toWallet -> {
                if (fromWallet.hasEnoughFunds(amount)) {
                    fromWallet.withdraw(amount);
                    toWallet.deposit(amount);

                    return session.save(fromWallet)
                        .then(session.save(toWallet))
                        .then(ReactiveMongoContext.getSession())
                        // An example how to resolve the current session
                        .doOnNext(tx -> log.info("Current session: {}", tx))
                        .then(Mono.just(TxResult.SUCCESS));
                } else {
                    return Mono.just(TxResult.NOT_ENOUGH_FUNDS);
                }
            })))
        .onErrorResume(e -> Mono.error(new RuntimeException("Conflict")))
        .last();
}
```

### 스프링 데이터 R2DBC <a href="#r2dbc" id="r2dbc"></a>

> R2DBC: Reactive Relational Database Connectivity

> <https://r2dbc.io/> <https://spring.io/projects/spring-data-r2dbc> <https://spring.io/projects/spring-data-r2dbc>

아래와 같은 DBMS 에 대하여 r2dbc 라이브러리를 제공

```
H2 (io.r2dbc:r2dbc-h2)
MariaDB (org.mariadb:r2dbc-mariadb)
Microsoft SQL Server (io.r2dbc:r2dbc-mssql)
MySQL (dev.miku:r2dbc-mysql)
jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)
Postgres (io.r2dbc:r2dbc-postgresql)
Oracle (com.oracle.database.r2dbc:oracle-r2dbc)
```

지금까지 `스프링 JDBC` 혹은 `스프링 데이터 JDBC` 혹은 `JPA` 를 사용해 생성된 `Hikari CP` 안의 연결객체가 `JDBC` 드라이버를 사용해 관계형 DB 를 사용해 왔다.

```
@Repository
public interface BookSpringDataJdbcRepository extends CrudRepository<Book, Integer> {
    
    @Query("SELECT * FROM book b WHERE b.title = :title")
    CompletableFuture<Book> findBookByTitleAsync(@Param("title") String title);

}
```

`JDBC, JPA` 등의 `관계형 DB 라이브러리`들은 모두 `동기/블럭킹` 방식으로 동작한다.

다행이도 `spring data jdbc` 를 개발한 `스프링 데이터 Relational` 프로젝트 팀에서\
**리액티브에 적합**한 `자바 DB 드라이버`인 `리액티브 드라이버`를 개발중이다.\
이 `리액티브 드라이버` 를 사용한 프로젝트가 `R2DBC` 프로젝트이다.

더이상 `JDBC` 를 사용하지 않고 리액티브 스택에 적합한 `리액티브 드라이버`를 사용해 DB 에 접근, 데이터를 조작한다.

> 안타깝게도 `JPA` 는 기존 코드가 너무 복잡했는지 리액티브 지원을 하지 않을것으로 보인다.

#### ReactiveCrudRepository <a href="#reactivecrudrepository" id="reactivecrudrepository"></a>

```
@Repository
public interface MemberRepository extends ReactiveCrudRepository<Member, Long> {
    Mono<Member> findByName(String name);

    Mono<Member> findByUserName(String name);

    @Query("SELECT * FROM member WHERE name = :name AND user_name = :userName")
    Mono<Member> findByNameAndUserName(String name, String userName);
}
```

#### R2dbcEntityTemplate <a href="#r2dbcentitytemplate" id="r2dbcentitytemplate"></a>

```
@Service
@RequiredArgsConstructor
public class MemberDynamicRepository {
    private final R2dbcEntityTemplate r2dbcEntityTemplate;

    public Flux<Member> findTest(String userName) {
        Query query = Query.query(where("user_name").like("%" + userName + "%"))
                .limit(10)
                .offset(0);
        return r2dbcEntityTemplate.select(Member.class)
                .matching(query)
                .all();
    }
}
```

### 스프링 데이터 Redis <a href="#redis" id="redis"></a>

`spring-boot-starter-data-redis-reactive` 모듈을 사용 `ReactiveRedisTemplate` 클래스가 `Redis` 커넥션의 핵심클래스이다.

> 다른 스프링 데이터 프로젝트와 달리 `Repository` 가 존재하지 않음\
> 일반적인 데이터 관리 외에도 구독/발행 구조의 메시지 기능도 지원한다.

> `spring-boot-starter-data-redis-reactive` 모듈은 내부적으로 `Lettuce` 라이브러리를 사용한다.

> <https://lettuce.io/>, 현재 non blokcing 을 지원하는 redis 라이브러리가 Lettuce 가 유일하다.\
> 또한 `Lettuce` 라이브러리 내에서 `Reactor` 라이브러리를 사용한다.

```
public class Sample {
    private String name;
    private String description;
}


@Configuration
public class RedisConfig {
    @Value("${redis.host}")
    private String host;
    @Value("${redis.port}")
    private Integer port;

    @Bean
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
        return new LettuceConnectionFactory(host, port);
    }
    
    @Bean
    public ReactiveRedisTemplate<String, Sample> reactiveRedisTemplate(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        StringRedisSerializer keySerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<Sample> valueSerializer = new Jackson2JsonRedisSerializer<>(Sample.class);

        RedisSerializationContext.RedisSerializationContextBuilder<String, Sample> builder =
                RedisSerializationContext.newSerializationContext(keySerializer);

        RedisSerializationContext<String, Sample> context = builder.value(valueSerializer).build();

        return new ReactiveRedisTemplate(reactiveRedisConnectionFactory, context);
    }
}
```

```
@Service
@RequiredArgsConstructor
public class SampleService {
    private final ReactiveRedisTemplate<String, Sample> redisTemplate;

    public Mono<Boolean> put(String key, Sample sample) {
        return redisTemplate.opsForValue().set(key, sample);
    }

    public Mono<Sample> get(String key) {
        return redisTemplate.opsForValue().get(key);
    }

    public Flux<Sample> getAll(String keyPattern){
        return redisTemplate.keys(keyPattern)
                .flatMap(key-> redisTemplate.opsForValue().get(key));
    }

    public Mono<Boolean> delete(String key) {
        return redisTemplate.opsForValue().delete(key);
    }
}
```

## 기타 <a href="#undefined" id="undefined"></a>

### ListenableFuture <a href="#listenablefuture" id="listenablefuture"></a>

스프링에서 제공하는 `Future` 구현 클래스

```
public interface ListenableFuture<T> extends Future<T> {
	void addCallback(ListenableFutureCallback<? super T> callback);
	void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
	default CompletableFuture<T> completable() {
		CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
		addCallback(completable::complete, completable::completeExceptionally);
		return completable;
	}
}
```

위의 `SseEmitter` 와 마찬가지로 스프링 리액티브의 반환데이터로 사용된다.

```
// 비동기 RestTemplate
AsyncRestTemplate httpClient = new AsyncRestTemplate();

@GetMapping
public ListenableFuture<?> requestData() {
    AsyncDatabaseClient databaseClient = new FakeAsyncDatabaseClient();
    // /hello 의 호출결과를 CompletableFuture 으로 반환하는 어뎁터
    CompletionStage<String> completionStage = AsyncAdapters.toCompletion(httpClient.execute(
        "http://localhost:8080/hello",
        HttpMethod.GET, null,
        new HttpMessageConverterExtractor<>(String.class, messageConverters) // http 의 body 부분 컨버터들 지정
    ));
    // CompletionStage(CompletableFuture 의 인터페이스) 를 ListenableFuture 로 변환
    return AsyncAdapters.toListenable(databaseClient.store(completionStage));
}
```

`AsyncRestTemplate.execute` 가 반환하는 `ListenableFuture` 를 `CompletionStage` 로 변환 반환된 `CompletionStage` 를 데이터베이스에 저장후 다시 반환된 `CompletionStage` 를 `ListenableFuture` 로 변환한다.

일반적으로 간단한 비동기 처리는 `Future` 를 구현한 `CompletableFuture`(`CompletionStage` 구현체) 를 주로 사용한다.

메서드 정의시 반환값을 스프링에서 기본 제공하는 `ListenableFuture` 를 구현하는 객체로 반환하기 힘들기에\
위와같은 `AsyncAdapters` 를 사용해 비동기 결과를 `ListenableFuture` 로 변환해주는 어뎁터를 사용하는 것이 편하다.

```
public final class AsyncAdapters {
    // ListenableFuture -> completionStage
    public static <T> CompletionStage<T> toCompletion(ListenableFuture<T> future) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        future.addCallback(completableFuture::complete, completableFuture::completeExceptionally);
        return completableFuture;
    }
    // completionStage -> ListenableFuture
    public static <T> ListenableFuture<T> toListenable(CompletionStage<T> stage) {
        SettableListenableFuture<T> future = new SettableListenableFuture<>();
        stage.whenComplete((v, t) -> {
            if (t == null) future.set(v);
            else future.setException(t);
        });
        return future;
    }
}
```

### AsyncRestTemplate <a href="#asyncresttemplate" id="asyncresttemplate"></a>

스프링 리액티브에선 동기방식인 일반 `RestTemplate` 을 사용하지 않고 `AsyncRestTemplate` 를 사용한다.\
흔히사용하는 `execute` 메서드의 반환값이 `ListenableFuture` 객체이다.

```
@Override
public <T> ListenableFuture<T> execute(String url, HttpMethod method, AsyncRequestCallback requestCallback,
        ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {

    URI expanded = getUriTemplateHandler().expand(url, uriVariables);
    return doExecute(expanded, method, requestCallback, responseExtractor);
}
```

출처 : <https://kouzie.github.io/spring/Spring-Boot-%EC%8A%A4%ED%94%84%EB%A7%81-%EB%A6%AC%EC%95%A1%ED%8B%B0%EB%B8%8C-%EA%B0%9C%EC%9A%94/#webflux>&#x20;


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://taejun.gitbook.io/tech/2-srping/webflux.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
