์คํ๋ง ์ฝ์ด for Reactive
์คํ๋ง 5.0 ์์๋ถํฐ ๋ฆฌ์กํฐ๋ธ ์ฝ๋๋ฅผ ์ํ ์ฌ๋ฌ๊ฐ์ง ํด๋์ค๋ค์ด ์์ , ์ถ๊ฐ๋์๋ค.
ReactiveAdapter, ReactiveAdapterRegistry
RxJava, Reactor
์์ ์ฌ์ฉํ๋ ๋ฐํ์ ํด๋์ค๋ฅผ Publihser
๋ก ๋ณํํด์ฃผ๋ Adapter
๊ฐ springframework.core
์ ์ถ๊ฐ๋์ด ์ฌ์ฉ ๊ฐ๋ฅํด์ก๋ค.
์๋์ฒ๋ผ ReactiveAdapter
๋ฅผ ์์๋ฐ์ RxJava Maybe ์ Publisher ๊ฐ์ ๋ณํ ์์
์ ํด์ฃผ๋ Adapter ๋ฅผ ์์ฑํด์ ์ฌ์ฉํ๊ฑฐ๋
Copy @Component
public class MaybeReactiveAdapter extends ReactiveAdapter {
public MaybeReactiveAdapter() {
/**
* Descriptor for a reactive type that can produce 0..1 values.
* @param type the reactive type
* @param emptySupplier a supplier of an empty-value instance of the reactive type
*/
super(ReactiveTypeDescriptor.singleOptionalValue(Maybe.class, Maybe::empty),
maybe -> ((Maybe<?>) maybe).toFlowable(), // Maybe->Publisher
publisher -> Flowable.fromPublisher(publisher).singleElement()); // Publisher->Maybe
}
}
ReactiveAdapterRegistry
๋ฅผ ์ฌ์ฉํด ์ฑ๊ธํด Instance
๋ณ์์ Adapter
์ฉ ์ฝ๋๋ฅผ ์์ฑํด ํ์ํ ๋ ๋ง๋ค ๊บผ๋ด์ด ์ธ ์ ์๋ค.
Copy @PostConstruct
public void init() {
ReactiveAdapterRegistry
.getSharedInstance()
.registerReactiveType(ReactiveTypeDescriptor.singleOptionalValue(Maybe.class, Maybe::empty),
maybe -> ((Maybe<?>) maybe).toFlowable(), // Maybe->Publisher
publisher -> Flowable.fromPublisher(publisher).singleElement()); // Publisher->Maybe
}
...
ReactiveAdapter adapter = ReactiveAdapterRegistry
.getSharedInstance()
.getAdapter(Maybe.class);
...
๋ฆฌ์กํฐ๋ธ I/O, ์ฝ๋ฑ
springframework.core.io
์ ์ ์ฅ๋ DataBuffer
, DataBufferUtils
๋ฅผ ์ฌ์ฉํ๋ฉด I/O ์์
์ด ํ์ํ ํ์ผ, ๋คํธ์ํฌ ์์์ผ๋ก ๋ถํฐ ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ ํํ๋ก ์์
์ ์ฒ๋ฆฌํ ์ ์๋ค. jav.nio.ByteBuffer
ํด๋์ค์ ํ๋ณํ ๊ธฐ๋ฅ์ ์ถ๊ฐํ์ฌ ๋ณด๋ค ์ฝ๊ฒ ์ฌ์ฉ ๊ฐ๋ฅํ๋ค.
Copy Flux<DataBuffer> reactiveHamlet = DataBufferUtils.read(
new DefaultResourceLoader().getResource("hamlet.txt"),
new DefaultDataBufferFactory(),
1024);
springframework.core.codec
์ ์ ์๋ ์ธํฐํ์ด์ค Encoder
, Decoder
๋ฅผ ์ฌ์ฉํ๋ฉด Non Blocking
๋ฐฉ์์ผ๋ก ์ง๋ ฌํ ๋ฐ์ดํฐ๋ฅผ ์๋ฐ๊ฐ์ฒด, ์๋ฐ๊ฐ์ฒด๋ฅผ ์ง๋ ฌํ ๋ฐ์ดํฐ๋ก ๋ณํ ๊ฐ๋ฅํ๋ค.
WebFlux
Sprinb Boot 2
์ ๋ฆฌ์กํฐ๋ธ ์น์๋ฒ๋ฅผ ์ํ WebFlux
๋ชจ๋ธ์ ์ฌ์ฉํ ์ ์๋๋ก spring-boot-starter-webflux
๋ผ๋ ์๋ก์ด ํจํค์ง๋ฅผ ์ถ๊ฐํ ์ ์๊ฒ ๋์๋ค.
ํด๋น ๋ชจ๋์ Reactive Stream Adapter
์์ ๊ตฌ์ถ๋๋ฉฐ Servlet 3.1+ ์ง์์๋ฒ(Tomcat, Jetty ๋ฑ)
, Netty
, Undertow
์๋ฒ์์ง์์ ๋ชจ๋ ์ง์ํ๋ค.
์์ ์์ง๋ค์ java 8
์ ์ถ๊ฐ๋ java NIO
๋ก ๊ตฌํ๋์ด Http ์์ฒญ์ ๋
ผ๋ธ๋ญํน์ผ๋ก ์ฒ๋ฆฌํ๋ค.
์ผ๋ฐ์ ์ WebMVC
๋ชจ๋๋ Spring 5.0
์ ์ด๋ฅด๋ฌ spring-boot-starter-web
Servlet 3.1
์์ง์ํ๋ฉด์ ์ผ๋ถ๋ถ์ ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ์ง์ํ๊ฒ ๋์๋ค.
ResponseBodyEmitterReturnValueHandler
ํด๋์ค๊ฐ ์
๊ทธ๋ ์ด๋ ๋๋ฉด์ ReactiveTypeHandler
ํ๋๋ฅผ ์ฌ์ฉํด WebMVC
์ ์ธํ๋ผ ๊ตฌ์กฐ๋ฅผ ํฌ๊ฒ ํด์น์ง ์๊ณ ์ปจํธ๋กค๋ฌ ๋ฉ์๋
๊ฐ ๋ฐํํ๋ Flux, Mono, Flowable
๋ฑ์ Publisher
(๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ)์ ์ฒ๋ฆฌํ๋ค.
๋ฌผ๋ก ์๋ธ๋ฆฟ API ๋ฅผ ์ฌ์ฉํ๊ธฐ์ ๋ธ๋กํน/์ค๋ ๋ํ
๋ฐฉ์์ ์ฌ์ฉํ๋ค.
WebFlux ๊ฐ์
๊ธฐ์กด WebMVC
๋ชจ๋ธ ๊ตฌ์กฐ๋ ์๋์ ๊ฐ๋ค.
ViewResolver
๋ Rest
๋ฐฉ์์์ ์๋ต๋๋ค.
๊ฐ์ข
์๋ธ๋ฆฟ ์ปจํ
์ด๋(Tomcat, JBoss
๋ฑ) ์์ฒญ์ ์๋ธ๋ฆฟ ํด๋์ค๋ฅผ ์์ํ DispatcherServlet
์ด ์คํ๋ง ๋ถํธ ์ปจํธ๋กค๋ฌ ๋งคํ์ ๋ฐ๋ผ ์์ฒญ์ ๋ถ๋ฐฐํ๋ค.
๊ทธ๋ฆผ์ฒ๋ผ ๊ธฐ์กด WebMVC
๋ฐฉ์์ ๋๊ธฐ/๋ธ๋กํน
๋ฐฉ์์ผ๋ก ๋์ํ๋ค.
์คํ๋ง ๋ถํธ์์ ๋ฆฌ์กํฐ๋ธ ๋ฐฉ์์ ์ฌ์ฉํ๋ ค๋ฉด Servlet 3.1+(Tomcat, Jetty ๋ฑ)
, Netty
, Undertow
์ ๊ฐ์ ์๋ฒ๋ฅผ ์ฌ์ฉํด ๋ฆฌ์กํฐ๋ธํ๊ฒ ๊ตฌ์กฐ๊ฐ ๋ณ๊ฒฝ๋์ด์ผ ํ๋๋ฐ
๋คํ์ด๋ ์คํ๋ง ํ๋ก์ ํธํ์ด ๋์ผํ ์ด๋
ธํ
์ด์
๊ธฐ๋ฐ ํ๋ก๊ทธ๋๋ฐ ๋ชจ๋ธ์ ์ฌ์ฉํ๋ฉด์ ๋น๋๊ธฐ/๋
ผ๋ธ๋กํน
์ผ๋ก ๋์ํ๋๋ก ์ด๋ฏธ ๊ฐ๋ฐํด๋์๋ค.
WebFlux with Flux
๋๋ต์ ์ผ๋ก WebFlux
์์ Http Request, Response ์ด๋ป๊ฒ ๋ฆฌ์กํฐ๋ธ๋ก ๊ตฌํํ๋์ง ์๋ ์ธํฐํ์ด์ค๋ก ํ์ธํ ์ ์๋ค.
Copy interface ServerHttpRequest { // Http Request ๋ฅผ ๊ฐ์ฒด๋ก ํํ
Flux<DataBuffer> getBody();
...
}
interface ServerHttpResponse { // Http Response ๋ฅผ ๊ฐ์ฒด๋ก ํํ
Mono<Void> writeWith(Publihser<? extends DataBuffer> body);
...
}
interface ServerWebExchange { // HTTP Request, Response ์ปจํ
์ด๋ ์ญํ
ServerHttpRequest getRequest();
ServerHttpResponse getResponse();
Mono<WebSession> getSession();
...
}
์๋ธ๋ฆฟ์ ServletRequest
, ServletResponse
์ ์ฐ๊ด์ง์ด์ ์๋ก์ด Http ์์ฒญ, ๋ฐํ์ ๊ฐ์ฒด๋ก ํํํ ์ ์๋ ์ธํฐํ์ด์ค๋ค์ด ์ ์๋์ด์๊ณ
DataBuffer
๋ฅผ ์ฌ์ฉํด ๋ฆฌ์กํฐ๋ธ ํ์
๊ณผ์ ๊ฒฐํฉ๋๋ฅผ ๋ฎ์ถ๋ค.
Copy interface WebHandler {
Mono<Void> handle(ServerWebExchange exchanage);
}
interface WebFilterChain {
Mono<Void> filter(ServerWebExchange exchanage);
}
interface WebFilter {
Mono<Void> filter(ServerWebExchange exchanage, WebFilterChain chain);
}
WebHandler
๋ DispatcherServlet
์ญํ , ์คํ๊ฒฐ๊ณผ๋ฅผ ๋ฐ์ง ๋ชปํ ์ ์์์ผ๋ก handle
๋ฉ์๋๋ Mono<Void>
๊ฐ ๋ฐํ๋๋ค.
ํด๋ผ์ด์ธํธ๋ฅผ ์ํ Http ๋ฐํ์ exchange
์์ ServerHttpResponse
์
WebFilter
๋ ์๋ธ๋ฆฟ์ ์์ฒญ, ๋ฐํ ํํฐ์ฒ๋ผ ๋ฆฌ์กํฐ๋ธ์์๋ ๋น์ง๋์ค ๋ก์ง์ ์ง์คํ ์ ์๋๋ก ํํฐ๊ธฐ๋ฅ์ด ์ ๊ณต๋๋ค.
Copy public interface HttpHandler {
Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
}
๋ง์ง๋ง์ผ๋ก WebHandler
๋ก๋ถํฐ ์ ๋ฌ๋ฐ์ exchange
๊ฐ์ฒด๋ฅผ url
๋งคํํ ์ ์๋๋ก HttpHandler
์ handle
๋ก ์ ๋ฌ๋๋ค.
WebFlux - Functional Reactive Web Server
Vert.x
๋ Ratpack
๊ณผ ๊ฐ์ ํ๋ ์์ํฌ์ ์ธ๊ธฐ๋น๊ฒฐ์ ์คํ๋ง์ ๋ณต์กํ MVC ์ค์ ์ผ๋ก ๋ผ์ฐํ
์ค์ ๊ณผ ๋ก์ง์ด ์์ด
๊ฐ๊ฒฐํ ์ค์ ์ผ๋ก ๋ผ์ฐํ
๋ก์ง์ ์์ฑํ ์ ์๋ API ๋ค์ด ์ ์ ์๋์ด ์๊ธฐ ๋๋ฌธ์ด๋ค.
์คํ๋ง ํ๋ ์์ํฌ๋ ์ ํ๋ ์์ํฌ์ฒ๋ผ ๊ฐ๊ฒฐํ๊ฒ ๋ผ์ฐํ
๋ก์ง์ ์ฒ๋ฆฌํ ์ ์๋ API ๋ฅผ ๊ฐ๋ฐํ์๋ค.
spring-boot-starter-webflux
๋ชจ๋์ org.springframework.web.reactive.function.server
ํจํค์ง์ ์ ์๋ RouterFunction
ํด๋์ค ์ฌ์ฉํ์ฌ ๋ผ์ฐํ
๋ก์ง ์ ์๊ฐ ๊ฐ๋ฅํ๋ค.
Copy import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;
import static org.springframework.http.MediaType.APPLICATION_JSON;
@SpringBootApplication
public class DemoApplication {
final ServerRedirectHandler serverRedirectHandler = new ServerRedirectHandler();
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public RouterFunction<ServerResponse> routes(OrderHandler orderHandler) {
return nest(path("/orders"),
nest(accept(APPLICATION_JSON),
route(GET("/{id}"), orderHandler::get)
.andRoute(method(HttpMethod.GET), orderHandler::list))
.andNest(contentType(APPLICATION_JSON),
route(POST("/"), orderHandler::create))
.andNest((serverRequest) -> serverRequest.cookies().containsKey("Redirect-Traffic"),
route(all(), serverRedirectHandler))
);
}
}
RouterFunction
์ ์คํ๋ง Bean
์ผ๋ก ๋ฑ๋กํ๊ณ /orders
์ ๋ํ ๋ผ์ฐํ
๋ก์ง ์ค์ , ๊ธฐ๋ณธ์ ์ธ Path, Http method, cookie
ํฌํจ์ฌ๋ถ ๋ฑ ์ฌ๋ฌ ๋ก์ง์ฒ๋ฆฌ ๊ฐ๋ฅํ๋ค.
ํจ์ํ์ผ๋ก ์น์๋ฒ๋ฅผ ๊ตฌ์ถํ ๊ฒฝ์ฐ Netty
์๋ฒ๋ฅผ ๊ฐ์ด ์ฌ์ฉํ๋ฉด ๋ณ๋์ ์คํ๋ง ์ค์ ์์ด ์๋ฒ ์คํ์ด ๊ฐ๋ฅํ๋ค.
์ด๋ฐ์ ๋๋ฌธ์ ๋จ์ ํ
์คํธ์ ๊ฒฝ์ฐ @SpringBootApplication
์ ์ฌ์ฉํ์ง ์๊ณ ๋จ์ Netty
์๋ฒ๋ฅผ ์ฌ์ฉํด ๋น ๋ฅด๊ฒ ์๋ฒ๋ฅผ ์คํํ๊ณ ๊ตฌํํ ๋ฉ์๋๋ค์ ํ
์คํธ ํ ์ ์๋ค.
๋ง์ฝ ํจ์ค์๋ ์ํธํ ๋ฐ ๋ณตํธํ ํ
์คํธ๋ฅผ ํ๋ค๋ฉด ์๋ฒ๊ธฐ๋ฅ์ ํ๋ ๊ฐ์ฒด ์ธ์ ์ถ๊ฐ์ ์ผ๋ก ํ์ํ ๊ฐ์ฒด๋ ํด์ ๊ธฐ๋ฅ์ด ์๋ spring-boot-starter-security
์ PasswordEncoder
๋ฟ์ด๋ค.
PasswordEncoder
๋ง ์ฌ์ฉํ๋ค๋ฉด spring-security-core
๋ง ์์กด์ฑ ์ฒ๋ฆฌํด๋ ๋๋ค.
๋ณ๋์ ์คํ๋ง ๊ด๋ จ ์ด๋
ธํ
์ด์
, Bean ๋ฑ๋ก๊ณผ์ ์์ด RouterFunction, Netty, PasswordEncoder
3๊ฐ ๊ฐ์ฒด๋ง ์ ์ ์ํด์ ์๋์ฒ๋ผ ์๋ฒ ์คํ์ด ๊ฐ๋ฅํ๋ค.
Copy import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;
public static void main(String... args) {
long start = System.currentTimeMillis();
BCryptPasswordEncoder passwordEncoder = new BCryptPasswordEncoder(18); // encoder ์์ฑ
HttpHandler httpHandler = RouterFunctions.toHttpHandler( // RouterFunction -> HttpHandler ๋ณ๊ฒฝ
route(POST("/password"), request -> request // RouterFunction ์ผ๋ก ๋ผ์ฐํฐ ๋ก์ง ์์ฑ
.bodyToMono(PasswordDTO.class)
.map(p -> passwordEncoder.matches(p.getRaw(), p.getSecured()))
.flatMap(isMatched -> isMatched
? ServerResponse.ok().build()
: ServerResponse.status(HttpStatus.EXPECTATION_FAILED).build())
)
);
ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler);
// HandlerAdapter ์ HttpHandler ์ฝ์
, BiFunction ๋ฅผ ๊ตฌํํ ํด๋์ค์
DisposableServer server = HttpServer.create() // Netty Server
.host("localhost").port(8080)
.handle(reactorHttpHandler) // BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> ์๊ตฌํจ
.bindNow(); // ์๋ฒ ์์ง ์์
LOGGER.debug("Started in " + (System.currentTimeMillis() - start) + " ms"); // Started in 703 ms
server.onDispose().block(); // main ์ค๋ ๋ ์ฐจ๋จ
}
์๋ฒ๊ฐ 0.7 ์ด๋ง์ ์คํ๋๋ค.
์คํ๋ง ์ปจํ
์ด๋, ์์กด์ฑ ์ฃผ์
, ์ด๋
ธํ
์ด์
์ฒ๋ฆฌ๋ฅผ ํ์ง ์์์ผ๋ก ์๋๊ฐ ๊ต์ฅํ ๋น ๋ฅด๋ฉฐ
๊ฐ๋จํ ํ
์คํธ์งํ์ ์์๊ฐ์ ๋ฐฉ์์ผ๋ก ์งํํ๋ฉด ํธํ๋ค.
WebFlux - Annotated Controller
RouterFunctions
๋ฅผ ์ฌ์ฉํด๋ ๋์ง๋ง WebMVC
๋ชจ๋ธ์์ ์ฌ์ฉํ๋ @RestController, @RequestMapping
๋ฑ์ ์ด๋
ธํ
์ด์
์ WebFlux
์์๋ ์ฌ์ฉํ ์ ์๋ค.
Copy @RestController
@RequestMapping("/member")
@RequiredArgsConstructor
public class MemberController {
private final MemberRepository memberRepository;
@GetMapping
public Flux<Member> getAll() {
return memberRepository.findAll();
}
@GetMapping("/id/{id}")
public Mono<Member> getById(@PathVariable Long id) {
return memberRepository.findById(id);
}
}
Flux
๋ ๋ฐฐ์ด, Mono
๋ ๊ฐ์ฒด๋ก ๋ฐํ๋๋ค.
WebFlux - Filter
๋์ด์ javax.servlet.Filter
์ ์ฌ์ฉํ์ง ๋ชปํ๋ค.
ํํฐ๊ธฐ๋ฅ์ ํ๋ ๋ฐฉ๋ฒ์ ์ฌ๋ฌ๊ฐ์ง๋ค.
RouterFunctions
Copy @SpringBootApplication
public class ReactApplication {
public static void main(String[] args) {
SpringApplication.run(ReactR2dbcApplication.class, args);
}
@Bean
public RouterFunction<ServerResponse> filterFunction(MemberComponent memberComponent) {
return RouterFunctions
.route(GET("/member/{memberId}")
.and(accept(MediaType.APPLICATION_JSON)), memberComponent::getById)
.filter(new ExampleHandlerFilterFunction());
}
}
@Component
@RequiredArgsConstructor
class MemberComponent {
private final MemberRepository memberRepository;
public Mono<ServerResponse> getById(ServerRequest request) {
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromValue(
memberRepository.findById(Long.valueOf(request.pathVariable("memberId")))));
}
}
WebFilter
Copy @Component
public class ExampleWebFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange,
WebFilterChain webFilterChain) {
serverWebExchange.getResponse().getHeaders().add("web-filter", "web-filter-test");
return webFilterChain.filter(serverWebExchange);
}
}
๋ณ๋์ ๋งคํ ์กฐ๊ฑด์ด ์๊ธฐ๋๋ฌธ์ ์กฐ๊ฑด๋ฌธ ๋ถ๊ธฐ๊ฐ ํ์ํจ
HandlerFilterFunction
Copy public class ExampleHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> handlerFunction) {
if (request.pathVariable("name").equalsIgnoreCase("test")) {
return ServerResponse.status(HttpStatus.FORBIDDEN).build();
}
return handlerFunction.handle(request);
}
}
WebFlux - Exception Handler
๋ฉ์๋ ๋ ๋ฒจ ์์ ์ค๋ฅ์ฒ๋ฆฌ๋ ServerResponse
์ status, body ๋ฑ์ ์ค์ ํ๋ฉด ์ฝ๊ฒ ์ฒ๋ฆฌํ ์ ์๋ค.
๋ํ ํด๋์ค ๋ด๋ถ์์ ๊ธฐ์กด WebMVC ์์ ์ฌ์ฉํ๋ @ExceptionHandler
๋ฅผ ์ฌ์ฉํด ์ฒ๋ฆฌํ ์ ์๋ค.
Copy @Controller public class SimpleController {
@ExceptionHandler public ResponseEntity<String> handle(IOException ex) {
// ...
}
}
๊ธ๋ก๋ฒ ๋ ๋ฒจ ์์ ์ค๋ฅ์ฒ๋ฆฌ๋ WebExceptionHandler
์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํด ํํฐ ๋ฐฉ์์ผ๋ก ์ฒ๋ฆฌํ ์ ์๋ค.
DefaultErrorAttributes
DefaultErrorAttributes
๋ WebFlux
์์ ์ฌ์ฉํ๋ ์๋ฌ ํธ๋ค๋ฌ๋ก ๊ธฐ๋ณธ ํํฐ๋ก ๋ฑ๋ก๋์ด ์๋ ์๋ฌ ํธ๋ค๋ฌ๊ฐ DefaultErrorAttributes
์์ getErrorAttributes
๋ฅผ ํธ์ถํด ์๋์ ๊ฐ์ ๋ฐํ๊ฐ์ ๋ฐํํ๋ค.
Copy {
"timestamp": "...",
"path": "...",
"status": 405,
"error": "Method Not Allowed",
"message": "",
"requestId": "ca35d584-1"
}
๊ธ๋ก๋ฒ ๋ ๋ฒจ์์ ์ค๋ฅ์ฒ๋ฆฌ๋ฅผ ํตํด ์ปค์คํ
ํ ๋ฐํ๊ฐ์ ์ค์ ํ๊ณ ์ถ์ผ๋ฉด DefaultErrorAttributes
์ getErrorAttributes
๋ฉ์๋๋ฅผ ์ค๋ฒ๋ผ์ด๋ฉ ํด์ผํ๋ค.
Copy @Slf4j
@Component
@RequiredArgsConstructor
public class GlobalErrorAttributes extends DefaultErrorAttributes {
// private final MessageSource messageSource;
@Override
public Map<String, Object> getErrorAttributes(ServerRequest request, ErrorAttributeOptions options) {
Throwable throwable = getError(request);
String acceptLanguage = request.headers().firstHeader("accept-language");
Locale locale = acceptLanguage != null && acceptLanguage.startsWith("ko") ? Locale.KOREA : Locale.getDefault();
log.error("unknown server error:{}, {}" + throwable.getClass().getCanonicalName(), throwable.getMessage());
// throwable ์ ์ข
๋ฅ์ ๋ง๊ฒ ๋ฐํ๊ฐ์ ์กฐ์
Map<String, Object> map = new HashMap<>();
map.put("code", UNKNOWN_ERROR_CODE);
map.put("error", UNKNOWN_ERROR_TYPE);
// map.put("description", messageSource.getMessage("fms.error.unknown_server_error", null, locale));
return map;
}
}
์๋ฌ ๋ฐํ์ ์๋์ฒ๋ผ ์ถ๋ ฅ๋๋๋ก ์ค์
Copy {
"code": "500-00",
"error": "UnknownServerError"
}
์ด์ ํธ๋ค๋ฌ์์ DefaultErrorAttributes
์ getErrorAttributes
๊ฐ ์๋
์ง์ ์ ์ํ GlobalErrorAttributes
์ getErrorAttributes
๊ฐ ํธ์ถ๋๋๋ก ์ค์ ํ๋ฉด ๋๋ค.
AbstractErrorWebExceptionHandler
AbstractErrorWebExceptionHandler
๋ ์๋ฌ๋ฐ์์ ํํฐ๋ก ๋ฑ๋ก๋์ด ์๋ ํธ๋ค๋ฌ
ํด๋น ํธ๋ค๋ฌ๋ณด๋ค ๋ ๋์ ์ฐ์ ์์๋ฅผ ๊ฐ์ง ํธ๋ค๋ฌ๋ก ์๋ฌ์ฒ๋ฆฌํ๋๋ก ์ค์
Copy @Order(-2) // ๊ธฐ๋ณธ ์๋ฌ์ฒ๋ฆฌ๋ -1, ๋ณด๋ค ๋น ๋ฅด๊ฒ ์ค์ ํ๋ค.
@Component
public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {
// constructors
public GlobalErrorWebExceptionHandler(
GlobalErrorAttributes errorAttributes, WebProperties.Resources resources,
ApplicationContext applicationContext, ServerCodecConfigurer configurer) {
super(errorAttributes, resources, applicationContext);
this.setMessageWriters(configurer.getWriters());
}
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
}
private Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
Map<String, Object> errorPropertiesMap = getErrorAttributes(request, ErrorAttributeOptions.defaults());
Integer status = Integer.valueOf(((String) errorPropertiesMap.get("code")).substring(0, 3));
return ServerResponse.status(status)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(errorPropertiesMap));
}
}
WebFlux - WebSocket
์ด๋ฏธ spring-boot-starter-websocket
๋ชจ๋์์ ์ ๊ณตํ๋ ์คํ๋ง ์น์์ผ์ ํตํด ๋
ผ๋ธ๋กํน์ผ๋ก ๋ฉ์ธ์ง๋ฅผ ์ฒ๋ฆฌํ ์ ์์ ์ค ์์์ง๋ง ๋ด๋ถ์์ ๋ธ๋กํน ๋ฐฉ์์ผ๋ก ๋์ํ๋ฉฐ ๋ฆฌ์กํฐ๋ธ ์๋ฒ ์ฑ๋ฅ์ ์ํฅ์ ๋ผ์น๋ค
WebFlux
์์ ๋น๋๊ธฐ/๋
ผ๋ธ๋กํน ๋ฐฉ์์ ์น์์ผ ์ฒ๋ฆฌ๋ฅผ ์ํด org.springframework.web.reactive.socket
ํจํค์ง๋ฅผ ์ ๊ณตํ๋ค.
ํจํค์ง์ reactive
๊ฐ ๋ถ์๋ฟ ํด๋์ค๋ช
์ด๋ ์ฌ์ฉ๋ฐฉ๋ฒ์ด ์ ์ฌํ๋ค.
์น์์ผ ์๋ฒ, ์น์์ผ ํด๋ผ์ด์ธํธ ๋ชจ๋ ์ ๊ณตํ๋ค.
์น์์ผ ์๋ฒ๋ WebSocketHandler
๋ฅผ ์ฌ์ฉํด ์์ผ ํธ๋ค๋ฌ ์ญํ ์ ํ๋ ๊ฐ์ฒด์ธ Handler
๋ฅผ ๋ฑ๋กํ๋ค.
Copy public class EchoWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session
.receive() // return Flux<WebSocketMessage>
.map(wsMessage -> wsMessage.getPayloadAsText()) // websocket ๋ฉ์ธ์ง์ payload (string) ํญ๋
.map(tm -> "Echo: " + tm) // ๋ฌธ์์ด ๋ณํ
.map(tm -> session.textMessage(tm)) // WebsocketSession ์ ์ฌ์ฉ, client ๋ณด๋ผ ๋ฉ์ธ์ง(payload) ์์ฑ
.as(wsMessage -> session.send(wsMessage)); // client ์๊ฒ ๋ฉ์ธ์ง(payload) ์ ์ก
}
}
์น์์ผ์ ์ฌ์ฉํ๋ ค๋ฉด ๋จผ์ ์น์์ผ ์ค์ ๊ฐ์ฒด๋ฅผ Bean
์ผ๋ก ๋ฑ๋กํด์ผ ํ๋ค.
์์์ ์ ์ํ ํธ๋ค๋ฌ๋ฅผ url
์ ๋งคํํ๊ณ , Request ์์ฒญ์ Upgrade ํ๋ ์ด๋ํฐ๋ฅผ Bean
์ผ๋ก ๋ฑ๋กํ๋ค.
Copy @Configuration
public class WebSocketConfiguration {
@Bean
public HandlerMapping handlerMapping() {
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(Collections.singletonMap("/ws/echo", new EchoWebSocketHandler())); // ๊ฒฝ๋ก๊ธฐ๋ฐ ๋งคํ ์ค์
mapping.setOrder(-1); // ์ฐ์ ์์, ์๋ต์ ์ฌ์ฉํ์ง ์๋ ๊ฒ์ผ๋ก ์ค์ ๋จ
return mapping;
}
@Bean
public HandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
// WebSocket Handshake (upgrade request) ๋ฅผ ์ฒ๋ฆฌํ๋ HandlerAdapter ์์ฑ
}
}
WebSocketMessage
๋ payload
๋ก DataBuffer
๋ฅผ ๊ตฌํํ์ฌ ๋ฌธ์์ด, ๋ฐ์ดํธ์ฝ๋๋ก ์ฝ๊ฒ ํ๋ณํ ๊ฐ๋ฅ
Copy public class WebSocketMessage {
private final Type type;
private final DataBuffer payload;
...
}
WebSocketHandler
๋ฅผ ๊ตฌํํ๋ฉด ํด๋น url ์ ํด๋นํ๋ WebSocketSession
๊ฐ์ฒด๋ฅผ ์ฌ์ฉํด ๋ฉ์ธ์ง๋ฅผ ๋ฐ๊ณ ๋ณด๋ธ๋ค.
์น์์ผ ํ
์คํธ ํด์ ์ฌ์ฉํด ws://127.0.0.1:8080/ws/echo
๋ก ์ ์, ๋ฉ์ธ์ง ์ ์ก์ Echo: ...
๋ฉ์ธ์ง ์์ ํ์ธ
WebSocket Client
์น์์ผ ํด๋ผ์ด์ธํธ์ ๊ฒฝ์ฐ WebSocketClient
์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ ReactorNettyWebSocketClient
๋ฅผ ์ฌ์ฉํ๋ค.
Copy import org.springframework.web.reactive.socket.client.WebSocketClient;
...
@Bean
public CommandLineRunner commandLineRunner() {
return (args) -> {
ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(URI.create("http://localhost:8080/ws/echo"),
session -> Flux
.interval(Duration.ofMillis(100))
.map(String::valueOf)
.map(session::textMessage)
.as(session::send))
.subscribe();
};
}
์ ์ํด๋ echo
์น์์ผ ์๋ฒ์ 0.1 ์ด๋ง๋ค interval
๋ฐ์ดํฐ๋ฅผ ๋ฌธ์์ด๋ก ์ ์ก
client
์ญ์ WebsocketHandler
๊ตฌํ์ฒด๋ฅผ excute
ํจ์์ ๋งค๊ฐ๋ณ์๋ก ๋ฐ์ผ๋ฉฐ ํธ๋ค๋ฌ ๋งคํ๊ณผ ํธ๋ค๋ฌ ์ด๋ํฐ๊ฐ Bean
์ผ๋ก ๋ฑ๋ก๋์ง ์์ ๋ฟ ์น์์ผ ์๋ฒ ์ฝ๋์ ์ ์ฌํ๋ค.
์ฐธ๊ณ ์ฝ๋: https://github.com/Kouzie/spring-reactive/tree/master/spring-reactive-websocket-client
์ํ๊น๊ฒ๋ WebFlux
์์ ์ ๊ณตํ๋ ์น์์ผ ๋ด์ฉ์ ์์ ๊ธฐ๋ฅ์ด ์ ๋ถ์ด๋ฉฐ STOMP ๋ฅผ ์ฌ์ฉํ ๋ฉ์ธ์ง ๋งคํ ๋ฑ์ ๊ธฐ๋ฅ์ ์ ๊ณตํ์ง ์๋๋ค.
๋ค์ค ํต์ Sinks.Many
์ฝ๋ ์ฐธ๊ณ : https://github.com/Kouzie/spring-reactive/tree/master/spring-reactive-websocket
๋ฆฌ์กํฐ๋ธ์ ์น์์ผ์ WebMVC ์์ ์ฌ์ฉํ ์น์์ผ๊ณผ ๋ค๋ฅธ์ ์ด ์๋๋ฐ ๋ชจ๋ ์์ฒญ์ ๋ํ ์๋ต์ ์ํํ ๋ ์์๊ฐ์ด ๋๋ค์์ด๋ ํจ์๋ฅผ ๋ฏธ๋ฆฌ ๋ฑ๋กํด๋์ด์ผ ํ๊ณ
๋๊ฐ ์ด์์ ํด๋ผ์ด์ธํธ๋ค๊ฐ ํต์ ์ ์ํด์ ๊ฐ ํด๋ผ์ด์ธํธ์ session
์ ์ฐพ์ ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ๋๋ฐ Sinks.Many
ํด๋์ค๋ฅผ ์ฌ์ฉํ๋ค.
์น์์ผ ์ค์ ๊ฐ์ฒด๋ฅผ Bean
์ผ๋ก ๋ฑ๋กํ๊ณ ํธ๋ค๋ฌ ๋งคํํ๋ ๊ฒ์ ๋์ผํ๋ค.
Copy @Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping handlerMapping(ChatSocketHandler chatSocketHandler) { // url, handler ๋งคํ
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(Collections.singletonMap("/ws/chat", chatSocketHandler));
mapping.setOrder(-1);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class ChatSocketHandler implements WebSocketHandler { // handler ์ ์
private final ObjectMapper mapper;
@Override
public Mono<Void> handle(WebSocketSession session) {
WebSocketMessageSubscriber subscriber = new WebSocketMessageSubscriber(session);
return session.receive()
.map(this::toDto)
.doOnNext(subscriber::onNext) // ์์ ์ฝ๋ฐฑ ํจ์ ๋ฑ๋ก
.doOnError(subscriber::onError) // ์๋ฌ ์ฝ๋ฐฑ ํจ์ ๋ฑ๋ก
.doOnCancel(subscriber::onCancel) // ์ฐ๊ฒฐ๋๊น ์ฝ๋ฐฑ ํจ์ ๋ฑ๋ก
.zipWith(session.send(subscriber.getMany().asFlux().map(webSocketToClientDto -> // ๋ฉ์ธ์ง ๋ฐ์ ์ฝ๋ฐฑ ํจ์ ๋ฑ๋ก
session.textMessage(webSocketToClientDto.getFrom() + ":" + webSocketToClientDto.getMessage()))))
.then();
}
private WebSocketFromClientDto toDto(WebSocketMessage message) {
try {
WebSocketFromClientDto WsDto = mapper.readValue(message.getPayloadAsText(), WebSocketFromClientDto.class);
return WsDto;
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}
}
session.send
์ ๋ฑ๋ก๋๋ Flux
๋ฐํ์๋ Sinks.Many
๋ก๋ถํฐ ์์ฑ๋๋๋ฐ
์ด๋ ์๋์์ ์ฌ์ฉํ๋ ์ฝ๋์ ๊ฐ์ด ๋ฉ์ธ์ง๋ฅผ ์ง์ด๋ฃ์ ์ ์๋ ๋ฐํ์๋ค.
Websocket ์ธ์๋ SSE (Server Send Events) ์์๋ ์ฌ์ฉ๋๋ค.
Copy @Slf4j
class WebSocketMessageSubscriber {
// ๋ณธ์ธ์ ํฌํจํ ๋ค๋ฅธ ํด๋ผ์ด์ธํธ์ ์น์์ผ ๋ฉ์ธ์ง ๋ฐํ์ ๋งต
public static Map<String, Sinks.Many<WebSocketToClientDto>> userMap = new HashMap<>(); //sessionId, sink
private final String id;
@Getter
private final Sinks.Many<WebSocketToClientDto> many; // ๋ณธ์ธ์ ์น์์ผ ๋ฉ์ธ์ง ๋ฐํ์
public WebSocketMessageSubscriber(WebSocketSession session) {
many = Sinks.many().unicast().onBackpressureBuffer();
id = session.getId();
many.tryEmitNext(WebSocketToClientDto.builder().from("system").message("welcome, " + id).build());
userMap.put(id, many);
}
public void onNext(WebSocketFromClientDto msg) {
log.info("onNext invoked, to:{}, msg:{}", msg.getTo(), msg.getMessage());
Sinks.Many<WebSocketToClientDto> to = userMap.get(msg.getTo());
if (to == null)
many.tryEmitNext(WebSocketToClientDto.builder().from("system").message("no user:" + msg.getTo()).build());
else
to.tryEmitNext(WebSocketToClientDto.builder().from(id).message(msg.getMessage()).build());
}
public void onError(Throwable error) {
//TODO log error
log.error("onError invoked, error:{}, {}", error.getClass().getSimpleName(), error.getMessage());
many.tryEmitNext(WebSocketToClientDto.builder()
.from("system")
.message(id + " on error, error:" + error.getMessage())
.build());
}
public void onCancel() {
log.info("onCancel invoked, id:{}", id);
userMap.remove(id);
for (Map.Entry<String, Sinks.Many<WebSocketToClientDto>> entry : userMap.entrySet()) {
if (!entry.getKey().equals(id))
entry.getValue().tryEmitNext(WebSocketToClientDto.builder()
.from("system")
.message(id + " is exit")
.build());
}
}
}
@Getter
@Setter
@Builder
class WebSocketToClientDto {
private String from;
private String message;
}
@Getter
@Setter
class WebSocketFromClientDto {
private String to;
private String message;
}
WebClient
๋
ผ๋ธ๋กํน Http Client
๋ก ๊ธฐ์กด ์คํ๋ง ๋ถํธ์์ ๋ํ์ ์ธ Http Client
๋ก RestTemplate
(๋ธ๋กํน) ์ด ์๋ค.
๋ด๋ถ์ Flux, Mono
๋ฆฌ์กํฐ ๊ฐ์ฒด๋ฅผ ์ง์ํ๋ ๋งคํ์ด ๋ด์ฅ๋์ด ์์ด ๋ฆฌ์กํฐ๋ธ ์๋ฒ์ ์ ์ด์ธ๋ฆฐ๋ค.
http://localhost:8080/api/user/{id}
url ์ ์ง์ํ๋ ๊ฐ๋จํ ์น์๋ฒ ์์ฑ
Copy @Slf4j
public class TestWebServer {
public static void main(String[] args) {
HttpHandler httpHandler = RouterFunctions.toHttpHandler( // RouterFunction -> HttpHandler ๋ณ๊ฒฝ
nest(path("/api"), route(GET("/users/{id}"),
request -> {
String id = request.pathVariable("id");
return ServerResponse.ok().syncBody("hello " + id + " user!"); // ๋ฐํ๋ฐ์ดํฐ ๋๊ธฐ์ ์ผ๋ก ์์ฑ
}) // end route
) // end nest
);
ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler);
DisposableServer server = HttpServer.create()
.host("localhost").port(8080)
.handle(reactorHttpHandler)
.bindNow();
server.onDispose().block();
}
}
WebClient
๋ฅผ ์ฌ์ฉํด ์ url
์ Http GET Request
์์ฒญ
Copy public class TestWebClient {
public static void main(String[] args) throws InterruptedException {
WebClient.create("http://localhost:8080/api") // WebClient ๊ฐ์ฒด ์์ฑ + baseUrl ์ค์
.get().uri("/users/{id}", 10) // method, uri ์ค์
.retrieve() // ์๋ต ๋ด์ฉ ์ค์ . ResponseSpec ๋ฐํ
.bodyToMono(String.class) // ์๋ต body ๋ฅผ Mono ๋ก ๋ณํ
.subscribe(s -> System.out.println(s)); // Mono ์ ๋ํ ๊ตฌ๋
์ค์
// hello 10 user!
Thread.sleep(1000); // main thread ์ข
๋ฃ ๋ฐฉ์ง
}
}
์์ WebClient
๋ GET
๋ฐฉ์์ด๋ผ uri
๋ง ์ค์ ํ์ง๋ง API
์ ๋ฐ๋ผ cookie, header, body
๋ชจ๋ ์ค์ ๊ฐ๋ฅํ๋ค.
HTTP ์๋ต์ ์ฒ๋ฆฌํ ์ ์๋ ๋ฉ์๋๊ฐ retrieve()
์ exchage()
๊ฐ ์๋๋ฐ
retrieve()
๋ ResponseSpec
์ ๋ฐํํ๊ณ exchage()
๋ Mono<ClientResponse>
๋ฅผ ๋ฐํํ๋ค.
๋ง์ฝ exchange()
๋ฅผ ์ฌ์ฉํ ๊ฒฝ์ฐ ์๋์ ๊ฐ์ด ์ฝ๋์์ฑ
Copy public static void main(String[] args) throws InterruptedException {
WebClient.create("http://localhost:8080/api") // WebClient ๊ฐ์ฒด ์์ฑ
.get().uri("/users/{id}", 10) // method, uri ์ค์
.exchange() // Mono<ClientResponse> ๋ฐํ
.flatMap(response -> response.bodyToMono(String.class)) // ์๋ต body ๋ฅผ Mono ๋ก ๋ณํ
.subscribe(s -> System.out.println(s)); // Mono ์ ๋ํ ๊ตฌ๋
์ค์
Thread.sleep(1000);
}
exchage
๋ฅผ ์ฌ์ฉํ๋ฉด ClientResponse
์์ ์ ๊ณตํ๋ Http Response
์ ๊ฐ์ข
์ ๋ณด๋ฅผ ์กฐ์ํ ์ ์๋ ์ฌ๋ฌ ๋ฉ์๋๋ก ๋ณต์กํ ๋ฐํ ๋ก์ง ๊ตฌ์ฑ์ด ๊ฐ๋ฅํ๋ค.
retrieve
์ ๊ฒฝ์ฐ Http status
๋ง ๊ฒจ์ฐ ์กฐ์ํ์ฌ DSL
ํ์์ผ๋ก ์ฒ๋ฆฌํ ์ ์๋ค.
WebClient
๋ ์ธํฐํ์ด์ค์ด๊ณ DefaultWebClient
๊ฐ WebClient
์ ์ ์ผํ ๊ตฌํ์ฒด์ด๋ค.
์ค์ DefaultWebClient
๋ด๋ถ์์
WebClient Serialize config
WebClient
์์ ์ง๋ ฌํ, ๋น์ง๋ ฌํ๋ฅผ ์ํํ ๋ ๊ธฐ์กด์์ฑํ ObjectMapper
๋ฅผ ํตํด ์ฒ๋ฆฌํ ์ ์๋ค.
Copy @Bean
public ObjectMapper objectMapper() {
JavaTimeModule module = new JavaTimeModule();
LocalDateTimeSerializer localDateTimeSerializer = new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));
LocalDateTimeDeserializer localDateTimeDeserializer = new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));
module.addSerializer(LocalDateTime.class, localDateTimeSerializer);
module.addDeserializer(LocalDateTime.class, localDateTimeDeserializer);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(module);
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
// UnrecognizedPropertyException ์ฒ๋ฆฌ
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// json -> clas ์์ unknown ์์ฑ์ด ์์ด๋ ์ฒ๋ฆฌ
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); // null ์ด ์๋๊ฒ๋ง ๋ณํ
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); // snake case ๋ก ๋ณํ
return objectMapper;
}
@Bean
public WebClient webClient(ObjectMapper objectMapper) {
ExchangeStrategies jacksonStrategy = ExchangeStrategies.builder()
.codecs(config -> {
config.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON));
config.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON));
config.defaultCodecs().maxInMemorySize(1024 * 1024 * 50);
}).build();
return WebClient.builder().exchangeStrategies(jacksonStrategy).build();
}
์ฃผ์์ฌํญ์ผ๋ก uri(uriBuilder->..)
๋ฉ์๋๋ฅผ ์ฌ์ฉํด query parameter
๋ฅผ ์ง์ ํ ๊ฒฝ์ฐ ๋ฌธ์์ด์ /
๊ฐ ๋ค์ด๊ฐ escape
๋ฌธ์๋ก ์ธ์ํ๊ธฐ ๋๋ฌธ์ base64 ๋ฌธ์์ด๋ก ๋ณํํ ์ ์๋ค, url encoding ์ ์งํํ์ง ์๋๋ค. ๋ํ WebClient
์์ฑ์ baseUrl
์ ์ค์ ํ์ง ์์ผ๋ฉด uribuilder
๋ฅผ ํตํด scheme, host, port, path
๋น๋ํจ์๋ฅผ ๋ชจ๋ ํธ์ถํด์ผ ํ๊ธฐ ๋๋ฌธ์ ๋ฒ๊ฑฐ๋กญ๋ค.
WebClient
๋ฅผ bean
์ผ๋ก ์์ฑํด singleton
๋ฐฉ์์ผ๋ก ์ฌ์ฉํ๋ค๋ฉด StringBuilder
๋ฅผ ํตํด uri
๋ฅผ ์ง์ ์์ฑํ๋ ๊ฒ์ ๊ถ์ฅ.
Copy StringBuilder urlBuilder = new StringBuilder(WEATHER_GET_ULTRA_SRT_NCST); /*URL*/
urlBuilder.append("?" + URLEncoder.encode("ServiceKey", "UTF-8") + "=" + URLEncoder.encode(dataGovApiKey, "UTF-8")); /*๊ณต๊ณต๋ฐ์ดํฐํฌํธ์์ ๋ฐ์ ์ธ์ฆํค*/
urlBuilder.append("&" + URLEncoder.encode("pageNo", "UTF-8") + "=" + URLEncoder.encode("1", "UTF-8")); /*ํ์ด์ง๋ฒํธ*/
urlBuilder.append("&" + URLEncoder.encode("numOfRows", "UTF-8") + "=" + URLEncoder.encode("10", "UTF-8")); /*ํ ํ์ด์ง ๊ฒฐ๊ณผ ์*/
urlBuilder.append("&" + URLEncoder.encode("dataType", "UTF-8") + "=" + URLEncoder.encode("JSON", "UTF-8")); /*์์ฒญ์๋ฃํ์(XML/JSON)Default: XML*/
urlBuilder.append("&" + URLEncoder.encode("base_date", "UTF-8") + "=" + URLEncoder.encode(LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE), "UTF-8")); /*15๋
12์ 1์ผ๋ฐํ*/
urlBuilder.append("&" + URLEncoder.encode("base_time", "UTF-8") + "=" + URLEncoder.encode(String.format("%02d00", curHour), "UTF-8")); /*05์ ๋ฐํ*/
urlBuilder.append("&" + URLEncoder.encode("nx", "UTF-8") + "=" + URLEncoder.encode(String.valueOf(nx), "UTF-8")); /*์๋ณด์ง์ X ์ขํ๊ฐ*/
urlBuilder.append("&" + URLEncoder.encode("ny", "UTF-8") + "=" + URLEncoder.encode(String.valueOf(ny), "UTF-8")); /*์๋ณด์ง์ ์ Y ์ขํ๊ฐ*/
URI uri = new URL(urlBuilder.toString()).toURI();
SSE(Server-Sent Event)
์๋ฒ ๋จ๋ฐฉํฅ ํต์ ์ด๋ผ ์น์์ผ์ด ๋นํด ์๋๋ ์ค๋ฒํค๋ ์ธก๋ฉด์์ SSE ๊ฐ ํจ์จ์ ์ด์ง๋ง ์๋ฐฉํฅ์ด ์๋๋ ์ด์ ๋ก ์น์์ผ์ด ์ฃผ๋ก ์ฌ์ฉ๋๋ค.
WebFlux
๋ฅผ ์ฌ์ฉํ์ง ์๊ณ spring-boot-starter-web
์์ SSE ํ๋กํ ์ฝ ์ฌ์ฉ์ด ๊ฐ๋ฅํ๋ค.
Copy @Component
@RequiredArgsConstructor
public class TemperatureSensor {
// ์คํ๋ง ์ ๊ณต ์ด๋ฒคํธ ๋ฐํ ๊ตฌ๋
์ง์ ํด๋์ค
private final ApplicationEventPublisher publisher;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final Random rnd = new Random();
@PostConstruct
public void startProcessing() {
this.executor.schedule(this::probe, 1, SECONDS); // ๋ฐ์ดํฐ ๋ฐํ ์์
}
private void probe() {
double temperature = 16 + rnd.nextGaussian() * 10;
publisher.publishEvent(new Temperature(temperature)); // ์ด๋ฒคํธ ๋ฐ์ดํฐ ๋ฐํ
executor.schedule(this::probe, rnd.nextInt(5000), MILLISECONDS); // 5์ดํ ์ฌ๋ฐํ
}
}
ScheduledExecutorService
์ ApplicationEventPublisher
ํด๋์ค๋ฅผ ์ฌ์ฉํด Temperature
๋ฐ์ดํฐ๋ฅผ ๊ณ์ ๋ฐํํ๋ค.
@EventListener
๋ฅผ ์ฌ์ฉํด ApplicationEventPublisher
์ ๋ฐํ๋ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ ์ ์๋ค.
Copy @Slf4j
@RestController
public class TemperatureController {
static final long SSE_SESSION_TIMEOUT = 5 * 1000L;
// ์ฐ๊ฒฐ ํด๋ผ์ด์ธํธ ๊ด๋ฆฌ list
private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>();
// TemperatureSensor ์ ApplicationEventPublisher ์์ ๋ฐํ๋๋ ๋ฐ์ดํฐ ๋์
@Async
@EventListener
public void handleMessage(Temperature temperature) {
log.info(format("Temperature: %4.2f C, active subscribers: %d", temperature.getValue(), clients.size()));
// ๊ด๋ฆฌ๋๋ ํด๋ผ์ด์ธํธ์๊ฒ ๋ฐํ๋ Temperature ๋ฐ์ดํฐ ์ ๋ฌ ๋ฐ ์์ธ๋ฐ์ ํด๋ผ์ด์ธํธ ์ญ์ ์ฒ๋ฆฌ
List<SseEmitter> deadEmitters = new ArrayList<>();
clients.forEach(emitter -> {
try {
Instant start = Instant.now();
emitter.send(temperature, MediaType.APPLICATION_JSON);
log.info("Sent to client, took: {}", Duration.between(start, Instant.now()));
} catch (Exception ignore) {
deadEmitters.add(emitter);
}
});
clients.removeAll(deadEmitters);
}
@RequestMapping(value = "/temperature-stream", method = RequestMethod.GET)
public SseEmitter events(HttpServletRequest request) {
log.info("SSE stream opened for client: " + request.getRemoteAddr());
SseEmitter emitter = new SseEmitter(SSE_SESSION_TIMEOUT); // 5 ์ด๊ฐ ์ฐ๊ฒฐ
clients.add(emitter); // ๊ด๋ฆฌ emitter ๋ชฉ๋ก์ ์ถ๊ฐ
// Remove SseEmitter from active clients on error or client disconnect
emitter.onTimeout(() -> clients.remove(emitter));
emitter.onCompletion(() -> clients.remove(emitter));
return emitter;
}
// ์์ ์ค์ ๋ 5์ด๊ฐ ์ง๋๋ฉด AsyncRequestTimeoutException ์ด ๋ฐ์ํ๊ณ ํธ์ถ๋จ.
@ExceptionHandler(value = AsyncRequestTimeoutException.class)
public ModelAndView handleTimeout(HttpServletResponse rsp) throws IOException {
log.warn("handle timeout");
if (!rsp.isCommitted()) {
rsp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}
return new ModelAndView();
}
}
SseEmitter
๋ฅผ ์ง์์ ์ผ๋ก ์ ์ง, ๊ด๋ฆฌํ๋ค. ๋ฐํ ํ์
์ด ResponseEntity, Map
๊ณผ ๊ฐ์ ๊ฐ์ฒด๊ฐ ์๋ SseEmitter
์ธ ๊ฒ์ด ์ด์ํ๋ค.
WebFlux
์์ Flux
์ Spring 5.0
์ ์ถ๊ฐ๋ ServerSentEvent
๋ฅผ ์ฌ์ฉํด SSE ํ๋กํ ์ฝ์ ์ง์ํ๋ค.
Copy @RestController
@RequiredArgsConstructor
public class ServerSentController {
private final StocksService stocksService;
@GetMapping("/sse/stocks")
public Flux<ServerSentEvent<?>> streamStocks() {
return stocksService.stream() // Flux<StockItem> ๋ฐํ
.map(item -> ServerSentEvent
.builder(item)
.event("StockItem")
.id(item.getId())
.build());
}
@GetMapping("/sse/stocks2")
public Flux<StockItem> streamStocks2() {
return stocksService.stream();
}
}
WebFlux
๋ด๋ถ์์ ๋ฐํ์์๋ฅผ ServerSentEvent
๋ก ๋ํ ํ๊ธฐ์ ๋จ์ Flux<>
๋ง ์ปจํธ๋กค๋ฌ ๋ฉ์๋์์ ๋ฐํํด๋ ๋๋ค.
์คํ๋ง ์ํ๋ฆฌํฐ with WebFlux
๊ธฐ์กด ์๋ธ๋ฆฟ ๊ธฐ๋ฐ ์คํ๋ง ๋ถํธ๋ ํ๋์ ์ค๋ ๋์ ํ๋์ ์ฐ๊ฒฐ์ด ์ฒ๋ฆฌ๋์ด
ThreadLocal
์ SecurityContext
๋ฅผ ์ ์ฅํด ์ฐ๊ฒฐ๋์ ๋ณด์์ฒ๋ฆฌ๋ฅผ ์งํํ์ง๋ง
๋ฆฌ์กํฐ๋ธ๋ ํ๋์ ์ฐ๊ฒฐ์ ์ฌ๋ฌ๊ฐ์ ์ค๋ ๋๊ฐ ๊ผฌ์ฌ์์ ์ ์์ด Reactor Context
๋ฅผ ์ฌ์ฉํด์ผ ํ๋ค.
spring-boot-starter-security
๋ชจ๋ ์ญ์ ๊ธฐ์กด ์๋ธ๋ฆฟ ๊ธฐ๋ฐ WebMVC
์์ WebFlux
๋ฅผ ์ง์ํ ์ ์๋๋ก ์
๋ฐ์ดํธ ๋์๋ค.
ReactiveSecurityContextHolder
WebMVC
์์ SecurityContextHolder
์์ SecurityContext
๋ฅผ ๊ฐ์ ธ์๋ค๋ฉด
WebFlux
์์ ReactiveSecurityContextHolder
์์ SecurityContext
๋ฅผ ๊ฐ์ ธ์จ๋ค.
Copy @RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
public class SecuredProfileController {
private final ProfileService profileService;
@GetMapping("/profiles")
public Mono<Profile> getProfile() {
return ReactiveSecurityContextHolder.getContext() // Mono<SecurityContext> ๋ฐํ
.map(SecurityContext::getAuthentication)
.flatMap(auth -> profileService.getByUser(auth.getName()));
}
}
๋ก๊ทธ์ธํ ์ ์ ์ ์ ๋ณด๋ฅผ SecurityContext
๊ฐ์ ธ์ Profile
์์ ๊ฒ์ ํ ์ถ๋ ฅํ๋ค.
๋น์ฐํ SecurityContext::getAuthentication
๋ฉ์๋๋ ๋ฆฌ์กํฐ ์ปจํ
์คํธ
๋ฅผ ์ฌ์ฉํ๋ค.
Copy // ReactiveSecurityContextHolder.java
public static Mono<SecurityContext> getContext() {
return Mono.subscriberContext()
.filter( c -> c.hasKey(SECURITY_CONTEXT_KEY))
.flatMap( c-> c.<Mono<SecurityContext>>get(SECURITY_CONTEXT_KEY));
}
SecurityWebFilterChain
์ธ์ฆ๊ณผ์ ์ ๊ฑฐ์น๋ ค๋ฉด ๋ด๋ถ ์ปจํ
์คํธ์ ์์ธ์ค ํ๋ ค๊ณ ํ๋ฉด SecurityContext
๊ฐ ํด๋น ๋ฆฌ์กํฐ ์ปจํ
์คํธ
์ ์กด์ฌํด์ผํ๊ณ Authentication
๊ฐ์ฒด๊ฐ SecurityContext
์์ ํ ๋น๋์ด์ผ ํ๋ค.
์ด ๋ชจ๋ ๊ณผ์ ์ ReactorContextWebFilter
์ SecurityWebFilterChain
์ ์ฉํ๊ณ ์ด๋ฅผ ํตํด ๋ฆฌ์กํฐ ์ปจํ
์คํธ
์์ SecurityContext
๊ฐ์ฒด์ Authentication
๊ฐ์ฒด๋ฅผ ์ง์ด๋ฃ๋๋ค.
SecurityContext
๋ฅผ ์ง์ด ๋ฃ๋ ํจ์๋ ServerSecurityContextRepository
๋ฅผ ์ฌ์ฉํ๋ค.
Copy package org.springframework.security.web.server.context;
public interface ServerSecurityContextRepository {
Mono<Void> save(ServerWebExchange exchange, SecurityContext context);
Mono<SecurityContext> load(ServerWebExchange exchange);
}
SecurityContext
๋ฅผ ํน์ ServerWebExchange
์ ์ ์ฅ, ์ฌ์ฉํ ์ ์๋ค.
Copy @Configuration
@EnableReactiveMethodSecurity // ๋ณ๋์ MethodInterceptor ์ฌ์ฉ์ ํ์ํจ
public class SecurityConfiguration {
@Bean // ReactorContextWebFilter ์ ์ ์ฉํ ์ํ๋ฆฌํฐ ํํฐ
public SecurityWebFilterChain securityFilterChainConfigurer(ServerHttpSecurity httpSecurity) {
// ๊ธฐ์กด mvc ์์ ์ฌ์ฉํ๋ HttpSecurity ์์ webflux ์ฉ์ผ๋ก ์ ์๋ ServerHttpSecurity
// ๊ธฐ์กด spring security ์ฌ์ฉ ๋ฐฉ์๊ณผ ํฌ๊ฒ ๋ค๋ฅด์ง ์๋ค.
return httpSecurity
.authorizeExchange()
.anyExchange().permitAll().and()
.httpBasic().and()
.formLogin().and()
.build();
}
private static final Pattern PASSWORD_ALGORITHM_PATTERN = Pattern.compile("^\\{.+}.*$");
private static final String NOOP_PASSWORD_PREFIX = "{noop}";
@Bean // ๋ก๊ทธ์ธ์ ์ฌ์ฉํ ํ
์คํธ ์ฌ์ฉ์ ์์ฑ
public MapReactiveUserDetailsService reactiveUserDetailsService(ObjectProvider<PasswordEncoder> passwordEncoder) {
return new MapReactiveUserDetailsService(
User.withUsername("user")
.password("user")
.passwordEncoder(p -> getOrDeducePassword(p, passwordEncoder.getIfAvailable()))
.roles("USER")
.build(),
User.withUsername("admin")
.password("admin")
.passwordEncoder(p -> getOrDeducePassword(p, passwordEncoder.getIfAvailable()))
.roles("USER", "ADMIN")
.build());
}
private String getOrDeducePassword(String password, PasswordEncoder encoder) {
if (encoder != null || PASSWORD_ALGORITHM_PATTERN.matcher(password).matches()) {
return password;
}
return NOOP_PASSWORD_PREFIX + password;
}
}
WebFlux with JWT
Custom SecurityContextRepository, AuthenticationManager
์คํ๋ง ์ํ๋ฆฌํฐ๋ SecurityContextRepository
๋ฅผ ํตํด SecurityContext
๋ฅผ ๋ฆฌ์กํฐ ์ปจํ
์คํธ
์ ์ ์ฅํ๊ณ ์ญ์ ํ๋ค.
default ์คํ๋ง ์ํ๋ฆฌํฐ์ ๊ฒฝ์ฐ DB ๋ก๋ถํฐ UserDetails ๋ฅผ ๊ฐ์ ธ์ ๋ฑ๋กํด๋๊ณ ์ฌ์ฉํ๊ฒ ์ง๋ง ์ฐ๋ฆฌ๋ JWT ๋ฅผ ์ฌ์ฉํ๊ธฐ์ ServerSecurityContextRepository
์์๋ฐ์ ์ปค์คํฐ๋ง์ด์ง ํด์ผํ๋ค.
Copy @Component
@RequiredArgsConstructor
public class SecurityContextRepository implements ServerSecurityContextRepository {
private final AuthenticationManager authenticationManager;
@Override
public Mono<Void> save(ServerWebExchange swe, SecurityContext sc) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Mono<SecurityContext> load(ServerWebExchange swe) {
ServerHttpRequest request = swe.getRequest();
String authToken = request.getHeaders().getFirst(HttpHeaders.AUTHORIZATION);
if (authToken != null) {
Authentication auth = new UsernamePasswordAuthenticationToken(authToken, authToken);
return authenticationManager
.authenticate(auth)
.map(authentication -> new SecurityContextImpl(authentication));
} else {
return Mono.empty();
}
}
}
SecurityContext
๋ฅผ ์์ฑ ๋ฐ ์ ์ฅํ๊ธฐ request ํค๋๋ก ๋ถํฐ JWT ํ ํฐ์ ๊ฐ์ ธ์ AuthenticationManager
๋ก ๋๊ธฐ๋ ์ฝ๋๊ฐ load
์ ์ ์๋์ด ์๋ค.
Copy @Component
@RequiredArgsConstructor
public class AuthenticationManager implements ReactiveAuthenticationManager {
private final JwtTokenUtil jwtUtil;
@Override
public Mono<Authentication> authenticate(Authentication authentication) {
String authToken = authentication.getCredentials().toString();
if (!jwtUtil.validateToken(authToken)) {
return Mono.empty();
}
Claims claims = jwtUtil.getAllClaimsFromToken(authToken);
String role = claims.get("role", String.class);
List<GrantedAuthority> authorities = Collections.singletonList(new SimpleGrantedAuthority(role));
return Mono.just(new UsernamePasswordAuthenticationToken(claims.getSubject(), null, authorities));
}
}
AuthenticationManager
๋ ์ ๋ฌ๋ฐ์ ํ ํฐ์ผ๋ก role
์ ๊บผ๋ด์ด Authority
๋ฅผ ์ง์ ํ๊ณ Authentication
์ธ์ฆ ๊ฐ์ฒด๋ฅผ ๋ฐํํ๋ค.
SecurityContextPath
๊ฐ ๋ฆฌ์กํฐ ์ปจํ
์คํธ๋ก ๋ณ๊ฒฝ๋์์ ๋ฟ ๊ธฐ์กด WebMVC
๋ชจ๋ธ์ ์คํ๋ง ์ํ๋ฆฌํฐ ๋ฐฉ์๊ณผ ๋น์ทํ๋ค.
Copy @Bean // ReactorContextWebFilter ์ ์ ์ฉํ ์ํ๋ฆฌํฐ ํํฐ
public SecurityWebFilterChain securityFilterChainConfigurer(ServerHttpSecurity httpSecurity) {
// ๊ธฐ์กด mvc ์์ ์ฌ์ฉํ๋ HttpSecurity ์์ webflux ์ฉ์ผ๋ก ์ ์๋ ServerHttpSecurity
// ๊ธฐ์กด spring security ์ฌ์ฉ ๋ฐฉ์๊ณผ ํฌ๊ฒ ๋ค๋ฅด์ง ์๋ค.
return httpSecurity
.exceptionHandling()
//.authenticationEntryPoint((swe, e) -> Mono.fromRunnable(() -> // ๋ฏธ ๋ก๊ทธ์ธ
// swe.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED)))
//.accessDeniedHandler((swe, e) -> Mono.fromRunnable(() -> // ๋ฏธ ๋ก๊ทธ์ธ
// swe.getResponse().setStatusCode(HttpStatus.FORBIDDEN)))
.authenticationEntryPoint((swd, e) -> Mono.error(new AuthenticationCredentialsNotFoundException("")))
.accessDeniedHandler((swe, e) -> Mono.error(new AccessDeniedException("")))
.and()
.authenticationManager(authenticationManager)
.securityContextRepository(securityContextRepository)
.csrf().disable()
.cors().disable()
.httpBasic().disable() // Basic Authentication disable
.formLogin().disable()
.authorizeExchange()
.pathMatchers("/member/join", "/member/login").permitAll()
.pathMatchers("/member/**", "/rent/**").authenticated()
.pathMatchers("/admin/**").hasAnyRole("ROLE_ADMIN")
.anyExchange().permitAll()
.and()
.build();
}
๋ง์ง๋ง์ผ๋ก ServerHttpSecurity
์ ์ปค์คํฐ๋ง์ด์งํ SecurityContextRepository, AuthenticationManager
๋ฅผ ๋ฑ๋กํ๊ณ ๋ณ๋์ ์๋ฌ ํธ๋ค๋ง ์ฒ๋ฆฌ๋ฅผ ํ๋ค๋ฉด ์๋ฌ๋ฅผ ๋ฐํํด ํธ๋ค๋ง,
์๋ค๋ฉด HttpStatus.UNAUTHORIZED
๋ก ๋จ์ HTTP Status
๋ง ๋ฐํํ ์ ์๋ค.
์คํ๋ง ๋ฐ์ดํฐ with WebFlux
๊ธฐ์กด์ WebMVC
๋ฐฉ์์ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๊ทผ์ JDBC
๋ฅผ ๊ตฌํํ ๋๋ผ์ด๋ฒ ๋ผ์ด๋ธ๋ฌ๋ฆฌ
๋ฅผ ์ฌ์ฉํด ์ ๊ทผํด์๋ค.
๋ฆฌ์กํฐ๋ธํ DB ํต์ ๋ HTTP ์ ๋ค๋ฅด์ง ์๋ค.
์ด๋ก ์ ์ผ๋ก DB ์ ๊ทผ์ฉ ์๋น์ค๋ฅผ ์์ฑํ๊ณ WebClient
๋ฅผ ์ฌ์ฉํด DB ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์จ๋ค๋ฉด ๋น๋๊ธฐ DB ์ ๊ทผ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ๊ตฌํํ ๊ฒ ๊ณผ ๋ค๋ฆ์๋ค.
๋คํ์ด๋ ๋ค์ํ DB ๋ฒค๋์ฌ์์ ์๋ฐ ๋น๋๊ธฐ DB ์ฐ๊ฒฐ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ธ ๋ฆฌ์กํฐ๋ธ ๋๋ผ์ด๋ฒ
๋ฅผ ์ ๊ณตํจ์ผ๋ก ๋จ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ง ์ถ๊ฐํ๋ฉด ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ ์ด์ด ์ ๋ํ ๋
ผ ๋ธ๋กํน ์์ธ์ค๋ฅผ ํ ์ ์๋ค.
spring-boot-starter-data-mongodb-reactive
spring-boot-starter-data-cassandra-reactive
spring-boot-starter-data-redis-reactive
spring-boot-starter-data-r2dbc
์คํ๋ง ๋ฐ์ดํฐ ํ์์ ๊ธฐ์กด์ ์ฌ์ฉํ๋ Repository
ํจํด์ ๋ฆฌ์กํฐ๋ธ ๋ฐฉ์์๋ ๋๊ฐ์ด ์ฌ์ฉํ ์ ์๋๋ก ์ถ์ํ๋ฅผ ํตํด ๊ตฌํํด๋์๋ค.
๊ฐ ๋ชจ๋๋ค์ด ReactiveCurdRepository
์ธํฐํ์ด์ค๋ฅผ ์ฌ์ฉํด Reactor
๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ํตํฉ๋์ด ์์ฐ์ค๋ฝ๊ฒ ๋ฆฌ์กํฐ๋ธํ๊ฒ ์ฝ๋์์ฑ์ด ๊ฐ๋ฅํ๋ค.
์คํ๋ง ๋ฐ์ดํฐ ๋ชฝ๊ณ DB ๋ฆฌ์กํฐ๋ธ
NoSQL
์ ๊ฒฝ์ฐ ๊ฐ ๋ฒค๋์ฌ์์ ํตํฉ๋ ๊ท์ฝ์ด ์๋ค.
๊ฐ ๋ฒค๋์ฌ์์ ์๊ธฐ๋ค๋ง์ ๋๋ผ์ด๋ฒ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ ๊ณตํ๊ณ ์คํ๋ง ๋ฐ์ดํฐ ํ์ ์คํ๋ง์์ ํด๋น ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ค์ ์ฝ๊ฒ ์ฌ์ฉํ ์ ์๋๋ก ๊ฐ์ข
๋ชจ๋์ ๊ฐ๋ฐํ๊ณ ์๋ค
NoSQL DB
๋ ์ต๊ทผ์ ๋ง๋ค์ด ์ ธ์ ๋๋ถ๋ถ ๋ฒค๋์ฌ๊ฐ ๋ฆฌ์กํฐ๋ธ ๋๋ผ์ด๋ฒ
๋ฅผ ์ ๊ณตํ๊ณ ์์ผ๋ฉฐ
์คํ๋ง ๋ฐ์ดํฐ ํ์ ๋ชฝ๊ณ DB ์์ ์ ๊ณตํ๋ ๋ฆฌ์กํฐ๋ธ ๋๋ผ์ด๋ฒ
๋ฅผ ์ฝ๊ณ ํธํ๊ฒ ์ฌ์ฉํ ์ ์๋๋ก spring-boot-starter-data-mongodb-reactive
๋ชจ๋์ ์์ฑํด๋์๋ค.
ํด๋น ๋ชจ๋์ ์ฌ์ฉํ๋ฉด ์คํ๋ง ํ์์ ๋ง๋ Repository
ํจํด์ ์ฌ์ฉํด ๋ฉ์๋๋ช
๊ธฐ๋ฐ์ผ๋ก ์ฟผ๋ฆฌ๋ฌธ์ด ์๋ ์์ฑ/์ฌ์ฉ ํ ์ ์๋ค.
ReactiveMongoRepository
Copy import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
// ReactiveCurdRepository ๊ตฌํ์ฒด
@Repository
public interface BookReactiveMongoRepository extends ReactiveMongoRepository<Book, ObjectId> {
Mono<Book> findOneByTitle(Mono<String> title);
Flux<Book> findManyByTitleRegex(String regexp);
@Meta(maxScanDocuments = 3)
Flux<Book> findByAuthorsOrderByPublishingYearDesc(Publisher<String> authors);
@Query("{ 'authors.1': { $exists: true } }")
Flux<Book> booksWithFewAuthors();
Flux<Book> findByPublishingYearBetweenOrderByPublishingYear(
Integer from,
Integer to,
Pageable pageable
);
}
ReactiveMongoTemplate
ReactiveMongoRepository
์ธ์๋ ReactiveMongoTemplate
๋ฅผ ์ฌ์ฉํด ์ฟผ๋ฆฌ ์กฐ์์ด ๊ฐ๋ฅํ๋ค.
Copy @Service
@RequiredArgsConstructor
public class RxMongoTemplateQueryService {
private static final String BOOK_COLLECTION = "book";
private final ReactiveMongoTemplate mongoTemplate; // ReactiveMongoTemplate implements ReactiveMongoOperations
public Flux<Book> findBooksByTitle(String title) {
Query query = Query.query(new Criteria("title")
.regex(".*" + title + ".*"))
.limit(100);
return mongoTemplate.find(query, Book.class, BOOK_COLLECTION);
}
}
MongoClient
๋ชฝ๊ณ DB ์์ ์ ๊ณตํ๋ ๋ฆฌ์กํฐ๋ธ ๋๋ผ์ด๋ฒ ๊ตฌํ์ฒด๊ฐ com.mongodb.reactivestreams.client.MongoClient
ํด๋์ค์ด๋ค.
https://mongodb.github.io/mongo-java-driver-reactivestreams/
org.mongodb:mongodb-driver-reactivestreams
๋ชจ๋์์ ์ ๊ณตํ๋ฉฐ spring-boot-starter-data-mongodb-reactive
์์ ๋ด๋ถ์ ์ผ๋ก ์ฌ์ฉํ๋ค.
MongoClient
ํด๋์ค๋ฅผ ์ฌ์ฉํด๋ ์ฟผ๋ฆฌ์กฐ์์ด ๊ฐ๋ฅํ๋ค.
Copy @Service
@RequiredArgsConstructor
public class RxMongoDriverQueryService {
private final MongoClient mongoClient;
public Flux<Book> findBooksByTitle(String title, boolean negate) {
return Flux.defer(() -> {
Bson query = Filters.regex("title", ".*" + title + ".*");
if (negate) query = Filters.not(query);
return mongoClient
.getDatabase("test-db")
.getCollection("book")
.find(query);
}).map(doc -> new Book(
doc.getObjectId("id"), // Document
doc.getString("title"),
doc.getInteger("pubYear")));
}
}
ํธ๋์ญ์
(ReactiveMongoTemplate.inTransaction)
MongoDB 4.0
๋ฒ์ ์ด์ ๊น์ง ํ๋์ ๋ฌธ์ ์ ๋ํด์๋ง ํธ๋์ญ์
์ ์ ๊ณตํ๋ Single-Document Transaction
๊ธฐ๋ฅ๋ง ์์๋ค.
ํ๋์ ๋ฌธ์์ ๋ชจ๋ ์ ๋ณด๋ฅผ ์ฝ์
ํ์ฌ ์ฌ์ฉํ๊ธฐ์ ํ๋์ ํธ๋์ญ์
์ ํ๋์ ๋ฌธ์๋ง ๊ฑด๋ค์ฌ์ Single-Document Transaction
์ผ๋ก๋ ์ถฉ๋ถํด์ผ ํ์ง๋ง ํญ์ ์์ธ๊ฐ ์๋๋ฒ,
๊ฒฐ๊ตญ ์ฌ๋ฌ ๋ฌธ์์ ๋ํ ํธ๋์ญ์
Multi-Document Transaction
๊ธฐ๋ฅ์ MongoDB 4.0
๋ถํฐ ์ง์ํ๋ค.
WiredTiger ์คํ ๋ฆฌ์ง ์์ง์ ์ค๋ฉ์ค์ ์ด ๋์ด ์์ง ์๊ณ ๋ณต์ ์ค์ ์ผ ๊ฒฝ์ฐ์๋ง Multi-Document Transaction
์ ์ง์ํ๋ค.
ReactiveMongoTemplate
์ inTransaction
๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ฉด
Copy private Mono<TxResult> doTransferMoney(String from, String to, Integer amount) {
return mongoTemplate.inTransaction().execute(session -> session
.findOne(queryForOwner(from), Wallet.class)
.flatMap(fromWallet -> session
.findOne(queryForOwner(to), Wallet.class)
.flatMap(toWallet -> {
if (fromWallet.hasEnoughFunds(amount)) {
fromWallet.withdraw(amount);
toWallet.deposit(amount);
return session.save(fromWallet)
.then(session.save(toWallet))
.then(ReactiveMongoContext.getSession())
// An example how to resolve the current session
.doOnNext(tx -> log.info("Current session: {}", tx))
.then(Mono.just(TxResult.SUCCESS));
} else {
return Mono.just(TxResult.NOT_ENOUGH_FUNDS);
}
})))
.onErrorResume(e -> Mono.error(new RuntimeException("Conflict")))
.last();
}
์คํ๋ง ๋ฐ์ดํฐ R2DBC
R2DBC: Reactive Relational Database Connectivity
https://r2dbc.io/ https://spring.io/projects/spring-data-r2dbc https://spring.io/projects/spring-data-r2dbc
์๋์ ๊ฐ์ DBMS ์ ๋ํ์ฌ r2dbc ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ ๊ณต
Copy H2 (io.r2dbc:r2dbc-h2)
MariaDB (org.mariadb:r2dbc-mariadb)
Microsoft SQL Server (io.r2dbc:r2dbc-mssql)
MySQL (dev.miku:r2dbc-mysql)
jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)
Postgres (io.r2dbc:r2dbc-postgresql)
Oracle (com.oracle.database.r2dbc:oracle-r2dbc)
์ง๊ธ๊น์ง ์คํ๋ง JDBC
ํน์ ์คํ๋ง ๋ฐ์ดํฐ JDBC
ํน์ JPA
๋ฅผ ์ฌ์ฉํด ์์ฑ๋ Hikari CP
์์ ์ฐ๊ฒฐ๊ฐ์ฒด๊ฐ JDBC
๋๋ผ์ด๋ฒ๋ฅผ ์ฌ์ฉํด ๊ด๊ณํ DB ๋ฅผ ์ฌ์ฉํด ์๋ค.
Copy @Repository
public interface BookSpringDataJdbcRepository extends CrudRepository<Book, Integer> {
@Query("SELECT * FROM book b WHERE b.title = :title")
CompletableFuture<Book> findBookByTitleAsync(@Param("title") String title);
}
JDBC, JPA
๋ฑ์ ๊ด๊ณํ DB ๋ผ์ด๋ธ๋ฌ๋ฆฌ
๋ค์ ๋ชจ๋ ๋๊ธฐ/๋ธ๋ญํน
๋ฐฉ์์ผ๋ก ๋์ํ๋ค.
๋คํ์ด๋ spring data jdbc
๋ฅผ ๊ฐ๋ฐํ ์คํ๋ง ๋ฐ์ดํฐ Relational
ํ๋ก์ ํธ ํ์์
๋ฆฌ์กํฐ๋ธ์ ์ ํฉ ํ ์๋ฐ DB ๋๋ผ์ด๋ฒ
์ธ ๋ฆฌ์กํฐ๋ธ ๋๋ผ์ด๋ฒ
๋ฅผ ๊ฐ๋ฐ์ค์ด๋ค.
์ด ๋ฆฌ์กํฐ๋ธ ๋๋ผ์ด๋ฒ
๋ฅผ ์ฌ์ฉํ ํ๋ก์ ํธ๊ฐ R2DBC
ํ๋ก์ ํธ์ด๋ค.
๋์ด์ JDBC
๋ฅผ ์ฌ์ฉํ์ง ์๊ณ ๋ฆฌ์กํฐ๋ธ ์คํ์ ์ ํฉํ ๋ฆฌ์กํฐ๋ธ ๋๋ผ์ด๋ฒ
๋ฅผ ์ฌ์ฉํด DB ์ ์ ๊ทผ, ๋ฐ์ดํฐ๋ฅผ ์กฐ์ํ๋ค.
์ํ๊น๊ฒ๋ JPA
๋ ๊ธฐ์กด ์ฝ๋๊ฐ ๋๋ฌด ๋ณต์กํ๋์ง ๋ฆฌ์กํฐ๋ธ ์ง์์ ํ์ง ์์๊ฒ์ผ๋ก ๋ณด์ธ๋ค.
ReactiveCrudRepository
Copy @Repository
public interface MemberRepository extends ReactiveCrudRepository<Member, Long> {
Mono<Member> findByName(String name);
Mono<Member> findByUserName(String name);
@Query("SELECT * FROM member WHERE name = :name AND user_name = :userName")
Mono<Member> findByNameAndUserName(String name, String userName);
}
R2dbcEntityTemplate
Copy @Service
@RequiredArgsConstructor
public class MemberDynamicRepository {
private final R2dbcEntityTemplate r2dbcEntityTemplate;
public Flux<Member> findTest(String userName) {
Query query = Query.query(where("user_name").like("%" + userName + "%"))
.limit(10)
.offset(0);
return r2dbcEntityTemplate.select(Member.class)
.matching(query)
.all();
}
}
์คํ๋ง ๋ฐ์ดํฐ Redis
spring-boot-starter-data-redis-reactive
๋ชจ๋์ ์ฌ์ฉ ReactiveRedisTemplate
ํด๋์ค๊ฐ Redis
์ปค๋ฅ์
์ ํต์ฌํด๋์ค์ด๋ค.
๋ค๋ฅธ ์คํ๋ง ๋ฐ์ดํฐ ํ๋ก์ ํธ์ ๋ฌ๋ฆฌ Repository
๊ฐ ์กด์ฌํ์ง ์์
์ผ๋ฐ์ ์ธ ๋ฐ์ดํฐ ๊ด๋ฆฌ ์ธ์๋ ๊ตฌ๋
/๋ฐํ ๊ตฌ์กฐ์ ๋ฉ์์ง ๊ธฐ๋ฅ๋ ์ง์ํ๋ค.
spring-boot-starter-data-redis-reactive
๋ชจ๋์ ๋ด๋ถ์ ์ผ๋ก Lettuce
๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ค.
https://lettuce.io/, ํ์ฌ non blokcing ์ ์ง์ํ๋ redis ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ Lettuce ๊ฐ ์ ์ผํ๋ค.
๋ํ Lettuce
๋ผ์ด๋ธ๋ฌ๋ฆฌ ๋ด์์ Reactor
๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ค.
Copy public class Sample {
private String name;
private String description;
}
@Configuration
public class RedisConfig {
@Value("${redis.host}")
private String host;
@Value("${redis.port}")
private Integer port;
@Bean
public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
return new LettuceConnectionFactory(host, port);
}
@Bean
public ReactiveRedisTemplate<String, Sample> reactiveRedisTemplate(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
StringRedisSerializer keySerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer<Sample> valueSerializer = new Jackson2JsonRedisSerializer<>(Sample.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, Sample> builder =
RedisSerializationContext.newSerializationContext(keySerializer);
RedisSerializationContext<String, Sample> context = builder.value(valueSerializer).build();
return new ReactiveRedisTemplate(reactiveRedisConnectionFactory, context);
}
}
Copy @Service
@RequiredArgsConstructor
public class SampleService {
private final ReactiveRedisTemplate<String, Sample> redisTemplate;
public Mono<Boolean> put(String key, Sample sample) {
return redisTemplate.opsForValue().set(key, sample);
}
public Mono<Sample> get(String key) {
return redisTemplate.opsForValue().get(key);
}
public Flux<Sample> getAll(String keyPattern){
return redisTemplate.keys(keyPattern)
.flatMap(key-> redisTemplate.opsForValue().get(key));
}
public Mono<Boolean> delete(String key) {
return redisTemplate.opsForValue().delete(key);
}
}
๊ธฐํ
ListenableFuture
์คํ๋ง์์ ์ ๊ณตํ๋ Future
๊ตฌํ ํด๋์ค
Copy public interface ListenableFuture<T> extends Future<T> {
void addCallback(ListenableFutureCallback<? super T> callback);
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
default CompletableFuture<T> completable() {
CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
addCallback(completable::complete, completable::completeExceptionally);
return completable;
}
}
์์ SseEmitter
์ ๋ง์ฐฌ๊ฐ์ง๋ก ์คํ๋ง ๋ฆฌ์กํฐ๋ธ์ ๋ฐํ๋ฐ์ดํฐ๋ก ์ฌ์ฉ๋๋ค.
Copy // ๋น๋๊ธฐ RestTemplate
AsyncRestTemplate httpClient = new AsyncRestTemplate();
@GetMapping
public ListenableFuture<?> requestData() {
AsyncDatabaseClient databaseClient = new FakeAsyncDatabaseClient();
// /hello ์ ํธ์ถ๊ฒฐ๊ณผ๋ฅผ CompletableFuture ์ผ๋ก ๋ฐํํ๋ ์ด๋ํฐ
CompletionStage<String> completionStage = AsyncAdapters.toCompletion(httpClient.execute(
"http://localhost:8080/hello",
HttpMethod.GET, null,
new HttpMessageConverterExtractor<>(String.class, messageConverters) // http ์ body ๋ถ๋ถ ์ปจ๋ฒํฐ๋ค ์ง์
));
// CompletionStage(CompletableFuture ์ ์ธํฐํ์ด์ค) ๋ฅผ ListenableFuture ๋ก ๋ณํ
return AsyncAdapters.toListenable(databaseClient.store(completionStage));
}
AsyncRestTemplate.execute
๊ฐ ๋ฐํํ๋ ListenableFuture
๋ฅผ CompletionStage
๋ก ๋ณํ ๋ฐํ๋ CompletionStage
๋ฅผ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ์ฅํ ๋ค์ ๋ฐํ๋ CompletionStage
๋ฅผ ListenableFuture
๋ก ๋ณํํ๋ค.
์ผ๋ฐ์ ์ผ๋ก ๊ฐ๋จํ ๋น๋๊ธฐ ์ฒ๋ฆฌ๋ Future
๋ฅผ ๊ตฌํํ CompletableFuture
(CompletionStage
๊ตฌํ์ฒด) ๋ฅผ ์ฃผ๋ก ์ฌ์ฉํ๋ค.
๋ฉ์๋ ์ ์์ ๋ฐํ๊ฐ์ ์คํ๋ง์์ ๊ธฐ๋ณธ ์ ๊ณตํ๋ ListenableFuture
๋ฅผ ๊ตฌํํ๋ ๊ฐ์ฒด๋ก ๋ฐํํ๊ธฐ ํ๋ค๊ธฐ์
์์๊ฐ์ AsyncAdapters
๋ฅผ ์ฌ์ฉํด ๋น๋๊ธฐ ๊ฒฐ๊ณผ๋ฅผ ListenableFuture
๋ก ๋ณํํด์ฃผ๋ ์ด๋ํฐ๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ํธํ๋ค.
Copy public final class AsyncAdapters {
// ListenableFuture -> completionStage
public static <T> CompletionStage<T> toCompletion(ListenableFuture<T> future) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
future.addCallback(completableFuture::complete, completableFuture::completeExceptionally);
return completableFuture;
}
// completionStage -> ListenableFuture
public static <T> ListenableFuture<T> toListenable(CompletionStage<T> stage) {
SettableListenableFuture<T> future = new SettableListenableFuture<>();
stage.whenComplete((v, t) -> {
if (t == null) future.set(v);
else future.setException(t);
});
return future;
}
}
AsyncRestTemplate
์คํ๋ง ๋ฆฌ์กํฐ๋ธ์์ ๋๊ธฐ๋ฐฉ์์ธ ์ผ๋ฐ RestTemplate
์ ์ฌ์ฉํ์ง ์๊ณ AsyncRestTemplate
๋ฅผ ์ฌ์ฉํ๋ค.
ํํ์ฌ์ฉํ๋ execute
๋ฉ์๋์ ๋ฐํ๊ฐ์ด ListenableFuture
๊ฐ์ฒด์ด๋ค.
Copy @Override
public <T> ListenableFuture<T> execute(String url, HttpMethod method, AsyncRequestCallback requestCallback,
ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {
URI expanded = getUriTemplateHandler().expand(url, uriVariables);
return doExecute(expanded, method, requestCallback, responseExtractor);
}
์ถ์ฒ : https://kouzie.github.io/spring/Spring-Boot-%EC%8A%A4%ED%94%84%EB%A7%81-%EB%A6%AC%EC%95%A1%ED%8B%B0%EB%B8%8C-%EA%B0%9C%EC%9A%94/#webflux