-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25501][SS] Add kafka delegation token support. #22598
Conversation
ok to test |
What's diff with #22550 ? Let's close one of them. +@merlintang |
That said on the jira the mentioned PR by @merlintang is crashing on my cluster so not proposed for merge. Please close it. |
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As embedded Kafka is available for testing I would say having a unittest where the DT is tested is quite important for this change (especially to assure further developments does not break this feature).
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
Outdated
Show resolved
Hide resolved
@gaborgsomogyi thanks for your PR, I am going through the details and test on my local machine. |
Test build #96814 has finished for PR 22598 at commit
|
* Made KafkaDelegationTokenIdentifier private * Kerberos module name fix for IBM JVM * ScramLoginModule comes from the class name which fails compile time if it's moved * Extended HadoopDelegationTokenManagerSuite tests * Moved token parameter set into ConfigUpdater
Test build #96871 has finished for PR 22598 at commit
|
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From our previous talks the configuration stuff looked ok, given the limitations of how the Kafka connector is initialized. Couldn't really think of a different way that could cover all the cases.
If there's any unit test that can be written for the TokenUtil
code that would be great.
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
Outdated
Show resolved
Hide resolved
* Removed the additional token enable flag. * Increased test coverage. * Fixes for @vanzin's comments.
Test build #97114 has finished for PR 22598 at commit
|
Test build #97115 has finished for PR 22598 at commit
|
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/TokenUtilSuite.scala
Outdated
Show resolved
Hide resolved
Test build #97151 has finished for PR 22598 at commit
|
* Added comment for getKrb5LoginModuleName. * Used === operator in tests instead of equals.
Test build #97188 has finished for PR 22598 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've finished taking a first look at the diff. Please note that I've added some comments in comment threads which are marked as resolved.
Btw, looks like manual test is the only way to verify this patch: while I don't have time/environment to try this out, but I'll try to do it when I'm ready.
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
Outdated
Show resolved
Hide resolved
* Reordered TokenUtil methods to make it more readable. * Enhanced additional parameters doc to make things more explicit.
Test build #97612 has finished for PR 22598 at commit
|
retest this, please |
Test build #97613 has finished for PR 22598 at commit
|
retest this, please |
Test build #99330 has finished for PR 22598 at commit
|
Test build #99453 has finished for PR 22598 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks ok pending tests and a few minor things.
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
Show resolved
Hide resolved
...l/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
Show resolved
Hide resolved
* Move configs to Kafka.scala * Minor cosmetic changes
core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
Outdated
Show resolved
Hide resolved
Test build #99469 has finished for PR 22598 at commit
|
Test build #99474 has finished for PR 22598 at commit
|
Test build #99476 has finished for PR 22598 at commit
|
Merging to master. |
## What changes were proposed in this pull request? It adds kafka delegation token support for structured streaming. Please see the relevant [SPIP](https://docs.google.com/document/d/1ouRayzaJf_N5VQtGhVq9FURXVmRpXzEEWYHob0ne3NY/edit?usp=sharing) What this PR contains: * Configuration parameters for the feature * Delegation token fetching from broker * Usage of token through dynamic JAAS configuration * Minor refactoring in the existing code What this PR doesn't contain: * Documentation changes because design can change ## How was this patch tested? Existing tests + added small amount of additional unit tests. Because it's an external service integration mainly tested on cluster. * 4 node cluster * Kafka broker version 1.1.0 * Topic with 4 partitions * security.protocol = SASL_SSL * sasl.mechanism = SCRAM-SHA-256 An example of obtaining a token: ``` 18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID HMAC OWNER RENEWERS ISSUEDATE EXPIRYDATE MAXDATE 18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] User:user [] 2018-10-01T01:07 2018-10-02T01:07 2018-10-08T01:07 18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67 ``` An example token usage: ``` 18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]"; 18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, using it for login. ``` Closes apache#22598 from gaborgsomogyi/SPARK-25501. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request? Kafka delegation token support implemented in [PR#22598](apache#22598) but that didn't contain documentation because of rapid changes. Because it has been merged in this PR I've documented it. ## How was this patch tested? jekyll build + manual html check Closes apache#23195 from gaborgsomogyi/SPARK-26236. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
@gaborgsomogyi why all these configs use Spark conf? Does this mean we cannot support multiple tokens? We have supported to pass kafka configs via data source options, why not use data source options instead? Sorry if I missed any discussions here. |
As a small extract these parameters are parsed way after the delegation token subsystem started and tokens obtained. |
this is exciting to me. i tested it on a kafka 2.2.0 cluster that uses gssapi/kerberos for authentication, by enabling token support in kafka. to be specific in server.properties i changed:
note that i kept i could see my spark structured streaming job obtain kafka token successfully. after that i ran into some issues in driver that seemed kafka specific. i added comments here: |
Good to hear its used. Ping me if you see any Spark related issue. BTW, multi-cluster support is on the way... |
after adding the scram login module to my broker jaas configs (so brokers have both kerberos and scram) the kafka specific issue was resolved. so now everything works. i will be doing long running tests over next few days/weeks. thanks again. |
i have been testing long running structured streaming jobs from and to kafka using delegation tokens. the driver is launched by a user with a kerberos login and keytab, principal and keytab are provided to spark-submit. trigger is hourly.
it seems like login with SCRAM fails a few times and then succeeds again. is this expected? |
Since Kafka doesn't support dynamic parameter settings, yes. |
got it. thanks
…On Tue, Jun 4, 2019, 12:56 Gabor Somogyi ***@***.***> wrote:
Since Kafka doesn't support dynamic parameter settings, yes.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#22598?email_source=notifications&email_token=AAGIQJBOG73M6ZDTQNP7T4TPY2NERA5CNFSM4FYHCM42YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODW5GLEQ#issuecomment-498754962>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAGIQJAOKP4QQJDLKKAUQULPY2NERANCNFSM4FYHCM4Q>
.
|
we just upgraded our dev cluster from SASL_PLAINTEXT to SASL_SSL but something is wrong with the certificates (or with kafkas hostname resolution? not sure yet). so as a workaround i temporarily need to set:
how i can generally change any of these settings? in future i could see how there are other settings in this kafka admin clients configuration that i would like to change as well, so i am looking for a general way to update it in a deployment. thanks! |
It's not yet possible. I'm having one week vacation but after that I'll add this feature. |
very much appreciated. i will test it once you have branch.
have a good vacation
…On Fri, Jun 7, 2019, 13:13 Gabor Somogyi ***@***.***> wrote:
It's not yet possible. I'm having one week vacation but after that I'll
add this feature.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#22598?email_source=notifications&email_token=AAGIQJGURJW5VCD63F2ACQTPZKJMVA5CNFSM4FYHCM42YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODXGN4OA#issuecomment-499965496>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAGIQJDPIXQJZGK3WH5CI4LPZKJMVANCNFSM4FYHCM4Q>
.
|
What changes were proposed in this pull request?
It adds kafka delegation token support for structured streaming. Please see the relevant SPIP
What this PR contains:
What this PR doesn't contain:
How was this patch tested?
Existing tests + added small amount of additional unit tests.
Because it's an external service integration mainly tested on cluster.
An example of obtaining a token:
An example token usage: