Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.camel.kafkaconnector.CamelSinkConnectorConfig #251

Closed
saranyaeu2987 opened this issue May 28, 2020 · 30 comments

Comments

@saranyaeu2987
Copy link

I am trying to run camel-kafka-connector in my minikube and Strimzi but getting
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.camel.kafkaconnector.CamelSinkConnectorConfig

Here is my image Docker file

FROM strimzi/kafka:0.18.0-kafka-2.5.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

Here is my plugins folder contains the following jars
Screen Shot 2020-05-27 at 10 25 32 PM

When I see the pod logs, this is what I see

2020-05-28 16:17:11,313 INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/kafka/plugins/camel-kafka-connector-0.2.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [main]
2020-05-28 16:17:11,313 INFO Added plugin 'org.apache.camel.kafkaconnector.CamelSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [main]
2020-05-28 16:17:11,313 INFO Added plugin 'org.apache.camel.kafkaconnector.CamelSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [main]
2020-05-28 16:17:11,313 INFO Added plugin 'org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [main]
2020-05-28 16:17:11,313 INFO Added plugin 'org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) [main]

I dont see org.apache.camel.kafkaconnector.CamelSinkConnectorConfig getting loaded from camel-kafka-connector-0.2.0.jar
Which I feel would have caused the following error

Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.camel.kafkaconnector.CamelSinkConnectorConfig
	at org.apache.camel.kafkaconnector.CamelSinkConnector.config(CamelSinkConnector.java:67)
	at org.apache.kafka.connect.connector.Connector.validate(Connector.java:129)
...

**Full log from connector pod **
logs-from-my-connect-cluster-connect-in-my-connect-cluster-connect-8866c5d89-rd4zs.txt

Any suggestions??

@oscerd
Copy link
Contributor

oscerd commented May 28, 2020

Can you please show your configuration file?

@saranyaeu2987
Copy link
Author

saranyaeu2987 commented May 28, 2020

@oscerd
Here is my kafkaconnect Strimzi deployment yaml file

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: selumalai/selumalai-s3-kafkaconnect #(Docker file definition in above comment where my-plugins contains all camel connector jars)
  logging:
    type: inline
    loggers:
      connect.root.logger.level: "INFO"
  replicas: 1
  bootstrapServers: 34.212.160.12:9092
  externalConfiguration:
    volumes:
      - name: aws-credentials
        secret:
          secretName: aws-credentials
  config:
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false

Here is my KafkaConnector deployment file

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: s3-sink-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.CamelSinkConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: my-replicated-topic
    camel.sink.url: aws-s3://selumalai-kafka-s3?keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
    camel.sink.maxPollDuration: 10000
    camel.component.aws-s3.configuration.autocloseBody: false
    camel.component.aws-s3.accessKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
    camel.component.aws-s3.secretKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
    camel.component.aws-s3.region: US_WEST_2

@valdar
Copy link
Member

valdar commented May 28, 2020

@saranyaeu2987 can you please try with this Docker file:

FROM strimzi/kafka:0.18.0-kafka-2.5.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/camel-aws-s3/
USER 1001

@saranyaeu2987
Copy link
Author

saranyaeu2987 commented May 28, 2020

Worked. Thanks a lot!!.

BTW, I have this
camel.sink.url: aws-s3://selumalai-kafka-s3?keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}

  1. Why the variables are not resolved and the file is saved as ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId} ??

Screen Shot 2020-05-28 at 5 33 25 PM

  1. Is there a batch size property for S3 sink ( since I couldnt find any example for S3 sink https://github.com/apache/camel-kafka-connector/tree/master/examples)?
    something like this
    camel.sink.maxBatchPollSize = 1000

@oscerd
Copy link
Contributor

oscerd commented May 29, 2020

The simple language is not resolved in the sink.url

@saranyaeu2987
Copy link
Author

@valdar @oscerd

I want my CamelSinkConnector(s3) to consume in batches of 1000 messages or for interval of 10seconds.
How can I set this property in yaml file ?

@oscerd
Copy link
Contributor

oscerd commented May 29, 2020

The S3 sink connector doens't provide a batch size option because it's mapped on the camel producer options actually. The producer doesn't work as a batch job.

@saranyaeu2987
Copy link
Author

So, this property CamelBatchSize in https://camel.apache.org/manual/latest/batch-consumer.html
will not be applicable to s3sink connector?

@oscerd
Copy link
Contributor

oscerd commented May 29, 2020

Yes, because you're mixing up the concepts: a consumer in Camel will be used when you're using a Source connector, so you'll have the consumer options in that case.
For example if you use the source connector you can set the

camel.source.endpoint.maxMessagesPerPoll

And define how much messages will be part of the batch consumed.

On the sink side you will be using the producer options.

Source options: https://camel.apache.org/camel-kafka-connector/latest/connectors/camel-aws-s3-kafka-source-connector.html
Sink options: https://camel.apache.org/camel-kafka-connector/latest/connectors/camel-aws-s3-kafka-sink-connector.html

You're using a sink connector (in Camel terms you're using a producer)

@saranyaeu2987
Copy link
Author

Got it.
I wanted to add variable date in camel.sink.url. is there a way to add it.
without variable part, the file in S3 is getting overwritten.

Here is my yaml file for your reference

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: s3-sink-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.CamelSinkConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: my-replicated-topic
    camel.sink.url: aws-s3://selumalai-kafka-s3?keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
    camel.sink.maxPollDuration: 10000
    camel.component.aws-s3.configuration.autocloseBody: false
    camel.component.aws-s3.accessKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
    camel.component.aws-s3.secretKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
    camel.component.aws-s3.region: US_WEST_2

@oscerd
Copy link
Contributor

oscerd commented May 29, 2020

I don't think it's possible with simple language. So I guess you'll need to add something dynamic there.

@saranyaeu2987
Copy link
Author

saranyaeu2987 commented May 29, 2020

@oscerd
I see both ${date:now:yyyyMMdd} and exchangeId in this document (https://camel.apache.org/components/latest/languages/simple-language.html)

Why is it's not possible in simple language? Pardon me if I again mixup concepts

@oscerd
Copy link
Contributor

oscerd commented May 29, 2020

It's not possible because you're declaring this stuff in a static file, simple language should be used in a real Camel route. The kafka configuration for connector is not dynamic, it's static.

@saranyaeu2987
Copy link
Author

saranyaeu2987 commented May 29, 2020

@oscerd @valdar

If that the case, how file variable is resolved?

NOT RESOLVED -->camel.sink.url: aws-s3://selumalai-kafka-s3?keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}. 
RESOLVED -->camel.component.aws-s3.accessKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}. 

=======================================================================

Basically, I wanted to preserve all Kafka topic data consumed via sinkconnector in S3 without overwriting.

Any suggestion on

  1. How to prevent overwriting S3 file after consuming data from a kafka topic?
    OR
    How can I add dynamic part to url so that sink connector writes topics data to different files in s3
  2. Is someway to write in batches in S3 ?

Current State: S3 file overwritten after every consumption
Desired State : Preserve every consumed data in S3. If batch consumption (say 1000 messages/file) available, it would be fantastic!!)

@oscerd
Copy link
Contributor

oscerd commented May 29, 2020

We are working on enabling the dynamic resolver with #252

@saranyaeu2987
Copy link
Author

saranyaeu2987 commented May 30, 2020

@oscerd
Thanks a lot. Rebuild jar with that line of change in my local and new files are created as expected!!!

Any suggestions on code change for batch consume mode for s3 sink ?

@oscerd
Copy link
Contributor

oscerd commented May 30, 2020

It's not supported on the sink side. The camel producer is not designed for batch.

@valdar
Copy link
Member

valdar commented Jun 11, 2020

@saranyaeu2987 I am closing this one for the moment since work is tracked in #252 . Feel free to reopen this one or open another issue if something still needs to be clarified.

@valdar valdar closed this as completed Jun 11, 2020
@ksingh7
Copy link

ksingh7 commented Jun 13, 2020

@saranyaeu2987 Did you managed to dump kafka data onto S3 in batches. With your examples, i manged to get data moving from Kafka topic on to S3, however for every new Kafka message, its creating a object inside S3 bucket, which is highly inefficient.

So i am wondering if you managed to fix this using some camel.sink properties. Any help on this is much appreciated.

image

@saranyaeu2987
Copy link
Author

saranyaeu2987 commented Jun 13, 2020 via email

@oscerd
Copy link
Contributor

oscerd commented Jun 13, 2020

It's how the component has been designed

@saranyaeu2987
Copy link
Author

saranyaeu2987 commented Jun 13, 2020 via email

@oscerd
Copy link
Contributor

oscerd commented Jun 13, 2020

I don't see how it would be useful. In camel, when you send a message to s3 through a producer it will be written as s3 object directly. In S3 there is no append operation. So I really don't see why changing the behavior. Also batch operation in what sense? Writing multiple lines on a same file? Or write multiple file in one shot? There is no batch support in S3 sdk v1 as far as I know.

@saranyaeu2987
Copy link
Author

saranyaeu2987 commented Jun 13, 2020 via email

@oscerd
Copy link
Contributor

oscerd commented Jun 13, 2020

There is no plan for this at the moment. Also the aws S3 component is old, so we won't introduce new stuff there. It's easier on aws2-s3.

For the second question, no. There is no folder concept in S3 and we didn't focus on it

@saranyaeu2987
Copy link
Author

saranyaeu2987 commented Jun 13, 2020

@oscerd

  1. Does aws2-s3 also doesn't batch feature?
  2. Is simple supported in aws2-s3 sink connectors?
  3. Can aws2-s3 be used instead is aws-s3 for my above example.
  4. Any doc on differences between them ?

@oscerd
Copy link
Contributor

oscerd commented Jun 13, 2020

  1. No
  2. Once we merge the dynamic PR, all the connectors will support simple
  3. It should work
  4. The only difference is the sdk version 2 vs 1. Features are the same.

@ksingh7
Copy link

ksingh7 commented Jun 13, 2020

@oscerd First of all thanks for building this connector, I am happy that it works \o/

I like to share a use case of Kafka Messages ===Batching===> S3 (inspired by RHT Open Hybrid Edge Computing project )

Suppose we wanted to ingest live sensor data from a fleet of thousands of IoT devices (ex : connected smart cars, industrial sensors etc) into Kafka. We wanted to persist data for long term processing and big data analytics, so data should be moved to Object Storage (s3).

Given, this if we have, say 10K devices, each device is sending 1 message every 10 seconds (this is for simple calculation, but in reality, the rate is much higher than this). So in 24 hours, this will generate

10000 x 8640 = 86400000 Messages / Day

So if these 84Million messages, occupy 1 Object per message, we will end up having 84Million objects in s3 each day, which is overwhelming. Ideally, if we can batch this something like, messages generated in 10 minutes will get stored in 1 S3 Object (file). This can drastically reduce the number of objects in S3 bucket (from 84M to 144K objects)

10000 x 8640 = 86400000 Messages / 600 = 144K S3 objects

Answering your previous comment point

Also batch operation in what sense? Writing multiple lines on the same file? Or write multiple files in one shot?

With batch, we mean, writing multiples lines (messages) on a same file

BTW this is a feature request not a but :) happy to discuss more on this topic

@oscerd
Copy link
Contributor

oscerd commented Jun 15, 2020

Directly in S3 AWS SDK, it's not possible to do this. In the Camel component this is not something we could do directly, because S3 doesn't support appending to an S3 object.

@oscerd
Copy link
Contributor

oscerd commented Jun 15, 2020

I'll have a look to Kinesis Firehose in combination to S3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants