Spring Webflux + Reactive Kafka (2) - Consumer

1. ๊ฐœ์š”

์ด๋ฒˆ์—” ์ปจ์Šˆ๋จธ๋ฅผ ๊ตฌ์„ฑํ•œ๋‹ค. ์ด๋ฒˆ ์—ญ์‹œ Reactive Kafka ๋ฅผ ํ†ตํ•˜์—ฌ ์ปจ์Šˆ๋จธ๋ฅผ ๊ตฌ์„ฑํ•  ์˜ˆ์ •์ด๊ณ  ์•ž์„œ ๊ตฌ์„ฑํ•œ ํ”„๋กœ๋“€์„œ์™€ ์—ฐ๋™ํ•˜์—ฌ ์‹ค์ œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›๋Š”๊ฒƒ๊นŒ์ง€ ๊ตฌํ˜„ํ•ด๋ณด๋ ค ํ•œ๋‹ค.

์ „์ฒด ์†Œ์Šค์ฝ”๋“œ : Github ๋งํฌ


2. ์นดํ”„์นด ํ† ํ”ฝ ๊ตฌ์„ฑ๋‚ด์šฉ

ํ˜„์žฌ ๊ฐœ์ธ ์„œ๋ฒ„ ์นดํ”„์นด์— ๊ตฌ์„ฑ๋œ ํ† ํ”ฝ์€ ์ด 2๊ฐœ๋กœ deogicorgi-message, deogicorgi-uri ๊ฐ€ ์กด์žฌํ•œ๋‹ค. ๊ฐ ํ† ํ”ฝ์˜ ํŒŒํ‹ฐ์…˜์€ 1๊ฐœ์”ฉ ๊ตฌ์„ฑํ•˜์˜€์œผ๋ฉฐ ๊ทธ๋ ‡๊ธฐ ๋•Œ๋ฌธ์— ํ•˜๋‚˜์˜ ํ† ํ”ฝ์—๋Š” ํ•˜๋‚˜์˜ ์ปจ์Šˆ๋จธ๋งŒ ๋ถ™์ผ ์ˆ˜ ์žˆ๋Š” ์ƒํƒœ์ด๋‹ค. (ํŒŒํ‹ฐ์…˜๊ณผ ์ปจ์Šˆ๋จธ์˜ ๊ด€๊ณ„๋Š” ๋” ๊ณต๋ถ€ ํ•ด๋ด์•ผ ํ•œ๋‹ค.)

[appuser@d97c9a0c3c7e ~]$ kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
deogicorgi-message
deogicorgi-uri
[appuser@d97c9a0c3c7e ~]$

์ •์ƒ์ ์œผ๋กœ ์นดํ”„์นด ํ† ํ”ฝ์ด ์กด์žฌํ•จ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. (๋„์ปค ์ปจํ…Œ์ด๋„ˆ ๋‚ด์—์„œ ์ˆ˜ํ–‰ํ•œ ์ปค๋งจ๋“œ์ด๋‹ค.)


3. ์ปจ์Šˆ๋จธ ์„ค์ •(Consumer)

์šฐ์„  ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›๋Š” ๊ธฐ๋Šฅ๋งŒ ๊ตฌ์„ฑํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๊ฐ„๋‹จํ•˜๊ฒŒ ์„ค์ •ํ•˜์˜€๋‹ค. ์ฐจํ›„ ๋ฉ”์‹œ์ง€๋ฅผ ๋‹ค์‹œ ํ”„๋กœ๋“€์‹ฑํ•˜๊ฑฐ๋‚˜ RDBMS ๋˜๋Š” NoSQL์— ์ €์žฅํ•˜๋Š” ๋กœ์ง ์—ญ์‹œ ์ถ”๊ฐ€ํ•  ์˜ˆ์ •์ด๋‹ค. ํ˜„์žฌ ๊ณ„ํšํ•˜๋Š” ์ „์ฒด์ ์ธ ํ”Œ๋กœ์šฐ๋Š” ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

ํ”„๋กœ๋“€์„œ - ์ปจ์Šˆ๋จธ - ์ €์žฅ๋งค์ฒด or ํ”„๋กœ๋“€์„œ (์ด ๋ชจ๋“ ๊ณผ์ •์ด ๋…ผ๋ธ”๋กœํ‚น์œผ๋กœ ๊ตฌ์„ฑ๋˜์–ด์•ผ ํ•œ๋‹ค)

3-1. application.yml

์นดํ”„์นด๊ฐ€ ์„ค์น˜๋œ ๊ฐœ์ธ์„œ๋ฒ„ ์ฃผ์†Œ ๋ฐ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์„ ํ† ํ”ฝ, ์ปจ์Šˆ๋จธ์˜ ๊ทธ๋ฃน ์•„์ด๋”” ๋“ฑ์„ ์„ค์ •ํ•œ๋‹ค.

kafka:
  hosts: deogicorgi.home:29092
  receiver :
    uri:
      name : deogicorgiUri
      topic : deogicorgi-uri
      groupId : deogicorgi-uri-1
    message:
      name : deogicorgiMessage
      topic : deogicorgi-message
      groupId : deogicorgi-message-1

3-2. KafkaProperties.java

์œ„ ์˜ application.yml ๊ณผ ๋งคํ•‘๋  ํด๋ž˜์Šค์ด๋‹ค. yml ํŒŒ์ผ ๋‚ด receiver 2๊ฐœ๋ฅผ ํ•˜๋‚˜์˜ ๋งต์œผ๋กœ ๋ฐ›๋„๋ก ๊ตฌ์„ฑํ–ˆ๋‹ค.

@Getter
@Setter
@Component
@EnableConfigurationProperties
@ConfigurationProperties("kafka")
public class KafkaProperties {

    // ์นดํ”„์นด ํ˜ธ์ŠคํŠธ
    private String hosts;

    // ๋ฆฌ์‹œ๋ฒ„ ํ”„๋กœํผํ‹ฐ ๋งต
    private Map<String, KafkaReceiverProperty> receiver = new HashMap<>();

    public void setReceiver(Map<String, KafkaReceiverProperty> receivers) {
        this.receiver = receivers;
    }

    public Optional<Map.Entry<String, KafkaReceiverProperty>> getProperty(String key) {
        return this.receiver.entrySet()
                .stream().filter(entry -> entry.getValue().getName().equals(key))
                .findFirst();
    }
}

3-3. KafkaReceiverProperty.java

์œ„ ์˜ KafkaProperties ์—์„œ ๋ฆฌ์‹œ๋ฒ„ ๋งต์— ์‚ฌ์šฉ๋˜๋Š” ๋ชจ๋ธํด๋ž˜์Šค์ด๋‹ค.

/**
 * ์นดํ”„์นด ๋ฆฌ์‹œ๋ฒ„ ์„ค์ • ํ”„๋กœํผํ‹ฐ
 */
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class KafkaReceiverProperty {

    // ๋ฆฌ์‹œ๋ฒ„ ์ด๋ฆ„
    private String name;

    // ๋‹ด๋‹นํ•  ํ† ํ”ฝ
    private String topic;

    // ๋ฆฌ์‹œ๋ฒ„ ๊ทธ๋ฃน ์•„์ด๋””
    private String groupId;
}

3-4. KafkaConfig.java

๋‘๊ฐœ์˜ ํ† ํ”ฝ์„ ๊ฐ๊ฐ ์ปจ์Šˆ๋ฐํ•˜๋Š” ๋ฆฌ์‹œ๋ฒ„๋“ค๋ฅผ ๋นˆ์œผ๋กœ ๋“ฑ๋กํ–ˆ๋‹ค. ๊ฐ๊ฐ์˜ ๋นˆ๋“ค์€ ์„ค์ •๋œ ํ† ํ”ฝ์˜ ๋ฉ”์‹œ์ง€๋งŒ ๋‹ด๋‹นํ•˜์—ฌ ๋ฐ›๊ฒŒ๋  ๊ฒƒ์ด๊ณ  ๊ฐ ํ† ํ”ฝ๋“ค์˜ ํŒŒํ‹ฐ์…˜์€ 1๊ฐœ๋กœ ๊ตฌ์„ฑํ•˜์˜€๊ธฐ ๋•Œ๋ฌธ์— ์‹ฑ๊ธ€ ์Šค๋ ˆ๋“œ๋กœ ๋™์ž‘ํ•˜๊ฒŒ ๋ ๊ฒƒ์ด๋‹ค. ์—ฌ๊ธฐ์„œ๋Š” ๋ฆฌ์‹œ๋ฒ„๋‹น ํ† ํ”ฝ์„ 1๊ฐœ์”ฉ๋งŒ ์ง€์ •ํ•˜์˜€์ง€๋งŒ ํ•˜๋‚˜์˜ ๋ฆฌ์‹œ๋ฒ„์— ์—ฌ๋Ÿฌ๊ฐœ์˜ ํ† ํ”ฝ์„ ์ง€์ • ํ•  ์ˆ˜ ๋„์žˆ๋‹ค. ๊ทธ๋ ‡๊ฒŒ ๋˜๋ฉด ๋“ฑ๋ก๋œ ์—ฌ๋Ÿฌ๊ฐœ์˜ ํ† ํ”ฝ์„ ์‹ฑ๊ธ€ ์Šค๋ ˆ๋“œ๋กœ ๋ฐ›๊ฒŒ๋œ๋‹ค.

@Configuration
@RequiredArgsConstructor
public class KafkaConfig {

    private final KafkaProperties properties;

    // deogicorgi-uri ๋ฆฌ์‹œ๋ฒ„
    @Bean("uriMessageReceiver")
    public KafkaReceiver<Integer, String> uriMessageReceiver() throws Exception {
        Map.Entry<String, KafkaReceiverProperty> deogicorgiUri = properties.getProperty("deogicorgiUri").orElse(null);

        if (ObjectUtils.isEmpty(deogicorgiUri)) {
            throw new Exception("property is null");
        }

        KafkaReceiverProperty property = deogicorgiUri.getValue();

        ReceiverOptions<Integer, String> receiverOptions =
                ReceiverOptions.<Integer, String>create(getConsumerProps(property))
                        .subscription(Collections.singleton(property.getTopic()));

        return KafkaReceiver.create(receiverOptions);
    }

    // deogicorgi-message ๋ฆฌ์‹œ๋ฒ„
    @Bean("messageReceiver")
    public KafkaReceiver<Integer, String> messageReceiver() throws Exception {
        Map.Entry<String, KafkaReceiverProperty> deogicorgiUri = properties.getProperty("deogicorgiMessage").orElse(null);

        if (ObjectUtils.isEmpty(deogicorgiUri)) {
            throw new Exception("property is null");
        }

        KafkaReceiverProperty property = deogicorgiUri.getValue();

        ReceiverOptions<Integer, String> receiverOptions =
                ReceiverOptions.<Integer, String>create(getConsumerProps(property))
                        .subscription(Collections.singleton(property.getTopic()));

        return KafkaReceiver.create(receiverOptions);
    }

    /******************************************************************
     ************************ Consumer Options ************************
     ******************************************************************/

    // ์ปจ์Šˆ๋จธ ์˜ต์…˜
    private Map<String, Object> getConsumerProps(KafkaReceiverProperty property) {
        return new HashMap<>() {{
            put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts());
            put(ConsumerConfig.GROUP_ID_CONFIG, property.getGroupId());
            put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
        }};
    }
}

3-5. KafkaMessageReceiver.java

๋นˆ์œผ๋กœ ๋“ฑ๋ก๋œ ๋ฆฌ์‹œ๋ฒ„๋“ค์„ ์ œ๊ณต๋ฐ›์•„ ์‹ค์ œ๋กœ ์ปจ์Šˆ๋ฐํ•˜๋Š” ์ฝ”๋“œ์ด๋‹ค. KafkaMessageReceiver ๋Š” ๋นˆ์œผ๋กœ ๋“ฑ๋ก ๋  ๋•Œ ์ •์˜๋œ ๋ชจ๋“  ๋ฆฌ์‹œ๋ฒ„๋ฅผ ๋ฐ›์•„์™€ ์ปจ์Šˆ๋ฐ์„ ์‹œ์ž‘ํ•œ๋‹ค. ์Šคํ”„๋ง ๋นˆ์˜ ๋ผ์ดํ”„์‚ฌ์ดํด์„ ์•Œ๊ณ ์žˆ๋‹ค๋ฉด ํ•ด๋‹น ํด๋ž˜์Šค๊ฐ€ ์ƒ์„ฑ๋˜๊ณ  ์ œ๊ฑฐ๋ ๋•Œ ์ˆ˜ํ–‰๋˜๋Š” ๋ฉ”์„œ๋“œ๋“ค์„ ์˜ค๋ฒ„๋ผ์ด๋”ฉ ํ•˜์—ฌ ์ „์ฒ˜๋ฆฌ ๋ฐ ํ›„์ฒ˜๋ฆฌ๋ฅผ ์ง„ํ–‰ ํ•  ์ˆ˜ ์žˆ๋‹ค.

/**
 * ์นดํ”„์นด ๋ฆฌ์‹œ๋ฒ„
 */
@Slf4j
@Component
public class KafkaMessageReceiver {

    /**
     * KafkaMessageReceiver๊ฐ€ ์ƒ์„ฑ๋  ๋•Œ ๋ชจ๋“  ์นดํ”„์นด ๋ฆฌ์‹œ๋ฒ„ ์‹œ์ž‘
     */
    public KafkaMessageReceiver(List<KafkaReceiver<Integer, String>> kafkaReceivers) {
        for (KafkaReceiver<Integer, String> receiver : kafkaReceivers) {
            this.start(receiver);
        }
    }

    public void start(KafkaReceiver<Integer, String> receiver) {
        receiver.receive().subscribe(record -> {
            log.info("Kafka Reciever result : Topic >> [{}], message >> [{}], Offset >> [{}]", record.topic(), record.value(), record.receiverOffset());
        });
    }
}

3-6. ๋กœ๊ทธ

Producer์˜ ๊ฒฝ์šฐ ๋ชจ๋“  ๋ฉ”์‹œ์ง€๋ฅผ ์‹ฑ๊ธ€ ์Šค๋ ˆ๋“œ๋กœ ์ฒ˜๋ฆฌ ํ•œ๋‹ค. ๊ฐ๊ฐ ์˜ ํ† ํ”ฝ์— ํ•˜๋‚˜์˜ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋ƒˆ์„ ๊ฒฝ์šฐ kafka-producer-network-thread ๋ผ๋Š” ์ด๋ฆ„์˜ ์Šค๋ ˆ๋“œ๋ฅผ ํ†ตํ•˜์—ฌ ์ฒ˜๋ฆฌ๋œ๋‹ค. ํ•ด๋‹น ์Šค๋ ˆ๋“œ๋Š” ๊ธฐ๋ณธ๊ฐ’์œผ๋กœ, ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œํ’€์„ ์‚ฌ์šฉํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด ์„ค์ •์—์„œ ์‚ฌ์šฉํ•˜๊ณ ์‹ถ์€ ์Šค๋ ˆ๋“œ๋ฅผ ๋“ฑ๋กํ•ด์ฃผ๋ฉด ๋œ๋‹ค.

Consumer์˜ ๊ฒฝ์šฐ ๊ฐ๊ฐ์˜ ๋ฆฌ์‹œ๋ฒ„๊ฐ€ ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. reactive-kafka-* ๋กœ ์‹œ์ž‘ํ•˜๋Š” ์Šค๋ ˆ๋“œ๋ฅผ ๊ฐ๊ฐ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์œผ๋ฉฐ ํ˜„์žฌ ๊ฐ ํ† ํ”ฝ์˜ ํŒŒํ‹ฐ์…˜์ด 1๊ฐœ์”ฉ์ด๋ฏ€๋กœ ํ•ด๋‹น ์Šค๋ ˆ๋“œ๋งŒ ๊ณ„์† ์‚ฌ์šฉํ•˜์—ฌ ์ฒ˜๋ฆฌํ•  ๊ฒƒ์ด๋‹ค. ์ดํ›„ ๊ฐ ํ† ํ”ฝ์˜ ํŒŒํ‹ฐ์…˜์„ ๋Š˜๋ฆฌ๋ฉด ๊ฐ ๋ฆฌ์‹œ๋ฒ„๋“ค์€ ๋“ฑ๋ก๋œ ์“ฐ๋ ˆ๋“œํ’€์„ ํ†ตํ•˜์—ฌ ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ฒŒ ๋ ๊ฒƒ์ด๋‹ค.

์ถœ์ฒ˜ : https://velog.io/@deogicorgi/Spring-Webflux-Reactive-Kafka-2-Consumer (๊ต‰์žฅํžˆ ์ž˜ ์ •๋ฆฌํ•ด์ฃผ์…จ๋‹ค)

Last updated