TopiclessTimeBasedPartitioner
- I want to remove topicname(
<topic>
) from S3 Object Name when using S3 Sink Connector. S3 Sink Connector doesn't support it. - Someone said I can use Blank(" "), Empty String(""), Back Space("\b") in
topic.dir
property of connector's config file. But it also doesn't work.topic.dir
means just<prefix>
.
# TimeBasedPartitioner
<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
# TopiclessTimeBasedPartitioner
<prefix>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
1. Install S3 Sink Connector
2. Add topicless-timebasedpartitioner.jar into the S3 sink connector directory in your connect plugin-path
- This jar file doesn't contain the S3 sink connector, only the partitioner.
e.g.
.(connect plugin-path)
└── confluentinc-kafka-connect-s3-10.5.0
├── assets
├── doc
├── etc
├── lib
├── manifest.json
└── topicless-timebasedpartitioner.jar
"partitioner.class": "io.github.yunanjeong.custom.TopiclessTimeBasedPartitioner"
File-Project Structure
진입Project Settings-Artifacts-Add(+)-Jar
에서Empty
선택From modules with dependencies
를 선택하는 것이 일반적이지만, 이미 원본 S3 sink connector에 중복되는 dependency가 모두 있으므로 상관없음
Name
에 Jar파일 이름을 적고,Output Layout-Available Element
에서 'topicless-timebasedpartitioner' compile output만 jar파일 포함대상으로 선택- 이후
intellij 메인 창-Build-Build Artifacts ...
선택하여 빌드를 진행한다. {project root path}/out/
에서 생성된 jar파일을 확인
# maven 설치
sudo apt install maven
# 빌드
mvn install -f pom.xml -Dcheckstyle.skip -DskipTests
#{project root path}/target/에서 생성된 jar 파일 확인
- TimeBasedPartitioner 수정 로직
- 로직 자체는 간단하지만 빌드 및 배포, kafka 적용 방법 등의 테스트가 필요해서 이 repository를 생성했다.
- kafka-connect-storage-common(커넥트 공통코드)
- kafka-connect-storage-cloud(S3 Sink Connector 소스 코드)
- S3 Sink Connector는 커넥트 공통 코드를 라이브러리로 가져와 쓰고 있다.
- 실제 TimeBasedPartitioner의 로직은 커넥트 공통코드에 구현되어있다.
- TimeBasedPartitioner는 DefaultPartitioner를 Override하고 있다.
- Custom Partitioner는 기존 Partitioner를 Override해서 만들면 된다고 공식홈페이지에서 언급한다.
- 현재는 TimeBasedPartitioner만 커스텀한 상태이나, 이와 같은 구조로 CustomPartitioner를 입맛에 맞게 만들 수 있다.
- 따라서, 커스텀 작업은 개별 프로젝트 OR S3 Sink Connector 소스 코드 기반으로 하되, Override를하기 위한 원본로직 참고는 커넥트 공통코드에서 하면된다.
- 관련 코드는 모두 Maven프로젝트이며, pom.xml파일에 Maven Repository로부터 참조하는 Dependency가 기술된다.
- S3경로
<prefix>/<topic>/<encodedPartition>
은 kafka-connect-storage-common(커넥트 공통코드)의DefaultPartitioner.java
에서 관리된다. - S3파일명
<topic>+<kafkaPartition>+<startOffset>.<format>
은 kafka-connect-storage-cloud(S3 Sink Connector 소스 코드)의TopicPartitionWriter.java
에서 관리된다. 파일명은 가급적 안바꾸는게 낫다. topic명, kafkaPartition(파티션넘버), startOffset, format 모두 트러블슈팅에 꼭 필요하다. offset 겹치거나 꼬여도 골치아프다.