Spring Webflux + Reactive Kafka (1) - Producer

1. 개요

Spring Webflux & Reactive Kafka를 활용하여 API를 통한 프로듀서와 컨슈머를 구성해보려 한다. Bloking IO를 사용할때의 개발과는 전혀 달라서 익숙해지는데 꽤나 걸릴 듯 하다. Webflux의 라우팅 방식이 아닌 RestController를 사용하여 구성했다.

기본적인 구성도는 매우 간단하다. 단순히 RestController를 통해 들어온 메시지를 카프카로 전송하고 컨슈머는 카프카에서 메시지를 가져오기만 하는 굉장히 단순한 흐름이다. 단지 이 모든 과정을 논블로킹 I/O로 처리하는것이 핵심이다.

전체 소스 코드 : Github 링크


2. 카프카 구성

로컬환경이 아닌 실무환경과 비슷하게 외부에 도커를 이용하여 구성하였다. 주키퍼와 브로커는 각 1대씩으로 우선 구성하였고 2대 이상 클러스터는 차후 시간날때 구성하려 한다.

2-1. docker-compose.yml

개인 홈서버에 구성하였고 파티션은 1개로 설정했다.


3. 프로듀서 설정(Producer)

Controller를 통해 들어온 메시지를 카프카로 전송하기 위한 Bean 설정 및 옵션 설정이다. 현재 프로듀서와 컨슈머가 독립적인 프로젝트로 구성되어있기에 해당 설정에는 프로듀서용 설정만 해놓은 상태이다.

3-1. KafkaConfig.java

Spring Kafka의 설정과는 좀 달라진 부분들이 많다. 매우 기본적인 설정만 구성한 상태이고 이 외에도 레퍼런스를 찾아보면 다양하고 복잡한 설정들을 찾을 수 있다.

공식 레퍼런스 : 공식 레퍼런스 링크

3-2. ProduceService.java

KafkaService 클래스에 넘어온 AbstractKafkaMessage 클래스는 Controller를 통하여 요청받은 @RequestBody 데이터이다. 내부적인 5XX 서버 에러를 리턴해주고 싶지 않기 때문에 KafkaProduceResult 라는 클래스를 만들어 자체적으로 처리하도록 구성하려 한다. 즉 요청 측에서는 전송 결과의 StatusCode는 무조건 2XX로 받게 될것이고 내부적으로는 메시지 전송에 실패할 경우 NOSQL이나 기타 방법등을 활용하여 재전송 처리를 할수 있도록 하려 한다.

3-3. KafkaService.java

실제 요청받은 메시지를 카프카로 보내는 코드이다. 실행해보면 100건, 1000건, 10000건이건 간에 싱글스레드로 처리되는데 이 부분을 멀티쓰레드로 돌리고 싶어 구글링을 열심히 해본 결과 Sender의 경우 애초에 싱글스레드로 돌아가도록 구현되어있다고 한다. 옵션에 스케쥴러도 다르게 지정해보고 삽질이란 삽질은 다해봤는데...


4. 그 외 메시지 클래스

요청받은 메시지 매핑 및 전송 처리 결과를 리턴하기 위한 모델 클래스들이다. 위 Kafka관련 클래스들과는 연관 없는 클래스이다. 대충 토이프로젝트의 의도를 보여주기 위함이다.

4-1. AbstractKafkaMessage.java

대충 어노테이션을 보면 @RequstBody 를 이용해 매핑되는 클래스로 KafkaUriMessage 타입과 KafkaBodyMessage 타입 있다는 것을 알 수있다. 이는 혹시 전송실패할 경우 두개의 타입을 다르게 처리하려고 나눠놓은 것이다.

4-2. KafkaProduceResult.java

마지막으로 전송결과가 매핑될 클래스이다. 요청 측에서는 해당 클래스의 내용에 따라 전송 성공 & 실패를 확인할 수 있다.

출처 : https://velog.io/@deogicorgi/Spring-Webflux-Reactive-Kafka-1 (굉장히잘 정리해주셨다~)

Last updated