1. ๊ฐ์
์ด๋ฒ์ ์ปจ์๋จธ๋ฅผ ๊ตฌ์ฑํ๋ค. ์ด๋ฒ ์ญ์ Reactive Kafka ๋ฅผ ํตํ์ฌ ์ปจ์๋จธ๋ฅผ ๊ตฌ์ฑํ ์์ ์ด๊ณ ์์ ๊ตฌ์ฑํ ํ๋ก๋์์ ์ฐ๋ํ์ฌ ์ค์ ๋ฉ์์ง๋ฅผ ๋ฐ๋๊ฒ๊น์ง ๊ตฌํํด๋ณด๋ ค ํ๋ค.
์ ์ฒด ์์ค์ฝ๋ : Github ๋งํฌ
2. ์นดํ์นด ํ ํฝ ๊ตฌ์ฑ๋ด์ฉ
ํ์ฌ ๊ฐ์ธ ์๋ฒ ์นดํ์นด์ ๊ตฌ์ฑ๋ ํ ํฝ์ ์ด 2๊ฐ๋ก deogicorgi-message, deogicorgi-uri ๊ฐ ์กด์ฌํ๋ค. ๊ฐ ํ ํฝ์ ํํฐ์
์ 1๊ฐ์ฉ ๊ตฌ์ฑํ์์ผ๋ฉฐ ๊ทธ๋ ๊ธฐ ๋๋ฌธ์ ํ๋์ ํ ํฝ์๋ ํ๋์ ์ปจ์๋จธ๋ง ๋ถ์ผ ์ ์๋ ์ํ์ด๋ค. (ํํฐ์
๊ณผ ์ปจ์๋จธ์ ๊ด๊ณ๋ ๋ ๊ณต๋ถ ํด๋ด์ผ ํ๋ค.)
[appuser@d97c9a0c3c7e ~]$ kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
deogicorgi-message
deogicorgi-uri
[appuser@d97c9a0c3c7e ~]$
์ ์์ ์ผ๋ก ์นดํ์นด ํ ํฝ์ด ์กด์ฌํจ์ ํ์ธํ ์ ์๋ค. (๋์ปค ์ปจํ
์ด๋ ๋ด์์ ์ํํ ์ปค๋งจ๋์ด๋ค.)
3. ์ปจ์๋จธ ์ค์ (Consumer)
์ฐ์ ๋ฉ์์ง๋ฅผ ๋ฐ๋ ๊ธฐ๋ฅ๋ง ๊ตฌ์ฑํ๊ธฐ ๋๋ฌธ์ ๊ฐ๋จํ๊ฒ ์ค์ ํ์๋ค. ์ฐจํ ๋ฉ์์ง๋ฅผ ๋ค์ ํ๋ก๋์ฑํ๊ฑฐ๋ RDBMS ๋๋ NoSQL์ ์ ์ฅํ๋ ๋ก์ง ์ญ์ ์ถ๊ฐํ ์์ ์ด๋ค. ํ์ฌ ๊ณํํ๋ ์ ์ฒด์ ์ธ ํ๋ก์ฐ๋ ์๋์ ๊ฐ๋ค.
ํ๋ก๋์ - ์ปจ์๋จธ - ์ ์ฅ๋งค์ฒด or ํ๋ก๋์ (์ด ๋ชจ๋ ๊ณผ์ ์ด ๋
ผ๋ธ๋กํน์ผ๋ก ๊ตฌ์ฑ๋์ด์ผ ํ๋ค)
3-1. application.yml
์นดํ์นด๊ฐ ์ค์น๋ ๊ฐ์ธ์๋ฒ ์ฃผ์ ๋ฐ ๋ฉ์์ง๋ฅผ ๋ฐ์ ํ ํฝ, ์ปจ์๋จธ์ ๊ทธ๋ฃน ์์ด๋ ๋ฑ์ ์ค์ ํ๋ค.
kafka:
hosts: deogicorgi.home:29092
receiver :
uri:
name : deogicorgiUri
topic : deogicorgi-uri
groupId : deogicorgi-uri-1
message:
name : deogicorgiMessage
topic : deogicorgi-message
groupId : deogicorgi-message-1
3-2. KafkaProperties.java
์ ์ application.yml ๊ณผ ๋งคํ๋ ํด๋์ค์ด๋ค. yml ํ์ผ ๋ด receiver 2๊ฐ๋ฅผ ํ๋์ ๋งต์ผ๋ก ๋ฐ๋๋ก ๊ตฌ์ฑํ๋ค.
@Getter
@Setter
@Component
@EnableConfigurationProperties
@ConfigurationProperties("kafka")
public class KafkaProperties {
// ์นดํ์นด ํธ์คํธ
private String hosts;
// ๋ฆฌ์๋ฒ ํ๋กํผํฐ ๋งต
private Map<String, KafkaReceiverProperty> receiver = new HashMap<>();
public void setReceiver(Map<String, KafkaReceiverProperty> receivers) {
this.receiver = receivers;
}
public Optional<Map.Entry<String, KafkaReceiverProperty>> getProperty(String key) {
return this.receiver.entrySet()
.stream().filter(entry -> entry.getValue().getName().equals(key))
.findFirst();
}
}
3-3. KafkaReceiverProperty.java
์ ์ KafkaProperties ์์ ๋ฆฌ์๋ฒ ๋งต์ ์ฌ์ฉ๋๋ ๋ชจ๋ธํด๋์ค์ด๋ค.
/**
* ์นดํ์นด ๋ฆฌ์๋ฒ ์ค์ ํ๋กํผํฐ
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class KafkaReceiverProperty {
// ๋ฆฌ์๋ฒ ์ด๋ฆ
private String name;
// ๋ด๋นํ ํ ํฝ
private String topic;
// ๋ฆฌ์๋ฒ ๊ทธ๋ฃน ์์ด๋
private String groupId;
}
3-4. KafkaConfig.java
๋๊ฐ์ ํ ํฝ์ ๊ฐ๊ฐ ์ปจ์๋ฐํ๋ ๋ฆฌ์๋ฒ๋ค๋ฅผ ๋น์ผ๋ก ๋ฑ๋กํ๋ค. ๊ฐ๊ฐ์ ๋น๋ค์ ์ค์ ๋ ํ ํฝ์ ๋ฉ์์ง๋ง ๋ด๋นํ์ฌ ๋ฐ๊ฒ๋ ๊ฒ์ด๊ณ ๊ฐ ํ ํฝ๋ค์ ํํฐ์
์ 1๊ฐ๋ก ๊ตฌ์ฑํ์๊ธฐ ๋๋ฌธ์ ์ฑ๊ธ ์ค๋ ๋๋ก ๋์ํ๊ฒ ๋ ๊ฒ์ด๋ค. ์ฌ๊ธฐ์๋ ๋ฆฌ์๋ฒ๋น ํ ํฝ์ 1๊ฐ์ฉ๋ง ์ง์ ํ์์ง๋ง ํ๋์ ๋ฆฌ์๋ฒ์ ์ฌ๋ฌ๊ฐ์ ํ ํฝ์ ์ง์ ํ ์ ๋์๋ค. ๊ทธ๋ ๊ฒ ๋๋ฉด ๋ฑ๋ก๋ ์ฌ๋ฌ๊ฐ์ ํ ํฝ์ ์ฑ๊ธ ์ค๋ ๋๋ก ๋ฐ๊ฒ๋๋ค.
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
private final KafkaProperties properties;
// deogicorgi-uri ๋ฆฌ์๋ฒ
@Bean("uriMessageReceiver")
public KafkaReceiver<Integer, String> uriMessageReceiver() throws Exception {
Map.Entry<String, KafkaReceiverProperty> deogicorgiUri = properties.getProperty("deogicorgiUri").orElse(null);
if (ObjectUtils.isEmpty(deogicorgiUri)) {
throw new Exception("property is null");
}
KafkaReceiverProperty property = deogicorgiUri.getValue();
ReceiverOptions<Integer, String> receiverOptions =
ReceiverOptions.<Integer, String>create(getConsumerProps(property))
.subscription(Collections.singleton(property.getTopic()));
return KafkaReceiver.create(receiverOptions);
}
// deogicorgi-message ๋ฆฌ์๋ฒ
@Bean("messageReceiver")
public KafkaReceiver<Integer, String> messageReceiver() throws Exception {
Map.Entry<String, KafkaReceiverProperty> deogicorgiUri = properties.getProperty("deogicorgiMessage").orElse(null);
if (ObjectUtils.isEmpty(deogicorgiUri)) {
throw new Exception("property is null");
}
KafkaReceiverProperty property = deogicorgiUri.getValue();
ReceiverOptions<Integer, String> receiverOptions =
ReceiverOptions.<Integer, String>create(getConsumerProps(property))
.subscription(Collections.singleton(property.getTopic()));
return KafkaReceiver.create(receiverOptions);
}
/******************************************************************
************************ Consumer Options ************************
******************************************************************/
// ์ปจ์๋จธ ์ต์
private Map<String, Object> getConsumerProps(KafkaReceiverProperty property) {
return new HashMap<>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts());
put(ConsumerConfig.GROUP_ID_CONFIG, property.getGroupId());
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
}};
}
}
3-5. KafkaMessageReceiver.java
๋น์ผ๋ก ๋ฑ๋ก๋ ๋ฆฌ์๋ฒ๋ค์ ์ ๊ณต๋ฐ์ ์ค์ ๋ก ์ปจ์๋ฐํ๋ ์ฝ๋์ด๋ค. KafkaMessageReceiver ๋ ๋น์ผ๋ก ๋ฑ๋ก ๋ ๋ ์ ์๋ ๋ชจ๋ ๋ฆฌ์๋ฒ๋ฅผ ๋ฐ์์ ์ปจ์๋ฐ์ ์์ํ๋ค.
์คํ๋ง ๋น์ ๋ผ์ดํ์ฌ์ดํด์ ์๊ณ ์๋ค๋ฉด ํด๋น ํด๋์ค๊ฐ ์์ฑ๋๊ณ ์ ๊ฑฐ๋ ๋ ์ํ๋๋ ๋ฉ์๋๋ค์ ์ค๋ฒ๋ผ์ด๋ฉ ํ์ฌ ์ ์ฒ๋ฆฌ ๋ฐ ํ์ฒ๋ฆฌ๋ฅผ ์งํ ํ ์ ์๋ค.
/**
* ์นดํ์นด ๋ฆฌ์๋ฒ
*/
@Slf4j
@Component
public class KafkaMessageReceiver {
/**
* KafkaMessageReceiver๊ฐ ์์ฑ๋ ๋ ๋ชจ๋ ์นดํ์นด ๋ฆฌ์๋ฒ ์์
*/
public KafkaMessageReceiver(List<KafkaReceiver<Integer, String>> kafkaReceivers) {
for (KafkaReceiver<Integer, String> receiver : kafkaReceivers) {
this.start(receiver);
}
}
public void start(KafkaReceiver<Integer, String> receiver) {
receiver.receive().subscribe(record -> {
log.info("Kafka Reciever result : Topic >> [{}], message >> [{}], Offset >> [{}]", record.topic(), record.value(), record.receiverOffset());
});
}
}
3-6. ๋ก๊ทธ
Producer์ ๊ฒฝ์ฐ ๋ชจ๋ ๋ฉ์์ง๋ฅผ ์ฑ๊ธ ์ค๋ ๋๋ก ์ฒ๋ฆฌ ํ๋ค. ๊ฐ๊ฐ ์ ํ ํฝ์ ํ๋์ ๋ฉ์์ง๋ฅผ ๋ณด๋์ ๊ฒฝ์ฐ kafka-producer-network-thread ๋ผ๋ ์ด๋ฆ์ ์ค๋ ๋๋ฅผ ํตํ์ฌ ์ฒ๋ฆฌ๋๋ค. ํด๋น ์ค๋ ๋๋ ๊ธฐ๋ณธ๊ฐ์ผ๋ก, ๋ค๋ฅธ ์ค๋ ๋ํ์ ์ฌ์ฉํ๊ณ ์ถ๋ค๋ฉด ์ค์ ์์ ์ฌ์ฉํ๊ณ ์ถ์ ์ค๋ ๋๋ฅผ ๋ฑ๋กํด์ฃผ๋ฉด ๋๋ค.
Consumer์ ๊ฒฝ์ฐ ๊ฐ๊ฐ์ ๋ฆฌ์๋ฒ๊ฐ ๋ค๋ฅธ ์ค๋ ๋๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค. reactive-kafka-* ๋ก ์์ํ๋ ์ค๋ ๋๋ฅผ ๊ฐ๊ฐ ์ฌ์ฉํ๊ณ ์์ผ๋ฉฐ ํ์ฌ ๊ฐ ํ ํฝ์ ํํฐ์
์ด 1๊ฐ์ฉ์ด๋ฏ๋ก ํด๋น ์ค๋ ๋๋ง ๊ณ์ ์ฌ์ฉํ์ฌ ์ฒ๋ฆฌํ ๊ฒ์ด๋ค. ์ดํ ๊ฐ ํ ํฝ์ ํํฐ์
์ ๋๋ฆฌ๋ฉด ๊ฐ ๋ฆฌ์๋ฒ๋ค์ ๋ฑ๋ก๋ ์ฐ๋ ๋ํ์ ํตํ์ฌ ๋ฉํฐ์ฐ๋ ๋๋ก ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๊ฒ ๋ ๊ฒ์ด๋ค.
์ถ์ฒ : https://velog.io/@deogicorgi/Spring-Webflux-Reactive-Kafka-2-Consumer (๊ต์ฅํ ์ ์ ๋ฆฌํด์ฃผ์
จ๋ค)