WebFlux

์Šคํ”„๋ง ์ฝ”์–ด for Reactive

์Šคํ”„๋ง 5.0 ์—์„œ๋ถ€ํ„ฐ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ฝ”๋“œ๋ฅผ ์œ„ํ•œ ์—ฌ๋Ÿฌ๊ฐ€์ง€ ํด๋ž˜์Šค๋“ค์ด ์ˆ˜์ •, ์ถ”๊ฐ€๋˜์—ˆ๋‹ค.

ReactiveAdapter, ReactiveAdapterRegistry

RxJava, Reactor ์—์„œ ์‚ฌ์šฉํ•˜๋Š” ๋ฐœํ–‰์ž ํด๋ž˜์Šค๋ฅผ Publihser ๋กœ ๋ณ€ํ™˜ํ•ด์ฃผ๋Š” Adapter ๊ฐ€ springframework.core ์— ์ถ”๊ฐ€๋˜์–ด ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•ด์กŒ๋‹ค.

์•„๋ž˜์ฒ˜๋Ÿผ ReactiveAdapter ๋ฅผ ์ƒ์†๋ฐ›์•„ RxJava Maybe ์™€ Publisher ๊ฐ„์˜ ๋ณ€ํ™˜ ์ž‘์—…์„ ํ•ด์ฃผ๋Š” Adapter ๋ฅผ ์ž‘์„ฑํ•ด์„œ ์‚ฌ์šฉํ•˜๊ฑฐ๋‚˜

@Component
public class MaybeReactiveAdapter extends ReactiveAdapter {

    public MaybeReactiveAdapter() {
        /**
         * Descriptor for a reactive type that can produce 0..1 values.
         * @param type the reactive type
         * @param emptySupplier a supplier of an empty-value instance of the reactive type
         */
        super(ReactiveTypeDescriptor.singleOptionalValue(Maybe.class, Maybe::empty),
            maybe -> ((Maybe<?>) maybe).toFlowable(), // Maybe->Publisher
            publisher -> Flowable.fromPublisher(publisher).singleElement()); // Publisher->Maybe
    }
}

ReactiveAdapterRegistry ๋ฅผ ์‚ฌ์šฉํ•ด ์‹ฑ๊ธ€ํ„ด Instance ๋ณ€์ˆ˜์— Adapter ์šฉ ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•ด ํ•„์š”ํ• ๋•Œ ๋งˆ๋‹ค ๊บผ๋‚ด์–ด ์“ธ ์ˆ˜ ์žˆ๋‹ค.

@PostConstruct
public void init() {
    ReactiveAdapterRegistry
        .getSharedInstance()
        .registerReactiveType(ReactiveTypeDescriptor.singleOptionalValue(Maybe.class, Maybe::empty),
            maybe -> ((Maybe<?>) maybe).toFlowable(), // Maybe->Publisher
            publisher -> Flowable.fromPublisher(publisher).singleElement()); // Publisher->Maybe
}

...

ReactiveAdapter adapter = ReactiveAdapterRegistry
    .getSharedInstance()
    .getAdapter(Maybe.class);
...

๋ฆฌ์•กํ‹ฐ๋ธŒ I/O, ์ฝ”๋ฑ

springframework.core.io ์— ์ €์žฅ๋œ DataBuffer, DataBufferUtils ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด I/O ์ž‘์—…์ด ํ•„์š”ํ•œ ํŒŒ์ผ, ๋„คํŠธ์›Œํฌ ์ž์›์œผ๋กœ ๋ถ€ํ„ฐ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ ํ˜•ํƒœ๋กœ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค. jav.nio.ByteBuffer ํด๋ž˜์Šค์˜ ํ˜•๋ณ€ํ™˜ ๊ธฐ๋Šฅ์„ ์ถ”๊ฐ€ํ•˜์—ฌ ๋ณด๋‹ค ์‰ฝ๊ฒŒ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋‹ค.

Flux<DataBuffer> reactiveHamlet = DataBufferUtils.read(
    new DefaultResourceLoader().getResource("hamlet.txt"),
    new DefaultDataBufferFactory(),
    1024);

springframework.core.codec ์— ์ •์˜๋œ ์ธํ„ฐํŽ˜์ด์Šค Encoder, Decoder ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด Non Blocking ๋ฐฉ์‹์œผ๋กœ ์ง๋ ฌํ™” ๋ฐ์ดํ„ฐ๋ฅผ ์ž๋ฐ”๊ฐ์ฒด, ์ž๋ฐ”๊ฐ์ฒด๋ฅผ ์ง๋ ฌํ™” ๋ฐ์ดํ„ฐ๋กœ ๋ณ€ํ™˜ ๊ฐ€๋Šฅํ•˜๋‹ค.

WebFlux

Sprinb Boot 2 ์— ๋ฆฌ์•กํ‹ฐ๋ธŒ ์›น์„œ๋ฒ„๋ฅผ ์œ„ํ•œ WebFlux ๋ชจ๋ธ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก spring-boot-starter-webflux ๋ผ๋Š” ์ƒˆ๋กœ์šด ํŒจํ‚ค์ง€๋ฅผ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋˜์—ˆ๋‹ค.

ํ•ด๋‹น ๋ชจ๋“ˆ์€ Reactive Stream Adapter ์œ„์— ๊ตฌ์ถ•๋œ๋ฉฐ Servlet 3.1+ ์ง€์›์„œ๋ฒ„(Tomcat, Jetty ๋“ฑ), Netty, Undertow ์„œ๋ฒ„์—”์ง„์—์„œ ๋ชจ๋‘ ์ง€์›ํ•œ๋‹ค.

์œ„์˜ ์—”์ง„๋“ค์€ java 8 ์— ์ถ”๊ฐ€๋œ java NIO ๋กœ ๊ตฌํ˜„๋˜์–ด Http ์š”์ฒญ์„ ๋…ผ๋ธ”๋Ÿญํ‚น์œผ๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค.

์ผ๋ฐ˜์ ์€ WebMVC ๋ชจ๋“ˆ๋„ Spring 5.0 ์— ์ด๋ฅด๋Ÿฌ spring-boot-starter-web Servlet 3.1 ์„์ง€์›ํ•˜๋ฉด์„œ ์ผ๋ถ€๋ถ„์€ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์„ ์ง€์›ํ•˜๊ฒŒ ๋˜์—ˆ๋‹ค.

ResponseBodyEmitterReturnValueHandler ํด๋ž˜์Šค๊ฐ€ ์—…๊ทธ๋ ˆ์ด๋“œ ๋˜๋ฉด์„œ ReactiveTypeHandler ํ•„๋“œ๋ฅผ ์‚ฌ์šฉํ•ด WebMVC ์˜ ์ธํ”„๋ผ ๊ตฌ์กฐ๋ฅผ ํฌ๊ฒŒ ํ•ด์น˜์ง€ ์•Š๊ณ  ์ปจํŠธ๋กค๋Ÿฌ ๋ฉ”์„œ๋“œ๊ฐ€ ๋ฐ˜ํ™˜ํ•˜๋Š” Flux, Mono, Flowable ๋“ฑ์˜ Publisher(๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ)์„ ์ฒ˜๋ฆฌํ•œ๋‹ค.

๋ฌผ๋ก  ์„œ๋ธ”๋ฆฟ API ๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ์— ๋ธ”๋กํ‚น/์Šค๋ ˆ๋“œํ’€ ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•œ๋‹ค.

WebFlux ๊ฐœ์š”

๊ธฐ์กด WebMVC ๋ชจ๋ธ ๊ตฌ์กฐ๋Š” ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

ViewResolver ๋Š” Rest ๋ฐฉ์‹์—์„  ์ƒ๋žต๋œ๋‹ค.

๊ฐ์ข… ์„œ๋ธ”๋ฆฟ ์ปจํ…Œ์ด๋„ˆ(Tomcat, JBoss ๋“ฑ) ์š”์ฒญ์„ ์„œ๋ธ”๋ฆฟ ํด๋ž˜์Šค๋ฅผ ์ƒ์†ํ•œ DispatcherServlet ์ด ์Šคํ”„๋ง ๋ถ€ํŠธ ์ปจํŠธ๋กค๋Ÿฌ ๋งคํ•‘์— ๋”ฐ๋ผ ์š”์ฒญ์„ ๋ถ„๋ฐฐํ•œ๋‹ค.

๊ทธ๋ฆผ์ฒ˜๋Ÿผ ๊ธฐ์กด WebMVC ๋ฐฉ์‹์€ ๋™๊ธฐ/๋ธ”๋กœํ‚น ๋ฐฉ์‹์œผ๋กœ ๋™์ž‘ํ•œ๋‹ค.

์Šคํ”„๋ง ๋ถ€ํŠธ์—์„œ ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•˜๋ ค๋ฉด Servlet 3.1+(Tomcat, Jetty ๋“ฑ), Netty, Undertow ์™€ ๊ฐ™์€ ์„œ๋ฒ„๋ฅผ ์‚ฌ์šฉํ•ด ๋ฆฌ์•กํ‹ฐ๋ธŒํ•˜๊ฒŒ ๊ตฌ์กฐ๊ฐ€ ๋ณ€๊ฒฝ๋˜์–ด์•ผ ํ•˜๋Š”๋ฐ

๋‹คํ–‰์ด๋„ ์Šคํ”„๋ง ํ”„๋กœ์ ํŠธํŒ€์ด ๋™์ผํ•œ ์–ด๋…ธํ…Œ์ด์…˜ ๊ธฐ๋ฐ˜ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ชจ๋ธ์„ ์‚ฌ์šฉํ•˜๋ฉด์„œ ๋น„๋™๊ธฐ/๋…ผ๋ธ”๋กํ‚น ์œผ๋กœ ๋™์ž‘ํ•˜๋„๋ก ์ด๋ฏธ ๊ฐœ๋ฐœํ•ด๋‘์—ˆ๋‹ค.

WebFlux with Flux

๋Œ€๋žต์ ์œผ๋กœ WebFlux ์—์„œ Http Request, Response ์–ด๋–ป๊ฒŒ ๋ฆฌ์•กํ‹ฐ๋ธŒ๋กœ ๊ตฌํ˜„ํ–ˆ๋Š”์ง€ ์•„๋ž˜ ์ธํ„ฐํŽ˜์ด์Šค๋กœ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

interface ServerHttpRequest { // Http Request ๋ฅผ ๊ฐ์ฒด๋กœ ํ‘œํ˜„
    Flux<DataBuffer> getBody();
    ...
}
interface ServerHttpResponse { // Http Response ๋ฅผ ๊ฐ์ฒด๋กœ ํ‘œํ˜„
    Mono<Void> writeWith(Publihser<? extends DataBuffer> body);
    ...
}
interface ServerWebExchange { // HTTP Request, Response ์ปจํ…Œ์ด๋„ˆ ์—ญํ• 
    ServerHttpRequest getRequest();
    ServerHttpResponse getResponse();
    Mono<WebSession> getSession();
    ...
}

์„œ๋ธ”๋ฆฟ์˜ ServletRequest, ServletResponse ์™€ ์—ฐ๊ด€์ง€์–ด์„œ ์ƒˆ๋กœ์šด Http ์š”์ฒญ, ๋ฐ˜ํ™˜์„ ๊ฐ์ฒด๋กœ ํ‘œํ˜„ํ•  ์ˆ˜ ์žˆ๋Š” ์ธํ„ฐํŽ˜์ด์Šค๋“ค์ด ์ •์˜๋˜์–ด์žˆ๊ณ 

DataBuffer ๋ฅผ ์‚ฌ์šฉํ•ด ๋ฆฌ์•กํ‹ฐ๋ธŒ ํƒ€์ž…๊ณผ์˜ ๊ฒฐํ•ฉ๋„๋ฅผ ๋‚ฎ์ถ˜๋‹ค.

interface WebHandler {
    Mono<Void> handle(ServerWebExchange exchanage);
}
interface WebFilterChain {
    Mono<Void> filter(ServerWebExchange exchanage);
}
interface WebFilter {
    Mono<Void> filter(ServerWebExchange exchanage, WebFilterChain chain);
}

WebHandler ๋Š” DispatcherServlet ์—ญํ• , ์‹คํ–‰๊ฒฐ๊ณผ๋ฅผ ๋ฐ›์ง€ ๋ชปํ•  ์ˆ˜ ์žˆ์Œ์œผ๋กœ handle ๋ฉ”์„œ๋“œ๋Š” Mono<Void> ๊ฐ€ ๋ฐ˜ํ™˜๋œ๋‹ค.

ํด๋ผ์ด์–ธํŠธ๋ฅผ ์œ„ํ•œ Http ๋ฐ˜ํ™˜์€ exchange ์•ˆ์˜ ServerHttpResponse ์—

WebFilter ๋Š” ์„œ๋ธ”๋ฆฟ์˜ ์š”์ฒญ, ๋ฐ˜ํ™˜ ํ•„ํ„ฐ์ฒ˜๋Ÿผ ๋ฆฌ์•กํ‹ฐ๋ธŒ์—์„œ๋„ ๋น„์ง€๋‹ˆ์Šค ๋กœ์ง์— ์ง‘์ค‘ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•„ํ„ฐ๊ธฐ๋Šฅ์ด ์ œ๊ณต๋œ๋‹ค.

public interface HttpHandler {
    Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
}

๋งˆ์ง€๋ง‰์œผ๋กœ WebHandler ๋กœ๋ถ€ํ„ฐ ์ „๋‹ฌ๋ฐ›์€ exchange ๊ฐ์ฒด๋ฅผ url ๋งคํ•‘ํ•  ์ˆ˜ ์žˆ๋„๋ก HttpHandler ์˜ handle ๋กœ ์ „๋‹ฌ๋œ๋‹ค.

WebFlux - Functional Reactive Web Server

Vert.x ๋‚˜ Ratpack ๊ณผ ๊ฐ™์€ ํ”„๋ ˆ์ž„์›Œํฌ์˜ ์ธ๊ธฐ๋น„๊ฒฐ์€ ์Šคํ”„๋ง์˜ ๋ณต์žกํ•œ MVC ์„ค์ •์œผ๋กœ ๋ผ์šฐํŒ… ์„ค์ •๊ณผ ๋กœ์ง์ด ์—†์ด ๊ฐ„๊ฒฐํ•œ ์„ค์ •์œผ๋กœ ๋ผ์šฐํŒ… ๋กœ์ง์„ ์ž‘์„ฑํ•  ์ˆ˜ ์žˆ๋Š” API ๋“ค์ด ์ž˜ ์ •์˜๋˜์–ด ์žˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

์Šคํ”„๋ง ํ”„๋ ˆ์ž„์›Œํฌ๋„ ์œ„ ํ”„๋ ˆ์ž„์›Œํฌ์ฒ˜๋Ÿผ ๊ฐ„๊ฒฐํ•˜๊ฒŒ ๋ผ์šฐํŒ… ๋กœ์ง์„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” API ๋ฅผ ๊ฐœ๋ฐœํ•˜์˜€๋‹ค.

spring-boot-starter-webflux ๋ชจ๋“ˆ์˜ org.springframework.web.reactive.function.server ํŒจํ‚ค์ง€์— ์ •์˜๋œ RouterFunction ํด๋ž˜์Šค ์‚ฌ์šฉํ•˜์—ฌ ๋ผ์šฐํŒ… ๋กœ์ง ์ •์˜๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;
import static org.springframework.http.MediaType.APPLICATION_JSON;

@SpringBootApplication
public class DemoApplication {

    final ServerRedirectHandler serverRedirectHandler = new ServerRedirectHandler();

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> routes(OrderHandler orderHandler) {
        return nest(path("/orders"), 
            nest(accept(APPLICATION_JSON),
                route(GET("/{id}"), orderHandler::get)
                    .andRoute(method(HttpMethod.GET), orderHandler::list))
                .andNest(contentType(APPLICATION_JSON),
                    route(POST("/"), orderHandler::create))
                .andNest((serverRequest) -> serverRequest.cookies().containsKey("Redirect-Traffic"),
                    route(all(), serverRedirectHandler))
        );
    }
}

RouterFunction ์„ ์Šคํ”„๋ง Bean ์œผ๋กœ ๋“ฑ๋กํ•˜๊ณ  /orders ์— ๋Œ€ํ•œ ๋ผ์šฐํŒ… ๋กœ์ง ์„ค์ •, ๊ธฐ๋ณธ์ ์ธ Path, Http method, cookie ํฌํ•จ์—ฌ๋ถ€ ๋“ฑ ์—ฌ๋Ÿฌ ๋กœ์ง์ฒ˜๋ฆฌ ๊ฐ€๋Šฅํ•˜๋‹ค.

ํ•จ์ˆ˜ํ˜•์œผ๋กœ ์›น์„œ๋ฒ„๋ฅผ ๊ตฌ์ถ•ํ• ๊ฒฝ์šฐ Netty ์„œ๋ฒ„๋ฅผ ๊ฐ™์ด ์‚ฌ์šฉํ•˜๋ฉด ๋ณ„๋„์˜ ์Šคํ”„๋ง ์„ค์ •์—†์ด ์„œ๋ฒ„ ์‹คํ–‰์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

์ด๋Ÿฐ์ ๋•Œ๋ฌธ์— ๋‹จ์ˆœ ํ…Œ์ŠคํŠธ์˜ ๊ฒฝ์šฐ @SpringBootApplication ์„ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ  ๋‹จ์ˆœ Netty ์„œ๋ฒ„๋ฅผ ์‚ฌ์šฉํ•ด ๋น ๋ฅด๊ฒŒ ์„œ๋ฒ„๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ๊ตฌํ˜„ํ•œ ๋ฉ”์„œ๋“œ๋“ค์„ ํ…Œ์ŠคํŠธ ํ•  ์ˆ˜ ์žˆ๋‹ค.

๋งŒ์•ฝ ํŒจ์Šค์›Œ๋“œ ์•”ํ˜ธํ™” ๋ฐ ๋ณตํ˜ธํ™” ํ…Œ์ŠคํŠธ๋ฅผ ํ•œ๋‹ค๋ฉด ์„œ๋ฒ„๊ธฐ๋Šฅ์„ ํ•˜๋Š” ๊ฐ์ฒด ์™ธ์— ์ถ”๊ฐ€์ ์œผ๋กœ ํ•„์š”ํ•œ ๊ฐ์ฒด๋Š” ํ•ด์‹œ ๊ธฐ๋Šฅ์ด ์žˆ๋Š” spring-boot-starter-security ์˜ PasswordEncoder ๋ฟ์ด๋‹ค.

PasswordEncoder ๋งŒ ์‚ฌ์šฉํ•œ๋‹ค๋ฉด spring-security-core ๋งŒ ์˜์กด์„ฑ ์ฒ˜๋ฆฌํ•ด๋„ ๋œ๋‹ค.

๋ณ„๋„์˜ ์Šคํ”„๋ง ๊ด€๋ จ ์–ด๋…ธํ…Œ์ด์…˜, Bean ๋“ฑ๋ก๊ณผ์ • ์—†์ด RouterFunction, Netty, PasswordEncoder 3๊ฐœ ๊ฐ์ฒด๋งŒ ์ž˜ ์ •์˜ํ•ด์„œ ์•„๋ž˜์ฒ˜๋Ÿผ ์„œ๋ฒ„ ์‹คํ–‰์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;

public static void main(String... args) {
    long start = System.currentTimeMillis();
    BCryptPasswordEncoder passwordEncoder = new BCryptPasswordEncoder(18); // encoder ์ƒ์„ฑ

    HttpHandler httpHandler = RouterFunctions.toHttpHandler( // RouterFunction -> HttpHandler ๋ณ€๊ฒฝ
        route(POST("/password"), request -> request // RouterFunction ์œผ๋กœ ๋ผ์šฐํ„ฐ ๋กœ์ง ์ƒ์„ฑ 
            .bodyToMono(PasswordDTO.class)
            .map(p -> passwordEncoder.matches(p.getRaw(), p.getSecured()))
            .flatMap(isMatched -> isMatched
                ? ServerResponse.ok().build()
                : ServerResponse.status(HttpStatus.EXPECTATION_FAILED).build())
        )
    );
    ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler); 
    // HandlerAdapter ์— HttpHandler ์‚ฝ์ž…, BiFunction ๋ฅผ ๊ตฌํ˜„ํ•œ ํด๋ž˜์Šค์ž„  
    DisposableServer server = HttpServer.create() // Netty Server
        .host("localhost").port(8080)
        .handle(reactorHttpHandler) // BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> ์š”๊ตฌํ•จ
        .bindNow(); // ์„œ๋ฒ„ ์—”์ง„ ์‹œ์ž‘  
        
    LOGGER.debug("Started in " + (System.currentTimeMillis() - start) + " ms"); // Started in 703 ms
    server.onDispose().block(); // main ์Šค๋ ˆ๋“œ ์ฐจ๋‹จ  
}

์„œ๋ฒ„๊ฐ€ 0.7 ์ดˆ๋งŒ์— ์‹คํ–‰๋œ๋‹ค. ์Šคํ”„๋ง ์ปจํ…Œ์ด๋„ˆ, ์˜์กด์„ฑ ์ฃผ์ž…, ์–ด๋…ธํ…Œ์ด์…˜ ์ฒ˜๋ฆฌ๋ฅผ ํ•˜์ง€ ์•Š์Œ์œผ๋กœ ์†๋„๊ฐ€ ๊ต‰์žฅํžˆ ๋น ๋ฅด๋ฉฐ ๊ฐ„๋‹จํ•œ ํ…Œ์ŠคํŠธ์ง„ํ–‰์€ ์œ„์™€๊ฐ™์€ ๋ฐฉ์‹์œผ๋กœ ์ง„ํ–‰ํ•˜๋ฉด ํŽธํ•˜๋‹ค.

WebFlux - Annotated Controller

RouterFunctions๋ฅผ ์‚ฌ์šฉํ•ด๋„ ๋˜์ง€๋งŒ WebMVC ๋ชจ๋ธ์—์„œ ์‚ฌ์šฉํ•˜๋Š” @RestController, @RequestMapping ๋“ฑ์˜ ์–ด๋…ธํ…Œ์ด์…˜์„ WebFlux ์—์„œ๋„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

@RestController
@RequestMapping("/member")
@RequiredArgsConstructor
public class MemberController {

    private final MemberRepository memberRepository;

    @GetMapping
    public Flux<Member> getAll() {
        return memberRepository.findAll();
    }

    @GetMapping("/id/{id}")
    public Mono<Member> getById(@PathVariable Long id) {
        return memberRepository.findById(id);
    }
}

Flux ๋Š” ๋ฐฐ์—ด, Mono ๋Š” ๊ฐ์ฒด๋กœ ๋ฐ˜ํ™˜๋œ๋‹ค.

WebFlux - Filter

๋”์ด์ƒ javax.servlet.Filter ์„ ์‚ฌ์šฉํ•˜์ง€ ๋ชปํ•œ๋‹ค.

ํ•„ํ„ฐ๊ธฐ๋Šฅ์„ ํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ์—ฌ๋Ÿฌ๊ฐ€์ง€๋‹ค.

RouterFunctions

@SpringBootApplication
public class ReactApplication {
    public static void main(String[] args) {
        SpringApplication.run(ReactR2dbcApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> filterFunction(MemberComponent memberComponent) {
        return RouterFunctions
                .route(GET("/member/{memberId}")
                .and(accept(MediaType.APPLICATION_JSON)), memberComponent::getById)
                .filter(new ExampleHandlerFilterFunction());
    }
}

@Component
@RequiredArgsConstructor
class MemberComponent {
    private final MemberRepository memberRepository;
    public Mono<ServerResponse> getById(ServerRequest request) {
        return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
                .body(BodyInserters.fromValue(
                       memberRepository.findById(Long.valueOf(request.pathVariable("memberId")))));
    }
}

WebFilter

@Component
public class ExampleWebFilter implements WebFilter {
  
    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, 
      WebFilterChain webFilterChain) {
        serverWebExchange.getResponse().getHeaders().add("web-filter", "web-filter-test");
        return webFilterChain.filter(serverWebExchange);
    }
}

๋ณ„๋„์˜ ๋งคํ•‘ ์กฐ๊ฑด์ด ์—†๊ธฐ๋•Œ๋ฌธ์— ์กฐ๊ฑด๋ฌธ ๋ถ„๊ธฐ๊ฐ€ ํ•„์š”ํ•จ

HandlerFilterFunction

public class ExampleHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {

    @Override
    public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> handlerFunction) {
        if (request.pathVariable("name").equalsIgnoreCase("test")) {
            return ServerResponse.status(HttpStatus.FORBIDDEN).build();
        }
        return handlerFunction.handle(request);
    }
}

WebFlux - Exception Handler

๋ฉ”์„œ๋“œ ๋ ˆ๋ฒจ์—์„œ ์˜ค๋ฅ˜์ฒ˜๋ฆฌ๋Š” ServerResponse ์— status, body ๋“ฑ์„ ์„ค์ •ํ•˜๋ฉด ์‰ฝ๊ฒŒ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค. ๋˜ํ•œ ํด๋ž˜์Šค ๋‚ด๋ถ€์—์„œ ๊ธฐ์กด WebMVC ์—์„œ ์‚ฌ์šฉํ•˜๋˜ @ExceptionHandler ๋ฅผ ์‚ฌ์šฉํ•ด ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

@Controller public class SimpleController {
    @ExceptionHandler public ResponseEntity<String> handle(IOException ex) {
        // ... 
    }
}

๊ธ€๋กœ๋ฒŒ ๋ ˆ๋ฒจ์—์„œ ์˜ค๋ฅ˜์ฒ˜๋ฆฌ๋Š” WebExceptionHandler ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•ด ํ•„ํ„ฐ ๋ฐฉ์‹์œผ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

DefaultErrorAttributes

DefaultErrorAttributes ๋Š” WebFlux ์—์„œ ์‚ฌ์šฉํ•˜๋Š” ์—๋Ÿฌ ํ•ธ๋“ค๋Ÿฌ๋กœ ๊ธฐ๋ณธ ํ•„ํ„ฐ๋กœ ๋“ฑ๋ก๋˜์–ด ์žˆ๋Š” ์—๋Ÿฌ ํ•ธ๋“ค๋Ÿฌ๊ฐ€ DefaultErrorAttributes ์•ˆ์˜ getErrorAttributes ๋ฅผ ํ˜ธ์ถœํ•ด ์•„๋ž˜์™€ ๊ฐ™์€ ๋ฐ˜ํ™˜๊ฐ’์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

{
"timestamp": "...",
"path": "...",
"status": 405,
"error": "Method Not Allowed",
"message": "",
"requestId": "ca35d584-1"
}

๊ธ€๋กœ๋ฒŒ ๋ ˆ๋ฒจ์—์„œ ์˜ค๋ฅ˜์ฒ˜๋ฆฌ๋ฅผ ํ†ตํ•ด ์ปค์Šคํ…€ํ•œ ๋ฐ˜ํ™˜๊ฐ’์„ ์„ค์ •ํ•˜๊ณ  ์‹ถ์œผ๋ฉด DefaultErrorAttributes ์˜ getErrorAttributes ๋ฉ”์„œ๋“œ๋ฅผ ์˜ค๋ฒ„๋ผ์ด๋”ฉ ํ•ด์•ผํ•œ๋‹ค.

@Slf4j
@Component
@RequiredArgsConstructor
public class GlobalErrorAttributes extends DefaultErrorAttributes {

    // private final MessageSource messageSource;

    @Override
    public Map<String, Object> getErrorAttributes(ServerRequest request, ErrorAttributeOptions options) {
        Throwable throwable = getError(request);
        String acceptLanguage = request.headers().firstHeader("accept-language");
        Locale locale = acceptLanguage != null && acceptLanguage.startsWith("ko") ? Locale.KOREA : Locale.getDefault();
        log.error("unknown server error:{}, {}" + throwable.getClass().getCanonicalName(), throwable.getMessage());
        // throwable ์˜ ์ข…๋ฅ˜์— ๋งž๊ฒŒ ๋ฐ˜ํ™˜๊ฐ’์„ ์กฐ์ •
        Map<String, Object> map = new HashMap<>();
        map.put("code", UNKNOWN_ERROR_CODE);
        map.put("error", UNKNOWN_ERROR_TYPE);
        // map.put("description", messageSource.getMessage("fms.error.unknown_server_error", null, locale));
        return map;
    }
}

์—๋Ÿฌ ๋ฐ˜ํ™˜์‹œ ์•„๋ž˜์ฒ˜๋Ÿผ ์ถœ๋ ฅ๋˜๋„๋ก ์„ค์ •

{
"code": "500-00",
"error": "UnknownServerError"
}

์ด์ œ ํ•ธ๋“ค๋Ÿฌ์—์„œ DefaultErrorAttributes ์˜ getErrorAttributes ๊ฐ€ ์•„๋‹Œ ์ง์ ‘ ์ •์˜ํ•œ GlobalErrorAttributes ์˜ getErrorAttributes ๊ฐ€ ํ˜ธ์ถœ๋˜๋„๋ก ์„ค์ •ํ•˜๋ฉด ๋œ๋‹ค.

AbstractErrorWebExceptionHandler

AbstractErrorWebExceptionHandler ๋Š” ์—๋Ÿฌ๋ฐœ์ƒ์‹œ ํ•„ํ„ฐ๋กœ ๋“ฑ๋ก๋˜์–ด ์žˆ๋Š” ํ•ธ๋“ค๋Ÿฌ ํ•ด๋‹น ํ•ธ๋“ค๋Ÿฌ๋ณด๋‹ค ๋” ๋†’์€ ์šฐ์„ ์ˆœ์œ„๋ฅผ ๊ฐ€์ง„ ํ•ธ๋“ค๋Ÿฌ๋กœ ์—๋Ÿฌ์ฒ˜๋ฆฌํ•˜๋„๋ก ์„ค์ •

@Order(-2) // ๊ธฐ๋ณธ ์—๋Ÿฌ์ฒ˜๋ฆฌ๋Š” -1, ๋ณด๋‹ค ๋น ๋ฅด๊ฒŒ ์„ค์ •ํ•œ๋‹ค.
@Component
public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {
    // constructors
    public GlobalErrorWebExceptionHandler(
            GlobalErrorAttributes errorAttributes, WebProperties.Resources resources,
            ApplicationContext applicationContext, ServerCodecConfigurer configurer) {
        super(errorAttributes, resources, applicationContext);
        this.setMessageWriters(configurer.getWriters());
    }

    @Override
    protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
        return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
    }

    private Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
        Map<String, Object> errorPropertiesMap = getErrorAttributes(request, ErrorAttributeOptions.defaults());
        Integer status = Integer.valueOf(((String) errorPropertiesMap.get("code")).substring(0, 3));
        return ServerResponse.status(status)
            .contentType(MediaType.APPLICATION_JSON)
            .body(BodyInserters.fromValue(errorPropertiesMap));
    }
}

WebFlux - WebSocket

์ด๋ฏธ spring-boot-starter-websocket ๋ชจ๋“ˆ์—์„œ ์ œ๊ณตํ•˜๋Š” ์Šคํ”„๋ง ์›น์†Œ์ผ“์„ ํ†ตํ•ด ๋…ผ๋ธ”๋กํ‚น์œผ๋กœ ๋ฉ”์„ธ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์„ ์ค„ ์•Œ์•˜์ง€๋งŒ ๋‚ด๋ถ€์—์„œ ๋ธ”๋กœํ‚น ๋ฐฉ์‹์œผ๋กœ ๋™์ž‘ํ•˜๋ฉฐ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์„œ๋ฒ„ ์„ฑ๋Šฅ์— ์˜ํ–ฅ์„ ๋ผ์นœ๋‹ค

WebFlux ์—์„  ๋น„๋™๊ธฐ/๋…ผ๋ธ”๋กํ‚น ๋ฐฉ์‹์˜ ์›น์†Œ์ผ“ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด org.springframework.web.reactive.socket ํŒจํ‚ค์ง€๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

ํŒจํ‚ค์ง€์— reactive ๊ฐ€ ๋ถ™์„๋ฟ ํด๋ž˜์Šค๋ช…์ด๋‚˜ ์‚ฌ์šฉ๋ฐฉ๋ฒ•์ด ์œ ์‚ฌํ•˜๋‹ค.

์›น์†Œ์ผ“ ์„œ๋ฒ„, ์›น์†Œ์ผ“ ํด๋ผ์ด์–ธํŠธ ๋ชจ๋‘ ์ œ๊ณตํ•œ๋‹ค.

์›น์†Œ์ผ“ ์„œ๋ฒ„๋Š” WebSocketHandler ๋ฅผ ์‚ฌ์šฉํ•ด ์†Œ์ผ“ ํ•ธ๋“ค๋Ÿฌ ์—ญํ• ์„ ํ•˜๋Š” ๊ฐ์ฒด์ธ Handler ๋ฅผ ๋“ฑ๋กํ•œ๋‹ค.

public class EchoWebSocketHandler implements WebSocketHandler {
    
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session
            .receive() // return Flux<WebSocketMessage> 
            .map(wsMessage -> wsMessage.getPayloadAsText()) // websocket ๋ฉ”์„ธ์ง€์˜ payload (string) ํญ๋“
            .map(tm -> "Echo: " + tm) // ๋ฌธ์ž์—ด ๋ณ€ํ™˜
            .map(tm -> session.textMessage(tm)) // WebsocketSession ์„ ์‚ฌ์šฉ, client ๋ณด๋‚ผ ๋ฉ”์„ธ์ง€(payload) ์ž‘์„ฑ
            .as(wsMessage -> session.send(wsMessage)); // client ์—๊ฒŒ ๋ฉ”์„ธ์ง€(payload) ์ „์†ก
    }
}

์›น์†Œ์ผ“์„ ์‚ฌ์šฉํ•˜๋ ค๋ฉด ๋จผ์ € ์›น์†Œ์ผ“ ์„ค์ • ๊ฐ์ฒด๋ฅผ Bean ์œผ๋กœ ๋“ฑ๋กํ•ด์•ผ ํ•œ๋‹ค. ์œ„์—์„œ ์ •์˜ํ•œ ํ•ธ๋“ค๋Ÿฌ๋ฅผ url ์— ๋งคํ•‘ํ•˜๊ณ , Request ์š”์ฒญ์„ Upgrade ํ•˜๋Š” ์–ด๋Žํ„ฐ๋ฅผ Bean ์œผ๋กœ ๋“ฑ๋กํ•œ๋‹ค.

@Configuration
public class WebSocketConfiguration {

    @Bean
    public HandlerMapping handlerMapping() {
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(Collections.singletonMap("/ws/echo", new EchoWebSocketHandler())); // ๊ฒฝ๋กœ๊ธฐ๋ฐ˜ ๋งคํ•‘ ์„ค์ •
        mapping.setOrder(-1); // ์šฐ์„ ์ˆœ์œ„, ์ƒ๋žต์‹œ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š” ๊ฒƒ์œผ๋กœ ์„ค์ •๋จ 
        return mapping;
    }
    @Bean
    public HandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter(); 
        // WebSocket Handshake (upgrade request) ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” HandlerAdapter ์ƒ์„ฑ 
    }
}

WebSocketMessage ๋Š” payload ๋กœ DataBuffer ๋ฅผ ๊ตฌํ˜„ํ•˜์—ฌ ๋ฌธ์ž์—ด, ๋ฐ”์ดํŠธ์ฝ”๋“œ๋กœ ์‰ฝ๊ฒŒ ํ˜•๋ณ€ํ™˜ ๊ฐ€๋Šฅ

public class WebSocketMessage {
	private final Type type;
	private final DataBuffer payload;
    ...
}

WebSocketHandler ๋ฅผ ๊ตฌํ˜„ํ•˜๋ฉด ํ•ด๋‹น url ์— ํ•ด๋‹นํ•˜๋Š” WebSocketSession ๊ฐ์ฒด๋ฅผ ์‚ฌ์šฉํ•ด ๋ฉ”์„ธ์ง€๋ฅผ ๋ฐ›๊ณ  ๋ณด๋‚ธ๋‹ค. ์›น์†Œ์ผ“ ํ…Œ์ŠคํŠธ ํˆด์„ ์‚ฌ์šฉํ•ด ws://127.0.0.1:8080/ws/echo ๋กœ ์ ‘์†, ๋ฉ”์„ธ์ง€ ์ „์†ก์‹œ Echo: ... ๋ฉ”์„ธ์ง€ ์ˆ˜์‹  ํ™•์ธ

WebSocket Client

์›น์†Œ์ผ“ ํด๋ผ์ด์–ธํŠธ์˜ ๊ฒฝ์šฐ WebSocketClient ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•œ ReactorNettyWebSocketClient ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

import org.springframework.web.reactive.socket.client.WebSocketClient;
...

@Bean
public CommandLineRunner commandLineRunner() {
    return (args) -> {
        ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();
        client.execute(URI.create("http://localhost:8080/ws/echo"),
            session -> Flux
                .interval(Duration.ofMillis(100))
                .map(String::valueOf)
                .map(session::textMessage)
                .as(session::send))
            .subscribe();
    };
}

์ •์˜ํ•ด๋‘” echo ์›น์†Œ์ผ“ ์„œ๋ฒ„์— 0.1 ์ดˆ๋งˆ๋‹ค interval ๋ฐ์ดํ„ฐ๋ฅผ ๋ฌธ์ž์—ด๋กœ ์ „์†ก

client ์—ญ์‹œ WebsocketHandler ๊ตฌํ˜„์ฒด๋ฅผ excute ํ•จ์ˆ˜์˜ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ๋ฐ›์œผ๋ฉฐ ํ•ธ๋“ค๋Ÿฌ ๋งคํ•‘๊ณผ ํ•ธ๋“ค๋Ÿฌ ์–ด๋Žํ„ฐ๊ฐ€ Bean ์œผ๋กœ ๋“ฑ๋ก๋˜์ง€ ์•Š์„ ๋ฟ ์›น์†Œ์ผ“ ์„œ๋ฒ„ ์ฝ”๋“œ์™€ ์œ ์‚ฌํ•˜๋‹ค.

์ฐธ๊ณ  ์ฝ”๋“œ: https://github.com/Kouzie/spring-reactive/tree/master/spring-reactive-websocket-client ์•ˆํƒ€๊น๊ฒŒ๋„ WebFlux ์—์„œ ์ œ๊ณตํ•˜๋Š” ์›น์†Œ์ผ“ ๋‚ด์šฉ์€ ์œ„์˜ ๊ธฐ๋Šฅ์ด ์ „๋ถ€์ด๋ฉฐ STOMP ๋ฅผ ์‚ฌ์šฉํ•œ ๋ฉ”์„ธ์ง€ ๋งคํ•‘ ๋“ฑ์˜ ๊ธฐ๋Šฅ์€ ์ œ๊ณตํ•˜์ง€ ์•Š๋Š”๋‹ค.

๋‹ค์ค‘ ํ†ต์‹  Sinks.Many

์ฝ”๋“œ ์ฐธ๊ณ : https://github.com/Kouzie/spring-reactive/tree/master/spring-reactive-websocket

๋ฆฌ์•กํ‹ฐ๋ธŒ์˜ ์›น์†Œ์ผ“์€ WebMVC ์—์„œ ์‚ฌ์šฉํ•œ ์›น์†Œ์ผ“๊ณผ ๋‹ค๋ฅธ์ ์ด ์žˆ๋Š”๋ฐ ๋ชจ๋“  ์š”์ฒญ์— ๋Œ€ํ•œ ์‘๋‹ต์„ ์ˆ˜ํ–‰ํ•  ๋•Œ ์œ„์™€๊ฐ™์ด ๋žŒ๋‹ค์‹์ด๋‚˜ ํ•จ์ˆ˜๋ฅผ ๋ฏธ๋ฆฌ ๋“ฑ๋กํ•ด๋‘์–ด์•ผ ํ•˜๊ณ  ๋‘๊ฐœ ์ด์ƒ์˜ ํด๋ผ์ด์–ธํŠธ๋“ค๊ฐ„ ํ†ต์‹ ์„ ์œ„ํ•ด์„œ ๊ฐ ํด๋ผ์ด์–ธํŠธ์˜ session ์„ ์ฐพ์•„ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐœํ–‰ํ•˜๋Š”๋ฐ Sinks.Many ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

์›น์†Œ์ผ“ ์„ค์ • ๊ฐ์ฒด๋ฅผ Bean ์œผ๋กœ ๋“ฑ๋กํ•˜๊ณ  ํ•ธ๋“ค๋Ÿฌ ๋งคํ•‘ํ•˜๋Š” ๊ฒƒ์€ ๋™์ผํ•˜๋‹ค.

@Configuration
public class WebSocketConfig {

    @Bean
    public HandlerMapping handlerMapping(ChatSocketHandler chatSocketHandler) { // url, handler ๋งคํ•‘
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(Collections.singletonMap("/ws/chat", chatSocketHandler));
        mapping.setOrder(-1);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

@Slf4j
@Component
@RequiredArgsConstructor
public class ChatSocketHandler implements WebSocketHandler { // handler ์ •์˜ 

    private final ObjectMapper mapper;

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        WebSocketMessageSubscriber subscriber = new WebSocketMessageSubscriber(session);
        return session.receive()
                .map(this::toDto)
                .doOnNext(subscriber::onNext) // ์ˆ˜์‹  ์ฝœ๋ฐฑ ํ•จ์ˆ˜ ๋“ฑ๋ก
                .doOnError(subscriber::onError) // ์—๋Ÿฌ ์ฝœ๋ฐฑ ํ•จ์ˆ˜ ๋“ฑ๋ก
                .doOnCancel(subscriber::onCancel) // ์—ฐ๊ฒฐ๋Š๊น€ ์ฝœ๋ฐฑ ํ•จ์ˆ˜ ๋“ฑ๋ก  
                .zipWith(session.send(subscriber.getMany().asFlux().map(webSocketToClientDto -> // ๋ฉ”์„ธ์ง€ ๋ฐœ์‹  ์ฝœ๋ฐฑ ํ•จ์ˆ˜ ๋“ฑ๋ก
                        session.textMessage(webSocketToClientDto.getFrom() + ":" + webSocketToClientDto.getMessage()))))
                .then();
    }

    private WebSocketFromClientDto toDto(WebSocketMessage message) {
        try {
            WebSocketFromClientDto WsDto = mapper.readValue(message.getPayloadAsText(), WebSocketFromClientDto.class);
            return WsDto;
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }
}

session.send ์— ๋“ฑ๋ก๋˜๋Š” Flux ๋ฐœํ–‰์ž๋Š” Sinks.Many ๋กœ๋ถ€ํ„ฐ ์ƒ์„ฑ๋˜๋Š”๋ฐ ์ด๋Š” ์•„๋ž˜์—์„œ ์‚ฌ์šฉํ•˜๋Š” ์ฝ”๋“œ์™€ ๊ฐ™์ด ๋ฉ”์„ธ์ง€๋ฅผ ์ง‘์–ด๋„ฃ์„ ์ˆ˜ ์žˆ๋Š” ๋ฐœํ–‰์ž๋‹ค. Websocket ์™ธ์—๋„ SSE (Server Send Events) ์—์„œ๋„ ์‚ฌ์šฉ๋œ๋‹ค.

@Slf4j
class WebSocketMessageSubscriber {
    // ๋ณธ์ธ์„ ํฌํ•จํ•œ ๋‹ค๋ฅธ ํด๋ผ์ด์–ธํŠธ์˜ ์›น์†Œ์ผ“ ๋ฉ”์„ธ์ง€ ๋ฐœํ–‰์ž ๋งต
    public static Map<String, Sinks.Many<WebSocketToClientDto>> userMap = new HashMap<>(); //sessionId, sink
    private final String id;
    @Getter
    private final Sinks.Many<WebSocketToClientDto> many; // ๋ณธ์ธ์˜ ์›น์†Œ์ผ“ ๋ฉ”์„ธ์ง€ ๋ฐœํ–‰์ž

    public WebSocketMessageSubscriber(WebSocketSession session) {
        many = Sinks.many().unicast().onBackpressureBuffer();
        id = session.getId();
        many.tryEmitNext(WebSocketToClientDto.builder().from("system").message("welcome, " + id).build());
        userMap.put(id, many);
    }

    public void onNext(WebSocketFromClientDto msg) {
        log.info("onNext invoked, to:{}, msg:{}", msg.getTo(), msg.getMessage());
        Sinks.Many<WebSocketToClientDto> to = userMap.get(msg.getTo());
        if (to == null)
            many.tryEmitNext(WebSocketToClientDto.builder().from("system").message("no user:" + msg.getTo()).build());
        else
            to.tryEmitNext(WebSocketToClientDto.builder().from(id).message(msg.getMessage()).build());
    }

    public void onError(Throwable error) {
        //TODO log error
        log.error("onError invoked, error:{}, {}", error.getClass().getSimpleName(), error.getMessage());
        many.tryEmitNext(WebSocketToClientDto.builder()
                .from("system")
                .message(id + " on error, error:" + error.getMessage())
                .build());
    }

    public void onCancel() {
        log.info("onCancel invoked, id:{}", id);
        userMap.remove(id);
        for (Map.Entry<String, Sinks.Many<WebSocketToClientDto>> entry : userMap.entrySet()) {
            if (!entry.getKey().equals(id))
                entry.getValue().tryEmitNext(WebSocketToClientDto.builder()
                        .from("system")
                        .message(id + " is exit")
                        .build());
        }
    }
}

@Getter
@Setter
@Builder
class WebSocketToClientDto {
    private String from;
    private String message;
}

@Getter
@Setter
class WebSocketFromClientDto {
    private String to;
    private String message;
}

WebClient

๋…ผ๋ธ”๋กํ‚น Http Client๋กœ ๊ธฐ์กด ์Šคํ”„๋ง ๋ถ€ํŠธ์—์„œ ๋Œ€ํ‘œ์ ์ธ Http Client ๋กœ RestTemplate(๋ธ”๋กํ‚น) ์ด ์žˆ๋‹ค. ๋‚ด๋ถ€์— Flux, Mono ๋ฆฌ์•กํ„ฐ ๊ฐ์ฒด๋ฅผ ์ง€์›ํ•˜๋Š” ๋งคํ•‘์ด ๋‚ด์žฅ๋˜์–ด ์žˆ์–ด ๋ฆฌ์•กํ‹ฐ๋ธŒ ์„œ๋ฒ„์— ์ž˜ ์–ด์šธ๋ฆฐ๋‹ค.

http://localhost:8080/api/user/{id} url ์„ ์ง€์›ํ•˜๋Š” ๊ฐ„๋‹จํ•œ ์›น์„œ๋ฒ„ ์ƒ์„ฑ

@Slf4j
public class TestWebServer {
    public static void main(String[] args) {
        HttpHandler httpHandler = RouterFunctions.toHttpHandler( // RouterFunction -> HttpHandler ๋ณ€๊ฒฝ
            nest(path("/api"), route(GET("/users/{id}"),
                request -> {
                    String id = request.pathVariable("id");
                    return ServerResponse.ok().syncBody("hello " + id + " user!"); // ๋ฐ˜ํ™˜๋ฐ์ดํ„ฐ ๋™๊ธฐ์ ์œผ๋กœ ์ƒ์„ฑ
                }) // end route
            ) // end nest
        );
        ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler);
        DisposableServer server = HttpServer.create()
            .host("localhost").port(8080)
            .handle(reactorHttpHandler)
            .bindNow();
        server.onDispose().block();
    }
}

WebClient ๋ฅผ ์‚ฌ์šฉํ•ด ์œ„ url ์— Http GET Request ์š”์ฒญ

public class TestWebClient {
    public static void main(String[] args) throws InterruptedException {
        WebClient.create("http://localhost:8080/api") // WebClient ๊ฐ์ฒด ์ƒ์„ฑ + baseUrl ์„ค์ •
            .get().uri("/users/{id}", 10) // method, uri ์„ค์ •
            .retrieve() // ์‘๋‹ต ๋‚ด์šฉ ์„ค์ •. ResponseSpec ๋ฐ˜ํ™˜
            .bodyToMono(String.class) // ์‘๋‹ต body ๋ฅผ Mono ๋กœ ๋ณ€ํ™˜
            .subscribe(s -> System.out.println(s)); // Mono ์— ๋Œ€ํ•œ ๊ตฌ๋… ์„ค์ •
            // hello 10 user!
        Thread.sleep(1000); // main thread ์ข…๋ฃŒ ๋ฐฉ์ง€ 
    }
}

์œ„์˜ WebClient ๋Š” GET ๋ฐฉ์‹์ด๋ผ uri ๋งŒ ์„ค์ •ํ–ˆ์ง€๋งŒ API ์— ๋”ฐ๋ผ cookie, header, body ๋ชจ๋‘ ์„ค์ • ๊ฐ€๋Šฅํ•˜๋‹ค.

HTTP ์‘๋‹ต์„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” ๋ฉ”์„œ๋“œ๊ฐ€ retrieve() ์™€ exchage() ๊ฐ€ ์žˆ๋Š”๋ฐ

retrieve() ๋Š” ResponseSpec ์„ ๋ฐ˜ํ™˜ํ•˜๊ณ  exchage() ๋Š” Mono<ClientResponse> ๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

๋งŒ์•ฝ exchange() ๋ฅผ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ ์•„๋ž˜์™€ ๊ฐ™์ด ์ฝ”๋“œ์ž‘์„ฑ

public static void main(String[] args) throws InterruptedException {
    WebClient.create("http://localhost:8080/api") // WebClient ๊ฐ์ฒด ์ƒ์„ฑ
        .get().uri("/users/{id}", 10) // method, uri ์„ค์ •
        .exchange() // Mono<ClientResponse> ๋ฐ˜ํ™˜
        .flatMap(response -> response.bodyToMono(String.class)) // ์‘๋‹ต body ๋ฅผ Mono ๋กœ ๋ณ€ํ™˜
        .subscribe(s -> System.out.println(s)); // Mono ์— ๋Œ€ํ•œ ๊ตฌ๋… ์„ค์ •
    Thread.sleep(1000);
}

exchage ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ClientResponse ์—์„œ ์ œ๊ณตํ•˜๋Š” Http Response ์˜ ๊ฐ์ข… ์ •๋ณด๋ฅผ ์กฐ์ž‘ํ•  ์ˆ˜ ์žˆ๋Š” ์—ฌ๋Ÿฌ ๋ฉ”์„œ๋“œ๋กœ ๋ณต์žกํ•œ ๋ฐ˜ํ™˜ ๋กœ์ง ๊ตฌ์„ฑ์ด ๊ฐ€๋Šฅํ•˜๋‹ค. retrieve ์˜ ๊ฒฝ์šฐ Http status ๋งŒ ๊ฒจ์šฐ ์กฐ์ž‘ํ•˜์—ฌ DSL ํ˜•์‹์œผ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

WebClient ๋Š” ์ธํ„ฐํŽ˜์ด์Šค์ด๊ณ  DefaultWebClient ๊ฐ€ WebClient ์˜ ์œ ์ผํ•œ ๊ตฌํ˜„์ฒด์ด๋‹ค. ์‹ค์ œ DefaultWebClient ๋‚ด๋ถ€์—์„ 

WebClient Serialize config

WebClient ์—์„œ ์ง๋ ฌํ™”, ๋น„์ง๋ ฌํ™”๋ฅผ ์ˆ˜ํ–‰ํ• ๋•Œ ๊ธฐ์กด์ƒ์„ฑํ•œ ObjectMapper ๋ฅผ ํ†ตํ•ด ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์ž‡๋‹ค.

@Bean
public ObjectMapper objectMapper() {
    JavaTimeModule module = new JavaTimeModule();
    LocalDateTimeSerializer localDateTimeSerializer = new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));
    LocalDateTimeDeserializer localDateTimeDeserializer = new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));
    module.addSerializer(LocalDateTime.class, localDateTimeSerializer);
    module.addDeserializer(LocalDateTime.class, localDateTimeDeserializer);

    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.registerModule(module);
    objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    // UnrecognizedPropertyException ์ฒ˜๋ฆฌ
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    // json -> clas ์—์„œ unknown ์†์„ฑ์ด ์žˆ์–ด๋„ ์ฒ˜๋ฆฌ
    objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); // null ์ด ์•„๋‹Œ๊ฒƒ๋งŒ ๋ณ€ํ™˜
    objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); // snake case ๋กœ ๋ณ€ํ™˜
    return objectMapper;
}

@Bean
public WebClient webClient(ObjectMapper objectMapper) {
    ExchangeStrategies jacksonStrategy = ExchangeStrategies.builder()
            .codecs(config -> {
                config.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON));
                config.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON));
                config.defaultCodecs().maxInMemorySize(1024 * 1024 * 50);
            }).build();
    return WebClient.builder().exchangeStrategies(jacksonStrategy).build();
}

์ฃผ์˜์‚ฌํ•ญ์œผ๋กœ uri(uriBuilder->..) ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•ด query parameter ๋ฅผ ์ง€์ •ํ•  ๊ฒฝ์šฐ ๋ฌธ์ž์—ด์— / ๊ฐ€ ๋“ค์–ด๊ฐˆ escape ๋ฌธ์ž๋กœ ์ธ์‹ํ•˜๊ธฐ ๋•Œ๋ฌธ์— base64 ๋ฌธ์ž์—ด๋กœ ๋ณ€ํ™˜ํ•  ์ˆ˜ ์—†๋‹ค, url encoding ์„ ์ง„ํ–‰ํ•˜์ง€ ์•Š๋Š”๋‹ค. ๋˜ํ•œ WebClient ์ƒ์„ฑ์‹œ baseUrl ์„ ์„ค์ •ํ•˜์ง€ ์•Š์œผ๋ฉด uribuilder ๋ฅผ ํ†ตํ•ด scheme, host, port, path ๋นŒ๋“œํ•จ์ˆ˜๋ฅผ ๋ชจ๋‘ ํ˜ธ์ถœํ•ด์•ผ ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋ฒˆ๊ฑฐ๋กญ๋‹ค.

WebClient ๋ฅผ bean ์œผ๋กœ ์ƒ์„ฑํ•ด singleton ๋ฐฉ์‹์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค๋ฉด StringBuilder ๋ฅผ ํ†ตํ•ด uri ๋ฅผ ์ง์ ‘์ƒ์„ฑํ•˜๋Š” ๊ฒƒ์„ ๊ถŒ์žฅ.

StringBuilder urlBuilder = new StringBuilder(WEATHER_GET_ULTRA_SRT_NCST); /*URL*/
urlBuilder.append("?" + URLEncoder.encode("ServiceKey", "UTF-8") + "=" + URLEncoder.encode(dataGovApiKey, "UTF-8")); /*๊ณต๊ณต๋ฐ์ดํ„ฐํฌํ„ธ์—์„œ ๋ฐ›์€ ์ธ์ฆํ‚ค*/
urlBuilder.append("&" + URLEncoder.encode("pageNo", "UTF-8") + "=" + URLEncoder.encode("1", "UTF-8")); /*ํŽ˜์ด์ง€๋ฒˆํ˜ธ*/
urlBuilder.append("&" + URLEncoder.encode("numOfRows", "UTF-8") + "=" + URLEncoder.encode("10", "UTF-8")); /*ํ•œ ํŽ˜์ด์ง€ ๊ฒฐ๊ณผ ์ˆ˜*/
urlBuilder.append("&" + URLEncoder.encode("dataType", "UTF-8") + "=" + URLEncoder.encode("JSON", "UTF-8")); /*์š”์ฒญ์ž๋ฃŒํ˜•์‹(XML/JSON)Default: XML*/
urlBuilder.append("&" + URLEncoder.encode("base_date", "UTF-8") + "=" + URLEncoder.encode(LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE), "UTF-8")); /*15๋…„ 12์›” 1์ผ๋ฐœํ‘œ*/
urlBuilder.append("&" + URLEncoder.encode("base_time", "UTF-8") + "=" + URLEncoder.encode(String.format("%02d00", curHour), "UTF-8")); /*05์‹œ ๋ฐœํ‘œ*/
urlBuilder.append("&" + URLEncoder.encode("nx", "UTF-8") + "=" + URLEncoder.encode(String.valueOf(nx), "UTF-8")); /*์˜ˆ๋ณด์ง€์  X ์ขŒํ‘œ๊ฐ’*/
urlBuilder.append("&" + URLEncoder.encode("ny", "UTF-8") + "=" + URLEncoder.encode(String.valueOf(ny), "UTF-8")); /*์˜ˆ๋ณด์ง€์ ์˜ Y ์ขŒํ‘œ๊ฐ’*/
URI uri = new URL(urlBuilder.toString()).toURI();

SSE(Server-Sent Event)

์„œ๋ฒ„ ๋‹จ๋ฐฉํ–ฅ ํ†ต์‹ ์ด๋ผ ์›น์†Œ์ผ“์ด ๋น„ํ•ด ์†๋„๋‚˜ ์˜ค๋ฒ„ํ—ค๋“œ ์ธก๋ฉด์—์„œ SSE ๊ฐ€ ํšจ์œจ์ ์ด์ง€๋งŒ ์–‘๋ฐฉํ–ฅ์ด ์•ˆ๋˜๋Š” ์ด์œ ๋กœ ์›น์†Œ์ผ“์ด ์ฃผ๋กœ ์‚ฌ์šฉ๋œ๋‹ค.

WebFlux ๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ  spring-boot-starter-web ์—์„œ SSE ํ”„๋กœํ† ์ฝœ ์‚ฌ์šฉ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

@Component
@RequiredArgsConstructor
public class TemperatureSensor {
   // ์Šคํ”„๋ง ์ œ๊ณต ์ด๋ฒคํŠธ ๋ฐœํ–‰ ๊ตฌ๋… ์ง€์› ํด๋ž˜์Šค
   private final ApplicationEventPublisher publisher;
   private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
   private final Random rnd = new Random();

   @PostConstruct
   public void startProcessing() {
      this.executor.schedule(this::probe, 1, SECONDS); // ๋ฐ์ดํ„ฐ ๋ฐœํ–‰ ์‹œ์ž‘ 
   }

   private void probe() {
      double temperature = 16 + rnd.nextGaussian() * 10;
      publisher.publishEvent(new Temperature(temperature)); // ์ด๋ฒคํŠธ ๋ฐ์ดํ„ฐ ๋ฐœํ–‰ 
      executor.schedule(this::probe, rnd.nextInt(5000), MILLISECONDS); // 5์ดˆํ›„ ์žฌ๋ฐœํ–‰
   }
}

ScheduledExecutorService ์™€ ApplicationEventPublisher ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•ด Temperature ๋ฐ์ดํ„ฐ๋ฅผ ๊ณ„์† ๋ฐœํ–‰ํ•œ๋‹ค. @EventListener ๋ฅผ ์‚ฌ์šฉํ•ด ApplicationEventPublisher ์— ๋ฐœํ–‰๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค.

@Slf4j
@RestController
public class TemperatureController {
    static final long SSE_SESSION_TIMEOUT = 5 * 1000L;
    // ์—ฐ๊ฒฐ ํด๋ผ์ด์–ธํŠธ ๊ด€๋ฆฌ list
    private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>();

    // TemperatureSensor ์˜ ApplicationEventPublisher ์—์„œ ๋ฐœํ–‰๋˜๋Š” ๋ฐ์ดํ„ฐ ๋Œ€์‘
    @Async
    @EventListener
    public void handleMessage(Temperature temperature) {
        log.info(format("Temperature: %4.2f C, active subscribers: %d", temperature.getValue(), clients.size()));
        // ๊ด€๋ฆฌ๋˜๋Š” ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ๋ฐœํ–‰๋œ Temperature ๋ฐ์ดํ„ฐ ์ „๋‹ฌ ๋ฐ ์˜ˆ์™ธ๋ฐœ์ƒ ํด๋ผ์ด์–ธํŠธ ์‚ญ์ œ์ฒ˜๋ฆฌ
        List<SseEmitter> deadEmitters = new ArrayList<>();
        clients.forEach(emitter -> {
            try {
                Instant start = Instant.now();
                emitter.send(temperature, MediaType.APPLICATION_JSON);
                log.info("Sent to client, took: {}", Duration.between(start, Instant.now()));
            } catch (Exception ignore) {
                deadEmitters.add(emitter);
            }
        });
        clients.removeAll(deadEmitters);
    }

    @RequestMapping(value = "/temperature-stream", method = RequestMethod.GET)
    public SseEmitter events(HttpServletRequest request) {
        log.info("SSE stream opened for client: " + request.getRemoteAddr());
        SseEmitter emitter = new SseEmitter(SSE_SESSION_TIMEOUT); // 5 ์ดˆ๊ฐ„ ์—ฐ๊ฒฐ
        clients.add(emitter); // ๊ด€๋ฆฌ emitter ๋ชฉ๋ก์— ์ถ”๊ฐ€

        // Remove SseEmitter from active clients on error or client disconnect
        emitter.onTimeout(() -> clients.remove(emitter));
        emitter.onCompletion(() -> clients.remove(emitter));

        return emitter;
    }
    // ์œ„์— ์„ค์ •๋œ 5์ดˆ๊ฐ€ ์ง€๋‚˜๋ฉด AsyncRequestTimeoutException ์ด ๋ฐœ์ƒํ•˜๊ณ  ํ˜ธ์ถœ๋จ.  
    @ExceptionHandler(value = AsyncRequestTimeoutException.class)
    public ModelAndView handleTimeout(HttpServletResponse rsp) throws IOException {
        log.warn("handle timeout");
        if (!rsp.isCommitted()) {
            rsp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
        }
        return new ModelAndView();
    }
}

SseEmitter ๋ฅผ ์ง€์†์ ์œผ๋กœ ์œ ์ง€, ๊ด€๋ฆฌํ•œ๋‹ค. ๋ฐ˜ํ™˜ ํƒ€์ž…์ด ResponseEntity, Map ๊ณผ ๊ฐ™์€ ๊ฐ์ฒด๊ฐ€ ์•„๋‹Œ SseEmitter ์ธ ๊ฒƒ์ด ์–ด์ƒ‰ํ•˜๋‹ค.

WebFlux ์—์„  Flux ์™€ Spring 5.0 ์— ์ถ”๊ฐ€๋œ ServerSentEvent ๋ฅผ ์‚ฌ์šฉํ•ด SSE ํ”„๋กœํ† ์ฝœ์„ ์ง€์›ํ•œ๋‹ค.

@RestController
@RequiredArgsConstructor
public class ServerSentController {

    private final StocksService stocksService;

    @GetMapping("/sse/stocks")
    public Flux<ServerSentEvent<?>> streamStocks() {
        return stocksService.stream() // Flux<StockItem> ๋ฐ˜ํ™˜
                .map(item -> ServerSentEvent
                        .builder(item)
                        .event("StockItem")
                        .id(item.getId())
                        .build());
    }

    @GetMapping("/sse/stocks2")
    public Flux<StockItem> streamStocks2() {
        return stocksService.stream();
    }
}

WebFlux ๋‚ด๋ถ€์—์„œ ๋ฐœํ–‰์›์†Œ๋ฅผ ServerSentEvent ๋กœ ๋ž˜ํ•‘ ํ•˜๊ธฐ์— ๋‹จ์ˆœ Flux<> ๋งŒ ์ปจํŠธ๋กค๋Ÿฌ ๋ฉ”์„œ๋“œ์—์„œ ๋ฐ˜ํ™˜ํ•ด๋„ ๋œ๋‹ค.

์Šคํ”„๋ง ์‹œํ๋ฆฌํ‹ฐ with WebFlux

๊ธฐ์กด ์„œ๋ธ”๋ฆฟ ๊ธฐ๋ฐ˜ ์Šคํ”„๋ง ๋ถ€ํŠธ๋Š” ํ•˜๋‚˜์˜ ์Šค๋ ˆ๋“œ์— ํ•˜๋‚˜์˜ ์—ฐ๊ฒฐ์ด ์ฒ˜๋ฆฌ๋˜์–ด ThreadLocal ์— SecurityContext ๋ฅผ ์ €์žฅํ•ด ์—ฐ๊ฒฐ๋™์•ˆ ๋ณด์•ˆ์ฒ˜๋ฆฌ๋ฅผ ์ง„ํ–‰ํ–ˆ์ง€๋งŒ

๋ฆฌ์•กํ‹ฐ๋ธŒ๋Š” ํ•˜๋‚˜์˜ ์—ฐ๊ฒฐ์— ์—ฌ๋Ÿฌ๊ฐœ์˜ ์Šค๋ ˆ๋“œ๊ฐ€ ๊ผฌ์—ฌ์žˆ์„ ์ˆ˜ ์žˆ์–ด Reactor Context ๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค. spring-boot-starter-security ๋ชจ๋“ˆ ์—ญ์‹œ ๊ธฐ์กด ์„œ๋ธ”๋ฆฟ ๊ธฐ๋ฐ˜ WebMVC ์—์„œ WebFlux ๋ฅผ ์ง€์›ํ•  ์ˆ˜ ์žˆ๋„๋ก ์—…๋ฐ์ดํŠธ ๋˜์—ˆ๋‹ค.

ReactiveSecurityContextHolder

WebMVC ์—์„  SecurityContextHolder ์—์„œ SecurityContext ๋ฅผ ๊ฐ€์ ธ์™”๋‹ค๋ฉด WebFlux ์—์„  ReactiveSecurityContextHolder ์—์„œ SecurityContext ๋ฅผ ๊ฐ€์ ธ์˜จ๋‹ค.

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
public class SecuredProfileController {

    private final ProfileService profileService;

    @GetMapping("/profiles")
    public Mono<Profile> getProfile() {
        return ReactiveSecurityContextHolder.getContext() // Mono<SecurityContext> ๋ฐ˜ํ™˜
            .map(SecurityContext::getAuthentication)
            .flatMap(auth -> profileService.getByUser(auth.getName()));
    }
}

๋กœ๊ทธ์ธํ•œ ์œ ์ €์˜ ์ •๋ณด๋ฅผ SecurityContext ๊ฐ€์ ธ์™€ Profile ์—์„œ ๊ฒ€์ƒ‰ ํ›„ ์ถœ๋ ฅํ•œ๋‹ค.

๋‹น์—ฐํžˆ SecurityContext::getAuthentication ๋ฉ”์„œ๋“œ๋Š” ๋ฆฌ์•กํ„ฐ ์ปจํ…์ŠคํŠธ ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

// ReactiveSecurityContextHolder.java

public static Mono<SecurityContext> getContext() {
    return Mono.subscriberContext()
        .filter( c -> c.hasKey(SECURITY_CONTEXT_KEY))
        .flatMap( c-> c.<Mono<SecurityContext>>get(SECURITY_CONTEXT_KEY));
}

SecurityWebFilterChain

์ธ์ฆ๊ณผ์ •์„ ๊ฑฐ์น˜๋ ค๋ฉด ๋‚ด๋ถ€ ์ปจํ…์ŠคํŠธ์— ์—‘์„ธ์Šค ํ•˜๋ ค๊ณ  ํ•˜๋ฉด SecurityContext ๊ฐ€ ํ•ด๋‹น ๋ฆฌ์•กํ„ฐ ์ปจํ…์ŠคํŠธ ์— ์กด์žฌํ•ด์•ผํ•˜๊ณ  Authentication ๊ฐ์ฒด๊ฐ€ SecurityContext ์•ˆ์— ํ• ๋‹น๋˜์–ด์•ผ ํ•œ๋‹ค.

์ด ๋ชจ๋“  ๊ณผ์ •์„ ReactorContextWebFilter ์— SecurityWebFilterChain ์ ์šฉํ•˜๊ณ  ์ด๋ฅผ ํ†ตํ•ด ๋ฆฌ์•กํ„ฐ ์ปจํ…์ŠคํŠธ ์•ˆ์— SecurityContext ๊ฐ์ฒด์™€ Authentication ๊ฐ์ฒด๋ฅผ ์ง‘์–ด๋„ฃ๋Š”๋‹ค.

SecurityContext ๋ฅผ ์ง‘์–ด ๋„ฃ๋Š” ํ•จ์ˆ˜๋Š” ServerSecurityContextRepository ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

package org.springframework.security.web.server.context;

public interface ServerSecurityContextRepository {
	Mono<Void> save(ServerWebExchange exchange, SecurityContext context);
	Mono<SecurityContext> load(ServerWebExchange exchange);
}

SecurityContext ๋ฅผ ํŠน์ • ServerWebExchange์— ์ €์žฅ, ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

@Configuration
@EnableReactiveMethodSecurity // ๋ณ„๋„์˜ MethodInterceptor ์‚ฌ์šฉ์‹œ ํ•„์š”ํ•จ
public class SecurityConfiguration {

    @Bean // ReactorContextWebFilter ์— ์ ์šฉํ•  ์‹œํ๋ฆฌํ‹ฐ ํ•„ํ„ฐ
    public SecurityWebFilterChain securityFilterChainConfigurer(ServerHttpSecurity httpSecurity) {
        // ๊ธฐ์กด mvc ์—์„œ ์‚ฌ์šฉํ•˜๋˜ HttpSecurity ์—์„œ webflux ์šฉ์œผ๋กœ ์ •์˜๋œ ServerHttpSecurity
        // ๊ธฐ์กด spring security ์‚ฌ์šฉ ๋ฐฉ์‹๊ณผ ํฌ๊ฒŒ ๋‹ค๋ฅด์ง€ ์•Š๋‹ค.  
        return httpSecurity
            .authorizeExchange()
            .anyExchange().permitAll().and()
            .httpBasic().and()
            .formLogin().and()
            .build();
    }

    private static final Pattern PASSWORD_ALGORITHM_PATTERN = Pattern.compile("^\\{.+}.*$");
    private static final String NOOP_PASSWORD_PREFIX = "{noop}";

    @Bean // ๋กœ๊ทธ์ธ์— ์‚ฌ์šฉํ•  ํ…Œ์ŠคํŠธ ์‚ฌ์šฉ์ž ์ƒ์„ฑ
    public MapReactiveUserDetailsService reactiveUserDetailsService(ObjectProvider<PasswordEncoder> passwordEncoder) {
        return new MapReactiveUserDetailsService(
            User.withUsername("user")
                .password("user")
                .passwordEncoder(p -> getOrDeducePassword(p, passwordEncoder.getIfAvailable()))
                .roles("USER")
                .build(),
            User.withUsername("admin")
                .password("admin")
                .passwordEncoder(p -> getOrDeducePassword(p, passwordEncoder.getIfAvailable()))
                .roles("USER", "ADMIN")
                .build());
    }

    private String getOrDeducePassword(String password, PasswordEncoder encoder) {
        if (encoder != null || PASSWORD_ALGORITHM_PATTERN.matcher(password).matches()) {
            return password;
        }
        return NOOP_PASSWORD_PREFIX + password;
    }
}

WebFlux with JWT

Custom SecurityContextRepository, AuthenticationManager

์Šคํ”„๋ง ์‹œํ๋ฆฌํ‹ฐ๋Š” SecurityContextRepository ๋ฅผ ํ†ตํ•ด SecurityContext ๋ฅผ ๋ฆฌ์•กํ„ฐ ์ปจํ…์ŠคํŠธ์— ์ €์žฅํ•˜๊ณ  ์‚ญ์ œํ•œ๋‹ค.

default ์Šคํ”„๋ง ์‹œํ๋ฆฌํ‹ฐ์˜ ๊ฒฝ์šฐ DB ๋กœ๋ถ€ํ„ฐ UserDetails ๋ฅผ ๊ฐ€์ ธ์™€ ๋“ฑ๋กํ•ด๋‘๊ณ  ์‚ฌ์šฉํ•˜๊ฒ ์ง€๋งŒ ์šฐ๋ฆฌ๋Š” JWT ๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ์— ServerSecurityContextRepository ์ƒ์†๋ฐ›์•„ ์ปค์Šคํ„ฐ๋งˆ์ด์ง• ํ•ด์•ผํ•œ๋‹ค.

@Component
@RequiredArgsConstructor
public class SecurityContextRepository implements ServerSecurityContextRepository {

    private final AuthenticationManager authenticationManager;

    @Override
    public Mono<Void> save(ServerWebExchange swe, SecurityContext sc) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    public Mono<SecurityContext> load(ServerWebExchange swe) {
        ServerHttpRequest request = swe.getRequest();
        String authToken = request.getHeaders().getFirst(HttpHeaders.AUTHORIZATION);
        if (authToken != null) {
            Authentication auth = new UsernamePasswordAuthenticationToken(authToken, authToken);
            return authenticationManager
                    .authenticate(auth)
                    .map(authentication -> new SecurityContextImpl(authentication));
        } else {
            return Mono.empty();
        }
    }
}

SecurityContext ๋ฅผ ์ƒ์„ฑ ๋ฐ ์ €์žฅํ•˜๊ธฐ request ํ—ค๋”๋กœ ๋ถ€ํ„ฐ JWT ํ† ํฐ์„ ๊ฐ€์ ธ์™€ AuthenticationManager๋กœ ๋„˜๊ธฐ๋Š” ์ฝ”๋“œ๊ฐ€ load ์— ์ •์˜๋˜์–ด ์žˆ๋‹ค.

@Component
@RequiredArgsConstructor
public class AuthenticationManager implements ReactiveAuthenticationManager {

    private final JwtTokenUtil jwtUtil;

    @Override
    public Mono<Authentication> authenticate(Authentication authentication) {
        String authToken = authentication.getCredentials().toString();
        if (!jwtUtil.validateToken(authToken)) {
            return Mono.empty();
        }
        Claims claims = jwtUtil.getAllClaimsFromToken(authToken);
        String role = claims.get("role", String.class);
        List<GrantedAuthority> authorities = Collections.singletonList(new SimpleGrantedAuthority(role));
        return Mono.just(new UsernamePasswordAuthenticationToken(claims.getSubject(), null, authorities));
    }
}

AuthenticationManager ๋Š” ์ „๋‹ฌ๋ฐ›์€ ํ† ํฐ์œผ๋กœ role ์„ ๊บผ๋‚ด์–ด Authority ๋ฅผ ์ง€์ •ํ•˜๊ณ  Authentication ์ธ์ฆ ๊ฐ์ฒด๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

SecurityContextPath ๊ฐ€ ๋ฆฌ์•กํ„ฐ ์ปจํ…์ŠคํŠธ๋กœ ๋ณ€๊ฒฝ๋˜์—ˆ์„ ๋ฟ ๊ธฐ์กด WebMVC ๋ชจ๋ธ์˜ ์Šคํ”„๋ง ์‹œํ๋ฆฌํ‹ฐ ๋ฐฉ์‹๊ณผ ๋น„์Šทํ•˜๋‹ค.

@Bean // ReactorContextWebFilter ์— ์ ์šฉํ•  ์‹œํ๋ฆฌํ‹ฐ ํ•„ํ„ฐ
public SecurityWebFilterChain securityFilterChainConfigurer(ServerHttpSecurity httpSecurity) {
    // ๊ธฐ์กด mvc ์—์„œ ์‚ฌ์šฉํ•˜๋˜ HttpSecurity ์—์„œ webflux ์šฉ์œผ๋กœ ์ •์˜๋œ ServerHttpSecurity
    // ๊ธฐ์กด spring security ์‚ฌ์šฉ ๋ฐฉ์‹๊ณผ ํฌ๊ฒŒ ๋‹ค๋ฅด์ง€ ์•Š๋‹ค.
    return httpSecurity
        .exceptionHandling()
        //.authenticationEntryPoint((swe, e) -> Mono.fromRunnable(() -> // ๋ฏธ ๋กœ๊ทธ์ธ
        //        swe.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED)))
        //.accessDeniedHandler((swe, e) -> Mono.fromRunnable(() -> // ๋ฏธ ๋กœ๊ทธ์ธ
        //        swe.getResponse().setStatusCode(HttpStatus.FORBIDDEN)))
        .authenticationEntryPoint((swd, e) -> Mono.error(new AuthenticationCredentialsNotFoundException("")))
        .accessDeniedHandler((swe, e) -> Mono.error(new AccessDeniedException("")))
        .and()
        .authenticationManager(authenticationManager)
        .securityContextRepository(securityContextRepository)
        .csrf().disable()
        .cors().disable()
        .httpBasic().disable() // Basic Authentication disable
        .formLogin().disable()
        .authorizeExchange()
        .pathMatchers("/member/join", "/member/login").permitAll()
        .pathMatchers("/member/**", "/rent/**").authenticated()
        .pathMatchers("/admin/**").hasAnyRole("ROLE_ADMIN")
        .anyExchange().permitAll()
        .and()
        .build();
}

๋งˆ์ง€๋ง‰์œผ๋กœ ServerHttpSecurity ์— ์ปค์Šคํ„ฐ๋งˆ์ด์ง•ํ•œ SecurityContextRepository, AuthenticationManager ๋ฅผ ๋“ฑ๋กํ•˜๊ณ  ๋ณ„๋„์˜ ์—๋Ÿฌ ํ•ธ๋“ค๋ง ์ฒ˜๋ฆฌ๋ฅผ ํ•œ๋‹ค๋ฉด ์—๋Ÿฌ๋ฅผ ๋ฐ˜ํ™˜ํ•ด ํ•ธ๋“ค๋ง, ์—†๋‹ค๋ฉด HttpStatus.UNAUTHORIZED ๋กœ ๋‹จ์ˆœ HTTP Status ๋งŒ ๋ฐ˜ํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

์Šคํ”„๋ง ๋ฐ์ดํ„ฐ with WebFlux

๊ธฐ์กด์— WebMVC ๋ฐฉ์‹์˜ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ ‘๊ทผ์‹œ JDBC ๋ฅผ ๊ตฌํ˜„ํ•œ ๋“œ๋ผ์ด๋ฒ„ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ๋ฅผ ์‚ฌ์šฉํ•ด ์ ‘๊ทผํ•ด์™”๋‹ค.

๋ฆฌ์•กํ‹ฐ๋ธŒํ•œ DB ํ†ต์‹ ๋„ HTTP ์™€ ๋‹ค๋ฅด์ง€ ์•Š๋‹ค. ์ด๋ก ์ ์œผ๋กœ DB ์ ‘๊ทผ์šฉ ์„œ๋น„์Šค๋ฅผ ์ƒ์„ฑํ•˜๊ณ  WebClient ๋ฅผ ์‚ฌ์šฉํ•ด DB ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜จ๋‹ค๋ฉด ๋น„๋™๊ธฐ DB ์ ‘๊ทผ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ๊ตฌํ˜„ํ•œ ๊ฒƒ ๊ณผ ๋‹ค๋ฆ„์—†๋‹ค.

๋‹คํ–‰์ด๋„ ๋‹ค์–‘ํ•œ DB ๋ฒค๋”์‚ฌ์—์„œ ์ž๋ฐ” ๋น„๋™๊ธฐ DB ์—ฐ๊ฒฐ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์ธ ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋“œ๋ผ์ด๋ฒ„๋ฅผ ์ œ๊ณตํ•จ์œผ๋กœ ๋‹จ์ˆœํžˆ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋งŒ ์ถ”๊ฐ€ํ•˜๋ฉด ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ ˆ์ด์–ด์— ๋Œ€ํ•œ ๋…ผ ๋ธ”๋กํ‚น ์—‘์„ธ์Šค๋ฅผ ํ•  ์ˆ˜ ์žˆ๋‹ค.

spring-boot-starter-data-mongodb-reactive spring-boot-starter-data-cassandra-reactive spring-boot-starter-data-redis-reactive spring-boot-starter-data-r2dbc

์Šคํ”„๋ง ๋ฐ์ดํ„ฐ ํŒ€์—์„œ ๊ธฐ์กด์— ์‚ฌ์šฉํ•œ๋˜ Repository ํŒจํ„ด์„ ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋ฐฉ์‹์—๋„ ๋˜‘๊ฐ™์ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ์ถ”์ƒํ™”๋ฅผ ํ†ตํ•ด ๊ตฌํ˜„ํ•ด๋‘์—ˆ๋‹ค.

๊ฐ ๋ชจ๋“ˆ๋“ค์ด ReactiveCurdRepository ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์‚ฌ์šฉํ•ด Reactor ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์™€ ํ†ตํ•ฉ๋˜์–ด ์ž์—ฐ์Šค๋Ÿฝ๊ฒŒ ๋ฆฌ์•กํ‹ฐ๋ธŒํ•˜๊ฒŒ ์ฝ”๋“œ์ž‘์„ฑ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

์Šคํ”„๋ง ๋ฐ์ดํ„ฐ ๋ชฝ๊ณ DB ๋ฆฌ์•กํ‹ฐ๋ธŒ

NoSQL ์˜ ๊ฒฝ์šฐ ๊ฐ ๋ฒค๋”์‚ฌ์—์„œ ํ†ตํ•ฉ๋œ ๊ทœ์•ฝ์ด ์—†๋‹ค. ๊ฐ ๋ฒค๋”์‚ฌ์—์„œ ์ž๊ธฐ๋“ค๋งŒ์˜ ๋“œ๋ผ์ด๋ฒ„ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์ œ๊ณตํ•˜๊ณ  ์Šคํ”„๋ง ๋ฐ์ดํ„ฐ ํŒ€์€ ์Šคํ”„๋ง์—์„œ ํ•ด๋‹น ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋“ค์„ ์‰ฝ๊ฒŒ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ๊ฐ์ข… ๋ชจ๋“ˆ์„ ๊ฐœ๋ฐœํ•˜๊ณ  ์žˆ๋‹ค

NoSQL DB ๋Š” ์ตœ๊ทผ์— ๋งŒ๋“ค์–ด ์ ธ์„œ ๋Œ€๋ถ€๋ถ„ ๋ฒค๋”์‚ฌ๊ฐ€ ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋“œ๋ผ์ด๋ฒ„ ๋ฅผ ์ œ๊ณตํ•˜๊ณ  ์žˆ์œผ๋ฉฐ ์Šคํ”„๋ง ๋ฐ์ดํ„ฐ ํŒ€์€ ๋ชฝ๊ณ DB ์—์„œ ์ œ๊ณตํ•˜๋Š” ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋“œ๋ผ์ด๋ฒ„ ๋ฅผ ์‰ฝ๊ณ  ํŽธํ•˜๊ฒŒ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก spring-boot-starter-data-mongodb-reactive ๋ชจ๋“ˆ์„ ์ž‘์„ฑํ•ด๋‘์—ˆ๋‹ค.

ํ•ด๋‹น ๋ชจ๋“ˆ์„ ์‚ฌ์šฉํ•˜๋ฉด ์Šคํ”„๋ง ํŒ€์—์„œ ๋งŒ๋“  Repository ํŒจํ„ด์„ ์‚ฌ์šฉํ•ด ๋ฉ”์„œ๋“œ๋ช… ๊ธฐ๋ฐ˜์œผ๋กœ ์ฟผ๋ฆฌ๋ฌธ์ด ์ž๋™ ์ƒ์„ฑ/์‚ฌ์šฉ ํ•  ์ˆ˜ ์žˆ๋‹ค.

ReactiveMongoRepository

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

// ReactiveCurdRepository ๊ตฌํ˜„์ฒด
@Repository
public interface BookReactiveMongoRepository extends ReactiveMongoRepository<Book, ObjectId> {
    Mono<Book> findOneByTitle(Mono<String> title);

    Flux<Book> findManyByTitleRegex(String regexp);

    @Meta(maxScanDocuments = 3)
    Flux<Book> findByAuthorsOrderByPublishingYearDesc(Publisher<String> authors);

    @Query("{ 'authors.1': { $exists: true } }")
    Flux<Book> booksWithFewAuthors();

    Flux<Book> findByPublishingYearBetweenOrderByPublishingYear(
            Integer from,
            Integer to,
            Pageable pageable
    );
}

ReactiveMongoTemplate

ReactiveMongoRepository ์™ธ์—๋„ ReactiveMongoTemplate ๋ฅผ ์‚ฌ์šฉํ•ด ์ฟผ๋ฆฌ ์กฐ์ž‘์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

@Service
@RequiredArgsConstructor
public class RxMongoTemplateQueryService {
    private static final String BOOK_COLLECTION = "book";

    private final ReactiveMongoTemplate mongoTemplate; // ReactiveMongoTemplate implements ReactiveMongoOperations

    public Flux<Book> findBooksByTitle(String title) {
        Query query = Query.query(new Criteria("title")
            .regex(".*" + title + ".*"))
            .limit(100);
        return mongoTemplate.find(query, Book.class, BOOK_COLLECTION);
    }
}

MongoClient

๋ชฝ๊ณ DB ์—์„œ ์ œ๊ณตํ•˜๋Š” ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋“œ๋ผ์ด๋ฒ„ ๊ตฌํ˜„์ฒด๊ฐ€ com.mongodb.reactivestreams.client.MongoClient ํด๋ž˜์Šค์ด๋‹ค.

https://mongodb.github.io/mongo-java-driver-reactivestreams/

org.mongodb:mongodb-driver-reactivestreams ๋ชจ๋“ˆ์—์„œ ์ œ๊ณตํ•˜๋ฉฐ spring-boot-starter-data-mongodb-reactive ์—์„œ ๋‚ด๋ถ€์ ์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค.

MongoClient ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•ด๋„ ์ฟผ๋ฆฌ์กฐ์ž‘์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

@Service
@RequiredArgsConstructor
public class RxMongoDriverQueryService {

    private final MongoClient mongoClient;
    
    public Flux<Book> findBooksByTitle(String title, boolean negate) {
        return Flux.defer(() -> {
            Bson query = Filters.regex("title", ".*" + title + ".*");
            if (negate) query = Filters.not(query);
            return mongoClient
                .getDatabase("test-db")
                .getCollection("book")
                .find(query);
        }).map(doc -> new Book(
            doc.getObjectId("id"), // Document
            doc.getString("title"),
            doc.getInteger("pubYear")));
    }
}

ํŠธ๋žœ์žญ์…˜(ReactiveMongoTemplate.inTransaction)

MongoDB 4.0 ๋ฒ„์ „ ์ด์ „๊นŒ์ง€ ํ•˜๋‚˜์˜ ๋ฌธ์„œ ์— ๋Œ€ํ•ด์„œ๋งŒ ํŠธ๋žœ์žญ์…˜์„ ์ œ๊ณตํ•˜๋Š” Single-Document Transaction ๊ธฐ๋Šฅ๋งŒ ์žˆ์—ˆ๋‹ค. ํ•˜๋‚˜์˜ ๋ฌธ์„œ์— ๋ชจ๋“  ์ •๋ณด๋ฅผ ์‚ฝ์ž…ํ•˜์—ฌ ์‚ฌ์šฉํ•˜๊ธฐ์— ํ•˜๋‚˜์˜ ํŠธ๋žœ์žญ์…˜์€ ํ•˜๋‚˜์˜ ๋ฌธ์„œ๋งŒ ๊ฑด๋“ค์—ฌ์„œ Single-Document Transaction ์œผ๋กœ๋„ ์ถฉ๋ถ„ํ•ด์•ผ ํ•˜์ง€๋งŒ ํ•ญ์ƒ ์˜ˆ์™ธ๊ฐ€ ์žˆ๋Š”๋ฒ•, ๊ฒฐ๊ตญ ์—ฌ๋Ÿฌ ๋ฌธ์„œ์— ๋Œ€ํ•œ ํŠธ๋žœ์žญ์…˜ Multi-Document Transaction ๊ธฐ๋Šฅ์„ MongoDB 4.0 ๋ถ€ํ„ฐ ์ง€์›ํ•œ๋‹ค.

WiredTiger ์Šคํ† ๋ฆฌ์ง€ ์—”์ง„์˜ ์ƒค๋”ฉ์„ค์ •์ด ๋˜์–ด ์žˆ์ง€ ์•Š๊ณ  ๋ณต์ œ์„ค์ •์ผ ๊ฒฝ์šฐ์—๋งŒ Multi-Document Transaction ์„ ์ง€์›ํ•œ๋‹ค.

ReactiveMongoTemplate ์˜ inTransaction ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด

private Mono<TxResult> doTransferMoney(String from, String to, Integer amount) {
    return mongoTemplate.inTransaction().execute(session -> session
        .findOne(queryForOwner(from), Wallet.class)
        .flatMap(fromWallet -> session
            .findOne(queryForOwner(to), Wallet.class)
            .flatMap(toWallet -> {
                if (fromWallet.hasEnoughFunds(amount)) {
                    fromWallet.withdraw(amount);
                    toWallet.deposit(amount);

                    return session.save(fromWallet)
                        .then(session.save(toWallet))
                        .then(ReactiveMongoContext.getSession())
                        // An example how to resolve the current session
                        .doOnNext(tx -> log.info("Current session: {}", tx))
                        .then(Mono.just(TxResult.SUCCESS));
                } else {
                    return Mono.just(TxResult.NOT_ENOUGH_FUNDS);
                }
            })))
        .onErrorResume(e -> Mono.error(new RuntimeException("Conflict")))
        .last();
}

์Šคํ”„๋ง ๋ฐ์ดํ„ฐ R2DBC

R2DBC: Reactive Relational Database Connectivity

https://r2dbc.io/ https://spring.io/projects/spring-data-r2dbc https://spring.io/projects/spring-data-r2dbc

์•„๋ž˜์™€ ๊ฐ™์€ DBMS ์— ๋Œ€ํ•˜์—ฌ r2dbc ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์ œ๊ณต

H2 (io.r2dbc:r2dbc-h2)
MariaDB (org.mariadb:r2dbc-mariadb)
Microsoft SQL Server (io.r2dbc:r2dbc-mssql)
MySQL (dev.miku:r2dbc-mysql)
jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)
Postgres (io.r2dbc:r2dbc-postgresql)
Oracle (com.oracle.database.r2dbc:oracle-r2dbc)

์ง€๊ธˆ๊นŒ์ง€ ์Šคํ”„๋ง JDBC ํ˜น์€ ์Šคํ”„๋ง ๋ฐ์ดํ„ฐ JDBC ํ˜น์€ JPA ๋ฅผ ์‚ฌ์šฉํ•ด ์ƒ์„ฑ๋œ Hikari CP ์•ˆ์˜ ์—ฐ๊ฒฐ๊ฐ์ฒด๊ฐ€ JDBC ๋“œ๋ผ์ด๋ฒ„๋ฅผ ์‚ฌ์šฉํ•ด ๊ด€๊ณ„ํ˜• DB ๋ฅผ ์‚ฌ์šฉํ•ด ์™”๋‹ค.

@Repository
public interface BookSpringDataJdbcRepository extends CrudRepository<Book, Integer> {
    
    @Query("SELECT * FROM book b WHERE b.title = :title")
    CompletableFuture<Book> findBookByTitleAsync(@Param("title") String title);

}

JDBC, JPA ๋“ฑ์˜ ๊ด€๊ณ„ํ˜• DB ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋“ค์€ ๋ชจ๋‘ ๋™๊ธฐ/๋ธ”๋Ÿญํ‚น ๋ฐฉ์‹์œผ๋กœ ๋™์ž‘ํ•œ๋‹ค.

๋‹คํ–‰์ด๋„ spring data jdbc ๋ฅผ ๊ฐœ๋ฐœํ•œ ์Šคํ”„๋ง ๋ฐ์ดํ„ฐ Relational ํ”„๋กœ์ ํŠธ ํŒ€์—์„œ ๋ฆฌ์•กํ‹ฐ๋ธŒ์— ์ ํ•ฉํ•œ ์ž๋ฐ” DB ๋“œ๋ผ์ด๋ฒ„์ธ ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋“œ๋ผ์ด๋ฒ„๋ฅผ ๊ฐœ๋ฐœ์ค‘์ด๋‹ค. ์ด ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋“œ๋ผ์ด๋ฒ„ ๋ฅผ ์‚ฌ์šฉํ•œ ํ”„๋กœ์ ํŠธ๊ฐ€ R2DBC ํ”„๋กœ์ ํŠธ์ด๋‹ค.

๋”์ด์ƒ JDBC ๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ  ๋ฆฌ์•กํ‹ฐ๋ธŒ ์Šคํƒ์— ์ ํ•ฉํ•œ ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋“œ๋ผ์ด๋ฒ„๋ฅผ ์‚ฌ์šฉํ•ด DB ์— ์ ‘๊ทผ, ๋ฐ์ดํ„ฐ๋ฅผ ์กฐ์ž‘ํ•œ๋‹ค.

์•ˆํƒ€๊น๊ฒŒ๋„ JPA ๋Š” ๊ธฐ์กด ์ฝ”๋“œ๊ฐ€ ๋„ˆ๋ฌด ๋ณต์žกํ–ˆ๋Š”์ง€ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ง€์›์„ ํ•˜์ง€ ์•Š์„๊ฒƒ์œผ๋กœ ๋ณด์ธ๋‹ค.

ReactiveCrudRepository

@Repository
public interface MemberRepository extends ReactiveCrudRepository<Member, Long> {
    Mono<Member> findByName(String name);

    Mono<Member> findByUserName(String name);

    @Query("SELECT * FROM member WHERE name = :name AND user_name = :userName")
    Mono<Member> findByNameAndUserName(String name, String userName);
}

R2dbcEntityTemplate

@Service
@RequiredArgsConstructor
public class MemberDynamicRepository {
    private final R2dbcEntityTemplate r2dbcEntityTemplate;

    public Flux<Member> findTest(String userName) {
        Query query = Query.query(where("user_name").like("%" + userName + "%"))
                .limit(10)
                .offset(0);
        return r2dbcEntityTemplate.select(Member.class)
                .matching(query)
                .all();
    }
}

์Šคํ”„๋ง ๋ฐ์ดํ„ฐ Redis

spring-boot-starter-data-redis-reactive ๋ชจ๋“ˆ์„ ์‚ฌ์šฉ ReactiveRedisTemplate ํด๋ž˜์Šค๊ฐ€ Redis ์ปค๋„ฅ์…˜์˜ ํ•ต์‹ฌํด๋ž˜์Šค์ด๋‹ค.

๋‹ค๋ฅธ ์Šคํ”„๋ง ๋ฐ์ดํ„ฐ ํ”„๋กœ์ ํŠธ์™€ ๋‹ฌ๋ฆฌ Repository ๊ฐ€ ์กด์žฌํ•˜์ง€ ์•Š์Œ ์ผ๋ฐ˜์ ์ธ ๋ฐ์ดํ„ฐ ๊ด€๋ฆฌ ์™ธ์—๋„ ๊ตฌ๋…/๋ฐœํ–‰ ๊ตฌ์กฐ์˜ ๋ฉ”์‹œ์ง€ ๊ธฐ๋Šฅ๋„ ์ง€์›ํ•œ๋‹ค.

spring-boot-starter-data-redis-reactive ๋ชจ๋“ˆ์€ ๋‚ด๋ถ€์ ์œผ๋กœ Lettuce ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

https://lettuce.io/, ํ˜„์žฌ non blokcing ์„ ์ง€์›ํ•˜๋Š” redis ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ Lettuce ๊ฐ€ ์œ ์ผํ•˜๋‹ค. ๋˜ํ•œ Lettuce ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ๋‚ด์—์„œ Reactor ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

public class Sample {
    private String name;
    private String description;
}


@Configuration
public class RedisConfig {
    @Value("${redis.host}")
    private String host;
    @Value("${redis.port}")
    private Integer port;

    @Bean
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
        return new LettuceConnectionFactory(host, port);
    }
    
    @Bean
    public ReactiveRedisTemplate<String, Sample> reactiveRedisTemplate(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        StringRedisSerializer keySerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<Sample> valueSerializer = new Jackson2JsonRedisSerializer<>(Sample.class);

        RedisSerializationContext.RedisSerializationContextBuilder<String, Sample> builder =
                RedisSerializationContext.newSerializationContext(keySerializer);

        RedisSerializationContext<String, Sample> context = builder.value(valueSerializer).build();

        return new ReactiveRedisTemplate(reactiveRedisConnectionFactory, context);
    }
}
@Service
@RequiredArgsConstructor
public class SampleService {
    private final ReactiveRedisTemplate<String, Sample> redisTemplate;

    public Mono<Boolean> put(String key, Sample sample) {
        return redisTemplate.opsForValue().set(key, sample);
    }

    public Mono<Sample> get(String key) {
        return redisTemplate.opsForValue().get(key);
    }

    public Flux<Sample> getAll(String keyPattern){
        return redisTemplate.keys(keyPattern)
                .flatMap(key-> redisTemplate.opsForValue().get(key));
    }

    public Mono<Boolean> delete(String key) {
        return redisTemplate.opsForValue().delete(key);
    }
}

๊ธฐํƒ€

ListenableFuture

์Šคํ”„๋ง์—์„œ ์ œ๊ณตํ•˜๋Š” Future ๊ตฌํ˜„ ํด๋ž˜์Šค

public interface ListenableFuture<T> extends Future<T> {
	void addCallback(ListenableFutureCallback<? super T> callback);
	void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
	default CompletableFuture<T> completable() {
		CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
		addCallback(completable::complete, completable::completeExceptionally);
		return completable;
	}
}

์œ„์˜ SseEmitter ์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ์Šคํ”„๋ง ๋ฆฌ์•กํ‹ฐ๋ธŒ์˜ ๋ฐ˜ํ™˜๋ฐ์ดํ„ฐ๋กœ ์‚ฌ์šฉ๋œ๋‹ค.

// ๋น„๋™๊ธฐ RestTemplate
AsyncRestTemplate httpClient = new AsyncRestTemplate();

@GetMapping
public ListenableFuture<?> requestData() {
    AsyncDatabaseClient databaseClient = new FakeAsyncDatabaseClient();
    // /hello ์˜ ํ˜ธ์ถœ๊ฒฐ๊ณผ๋ฅผ CompletableFuture ์œผ๋กœ ๋ฐ˜ํ™˜ํ•˜๋Š” ์–ด๋Žํ„ฐ
    CompletionStage<String> completionStage = AsyncAdapters.toCompletion(httpClient.execute(
        "http://localhost:8080/hello",
        HttpMethod.GET, null,
        new HttpMessageConverterExtractor<>(String.class, messageConverters) // http ์˜ body ๋ถ€๋ถ„ ์ปจ๋ฒ„ํ„ฐ๋“ค ์ง€์ •
    ));
    // CompletionStage(CompletableFuture ์˜ ์ธํ„ฐํŽ˜์ด์Šค) ๋ฅผ ListenableFuture ๋กœ ๋ณ€ํ™˜
    return AsyncAdapters.toListenable(databaseClient.store(completionStage));
}

AsyncRestTemplate.execute ๊ฐ€ ๋ฐ˜ํ™˜ํ•˜๋Š” ListenableFuture ๋ฅผ CompletionStage ๋กœ ๋ณ€ํ™˜ ๋ฐ˜ํ™˜๋œ CompletionStage ๋ฅผ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ €์žฅํ›„ ๋‹ค์‹œ ๋ฐ˜ํ™˜๋œ CompletionStage ๋ฅผ ListenableFuture ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.

์ผ๋ฐ˜์ ์œผ๋กœ ๊ฐ„๋‹จํ•œ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ๋Š” Future ๋ฅผ ๊ตฌํ˜„ํ•œ CompletableFuture(CompletionStage ๊ตฌํ˜„์ฒด) ๋ฅผ ์ฃผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค.

๋ฉ”์„œ๋“œ ์ •์˜์‹œ ๋ฐ˜ํ™˜๊ฐ’์„ ์Šคํ”„๋ง์—์„œ ๊ธฐ๋ณธ ์ œ๊ณตํ•˜๋Š” ListenableFuture ๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ๊ฐ์ฒด๋กœ ๋ฐ˜ํ™˜ํ•˜๊ธฐ ํž˜๋“ค๊ธฐ์— ์œ„์™€๊ฐ™์€ AsyncAdapters ๋ฅผ ์‚ฌ์šฉํ•ด ๋น„๋™๊ธฐ ๊ฒฐ๊ณผ๋ฅผ ListenableFuture ๋กœ ๋ณ€ํ™˜ํ•ด์ฃผ๋Š” ์–ด๋Žํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ํŽธํ•˜๋‹ค.

public final class AsyncAdapters {
    // ListenableFuture -> completionStage
    public static <T> CompletionStage<T> toCompletion(ListenableFuture<T> future) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        future.addCallback(completableFuture::complete, completableFuture::completeExceptionally);
        return completableFuture;
    }
    // completionStage -> ListenableFuture
    public static <T> ListenableFuture<T> toListenable(CompletionStage<T> stage) {
        SettableListenableFuture<T> future = new SettableListenableFuture<>();
        stage.whenComplete((v, t) -> {
            if (t == null) future.set(v);
            else future.setException(t);
        });
        return future;
    }
}

AsyncRestTemplate

์Šคํ”„๋ง ๋ฆฌ์•กํ‹ฐ๋ธŒ์—์„  ๋™๊ธฐ๋ฐฉ์‹์ธ ์ผ๋ฐ˜ RestTemplate ์„ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ  AsyncRestTemplate ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. ํ”ํžˆ์‚ฌ์šฉํ•˜๋Š” execute ๋ฉ”์„œ๋“œ์˜ ๋ฐ˜ํ™˜๊ฐ’์ด ListenableFuture ๊ฐ์ฒด์ด๋‹ค.

@Override
public <T> ListenableFuture<T> execute(String url, HttpMethod method, AsyncRequestCallback requestCallback,
        ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {

    URI expanded = getUriTemplateHandler().expand(url, uriVariables);
    return doExecute(expanded, method, requestCallback, responseExtractor);
}

์ถœ์ฒ˜ : https://kouzie.github.io/spring/Spring-Boot-%EC%8A%A4%ED%94%84%EB%A7%81-%EB%A6%AC%EC%95%A1%ED%8B%B0%EB%B8%8C-%EA%B0%9C%EC%9A%94/#webflux

Last updated