34.3 Apache Kafka Support by ys

Apache Kafkaspring-kafka프로젝트 의 자동 구성을 제공하여 지원됩니다 .

Apache 카프카란?

아파치 재단의 카프카는 pub-sub모델의 메세지 큐이고, 분산환경에 특화되어 설계되어 있다는 특징을 가짐으로써, 기존의 RabbitMQ와 같은 다른 메세지큐와의 성능 차이가 난다(훨씬 빠르게 처리한다). 그 외에도 클러스터 구성, fail-over, replication와 같은 여러 가지 특징들을 가지고 있다.

Use Cases (카프카의 사용 용도의 예)

  • Messaging System: 가장 일반적으로 많이 사용되고 있는 용도로 메세지 제공자 (Producer 또는 Source)와 수신자 (Consumer 또는 Sink) 사이에서 메세지를 전달해주는 역할을 합니다. 각각의 컨슈머 (또는 컨슈머 그룹)는 전달받기를 원하는 메세지의 토픽에 구독 신청해야 하며 하나의 토픽에 여러 컨슈머가 구독 신청 할 수 있습니다. (이 경우에 메세지는 구독신청한 모든 컨슈머한테 Broadcast 됩니다.)

  • Website Activity Checking 및 Monitoring: 링크드인에서 처음 만들고 사용했던 목적처럼 웹사이트가 정상적으로 돌아가는지 또는 웹사이트 사용 시 유저들의 패턴이 어떻게 되는지 모니터링 또는 웹사이트 이벤트 체킹의 목적으로도 사용 가능하며 (중간에서 메세지를 전달하는 중간자의 역할을 할 수도 있지만 메세지 자체가 디스크에 일정 기간 동안 로깅이 되어 있기 때문에 직접 분석도 가능합니다.)

  • Log Aggregation: 하나의 웹사이트가 여러 대의 서버로 운영되고 있다면 (대부분의 엔터프라이즈 웹사이트들이 그렇듯이) 각각의 서버에 있는 로그를 통합해주는 시스템 구축에도 사용 가능합니다.

  • Stream Processing & Batch Processing: 요즘 빅데이터 쪽에서 가장 핫한 Spark나 Storm같은 Stream Processing (스트림 처리)을 지원하는 플랫폼이나 Hadoop과 같이 Batch Processing (일괄 처리)을 지원하는 플랫폼과 연결햐여 메세지의 변환도 가능합니다.

  • Etc: 그 외에 연결된 DB나 서치 엔진의 일시적 서비스 장애 때문에 다운이 되었을 때 메세지들을 잠시 저장해줄 수 있는 임시 버퍼의 역할도 가능하며 Operational metrics (각각의 토픽에 대해 들어오는 메세지의 수를 정기적으로 체크하여 그 수가 너무 낮거나 높을 때 문제가 있는 확인차 운영팀에 메일등을 통해 알려주는 용도)나 Event sourcing (특정 이벤트들을 시간 순으로 기록하여 나중에 필요할 때 사용하는 용도) 등의 용도로도 사용되고 있습니다.

궁금하면 보기 : https://epicdevs.com/17

Kafka 구성은 spring.kafka.*의 외부 구성 등록 정보에 의해 제어됩니다. 예를 들어 application.properties에 다음섹션을 선언 할 수 있습니다.

spring.kafka.bootstrap-servers = localhost : 9092
 spring.kafka.consumer.group-id = myGroup

시작시 주제를 작하려면 NewTopic유형의 bean을 추가하십시오. 주제가 이미 존재하면, bean은 무시됩니다.

34.3.1 메시지 보내기

Spring KafkaTemplate은 자동으로 구성되며 다음 예제와 같이 직접 bean에서 자동으로 autowire 할 수 있습니다.

@Component
public class MyBean {

	private final KafkaTemplate kafkaTemplate;

	@Autowired
	public MyBean(KafkaTemplate kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	// ...

}

spring.kafka.producer.transaction-id-prefix속성이 정의되면 KafkaTransactionManager가 자동으로 구성됩니다. 또한 RecordMessageConverter bean이 정의되면 자동으로 구성된 KafkaTemplate 과 연관됩니다 .

34.3.2 메시지 받기

Apache Kafka 하부 구조가 존재하면, 모든 bean에 @KafkaListener주석을 붙여 리스너 엔드포인점을 작성할 수 있습니다 . KafkaListenerContainerFactory가 정의되지 않은 경우 spring.kafka.listener.*에 정의 된 키를 사용하여 기본값이 자동으로 구성됩니다 .

다음 구성 요소는 someTopic주제에 리스너 엔드 포인트를 작성합니다 .

@Component
public class MyBean {

	@KafkaListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}

KafkaTransactionManagerbean 정의된 경우, 자동 컨테이너 팩토에 연결됩니다. 마찬가지로 RecordMessageConverter, ErrorHandler또는 AfterRollbackProcessorbean이 정의된 경, 자동으로 기본 팩토리에 연결되어 있습니다.

사용자 정의 ChainedKafkaTransactionManager는 일반적으로 자동 구성된 KafkaTransactionManagerbean을 참조 하므로 @Primary로 표시해야합니다 .

34.3.3 카프카 Streams

Apache 용 Spring Kafka는 StreamsBuilder 객체를 생성하고 스트림의 라이프 사이클을 관리하기위한 팩토리 빈을 제공합니다. Spring Boot는kafka-streams이 classpath에 있고 Kafka Streams가 @EnableKafkaStreams annotation을 통해 활성화되어있는 한 필요한 KafkaStreamsConfiguration 빈을 자동으로 구성합니다.

Kafka 스트림을 사용하도록 설정하면 응용 프로그램 ID 및 부트 스트랩 서버를 설정해야합니다. 전자는 spring.kafka.streams.application-id를 사용하여 구성 할 수 있으며, 설정하지 않으면 spring.application.name이 기본값이됩니다. 후자는 전 세계적으로 또는 스트림에 대해서만 재정의되도록 설정할 수 있습니다.

전용 속성을 사용하여 몇 가지 추가 속성을 사용할 수 있습니다. 다른 임의의 카프카 속성은 spring.kafka.streams.properties네임 스페이스를 사용하여 설정할 수 있습니다 . 더 자세한 정보 는 34.3.4 절. "추가 카프카 속성" 을 참고하십시오.

Factory Bean을 사용하려면 다음 예제와 같이 StreamsBuilder@Bean으로 연결하면됩니다.

@Configuration
@EnableKafkaStreams
static class KafkaStreamsExampleConfiguration {

	@Bean
	public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
		KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
		stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
				Produced.with(Serdes.Integer(), new JsonSerde<>()));
		return stream;
	}

}

기본적으로 생성하는 StreamBuilder 객체에 의해 관리되는 스트림은 자동으로 시작됩니다. spring.kafka.streams.auto-startup 속성을 사용하여이 동작을 사용자 정의 할 수 있습니다.

34.3.4 추가 카프카 속성

자동 구성에서 지원되는 등록 정보는 부록 A, 일반적인 응용 프로그램 등록 정보나와 있습니다 . 대부분의 경우 이러한 속성 (하이픈 또는 낙타)은 Apache Kafka 점선으로 표시된 속성에 직접 매핑됩니다. 자세한 내용은 Apache Kafka 문서를 참조하십시오.

이러한 속성의 처음 몇 가지는 모든 구성 요소 (제작자, 소비자, 관리자 및 스트림)에 적용되지만 다른 값을 사용하려는 경우 구성 요소 수준에서 지정할 수 있습니다. Apache Kafka는 중요도가 HIGH, MEDIUM 또는 LOW 인 속성을 지정합니다. 스프링 부트 자동 구성은 모든 중요도 높은 속성, 일부 선택된 중간 및 낮음 속성 및 기본값이없는 속성을 지원합니다.

Kafka가 지원하는 속성의 하위 집합 만 KafkaProperties클래스를 통해 직접 사용할 수 있습니다 . 직접 지원되지 않는 추가 속성으로 제작자 또는 소비자를 구성하려면 다음 속성을 사용하십시오.

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth

이것은 일반적인 prop.oneKafka 속성을 first(생산자, 소비자 및 관리자에게 적용), prop.two관리자 속성을 second, prop.three 소비자 속성을 third, prop.four프로듀서 속성을 fourth, prop.five스트림 속성을 fifth로 설정합니다.

다음과 같이 스프링 카프카 JsonDeserializer를 설정할 수도 있습니다 :

spring.kafka.consumer.value-deserializer = org.springframework.kafka.support.serializer.JsonDeserializer
 spring.kafka.consumer.properties.spring.json.value.default.type = com.example.Invoice
 spring.kafka.consumer. properties.spring.json.trusted.packages = com.example, org.acme

마찬가지로 JsonSerializer헤더에 유형 정보를 보내는 기본 동작을 비활성화 할 수 있습니다.

spring.kafka.producer.value-serializer = org.springframework.kafka.support.serializer.JsonSerializer
 spring.kafka.producer.properties.spring.json.add.type.headers = false

중대한

이 방법으로 설정된 특성은 스프링 부트가 명시 적으로 지원하는 모든 구성 항목보다 우선합니다.

Last updated