Spring Webflux + Reactive Kafka (1) - Producer

1. ๊ฐœ์š”

Spring Webflux & Reactive Kafka๋ฅผ ํ™œ์šฉํ•˜์—ฌ API๋ฅผ ํ†ตํ•œ ํ”„๋กœ๋“€์„œ์™€ ์ปจ์Šˆ๋จธ๋ฅผ ๊ตฌ์„ฑํ•ด๋ณด๋ ค ํ•œ๋‹ค. Bloking IO๋ฅผ ์‚ฌ์šฉํ• ๋•Œ์˜ ๊ฐœ๋ฐœ๊ณผ๋Š” ์ „ํ˜€ ๋‹ฌ๋ผ์„œ ์ต์ˆ™ํ•ด์ง€๋Š”๋ฐ ๊ฝค๋‚˜ ๊ฑธ๋ฆด ๋“ฏ ํ•˜๋‹ค. Webflux์˜ ๋ผ์šฐํŒ… ๋ฐฉ์‹์ด ์•„๋‹Œ RestController๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ตฌ์„ฑํ–ˆ๋‹ค.

๊ธฐ๋ณธ์ ์ธ ๊ตฌ์„ฑ๋„๋Š” ๋งค์šฐ ๊ฐ„๋‹จํ•˜๋‹ค. ๋‹จ์ˆœํžˆ RestController๋ฅผ ํ†ตํ•ด ๋“ค์–ด์˜จ ๋ฉ”์‹œ์ง€๋ฅผ ์นดํ”„์นด๋กœ ์ „์†กํ•˜๊ณ  ์ปจ์Šˆ๋จธ๋Š” ์นดํ”„์นด์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๊ฐ€์ ธ์˜ค๊ธฐ๋งŒ ํ•˜๋Š” ๊ต‰์žฅํžˆ ๋‹จ์ˆœํ•œ ํ๋ฆ„์ด๋‹ค. ๋‹จ์ง€ ์ด ๋ชจ๋“  ๊ณผ์ •์„ ๋…ผ๋ธ”๋กœํ‚น I/O๋กœ ์ฒ˜๋ฆฌํ•˜๋Š”๊ฒƒ์ด ํ•ต์‹ฌ์ด๋‹ค.

์ „์ฒด ์†Œ์Šค ์ฝ”๋“œ : Github ๋งํฌ


2. ์นดํ”„์นด ๊ตฌ์„ฑ

๋กœ์ปฌํ™˜๊ฒฝ์ด ์•„๋‹Œ ์‹ค๋ฌดํ™˜๊ฒฝ๊ณผ ๋น„์Šทํ•˜๊ฒŒ ์™ธ๋ถ€์— ๋„์ปค๋ฅผ ์ด์šฉํ•˜์—ฌ ๊ตฌ์„ฑํ•˜์˜€๋‹ค. ์ฃผํ‚คํผ์™€ ๋ธŒ๋กœ์ปค๋Š” ๊ฐ 1๋Œ€์”ฉ์œผ๋กœ ์šฐ์„  ๊ตฌ์„ฑํ•˜์˜€๊ณ  2๋Œ€ ์ด์ƒ ํด๋Ÿฌ์Šคํ„ฐ๋Š” ์ฐจํ›„ ์‹œ๊ฐ„๋‚ ๋•Œ ๊ตฌ์„ฑํ•˜๋ ค ํ•œ๋‹ค.

2-1. docker-compose.yml

๊ฐœ์ธ ํ™ˆ์„œ๋ฒ„์— ๊ตฌ์„ฑํ•˜์˜€๊ณ  ํŒŒํ‹ฐ์…˜์€ 1๊ฐœ๋กœ ์„ค์ •ํ–ˆ๋‹ค.

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://deogicorgi.home:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

3. ํ”„๋กœ๋“€์„œ ์„ค์ •(Producer)

Controller๋ฅผ ํ†ตํ•ด ๋“ค์–ด์˜จ ๋ฉ”์‹œ์ง€๋ฅผ ์นดํ”„์นด๋กœ ์ „์†กํ•˜๊ธฐ ์œ„ํ•œ Bean ์„ค์ • ๋ฐ ์˜ต์…˜ ์„ค์ •์ด๋‹ค. ํ˜„์žฌ ํ”„๋กœ๋“€์„œ์™€ ์ปจ์Šˆ๋จธ๊ฐ€ ๋…๋ฆฝ์ ์ธ ํ”„๋กœ์ ํŠธ๋กœ ๊ตฌ์„ฑ๋˜์–ด์žˆ๊ธฐ์— ํ•ด๋‹น ์„ค์ •์—๋Š” ํ”„๋กœ๋“€์„œ์šฉ ์„ค์ •๋งŒ ํ•ด๋†“์€ ์ƒํƒœ์ด๋‹ค.

3-1. KafkaConfig.java

Spring Kafka์˜ ์„ค์ •๊ณผ๋Š” ์ข€ ๋‹ฌ๋ผ์ง„ ๋ถ€๋ถ„๋“ค์ด ๋งŽ๋‹ค. ๋งค์šฐ ๊ธฐ๋ณธ์ ์ธ ์„ค์ •๋งŒ ๊ตฌ์„ฑํ•œ ์ƒํƒœ์ด๊ณ  ์ด ์™ธ์—๋„ ๋ ˆํผ๋Ÿฐ์Šค๋ฅผ ์ฐพ์•„๋ณด๋ฉด ๋‹ค์–‘ํ•˜๊ณ  ๋ณต์žกํ•œ ์„ค์ •๋“ค์„ ์ฐพ์„ ์ˆ˜ ์žˆ๋‹ค.

๊ณต์‹ ๋ ˆํผ๋Ÿฐ์Šค : ๊ณต์‹ ๋ ˆํผ๋Ÿฐ์Šค ๋งํฌ

/**
 * Kafka ์„ค์ •
 */
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {

    private final KafkaProperties properties;

    /******************************************************************
     ************************ Producer Options ************************
     ******************************************************************/
    
    // ๊ธฐ๋ณธ ์„ค์ •๋“ค๋กœ ๊ตฌ์„ฑ
    @Bean("kafkaSender")
    public KafkaSender<String, Object> kafkaSender() {
        SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
        senderOptions.scheduler(Schedulers.parallel());
        senderOptions.closeTimeout(Duration.ofSeconds(5));
        return KafkaSender.create(senderOptions);
    }
    
    // ํ”„๋กœ๋“€์„œ ์˜ต์…˜
    private Map<String, Object> getProducerProps() {
        return new HashMap<>() {{
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts());
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); // ์ „์†ก ์‹œ๊ฐ„ ์ œํ•œ์„ 1000ms๋กœ ์„ค์ •
        }};
    }
}

3-2. ProduceService.java

KafkaService ํด๋ž˜์Šค์— ๋„˜์–ด์˜จ AbstractKafkaMessage ํด๋ž˜์Šค๋Š” Controller๋ฅผ ํ†ตํ•˜์—ฌ ์š”์ฒญ๋ฐ›์€ @RequestBody ๋ฐ์ดํ„ฐ์ด๋‹ค. ๋‚ด๋ถ€์ ์ธ 5XX ์„œ๋ฒ„ ์—๋Ÿฌ๋ฅผ ๋ฆฌํ„ดํ•ด์ฃผ๊ณ  ์‹ถ์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— KafkaProduceResult ๋ผ๋Š” ํด๋ž˜์Šค๋ฅผ ๋งŒ๋“ค์–ด ์ž์ฒด์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๋„๋ก ๊ตฌ์„ฑํ•˜๋ ค ํ•œ๋‹ค. ์ฆ‰ ์š”์ฒญ ์ธก์—์„œ๋Š” ์ „์†ก ๊ฒฐ๊ณผ์˜ StatusCode๋Š” ๋ฌด์กฐ๊ฑด 2XX๋กœ ๋ฐ›๊ฒŒ ๋ ๊ฒƒ์ด๊ณ  ๋‚ด๋ถ€์ ์œผ๋กœ๋Š” ๋ฉ”์‹œ์ง€ ์ „์†ก์— ์‹คํŒจํ•  ๊ฒฝ์šฐ NOSQL์ด๋‚˜ ๊ธฐํƒ€ ๋ฐฉ๋ฒ•๋“ฑ์„ ํ™œ์šฉํ•˜์—ฌ ์žฌ์ „์†ก ์ฒ˜๋ฆฌ๋ฅผ ํ• ์ˆ˜ ์žˆ๋„๋ก ํ•˜๋ ค ํ•œ๋‹ค.

/**
 * ํ”„๋กœ๋“€์‹ฑ ์„œ๋น„์Šค
 * Kafka ํ”„๋กœ๋“€์‹ฑ ์ „ ๋กœ์ง ์ฒ˜๋ฆฌ
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class ProduceService {

    private final KafkaService kafkaService;
    private final FailureMessageService failureMessageService;

    public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
        return kafkaService.send(message)
                .map(produceResult -> {
                    log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
                    if (produceResult.hasError()) {
                    	failureMessageService.produceFailure();
                        // TODO ์นดํ”„์นด ํ”„๋กœ๋“€์‹ฑ ์‹คํŒจ์ผ ๊ฒฝ์šฐ ์ฒ˜๋ฆฌ
                        // ex ) ์ฒ˜๋ฆฌํ•˜์ง€๋ชปํ•œ ์š”์ฒญ์„ ๋ชฝ๊ณ ๋“ฑ์— ์ €์žฅ ํ›„ ์žฌ์‹œ๋„, ๋กœ๊น… ๋“ฑ๋“ฑ
                        log.error("Kafka produce error : {}", produceResult.getErrorMessage());
                    }
                    return produceResult;
                });
    }
}

3-3. KafkaService.java

์‹ค์ œ ์š”์ฒญ๋ฐ›์€ ๋ฉ”์‹œ์ง€๋ฅผ ์นดํ”„์นด๋กœ ๋ณด๋‚ด๋Š” ์ฝ”๋“œ์ด๋‹ค. ์‹คํ–‰ํ•ด๋ณด๋ฉด 100๊ฑด, 1000๊ฑด, 10000๊ฑด์ด๊ฑด ๊ฐ„์— ์‹ฑ๊ธ€์Šค๋ ˆ๋“œ๋กœ ์ฒ˜๋ฆฌ๋˜๋Š”๋ฐ ์ด ๋ถ€๋ถ„์„ ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋กœ ๋Œ๋ฆฌ๊ณ  ์‹ถ์–ด ๊ตฌ๊ธ€๋ง์„ ์—ด์‹ฌํžˆ ํ•ด๋ณธ ๊ฒฐ๊ณผ Sender์˜ ๊ฒฝ์šฐ ์• ์ดˆ์— ์‹ฑ๊ธ€์Šค๋ ˆ๋“œ๋กœ ๋Œ์•„๊ฐ€๋„๋ก ๊ตฌํ˜„๋˜์–ด์žˆ๋‹ค๊ณ  ํ•œ๋‹ค. ์˜ต์…˜์— ์Šค์ผ€์ฅด๋Ÿฌ๋„ ๋‹ค๋ฅด๊ฒŒ ์ง€์ •ํ•ด๋ณด๊ณ  ์‚ฝ์งˆ์ด๋ž€ ์‚ฝ์งˆ์€ ๋‹คํ•ด๋ดค๋Š”๋ฐ...

/**
 * ์นดํ”„์นด ์„œ๋น„์Šค
 * ์‹ค์ œ ์นดํ”„์นด๋กœ ๋ฉ”์‹œ์ง€ ํ”„๋กœ๋“€์‹ฑ
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaService {

    private final KafkaSender<String, Object> producer;

    public Mono<KafkaProduceResult> send(AbstractKafkaMessage message) {

        return producer.createOutbound()
                // ์ง€์ •๋œ ํ† ํ”ฝ์œผ๋กœ ๋ฉ”์‹œ์ง€ ์ „์†ก
                .send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage())))
                .then()
                // ์—๋Ÿฌ ์—†์ด ์ „์†ก์ด ์™„๋ฃŒ ๋˜์—ˆ์„ ๊ฒฝ์šฐ
                .thenReturn(new KafkaProduceResult(message))
                // ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ–ˆ์„ ๊ฒฝ์šฐ
                .onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e)));
    }
}

4. ๊ทธ ์™ธ ๋ฉ”์‹œ์ง€ ํด๋ž˜์Šค

์š”์ฒญ๋ฐ›์€ ๋ฉ”์‹œ์ง€ ๋งคํ•‘ ๋ฐ ์ „์†ก ์ฒ˜๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ๋ฆฌํ„ดํ•˜๊ธฐ ์œ„ํ•œ ๋ชจ๋ธ ํด๋ž˜์Šค๋“ค์ด๋‹ค. ์œ„ Kafka๊ด€๋ จ ํด๋ž˜์Šค๋“ค๊ณผ๋Š” ์—ฐ๊ด€ ์—†๋Š” ํด๋ž˜์Šค์ด๋‹ค. ๋Œ€์ถฉ ํ† ์ดํ”„๋กœ์ ํŠธ์˜ ์˜๋„๋ฅผ ๋ณด์—ฌ์ฃผ๊ธฐ ์œ„ํ•จ์ด๋‹ค.

4-1. AbstractKafkaMessage.java

๋Œ€์ถฉ ์–ด๋…ธํ…Œ์ด์…˜์„ ๋ณด๋ฉด @RequstBody ๋ฅผ ์ด์šฉํ•ด ๋งคํ•‘๋˜๋Š” ํด๋ž˜์Šค๋กœ KafkaUriMessage ํƒ€์ž…๊ณผ KafkaBodyMessage ํƒ€์ž… ์žˆ๋‹ค๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜์žˆ๋‹ค. ์ด๋Š” ํ˜น์‹œ ์ „์†ก์‹คํŒจํ•  ๊ฒฝ์šฐ ๋‘๊ฐœ์˜ ํƒ€์ž…์„ ๋‹ค๋ฅด๊ฒŒ ์ฒ˜๋ฆฌํ•˜๋ ค๊ณ  ๋‚˜๋ˆ ๋†“์€ ๊ฒƒ์ด๋‹ค.

/**
 * ์นดํ”„์นด ๋ฉ”์‹œ์ง€ ๋ฒ ์ด์Šค
 * ํ”„๋กœ๋“€์„œ ๋‚ด ์—๋Ÿฌ ๋ฐœ์ƒ์‹œ ์ฒ˜๋ฆฌ๋ฅผ ์‰ฝ๊ฒŒํ•˜๊ธฐ ์œ„ํ•ด URI ํ˜•ํƒœ์™€ Message ํ˜•ํƒœ๋กœ ๋‚˜๋ˆ”
 */
@Getter
@Setter
@JsonTypeInfo(
        use = JsonTypeInfo.Id.NAME,
        property = "type",
        defaultImpl = KafkaUriProduceMessage.class)
@JsonSubTypes({
        @JsonSubTypes.Type(value = KafkaUriMessage.class, names = {"uri", "Uri", "URI"}),
        @JsonSubTypes.Type(value = KafkaBodyMessage.class, names = {"message", "Message", "MESSAGE"})
})
public abstract class AbstractKafkaMessage {

    // ์š”์ฒญ ํ† ํ”ฝ
    protected String topic;

    // ๋ฉ”์‹œ์ง€ ํƒ€์ž… (uri , message)
    protected ProduceMessageType type;

    // ์š”์ฒญ ์‹œ๊ฐ„
    protected LocalDateTime requestedAt;

    public abstract String getRequestedMessage();

}

4-2. KafkaProduceResult.java

๋งˆ์ง€๋ง‰์œผ๋กœ ์ „์†ก๊ฒฐ๊ณผ๊ฐ€ ๋งคํ•‘๋  ํด๋ž˜์Šค์ด๋‹ค. ์š”์ฒญ ์ธก์—์„œ๋Š” ํ•ด๋‹น ํด๋ž˜์Šค์˜ ๋‚ด์šฉ์— ๋”ฐ๋ผ ์ „์†ก ์„ฑ๊ณต & ์‹คํŒจ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

/**
 * ์นดํ”„์นด ๋ฉ”์‹œ์ง€ ์ „์†ก๊ฒฐ๊ณผ ํด๋ž˜์Šค
 */
@Getter
public class KafkaProduceResult {

    // ๋ฉ”์‹œ์ง€ ์ „์†ก ์ƒํƒœ - true : ์ „์†ก์™„๋ฃŒ, false : ์ „์†ก์‹คํŒจ
    private Boolean status = true;

    // ๋ฉ”์‹œ์ง€ ์ „์†ก ํ† ํ”ฝ
    private String topic;

    // ์š”์ฒญ๋ฐ›์€ ๋ฉ”์‹œ์ง€ ํƒ€์ž… (uri, message)
    private ProduceMessageType messageType;

    // ์š”์ฒญ๋ฐ›์€ ๋ฉ”์‹œ์ง€ - URI ๋˜๋Š” JSON String
    private String requestedMessage;

    // ์—๋Ÿฌ - ์ „์†ก๊ณผ์ • ์ค‘ ๋ฐœ์ƒ๋œ ์—๋Ÿฌ, ์ „์†ก์™„๋ฃŒ ์ผ ๊ฒฝ์šฐ null
    @JsonIgnore
    private Throwable error = null;

    // ์—๋Ÿฌ ๋ฉ”์‹œ์ง€ - ์ „์†ก๊ณผ์ • ์ค‘ ๋ฐœ์ƒ๋œ ์—๋Ÿฌ, ์ „์†ก์™„๋ฃŒ ์ผ ๊ฒฝ์šฐ null
    private String errorMessage = null;

    // ๋ฉ”์‹œ์ง€๋ฅผ ์š”์ฒญ๋ฐ›์€ ์‹œ๊ฐ„
    private LocalDateTime requestedAt;

    // ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•œ ์‹œ๊ฐ„
    private LocalDateTime producedAt;

    public KafkaProduceResult(AbstractKafkaMessage message) {
        this.setRequestedMessage(message);
    }

    public KafkaProduceResult(AbstractKafkaMessage message, Throwable e) {
        this.setRequestedMessage(message);
        this.status = false;
        this.error = e;
        this.errorMessage = e.getMessage();
        this.producedAt = null;
    }

    public Boolean hasError() {
        return error != null;
    }

    private void setRequestedMessage(AbstractKafkaMessage requestedMessage) {
        this.topic = requestedMessage.getTopic();
        this.messageType = requestedMessage.getType();
        this.requestedMessage = requestedMessage.getRequestedMessage();
        this.producedAt = LocalDateTime.now();
        this.requestedAt = requestedMessage.getRequestedAt();
    }
}

์ถœ์ฒ˜ : https://velog.io/@deogicorgi/Spring-Webflux-Reactive-Kafka-1 (๊ต‰์žฅํžˆ์ž˜ ์ •๋ฆฌํ•ด์ฃผ์…จ๋‹ค~)

Last updated