1. ๊ฐ์
Spring Webflux & Reactive Kafka๋ฅผ ํ์ฉํ์ฌ API๋ฅผ ํตํ ํ๋ก๋์์ ์ปจ์๋จธ๋ฅผ ๊ตฌ์ฑํด๋ณด๋ ค ํ๋ค.
Bloking IO๋ฅผ ์ฌ์ฉํ ๋์ ๊ฐ๋ฐ๊ณผ๋ ์ ํ ๋ฌ๋ผ์ ์ต์ํด์ง๋๋ฐ ๊ฝค๋ ๊ฑธ๋ฆด ๋ฏ ํ๋ค. Webflux์ ๋ผ์ฐํ
๋ฐฉ์์ด ์๋ RestController๋ฅผ ์ฌ์ฉํ์ฌ ๊ตฌ์ฑํ๋ค.
๊ธฐ๋ณธ์ ์ธ ๊ตฌ์ฑ๋๋ ๋งค์ฐ ๊ฐ๋จํ๋ค. ๋จ์ํ RestController๋ฅผ ํตํด ๋ค์ด์จ ๋ฉ์์ง๋ฅผ ์นดํ์นด๋ก ์ ์กํ๊ณ ์ปจ์๋จธ๋ ์นดํ์นด์์ ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ค๊ธฐ๋ง ํ๋ ๊ต์ฅํ ๋จ์ํ ํ๋ฆ์ด๋ค. ๋จ์ง ์ด ๋ชจ๋ ๊ณผ์ ์ ๋
ผ๋ธ๋กํน I/O๋ก ์ฒ๋ฆฌํ๋๊ฒ์ด ํต์ฌ์ด๋ค.
์ ์ฒด ์์ค ์ฝ๋ : Github ๋งํฌ
2. ์นดํ์นด ๊ตฌ์ฑ
๋ก์ปฌํ๊ฒฝ์ด ์๋ ์ค๋ฌดํ๊ฒฝ๊ณผ ๋น์ทํ๊ฒ ์ธ๋ถ์ ๋์ปค๋ฅผ ์ด์ฉํ์ฌ ๊ตฌ์ฑํ์๋ค. ์ฃผํคํผ์ ๋ธ๋ก์ปค๋ ๊ฐ 1๋์ฉ์ผ๋ก ์ฐ์ ๊ตฌ์ฑํ์๊ณ 2๋ ์ด์ ํด๋ฌ์คํฐ๋ ์ฐจํ ์๊ฐ๋ ๋ ๊ตฌ์ฑํ๋ ค ํ๋ค.
2-1. docker-compose.yml
๊ฐ์ธ ํ์๋ฒ์ ๊ตฌ์ฑํ์๊ณ ํํฐ์
์ 1๊ฐ๋ก ์ค์ ํ๋ค.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://deogicorgi.home:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
3. ํ๋ก๋์ ์ค์ (Producer)
Controller๋ฅผ ํตํด ๋ค์ด์จ ๋ฉ์์ง๋ฅผ ์นดํ์นด๋ก ์ ์กํ๊ธฐ ์ํ Bean ์ค์ ๋ฐ ์ต์
์ค์ ์ด๋ค. ํ์ฌ ํ๋ก๋์์ ์ปจ์๋จธ๊ฐ ๋
๋ฆฝ์ ์ธ ํ๋ก์ ํธ๋ก ๊ตฌ์ฑ๋์ด์๊ธฐ์ ํด๋น ์ค์ ์๋ ํ๋ก๋์์ฉ ์ค์ ๋ง ํด๋์ ์ํ์ด๋ค.
3-1. KafkaConfig.java
Spring Kafka์ ์ค์ ๊ณผ๋ ์ข ๋ฌ๋ผ์ง ๋ถ๋ถ๋ค์ด ๋ง๋ค. ๋งค์ฐ ๊ธฐ๋ณธ์ ์ธ ์ค์ ๋ง ๊ตฌ์ฑํ ์ํ์ด๊ณ ์ด ์ธ์๋ ๋ ํผ๋ฐ์ค๋ฅผ ์ฐพ์๋ณด๋ฉด ๋ค์ํ๊ณ ๋ณต์กํ ์ค์ ๋ค์ ์ฐพ์ ์ ์๋ค.
๊ณต์ ๋ ํผ๋ฐ์ค : ๊ณต์ ๋ ํผ๋ฐ์ค ๋งํฌ
/**
* Kafka ์ค์
*/
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
private final KafkaProperties properties;
/******************************************************************
************************ Producer Options ************************
******************************************************************/
// ๊ธฐ๋ณธ ์ค์ ๋ค๋ก ๊ตฌ์ฑ
@Bean("kafkaSender")
public KafkaSender<String, Object> kafkaSender() {
SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
senderOptions.scheduler(Schedulers.parallel());
senderOptions.closeTimeout(Duration.ofSeconds(5));
return KafkaSender.create(senderOptions);
}
// ํ๋ก๋์ ์ต์
private Map<String, Object> getProducerProps() {
return new HashMap<>() {{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts());
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); // ์ ์ก ์๊ฐ ์ ํ์ 1000ms๋ก ์ค์
}};
}
}
3-2. ProduceService.java
KafkaService ํด๋์ค์ ๋์ด์จ AbstractKafkaMessage ํด๋์ค๋ Controller๋ฅผ ํตํ์ฌ ์์ฒญ๋ฐ์ @RequestBody ๋ฐ์ดํฐ์ด๋ค. ๋ด๋ถ์ ์ธ 5XX ์๋ฒ ์๋ฌ๋ฅผ ๋ฆฌํดํด์ฃผ๊ณ ์ถ์ง ์๊ธฐ ๋๋ฌธ์ KafkaProduceResult ๋ผ๋ ํด๋์ค๋ฅผ ๋ง๋ค์ด ์์ฒด์ ์ผ๋ก ์ฒ๋ฆฌํ๋๋ก ๊ตฌ์ฑํ๋ ค ํ๋ค. ์ฆ ์์ฒญ ์ธก์์๋ ์ ์ก ๊ฒฐ๊ณผ์ StatusCode๋ ๋ฌด์กฐ๊ฑด 2XX๋ก ๋ฐ๊ฒ ๋ ๊ฒ์ด๊ณ ๋ด๋ถ์ ์ผ๋ก๋ ๋ฉ์์ง ์ ์ก์ ์คํจํ ๊ฒฝ์ฐ NOSQL์ด๋ ๊ธฐํ ๋ฐฉ๋ฒ๋ฑ์ ํ์ฉํ์ฌ ์ฌ์ ์ก ์ฒ๋ฆฌ๋ฅผ ํ ์ ์๋๋ก ํ๋ ค ํ๋ค.
/**
* ํ๋ก๋์ฑ ์๋น์ค
* Kafka ํ๋ก๋์ฑ ์ ๋ก์ง ์ฒ๋ฆฌ
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ProduceService {
private final KafkaService kafkaService;
private final FailureMessageService failureMessageService;
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
return kafkaService.send(message)
.map(produceResult -> {
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
if (produceResult.hasError()) {
failureMessageService.produceFailure();
// TODO ์นดํ์นด ํ๋ก๋์ฑ ์คํจ์ผ ๊ฒฝ์ฐ ์ฒ๋ฆฌ
// ex ) ์ฒ๋ฆฌํ์ง๋ชปํ ์์ฒญ์ ๋ชฝ๊ณ ๋ฑ์ ์ ์ฅ ํ ์ฌ์๋, ๋ก๊น
๋ฑ๋ฑ
log.error("Kafka produce error : {}", produceResult.getErrorMessage());
}
return produceResult;
});
}
}
3-3. KafkaService.java
์ค์ ์์ฒญ๋ฐ์ ๋ฉ์์ง๋ฅผ ์นดํ์นด๋ก ๋ณด๋ด๋ ์ฝ๋์ด๋ค. ์คํํด๋ณด๋ฉด 100๊ฑด, 1000๊ฑด, 10000๊ฑด์ด๊ฑด ๊ฐ์ ์ฑ๊ธ์ค๋ ๋๋ก ์ฒ๋ฆฌ๋๋๋ฐ ์ด ๋ถ๋ถ์ ๋ฉํฐ์ฐ๋ ๋๋ก ๋๋ฆฌ๊ณ ์ถ์ด ๊ตฌ๊ธ๋ง์ ์ด์ฌํ ํด๋ณธ ๊ฒฐ๊ณผ Sender์ ๊ฒฝ์ฐ ์ ์ด์ ์ฑ๊ธ์ค๋ ๋๋ก ๋์๊ฐ๋๋ก ๊ตฌํ๋์ด์๋ค๊ณ ํ๋ค. ์ต์
์ ์ค์ผ์ฅด๋ฌ๋ ๋ค๋ฅด๊ฒ ์ง์ ํด๋ณด๊ณ ์ฝ์ง์ด๋ ์ฝ์ง์ ๋คํด๋ดค๋๋ฐ...
/**
* ์นดํ์นด ์๋น์ค
* ์ค์ ์นดํ์นด๋ก ๋ฉ์์ง ํ๋ก๋์ฑ
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaService {
private final KafkaSender<String, Object> producer;
public Mono<KafkaProduceResult> send(AbstractKafkaMessage message) {
return producer.createOutbound()
// ์ง์ ๋ ํ ํฝ์ผ๋ก ๋ฉ์์ง ์ ์ก
.send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage())))
.then()
// ์๋ฌ ์์ด ์ ์ก์ด ์๋ฃ ๋์์ ๊ฒฝ์ฐ
.thenReturn(new KafkaProduceResult(message))
// ์๋ฌ๊ฐ ๋ฐ์ํ์ ๊ฒฝ์ฐ
.onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e)));
}
}
4. ๊ทธ ์ธ ๋ฉ์์ง ํด๋์ค
์์ฒญ๋ฐ์ ๋ฉ์์ง ๋งคํ ๋ฐ ์ ์ก ์ฒ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ๋ฆฌํดํ๊ธฐ ์ํ ๋ชจ๋ธ ํด๋์ค๋ค์ด๋ค. ์ Kafka๊ด๋ จ ํด๋์ค๋ค๊ณผ๋ ์ฐ๊ด ์๋ ํด๋์ค์ด๋ค. ๋์ถฉ ํ ์ดํ๋ก์ ํธ์ ์๋๋ฅผ ๋ณด์ฌ์ฃผ๊ธฐ ์ํจ์ด๋ค.
4-1. AbstractKafkaMessage.java
๋์ถฉ ์ด๋
ธํ
์ด์
์ ๋ณด๋ฉด @RequstBody ๋ฅผ ์ด์ฉํด ๋งคํ๋๋ ํด๋์ค๋ก KafkaUriMessage ํ์
๊ณผ KafkaBodyMessage ํ์
์๋ค๋ ๊ฒ์ ์ ์์๋ค. ์ด๋ ํน์ ์ ์ก์คํจํ ๊ฒฝ์ฐ ๋๊ฐ์ ํ์
์ ๋ค๋ฅด๊ฒ ์ฒ๋ฆฌํ๋ ค๊ณ ๋๋ ๋์ ๊ฒ์ด๋ค.
/**
* ์นดํ์นด ๋ฉ์์ง ๋ฒ ์ด์ค
* ํ๋ก๋์ ๋ด ์๋ฌ ๋ฐ์์ ์ฒ๋ฆฌ๋ฅผ ์ฝ๊ฒํ๊ธฐ ์ํด URI ํํ์ Message ํํ๋ก ๋๋
*/
@Getter
@Setter
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
property = "type",
defaultImpl = KafkaUriProduceMessage.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = KafkaUriMessage.class, names = {"uri", "Uri", "URI"}),
@JsonSubTypes.Type(value = KafkaBodyMessage.class, names = {"message", "Message", "MESSAGE"})
})
public abstract class AbstractKafkaMessage {
// ์์ฒญ ํ ํฝ
protected String topic;
// ๋ฉ์์ง ํ์
(uri , message)
protected ProduceMessageType type;
// ์์ฒญ ์๊ฐ
protected LocalDateTime requestedAt;
public abstract String getRequestedMessage();
}
4-2. KafkaProduceResult.java
๋ง์ง๋ง์ผ๋ก ์ ์ก๊ฒฐ๊ณผ๊ฐ ๋งคํ๋ ํด๋์ค์ด๋ค. ์์ฒญ ์ธก์์๋ ํด๋น ํด๋์ค์ ๋ด์ฉ์ ๋ฐ๋ผ ์ ์ก ์ฑ๊ณต & ์คํจ๋ฅผ ํ์ธํ ์ ์๋ค.
/**
* ์นดํ์นด ๋ฉ์์ง ์ ์ก๊ฒฐ๊ณผ ํด๋์ค
*/
@Getter
public class KafkaProduceResult {
// ๋ฉ์์ง ์ ์ก ์ํ - true : ์ ์ก์๋ฃ, false : ์ ์ก์คํจ
private Boolean status = true;
// ๋ฉ์์ง ์ ์ก ํ ํฝ
private String topic;
// ์์ฒญ๋ฐ์ ๋ฉ์์ง ํ์
(uri, message)
private ProduceMessageType messageType;
// ์์ฒญ๋ฐ์ ๋ฉ์์ง - URI ๋๋ JSON String
private String requestedMessage;
// ์๋ฌ - ์ ์ก๊ณผ์ ์ค ๋ฐ์๋ ์๋ฌ, ์ ์ก์๋ฃ ์ผ ๊ฒฝ์ฐ null
@JsonIgnore
private Throwable error = null;
// ์๋ฌ ๋ฉ์์ง - ์ ์ก๊ณผ์ ์ค ๋ฐ์๋ ์๋ฌ, ์ ์ก์๋ฃ ์ผ ๊ฒฝ์ฐ null
private String errorMessage = null;
// ๋ฉ์์ง๋ฅผ ์์ฒญ๋ฐ์ ์๊ฐ
private LocalDateTime requestedAt;
// ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ์๊ฐ
private LocalDateTime producedAt;
public KafkaProduceResult(AbstractKafkaMessage message) {
this.setRequestedMessage(message);
}
public KafkaProduceResult(AbstractKafkaMessage message, Throwable e) {
this.setRequestedMessage(message);
this.status = false;
this.error = e;
this.errorMessage = e.getMessage();
this.producedAt = null;
}
public Boolean hasError() {
return error != null;
}
private void setRequestedMessage(AbstractKafkaMessage requestedMessage) {
this.topic = requestedMessage.getTopic();
this.messageType = requestedMessage.getType();
this.requestedMessage = requestedMessage.getRequestedMessage();
this.producedAt = LocalDateTime.now();
this.requestedAt = requestedMessage.getRequestedAt();
}
}
์ถ์ฒ : https://velog.io/@deogicorgi/Spring-Webflux-Reactive-Kafka-1 (๊ต์ฅํ์ ์ ๋ฆฌํด์ฃผ์
จ๋ค~)