From c262d20211253bd3e6d5acd97c6c71add1006ea1 Mon Sep 17 00:00:00 2001 From: Serhii Chvaliuk Date: Fri, 25 Mar 2022 14:49:54 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Destination=20Snowflake=20+=20No?= =?UTF-8?q?rmalization=20Core:=20Added=20OAuth=20support=20(#11093)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [10033] Destination-Snowflake: added basic part for support oauth login mode * added basic logic for token refresh * Fixed code to support pooled connections * Hide DBT transformations in cloud (#10583) * Bump Airbyte version from 0.35.35-alpha to 0.35.36-alpha (#10584) Co-authored-by: timroes * :bug: Source Shopify: fix wrong field type for tax_exemptions (#10419) * fix(shopify): wrong type for tax_exemptions abandoned_checkouts customer tax_exemptions had the wrong field type * fix(shopify): wrong type for tax_exemptions abandoned_checkouts customer tax_exemptions had the wrong field type * bump connector version Co-authored-by: marcosmarxm * Remove storybook-addon-styled-component-theme (#10574) * Helm Chart: Secure chart for best practices (#10000) * πŸ› Source FB Marketing: fix `execute_in_batch` when batch is bigger than 50 (#10588) * fix execute_in_batch * add tests * fix pre-commit config Co-authored-by: Sherif A. Nada Co-authored-by: Eugene Kulak Co-authored-by: Sherif A. Nada * Bmoric/move flag check to handler (#10469) Move the feature flag checks to the handler instead of the configuration API. This could have avoid some bug related to the missing flag check in the cloud project. * Documented product release stages (#10596) * Set resource limits for connector definitions: api layer (#10482) * Updated link to product release stages doc (#10599) * Change the block logic and block after the job creation (#10597) This is changing the check to see if a connection exist in order to make it more performant and more accurate. It makes sure that the workflow is reachable by trying to query it. * Add timeout to connector pod init container command (#10592) * add timeout to init container command * add disk usage check into init command * fix up disk usage checking and logs from init entrypoint * run format * fix orchestrator restart problem for cloud (#10565) * test time ranges for cancellations * try with wait * fix cancellation on worker restart * revert for CI testing that the test fails without the retry policy * revert testing change * matrix test the different possible cases * re-enable new retry policy * switch to no_retry * switch back to new retry * paramaterize correctly * revert to no-retry * re-enable new retry policy * speed up test + fixees * significantly speed up test * fix ordering * use multiple task queues in connection manager test * use versioning for task queue change * remove sync workflow registration for the connection manager queue * use more specific example * respond to parker's comments * Fix the toggle design (#10612) * Source Hubspot: cast timestamp to date/datetime (#10576) * cast timestamp to date * change test name * fix corner cases * fix corner cases 2 * format code * changed method name * add return typing * bump version * updated spec and def yaml Co-authored-by: auganbay * Update _helpers.tpl (#10617) as helm templates integers as float64, when using %d, it renders the value of external airbyte.minio.endpoint to "S3_MINIO_ENDPOINT: "http://minio-service:%!d(float64=9000)", therefore needed to be changed to %g * πŸŽ‰ Source Survey Monkey: add option to filter survey IDs (#8768) * Add custom survey_ids * bump version * Update survey_question schema * Add changelog * Allow null objects * merge master and format * Make all types safe with NULL and add survey_ids to all streams * Make additional types safe with NULL * Make additional types safe with NULL * One last safe NULL type * small fixes * solve conflic * small fixes * revert fb wrong commit * small fb correction * bump connector version Co-authored-by: marcosmarxm * Fix doc links/loading (#10621) * Allow frontmatter in rendered markdown (#10624) * Adjust to new normalization name (#10626) * sweep pods from end time not start time (#10614) * Source Pinterest: fix typo in schema fields (#10223) * :tada: add associations companies to deals, ticket and contacts stream (from PR 9027) (#10631) * Added associations to some CRM Object streams in Hubspot connector * Added associations in the relevant schemas * fix eof * bump connector version Co-authored-by: ksoenandar * Source Chargebee: add transaction stream (#10312) * added transactions model * changes * fix * few changes * fix * added new stream in configured_catalog*.json * changes * removed new stream in configured_catalog*.json * solve small schema issues * add eof * bump connector version Co-authored-by: marcosmarxm Co-authored-by: Marcos Marx * Add missing continue as new (#10636) * Bump Airbyte version from 0.35.36-alpha to 0.35.37-alpha (#10640) Co-authored-by: benmoriceau * exclude workers test from connectors builds on CI (#10615) * :tada: Source Google Workspace Admin Reports: add support for Google Meet Audit Activity Events (#10244) * source(google-workspace-admin-reports): add support for Google Meet Audit activity events Signed-off-by: Michele Zuccala * remove required fields * bump connector version * run format Co-authored-by: marcosmarxm * stabilize connection manager tests (#10606) * stabilize connection manager tests * just call shutdown once * another run just so we can see if it's passing * another run just so we can see if it's passing * re-disable test * run another test * run another test * run another test * run another test * Log pod state if init pod wait condition times out (for debugging transient test issue) (#10639) * log pod state if init pod search times out * increase test timeout from 5 to 6 minutes to give kube pod process timeout time to trigger * format * upgrade gradle from 7.3.3 -> 7.4 (#10645) * upgrade temporal sdk to 1.8.1 (#10648) * upgrade temporal from mostly 1.6.0 to 1.8.1 * try bumping GSM to get newer grpc dep * Revert "try bumping GSM to get newer grpc dep" This reverts commit d8376502843a04771b76fe2f1c6c7d6de0ab5e5d. * upgrade temporal-testing as well * don't change version for temporal-testing-junit5 * πŸŽ‰ Source Google Ads: add network fields to click view stream * Google Ads #8331 - add network fields to click_view stream schema * Google Ads #8331 - add segments.ad_network_type to click_view pk according to PR review * Google Ads #8331 - bump version * Google Ads #8331 - update definition * Cloud Dashboard 1 (#10628) Publish metrics for: - created jobs tagged by release stage - failed jobs tagged by release stage - cancelled jobs tagged by release stage - succeed jobs tagged by release stage * Correct cancelled job metric name. (#10658) * Add attempt status by release stage metrics. (#10659) Add, - attempt_created_by_release_stage - attempt_failed_by_release_stage - attempt_succeeded_by_release_stage * πŸ› Source CockroachDB: fix connector replication failure due to multiple open portals error (#10235) * fix cockroachdb connector replication failure due to multiple open portals error * bump connector version Co-authored-by: marcosmarxm * πŸ™ octavia-cli: implement `generate` command (#10132) * Add try catch to make sure all handlers are closed (#10627) * Add try catch to make sure all handlers are closed * Handle exceptions while initializing writers * Bumpversion of connectors * bumpversion in seed * Fix bigquery denormalized tests * bumpversion seed of destination bigquery denormalized * Fix links in onboarding page (#10656) * Fix missing key inside map * Fix onboarding progress links * Add use-case links to onboarding (#10657) * Add use-case links to onboarding * Add new onboarding links * Set resource limits for connector definitions: expose in worker (#10483) * pipe through to worker * wip * pass source and dest def resource reqs to job client * fix test * use resource requirements utils to get resource reqs for legacy and new impls * undo changes to pass sync input to container launcher worker factory * remove import * fix hierarchy order of resource requirements * add nullable annotations * undo change to test * format * use destination resource reqs for normalization and make resource req utils more flexible * format * refactor resource requirements utils and add tests * switch to storing source/dest resource requirements directly on job sync config * fix tests and javadocs * use sync input resource requirements for container orchestrator pod * do not set connection resource reqs to worker reqs * add overrident requirement utils method + test + comment Co-authored-by: lmossman * add mocks to tests * Bump Airbyte version from 0.35.37-alpha to 0.35.38-alpha (#10668) Co-authored-by: lmossman * πŸŽ‰ Source Salesforce: speed up discovery >20x by leveraging parallel API calls (#10516) * πŸ“– improve salesforce docs & reorder properties in the spec (#10679) * Bump Airbyte version from 0.35.38-alpha to 0.35.39-alpha (#10680) Co-authored-by: sherifnada * Improve note in salesforce docs about creating a RO user * Upgrade plop in connector generators (#10578) * Upgrade plop * Remove scaffolded code * Build fixes * Remove scaffolded code * Revert "Remove scaffolded code" This reverts commit 3911f527f8fb2049f9a1cd261002fa1bb5c083ae. * Revert "Remove scaffolded code" This reverts commit 549f790e3cd97f2d73b8f70f50ed9d1ce5a80d74. * Remove .gitignore changes * Remove .gitignore changes * Update scaffold generated code * Replace titleCase with capitalCase (#10654) * Add capitalCase helper * Replace titleCase with capitalCase * Update generated scaffold files Co-authored-by: LiRen Tu * πŸ› Fix toggle styling (#10684) * Fix error NPE in metrics emission. (#10675) * Fix missing type=button (#10683) * close ssh in case of exception during check in Postgres connector (#10620) * close ssh in case of exception * remove unwanted change * remove comment * format * do not close scanner * fix semi-colon * format * Refactor to enable support for optional JDBC parameters for all JDBC destinations (#10421) * refactoring to allow testing * MySQLDestination uses connection property map instead of url arguments * Update jdbc destinations * A little more generic * reset to master * reset to master * move to jdbcutils * Align when multiline * Align when multiline * Update postgres to use property map * Move tests to AbstractJdbcDestinationTest * clean * Align when multiline * return property map * Add postgres tests * update clickhouse * reformat * reset * reformat * fix test * reformat * fix bug * Add mssql tests * refactor test * fix oracle destination test * oracle tests * fix redshift acceptance test * Pass string * Revert "Pass string" This reverts commit 697821738cae042db6fcb10f47c4bae819d173ac. * Double deserialization * Revert "Double deserialization" This reverts commit ee8d75245b7eafa1a5feb0cab2419a3ecc05474e. * try updating json_operations * Revert "try updating json_operations" This reverts commit c8022c299431c8ff43ed4f507bb6475569a74553. * json parse * Revert "json parse" This reverts commit 11a6725eaa7c73b13be5e2c3ba7ed0ad25b43c3e. * Revert "Revert "Double deserialization"" This reverts commit 213f47acc42f59953dce42538cd3a07a8dbf3360. * Revert "Revert "Revert "Double deserialization""" This reverts commit 66822454afeaa0c0c7de641b9907c904b659e535. * move to constant * Add comment * map can be constant * Add comment * move map * hide in method * no need to create new map * no need to create new map * no need to create new map * enably mysql test * Update changelogs * Update changelog * update changelog * Bump versions * bump version * disable dbt support * update spec * update other oracle tests * update doc * bump seed * fix source test * update seed spec file * fix expected spec * Fix trial period time frame (#10714) * Bmoric/restore update with temporal (#10713) Restore the missing update call to temporal. It was making the update of a schedule to not be effective immediately. * Bump Airbyte version from 0.35.39-alpha to 0.35.40-alpha (#10716) Co-authored-by: benmoriceau * Fix CockroachDbSource compilation error (#10731) * Fix CockroachDbSource compilation error * fix test too * πŸŽ‰ Source Zendesk: sync rate improvement (#9456) * Update Source Zendesk request execution with future requests. * Revert "Update Source Zendesk request execution with future requests." This reverts commit 2a3c1f82b75a2b47ece13cde71f99e7be84065e6. * Add futures stream logics. * Fix stream * Fix full refresh streams. * Update streams.py. Fix all streams. Updated schema. * Add future request unit tests * Post review fixes. * Fix broken incremental streams. Fix SAT. Remove odd unit tests. * Comment few unit tests * Bump docker version * CDK: Ensure AirbyteLogger is thread-safe using Lock (#9943) * Ensure AirbyteLogger is thread-safe - Introduce a global lock to ensure `AirbyteLogger` is thread-safe. - The `logging` module is thread-safe, however `print` is not, and is currently used. This means that messages sent to stdout can clash if connectors use threading. This is obviously a huge problem when the IPC between the source/destination is stdout! - A `multiprocessing.Lock` could have been introduced however given that `logging` module is not multiprocess-safe I thought that thread-safety should be first goal. - IMO the `AirbyteLogger` should be a subclass of the `logging.Logger` so you have thread-safety automatically, however I didn't want to make a huge wholesale change here. * Revert lock and add deprecation warning instead * remove --cpu-shares flag (#10738) * Bump Airbyte version from 0.35.40-alpha to 0.35.41-alpha (#10740) Co-authored-by: jrhizor * Add Scylla destination to index (#10741) * Add scylla to destination_definitions * Add woocommerce source * Update definition id * Add icon * update docker repository * reset to master * fix version * generate spec * Update builds.md * run gradle format (#10746) * Bump Airbyte version from 0.35.41-alpha to 0.35.42-alpha (#10747) Co-authored-by: girarda * Change offer amount * Fix back link on signup page (#10732) * Fix back link on signup page * Add and correct uiConfig links * πŸŽ‰ Source redshift: implement privileges check (#9744) * update postgres source version (#10696) * update postgres source version * update spec * fix[api]: nullable connection schedule (#10107) * fix[api] inconsistent casing on OperationID for Operations API (#10464) * #10307 Fixes inconsistent casing on OperationID for Operations API * update generated doc Co-authored-by: alafanechere * Display numbers in usage per connection table (#10757) * Add connector stage to dropdown value (#10677) * Add connector stage to dropdown value * Remove line break from i18n message * Update snowflake destination docs for correct host (#10673) * Update snowflake destination docs for correct host * Update snowflake.md * Update README.md * Update spec.json * Update README.md * Update spec.json * Update README.md * Update snowflake.md * Update spec.json * Update spec.json * πŸ“• source salesforce: fix broken page anchor in spec.json & add guide for adding read only user (#10751) * πŸŽ‰ Source Facebook Marketing: add activities stream (#10655) * add facebook marketing activities stream * update incremental test * add overrides for activities specific logic * formatting * update readme docs * remove test limitation * update dockerfile airbyte version * correct tests * bump connector version in config module Co-authored-by: marcosmarxm * Add a note about running only in dev mode on M1 (#10772) Macs with M1 chip can run Airbyte only in dev mode right now, so to make it clear, I added a note about it and moved the hint about M1 chips to the top of the section. * push failures to segment (#10715) * test: new failures metadata for segment tracking * new failures metadata for segment tracking failure_reasons: array of all failures (as json objects) for a job - for general analytics on failures main_failure_reason: main failure reason (as json object) for this job - for operational usage (for Intercom) - currently this is just the first failure reason chronologically - we'll probably to change this when we have more data on how to determine failure reasons more intelligently - added an attempt_id to failures so we can group failures by attempt - removed stacktrace from failures since it's not clear how we'd use these in an analytics use case (and because segment has a 32kb size limit for events) * remove attempt_id attempt info is already in failure metadata * explicitly sort failures array chronologically * replace "unknown" enums with null note: ImmutableMaps don't allow nulls * move sorting to the correct place * Update temporal retention TTL from 7 to 30 days (#10635) Increase the temporal retention to 30 days instead of 7. It will help with on call investigation. * Add count connection functions (#10568) * Add count connection functions * Fix new configRepository queries - Remove unnecessary joins - Fix countConnection * Use existing mock data for tests * Adds default sidecar cpu request and limit and add resources to the init container (#10759) * close ssh tunnel in case of exception in destination consumer (#10686) * close ssh tunnel in case of exception * format * fix salesforce docs markdown formatting * Fix typo in salesforce docs * Extract event from the temporal worker run factory (#10739) Extract of different events that can happen to a sync into a non temporal related interface. * Bump Airbyte version from 0.35.42-alpha to 0.35.43-alpha (#10778) Co-authored-by: sherifnada * Added a note about running in dev mode on M1 macs (#10776) Currently, Macs with M1 chips can run Airbyte only in dev mode. I added a note about that. * Destination Snowflake: add missing version in changelog (#10779) * Hide shopify in Cloud (#10783) * Metrics Reporter Queries Part 1 (#10663) Add all the simpler queries from https://docs.google.com/document/d/11pEUsHyKUhh4CtV3aReau3SUG-ncEvy6ROJRVln6YB4/edit?usp=sharing. - Num Pending Jobs - Num Concurrent Jobs - Oldest Pending Job - Oldest Running Job * Bump Airbyte version from 0.35.43-alpha to 0.35.44-alpha (#10789) * Bump Airbyte version from 0.35.43-alpha to 0.35.44-alpha * Commit. * Add exception block. * Why would having try catch work? * Add logging to figure out. * Undo all debugging changes. * Better comments. Co-authored-by: davinchia Co-authored-by: Davin Chia * Update api-documentation.md * jdbc build fixes (#10799) * Update api-documentation.md * Exclude package.json from codeowners (#10805) * :tada: Source Chargebee: add credit note model (#10795) * feat(chargebee) add credit note model * fix(airbyte): update version Dockerfile * fix(airbyte): update version Dockerfile v2 * Source Chargebee: run format and correct unit test (#10811) * feat(chargebee) add credit note model * fix(airbyte): update version Dockerfile * fix(airbyte): update version Dockerfile v2 * correct unit test Co-authored-by: Koen Sengers * πŸŽ‰ Source Chartmogul: Add CustomerCount stream (#10756) * πŸŽ‰ Source Chartmogul: Add CustomerCount stream * Update description * address comments * update changelog * format source file * run seed file Co-authored-by: marcosmarxm * default to no resource limits for OSS (#10800) * Add autoformat (#10808) * Bump Airbyte version from 0.35.44-alpha to 0.35.45-alpha (#10818) Co-authored-by: lmossman * Set default values as current values in editMode (#10486) * Set default values as current values in editMode * Fix unit tests * Save signup fields (#10768) * Temporary save signup fields into firebase_user.displayName * Use default values if no displayName was stored before * Move regsiter to localStorage * Address PR comments * Source Woocommerce: fixes (#10529) * fixed issues * Fix: multiple issues * modify configured catalog * Fix: remove unused variables * Fix: orders request with parameters * Fix: add new line in configured catalogs * Fix: remove unused imports * Fix: catalog changes * Source woocommerce: publishing connector (#10791) * fixed issues * Fix: multiple issues * modify configured catalog * Fix: remove unused variables * Fix: orders request with parameters * Fix: add new line in configured catalogs * Fix: remove unused imports * Fix: catalog changes * fix: change schema for meta_data Co-authored-by: Manoj * Surface any active child thread of dying connectors (#10660) * Interrupt child thread of dying connectors to avoid getting stuck * Catch and print stacktrace * Add test on interrupt/kill time outs * Send message to sentry too * Add another token to alleviate API limit pressure. (#10826) We are running into Github API rate limits. This PR: - introduces another token as a temp solution. - reorganises the workflow file. * Add caching to all jobs in the main build. (#10801) Add build dependency caching to all jobs in the main build. This speeds things up by 5 mins over the previously uncached time. * πŸ› Handle try/catch in BigQuery destination consumers (#10755) * Handle try/catch in BigQuery destination consumers * Remove parallelStream * Bumpversion of connector * update changelogs * update seeds * Format code (#10837) * Regenerate MySQL outputs from normalization tests * format * Use cypress dashboard and stabilize e2e tests (#10807) * Record e2e tests to cypress dashboard * Make env variable accessible in script * Improve e2e_test script * Properly wait for server to be ready * Isolate test suites better * More test isolation * Revert baseUrl for development * πŸ› Source Github: add new streams `Deployments`, `ProjectColumns`, `PullRequestCommits` (#10385) Signed-off-by: Sergey Chvalyuk * Remove the use of ConfigPersistence for ActorCatalog operation (#10387) * Skip ConfigPersistence for ActorCatalog operations * Fix catalog insertion logic - ActorCatalog and ActorCatalogFetchEvent are stored within the same transation. - The function writing catalog now automatically handles deduplication. - Fixed function visibility: helper function to handle ActorCatalog insertion are now private. * Fix fetch catalog query take the catalog associated with the latest fetch event in case where multiple event are present for the same config, actorId, actor version. * Fix name of columns used for insert * Add testing on deduplication of catalogs * Add javadoc for actor catalog functions * Rename sourceId to actorId * Fix formatting * Update integrations README.md (#10851) Updated verbiage from grades to stages Updated connector stages to match cloud stage tags Added connectors missing on README.md that appear on cloud drop down * [10033] Destination-Snowflake: added basic part for support oauth login mode * added basic logic for token refresh * Updated spec to support DBT normalization and OAuth * snowflake oauth Signed-off-by: Sergey Chvalyuk * test_transform_snowflake_oauth added Signed-off-by: Sergey Chvalyuk * [4654] Added backward compatibility * Added test to check a backward compatibility * fixed oauth connection * Updated doc, fixed code as per comments in PR * to be more explicit Signed-off-by: Sergey Chvalyuk * Added executor service * Fixed merge conflict * Updated doc and bumped version * Bumped version * bump 0.1.71 -> 0.1.72 Signed-off-by: Sergey Chvalyuk * Updated doc * fix version in basic-normalization.md Signed-off-by: Sergey Chvalyuk * Added explicit re-set property, but even now it already works * dummy bumping version * updated spec Co-authored-by: ievgeniit Co-authored-by: Tim Roes Co-authored-by: Octavia Squidington III <90398440+octavia-squidington-iii@users.noreply.github.com> Co-authored-by: timroes Co-authored-by: Philippe Boyd Co-authored-by: marcosmarxm Co-authored-by: Álvaro Torres Cogollo Co-authored-by: Eugene Kulak Co-authored-by: Sherif A. Nada Co-authored-by: Eugene Kulak Co-authored-by: Benoit Moriceau Co-authored-by: Amruta Ranade <11484018+Amruta-Ranade@users.noreply.github.com> Co-authored-by: Charles Co-authored-by: Parker Mossman Co-authored-by: Jared Rhizor Co-authored-by: augan-rymkhan <93112548+augan-rymkhan@users.noreply.github.com> Co-authored-by: auganbay Co-authored-by: keterslayter <32784192+keterslayter@users.noreply.github.com> Co-authored-by: Daniel Diamond <33811744+danieldiamond@users.noreply.github.com> Co-authored-by: Ronald Fortmann <72810611+rfortmann-ewolff@users.noreply.github.com> Co-authored-by: Marcos Marx Co-authored-by: ksoenandar Co-authored-by: Aaditya Sinha <75474786+aadityasinha-dotcom@users.noreply.github.com> Co-authored-by: benmoriceau Co-authored-by: Michele Zuccala Co-authored-by: vitaliizazmic <75620293+vitaliizazmic@users.noreply.github.com> Co-authored-by: Davin Chia Co-authored-by: Lakshmikant Shrinivas Co-authored-by: Augustin Co-authored-by: Christophe Duong Co-authored-by: lmossman Co-authored-by: lmossman Co-authored-by: Maksym Pavlenok Co-authored-by: sherifnada Co-authored-by: LiRen Tu Co-authored-by: Subodh Kant Chaturvedi Co-authored-by: girarda Co-authored-by: Vadym Hevlich Co-authored-by: jdclarke5 Co-authored-by: jrhizor Co-authored-by: girarda Co-authored-by: Azhar Dewji Co-authored-by: Alasdair Brown Co-authored-by: Julia Co-authored-by: Lucas Wiley Co-authored-by: Philip Corr Co-authored-by: Greg Solovyev Co-authored-by: Peter Hu Co-authored-by: Malik Diarra Co-authored-by: Thibaud Chardonnens Co-authored-by: davinchia Co-authored-by: Erica Struthers <93952107+erica-airbyte@users.noreply.github.com> Co-authored-by: Edward Gao Co-authored-by: Tim Roes Co-authored-by: ksengers <30521298+Koen03@users.noreply.github.com> Co-authored-by: Koen Sengers Co-authored-by: Titas Skrebe Co-authored-by: Artem Astapenko <3767150+Jamakase@users.noreply.github.com> Co-authored-by: Manoj Reddy KS Co-authored-by: Harshith Mullapudi Co-authored-by: Juan <80164312+jnr0790@users.noreply.github.com> --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 108 ++++++++++- .../bases/base-normalization/Dockerfile | 2 +- .../transform_config/transform.py | 12 +- .../base-normalization/snowflake.Dockerfile | 2 +- .../unit_tests/test_transform_config.py | 43 +++++ .../DestinationAcceptanceTest.java | 12 ++ .../destination-snowflake/Dockerfile | 2 +- .../destination-snowflake/build.gradle | 1 + .../snowflake/SnowflakeDatabase.java | 182 +++++++++++++++--- .../snowflake/SnowflakeDestination.java | 5 + .../src/main/resources/spec.json | 133 +++++++++++-- ...wflakeInsertDestinationAcceptanceTest.java | 15 +- .../java/io/airbyte/oauth/BaseOAuth2Flow.java | 10 +- .../oauth/OAuthImplementationFactory.java | 1 + .../flows/DestinationSnowflakeOAuthFlow.java | 146 ++++++++++++++ .../NormalizationRunnerFactory.java | 2 +- docs/integrations/destinations/snowflake.md | 18 ++ .../basic-normalization.md | 1 + 19 files changed, 644 insertions(+), 53 deletions(-) create mode 100644 airbyte-oauth/src/main/java/io/airbyte/oauth/flows/DestinationSnowflakeOAuthFlow.java diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index e103af789278..d0a0453bb72b 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -221,7 +221,7 @@ - name: Snowflake destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.4.22 + dockerImageTag: 0.4.24 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake icon: snowflake.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 5769d9bec319..7dd28835db92 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3840,7 +3840,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-snowflake:0.4.22" +- dockerImage: "airbyte/destination-snowflake:0.4.24" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake" connectionSpecification: @@ -3854,7 +3854,6 @@ - "database" - "schema" - "username" - - "password" additionalProperties: true properties: host: @@ -3906,11 +3905,56 @@ type: "string" title: "Username" order: 5 - password: - description: "The password associated with the username." - type: "string" - airbyte_secret: true - title: "Password" + credentials: + title: "Authorization Method" + type: "object" + oneOf: + - type: "object" + title: "OAuth2.0" + order: 0 + required: + - "access_token" + - "refresh_token" + properties: + auth_type: + type: "string" + const: "OAuth2.0" + enum: + - "OAuth2.0" + default: "OAuth2.0" + order: 0 + client_id: + type: "string" + title: "Client ID" + description: "The Client ID of your Drift developer application." + airbyte_secret: true + client_secret: + type: "string" + title: "Client Secret" + description: "The Client Secret of your Drift developer application." + airbyte_secret: true + access_token: + type: "string" + title: "Access Token" + description: "Access Token for making authenticated requests." + airbyte_secret: true + refresh_token: + type: "string" + title: "Refresh Token" + description: "Refresh Token for making authenticated requests." + airbyte_secret: true + - title: "Username and Password" + type: "object" + required: + - "password" + order: 1 + properties: + password: + description: "The password associated with the username." + type: "string" + airbyte_secret: true + title: "Password" + order: 1 order: 6 jdbc_url_params: description: "Additional properties to pass to the JDBC URL string when\ @@ -4150,6 +4194,56 @@ - "overwrite" - "append" - "append_dedup" + advanced_auth: + auth_flow_type: "oauth2.0" + predicate_key: + - "credentials" + - "auth_type" + predicate_value: "OAuth2.0" + oauth_config_specification: + oauth_user_input_from_connector_config_specification: + type: "object" + properties: + host: + type: "string" + path_in_connector_config: + - "host" + complete_oauth_output_specification: + type: "object" + additionalProperties: false + properties: + access_token: + type: "string" + path_in_connector_config: + - "credentials" + - "access_token" + refresh_token: + type: "string" + path_in_connector_config: + - "credentials" + - "refresh_token" + complete_oauth_server_input_specification: + type: "object" + additionalProperties: false + properties: + client_id: + type: "string" + client_secret: + type: "string" + complete_oauth_server_output_specification: + type: "object" + additionalProperties: false + properties: + client_id: + type: "string" + path_in_connector_config: + - "credentials" + - "client_id" + client_secret: + type: "string" + path_in_connector_config: + - "credentials" + - "client_secret" - dockerImage: "airbyte/destination-mariadb-columnstore:0.1.4" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/mariadb-columnstore" diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index f3db3ce267c7..376a48295f68 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -28,5 +28,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.1.71 +LABEL io.airbyte.version=0.1.72 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py index 82ad24c96436..a3c743d61be4 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py @@ -211,7 +211,6 @@ def transform_snowflake(config: Dict[str, Any]): "type": "snowflake", "account": account, "user": config["username"].upper(), - "password": config["password"], "role": config["role"].upper(), "database": config["database"].upper(), "warehouse": config["warehouse"].upper(), @@ -224,6 +223,17 @@ def transform_snowflake(config: Dict[str, Any]): "connect_retries": 3, "connect_timeout": 15, } + + credentials = config.get("credentials", {}) + if credentials.get("auth_type") == "OAuth2.0": + dbt_config["authenticator"] = "oauth" + dbt_config["oauth_client_id"] = credentials["client_id"] + dbt_config["oauth_client_secret"] = credentials["client_secret"] + dbt_config["token"] = credentials["refresh_token"] + elif credentials.get("password"): + dbt_config["password"] = credentials["password"] + else: + dbt_config["password"] = config["password"] return dbt_config @staticmethod diff --git a/airbyte-integrations/bases/base-normalization/snowflake.Dockerfile b/airbyte-integrations/bases/base-normalization/snowflake.Dockerfile index b1777276e320..35c6d2885186 100644 --- a/airbyte-integrations/bases/base-normalization/snowflake.Dockerfile +++ b/airbyte-integrations/bases/base-normalization/snowflake.Dockerfile @@ -29,5 +29,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.1.71 +LABEL io.airbyte.version=0.1.72 LABEL io.airbyte.name=airbyte/normalization-snowflake diff --git a/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py b/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py index 334cb314d387..6fdc01f3919f 100644 --- a/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py +++ b/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py @@ -300,6 +300,49 @@ def test_transform_snowflake(self): assert expected == actual assert extract_schema(actual) == "AIRBYTE_SCHEMA" + def test_transform_snowflake_oauth(self): + + input = { + "host": "http://123abc.us-east-7.aws.snowflakecomputing.com", + "role": "AIRBYTE_ROLE", + "warehouse": "AIRBYTE_WAREHOUSE", + "database": "AIRBYTE_DATABASE", + "schema": "AIRBYTE_SCHEMA", + "username": "AIRBYTE_USER", + "credentials": { + "auth_type": "OAuth2.0", + "client_id": "AIRBYTE_CLIENT_ID", + "access_token": "AIRBYTE_ACCESS_TOKEN", + "client_secret": "AIRBYTE_CLIENT_SECRET", + "refresh_token": "AIRBYTE_REFRESH_TOKEN", + }, + } + + actual = TransformConfig().transform_snowflake(input) + expected = { + "account": "123abc.us-east-7.aws", + "client_session_keep_alive": False, + "database": "AIRBYTE_DATABASE", + "query_tag": "normalization", + "role": "AIRBYTE_ROLE", + "schema": "AIRBYTE_SCHEMA", + "threads": 5, + "retry_all": True, + "retry_on_database_errors": True, + "connect_retries": 3, + "connect_timeout": 15, + "type": "snowflake", + "user": "AIRBYTE_USER", + "warehouse": "AIRBYTE_WAREHOUSE", + "authenticator": "oauth", + "oauth_client_id": "AIRBYTE_CLIENT_ID", + "oauth_client_secret": "AIRBYTE_CLIENT_SECRET", + "token": "AIRBYTE_REFRESH_TOKEN", + } + + assert expected == actual + assert extract_schema(actual) == "AIRBYTE_SCHEMA" + def test_transform_mysql(self): input = { "type": "mysql5", diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java index de20f4360f4c..891c550a7952 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -1020,6 +1020,18 @@ workerConfigs, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName( .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot); } + protected StandardCheckConnectionOutput.Status runCheckWithCatchedException(final JsonNode config) { + try { + final StandardCheckConnectionOutput standardCheckConnectionOutput = new DefaultCheckConnectionWorker( + workerConfigs, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null)) + .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot); + return standardCheckConnectionOutput.getStatus(); + } catch (final Exception e) { + LOGGER.error("Failed to check connection:" + e.getMessage()); + } + return Status.FAILED; + } + protected AirbyteDestination getDestination() { return new DefaultAirbyteDestination( workerConfigs, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null)); diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 8a7cae7a2d9f..2475447387bd 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=0.4.22 +LABEL io.airbyte.version=0.4.24 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 84895ea82253..d31f433dfc5b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -31,6 +31,7 @@ dependencies { implementation 'org.apache.commons:commons-csv:1.4' implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2' implementation "io.aesy:datasize:1.0.0" + implementation 'com.zaxxer:HikariCP:5.0.1' implementation 'com.azure:azure-storage-blob:12.12.0' implementation project(':airbyte-config:models') diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java index 359b843a2a1d..543ae21bd9f1 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java @@ -4,53 +4,169 @@ package io.airbyte.integrations.destination.snowflake; +import static java.util.stream.Collectors.joining; + import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.db.Databases; +import com.zaxxer.hikari.HikariDataSource; +import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; +import java.io.IOException; +import java.net.URI; +import java.net.URLEncoder; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublisher; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Base64; +import java.util.HashMap; import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * SnowflakeDatabase contains helpers to create connections to and run queries on Snowflake. */ public class SnowflakeDatabase { + private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDatabase.class); + private static final int PAUSE_BETWEEN_TOKEN_REFRESH_MIN = 7; // snowflake access token's TTL is 10min and can't be modified + private static final Duration NETWORK_TIMEOUT = Duration.ofMinutes(1); private static final Duration QUERY_TIMEOUT = Duration.ofHours(3); private static final SnowflakeSQLNameTransformer nameTransformer = new SnowflakeSQLNameTransformer(); private static final String DRIVER_CLASS_NAME = "net.snowflake.client.jdbc.SnowflakeDriver"; - private static DataSource createDataSource(final JsonNode config) { - final String host = config.get("host").asText(); + private static final String REFRESH_TOKEN_URL = "https://%s/oauth/token-request"; + private static final HttpClient httpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .connectTimeout(Duration.ofSeconds(10)) + .build(); + + private static HikariDataSource createDataSource(final JsonNode config) { + final HikariDataSource dataSource = new HikariDataSource(); + + final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:snowflake://%s/?", + config.get("host").asText())); final String username = config.get("username").asText(); - final String password = config.get("password").asText(); - final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:snowflake://%s/?", host)); + final Properties properties = new Properties(); + + final JsonNode credentials = config.get("credentials"); + if (credentials != null && credentials.has("auth_type") && "OAuth2.0".equals( + credentials.get("auth_type").asText())) { + LOGGER.debug("OAuth login mode is used"); + // OAuth login option is selected on UI + final String accessToken; + try { + // accessToken is only valid for 10 minutes. So we need to get a new one before processing new + // stream + accessToken = getAccessTokenUsingRefreshToken(config.get("host").asText(), + credentials.get("client_id").asText(), + credentials.get("client_secret").asText(), credentials.get("refresh_token").asText()); + } catch (IOException e) { + throw new RuntimeException(e); + } + properties.put("client_id", credentials.get("client_id").asText()); + properties.put("client_secret", credentials.get("client_secret").asText()); + properties.put("refresh_token", credentials.get("refresh_token").asText()); + properties.put("host", config.get("host").asText()); + properties.put("authenticator", "oauth"); + properties.put("token", accessToken); + // the username is required for DBT normalization in OAuth connection + properties.put("username", username); + + // thread to keep the refresh token up to date + SnowflakeDestination.SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(getRefreshTokenTask(dataSource), + PAUSE_BETWEEN_TOKEN_REFRESH_MIN, PAUSE_BETWEEN_TOKEN_REFRESH_MIN, TimeUnit.MINUTES); + + } else if (credentials != null && credentials.has("password")) { + LOGGER.debug("User/password login mode is used"); + // Username and pass login option is selected on UI + dataSource.setUsername(username); + dataSource.setPassword(credentials.get("password").asText()); + + } else { + LOGGER.warn( + "Obsolete User/password login mode is used. Please re-create a connection to use the latest connector's version"); + // case to keep the backward compatibility + dataSource.setUsername(username); + dataSource.setPassword(config.get("password").asText()); + } + + properties.put("warehouse", config.get("warehouse").asText()); + properties.put("database", config.get("database").asText()); + properties.put("role", config.get("role").asText()); + properties.put("schema", nameTransformer.getIdentifier(config.get("schema").asText())); + + properties.put("networkTimeout", Math.toIntExact(NETWORK_TIMEOUT.toSeconds())); + properties.put("queryTimeout", Math.toIntExact(QUERY_TIMEOUT.toSeconds())); + // allows queries to contain any number of statements. + properties.put("MULTI_STATEMENT_COUNT", 0); + + // https://docs.snowflake.com/en/user-guide/jdbc-parameters.html#application + // identify airbyte traffic to snowflake to enable partnership & optimization opportunities + properties.put("application", "airbyte"); + // Needed for JDK17 - see + // https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow + properties.put("JDBC_QUERY_RESULT_FORMAT", "JSON"); + + // https://docs.snowflake.com/en/user-guide/jdbc-configure.html#jdbc-driver-connection-string if (config.has("jdbc_url_params")) { jdbcUrl.append(config.get("jdbc_url_params").asText()); } - final Map properties = new ImmutableMap.Builder() - .put("warehouse", config.get("warehouse").asText()) - .put("database", config.get("database").asText()) - .put("role", config.get("role").asText()) - .put("schema", nameTransformer.getIdentifier(config.get("schema").asText())) - .put("networkTimeout", String.valueOf(Math.toIntExact(NETWORK_TIMEOUT.toSeconds()))) - .put("queryTimeout", String.valueOf(Math.toIntExact(QUERY_TIMEOUT.toSeconds()))) - // allows queries to contain any number of statements - .put("MULTI_STATEMENT_COUNT", "0") - // https://docs.snowflake.com/en/user-guide/jdbc-parameters.html#application - // identify airbyte traffic to snowflake to enable partnership & optimization opportunities - .put("application", "airbyte") - // Needed for JDK17 - see - // https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow - .put("JDBC_QUERY_RESULT_FORMAT", "JSON") - .build(); - - return Databases.createBasicDataSource(username, password, jdbcUrl.toString(), DRIVER_CLASS_NAME, properties); + dataSource.setDriverClassName(DRIVER_CLASS_NAME); + dataSource.setJdbcUrl(jdbcUrl.toString()); + dataSource.setDataSourceProperties(properties); + return dataSource; + } + + private static String getAccessTokenUsingRefreshToken(final String hostName, + final String clientId, + final String clientSecret, + final String refreshCode) + throws IOException { + final var refreshTokenUri = String.format(REFRESH_TOKEN_URL, hostName); + final Map requestBody = new HashMap<>(); + requestBody.put("grant_type", "refresh_token"); + requestBody.put("refresh_token", refreshCode); + + try { + final BodyPublisher bodyPublisher = BodyPublishers.ofString(requestBody.keySet().stream() + .map(key -> key + "=" + URLEncoder.encode(requestBody.get(key), StandardCharsets.UTF_8)) + .collect(joining("&"))); + + final byte[] authorization = Base64.getEncoder() + .encode((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8)); + + final HttpRequest request = HttpRequest.newBuilder() + .POST(bodyPublisher) + .uri(URI.create(refreshTokenUri)) + .header("Content-Type", "application/x-www-form-urlencoded") + .header("Accept", "application/json") + .header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8)) + .build(); + + final HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + final JsonNode jsonResponse = Jsons.deserialize(response.body()); + if (jsonResponse.has("access_token")) { + return jsonResponse.get("access_token").asText(); + } else { + throw new RuntimeException( + "Failed to obtain accessToken using refresh token. " + jsonResponse); + } + } catch (final InterruptedException e) { + throw new IOException("Failed to refreshToken", e); + } } public static JdbcDatabase getDatabase(final JsonNode config) { @@ -58,4 +174,22 @@ public static JdbcDatabase getDatabase(final JsonNode config) { return new DefaultJdbcDatabase(dataSource); } + private static Runnable getRefreshTokenTask(final HikariDataSource dataSource){ + return () -> { + LOGGER.info("Refresh token process started"); + var props = dataSource.getDataSourceProperties(); + try { + var token = getAccessTokenUsingRefreshToken(props.getProperty("host"), + props.getProperty("client_id"), props.getProperty("client_secret"), + props.getProperty("refresh_token")); + props.setProperty("token", token); + dataSource.setDataSourceProperties(props); + + LOGGER.info("New refresh token has been obtained"); + } catch (IOException e) { + LOGGER.error("Failed to obtain a fresh accessToken:" + e); + } + }; + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index 869dcd7900cf..4fe7030811f4 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -7,9 +7,13 @@ import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; public class SnowflakeDestination extends SwitchingDestination { + public static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1); + enum DestinationType { COPY_S3, COPY_GCS, @@ -24,6 +28,7 @@ public SnowflakeDestination() { public static void main(final String[] args) throws Exception { final Destination destination = new SnowflakeDestination(); new IntegrationRunner(destination).run(args); + SCHEDULED_EXECUTOR_SERVICE.shutdownNow(); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json index b9b42544bd78..a060b49bb97a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json @@ -8,15 +8,7 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Snowflake Destination Spec", "type": "object", - "required": [ - "host", - "role", - "warehouse", - "database", - "schema", - "username", - "password" - ], + "required": ["host", "role", "warehouse", "database", "schema", "username"], "additionalProperties": true, "properties": { "host": { @@ -64,11 +56,68 @@ "title": "Username", "order": 5 }, - "password": { - "description": "The password associated with the username.", - "type": "string", - "airbyte_secret": true, - "title": "Password", + "credentials": { + "title": "Authorization Method", + "type": "object", + "oneOf": [ + { + "type": "object", + "title": "OAuth2.0", + "order": 0, + "required": [ + "access_token", + "refresh_token" + ], + "properties": { + "auth_type": { + "type": "string", + "const": "OAuth2.0", + "enum": ["OAuth2.0"], + "default": "OAuth2.0", + "order": 0 + }, + "client_id": { + "type": "string", + "title": "Client ID", + "description": "The Client ID of your Drift developer application.", + "airbyte_secret": true + }, + "client_secret": { + "type": "string", + "title": "Client Secret", + "description": "The Client Secret of your Drift developer application.", + "airbyte_secret": true + }, + "access_token": { + "type": "string", + "title": "Access Token", + "description": "Access Token for making authenticated requests.", + "airbyte_secret": true + }, + "refresh_token": { + "type": "string", + "title": "Refresh Token", + "description": "Refresh Token for making authenticated requests.", + "airbyte_secret": true + } + } + }, + { + "title": "Username and Password", + "type": "object", + "required": ["password"], + "order": 1, + "properties": { + "password": { + "description": "The password associated with the username.", + "type": "string", + "airbyte_secret": true, + "title": "Password", + "order": 1 + } + } + } + ], "order": 6 }, "jdbc_url_params": { @@ -292,5 +341,61 @@ ] } } + }, + "advanced_auth": { + "auth_flow_type": "oauth2.0", + "predicate_key": ["credentials", "auth_type"], + "predicate_value": "OAuth2.0", + "oauth_config_specification": { + "oauth_user_input_from_connector_config_specification": { + "type": "object", + "properties": { + "host": { + "type": "string", + "path_in_connector_config": ["host"] + } + } + }, + "complete_oauth_output_specification": { + "type": "object", + "additionalProperties": false, + "properties": { + "access_token": { + "type": "string", + "path_in_connector_config": ["credentials", "access_token"] + }, + "refresh_token": { + "type": "string", + "path_in_connector_config": ["credentials", "refresh_token"] + } + } + }, + "complete_oauth_server_input_specification": { + "type": "object", + "additionalProperties": false, + "properties": { + "client_id": { + "type": "string" + }, + "client_secret": { + "type": "string" + } + } + }, + "complete_oauth_server_output_specification": { + "type": "object", + "additionalProperties": false, + "properties": { + "client_id": { + "type": "string", + "path_in_connector_config": ["credentials", "client_id"] + }, + "client_secret": { + "type": "string", + "path_in_connector_config": ["credentials", "client_secret"] + } + } + } + } } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java index a4a658e42286..28cb59fc4d44 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java @@ -14,6 +14,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; +import io.airbyte.config.StandardCheckConnectionOutput.Status; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; @@ -33,6 +34,7 @@ import java.util.Optional; import java.util.stream.Collectors; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -65,7 +67,7 @@ public JsonNode getStaticConfig() { @Override protected JsonNode getFailCheckConfig() { final JsonNode invalidConfig = Jsons.clone(config); - ((ObjectNode) invalidConfig).put("password", "wrong password"); + ((ObjectNode) invalidConfig.get("credentials")).put("password", "wrong password"); return invalidConfig; } @@ -169,6 +171,17 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception { database.close(); } + @Test + public void testBackwardCompatibilityAfterAddingOauth() { + final JsonNode deprecatedStyleConfig = Jsons.clone(config); + final JsonNode password = deprecatedStyleConfig.get("credentials").get("password"); + + ((ObjectNode) deprecatedStyleConfig).remove("credentials"); + ((ObjectNode) deprecatedStyleConfig).set("password", password); + + assertEquals(Status.SUCCEEDED, runCheckWithCatchedException(deprecatedStyleConfig)); + } + /** * This test is disabled because it is very slow, and should only be run manually for now. */ diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuth2Flow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuth2Flow.java index a9a8d1d9cd71..6a0e28f95389 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuth2Flow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuth2Flow.java @@ -47,6 +47,14 @@ public enum TOKEN_REQUEST_CONTENT_TYPE { String contentType; Function, String> converter; + public String getContentType() { + return contentType; + } + + public Function, String> getConverter() { + return converter; + } + TOKEN_REQUEST_CONTENT_TYPE(final String contentType, final Function, String> converter) { this.contentType = contentType; this.converter = converter; @@ -55,7 +63,7 @@ public enum TOKEN_REQUEST_CONTENT_TYPE { } protected final HttpClient httpClient; - private final TOKEN_REQUEST_CONTENT_TYPE tokenReqContentType; + protected final TOKEN_REQUEST_CONTENT_TYPE tokenReqContentType; private final Supplier stateSupplier; public BaseOAuth2Flow(final ConfigRepository configRepository, final HttpClient httpClient) { diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java index 2f01f2bc9d6a..72500f200a27 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java @@ -61,6 +61,7 @@ public OAuthImplementationFactory(final ConfigRepository configRepository, final .put("airbyte/source-mailchimp", new MailchimpOAuthFlow(configRepository, httpClient)) .put("airbyte/source-shopify", new ShopifyOAuthFlow(configRepository, httpClient)) .put("airbyte/source-tiktok-marketing", new TikTokMarketingOAuthFlow(configRepository, httpClient)) + .put("airbyte/destination-snowflake", new DestinationSnowflakeOAuthFlow(configRepository, httpClient)) .build(); } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/DestinationSnowflakeOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/DestinationSnowflakeOAuthFlow.java new file mode 100644 index 000000000000..cf85dae58d69 --- /dev/null +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/DestinationSnowflakeOAuthFlow.java @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.oauth.flows; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.oauth.BaseOAuth2Flow; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.utils.URIBuilder; + +public class DestinationSnowflakeOAuthFlow extends BaseOAuth2Flow { + + private static final String AUTHORIZE_URL = "https://%s/oauth/authorize"; + private static final String ACCESS_TOKEN_URL = "https://%s/oauth/token-request"; + + public DestinationSnowflakeOAuthFlow( + ConfigRepository configRepository, + HttpClient httpClient) { + super(configRepository, httpClient); + } + + @Override + protected String formatConsentUrl(UUID definitionId, + String clientId, + String redirectUrl, + JsonNode inputOAuthConfiguration) + throws IOException { + try { + return new URIBuilder( + String.format(AUTHORIZE_URL, extractAuthorizeUrl(inputOAuthConfiguration))) + .addParameter("client_id", clientId) + .addParameter("redirect_uri", redirectUrl) + .addParameter("response_type", "code") + .addParameter("state", getState()) + .build().toString(); + + } catch (final URISyntaxException e) { + throw new IOException("Failed to format Consent URL for OAuth flow", e); + } + } + + @Override + protected String getAccessTokenUrl(JsonNode inputOAuthConfiguration) { + return String.format(ACCESS_TOKEN_URL, extractTokenUrl(inputOAuthConfiguration)); + } + + @Override + protected String extractCodeParameter(Map queryParams) throws IOException { + return super.extractCodeParameter(queryParams); + } + + @Override + protected Map getAccessTokenQueryParameters(String clientId, + String clientSecret, + String authCode, + String redirectUrl) { + return ImmutableMap.builder() + // required + .put("grant_type", "authorization_code") + .put("code", authCode) + .put("redirect_uri", redirectUrl) + .build(); + } + + // -------------------------------------------- + @Override + protected Map completeOAuthFlow(final String clientId, + final String clientSecret, + final String authCode, + final String redirectUrl, + final JsonNode inputOAuthConfiguration, + final JsonNode oAuthParamConfig) + throws IOException { + final var accessTokenUrl = getAccessTokenUrl(inputOAuthConfiguration); + + final byte[] authorization = Base64.getEncoder() + .encode((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8)); + final HttpRequest request = HttpRequest.newBuilder() + .POST(HttpRequest.BodyPublishers + .ofString(tokenReqContentType.getConverter().apply( + getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl)))) + .uri(URI.create(accessTokenUrl)) + .header("Content-Type", tokenReqContentType.getContentType()) + .header("Accept", "application/json") + .header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8)) + .build(); + try { + final HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + return extractOAuthOutput(Jsons.deserialize(response.body()), accessTokenUrl); + } catch (final InterruptedException e) { + throw new IOException("Failed to complete OAuth flow", e); + } + } + + /** + * Extract all OAuth outputs from distant API response and store them in a flat map. + */ + protected Map extractOAuthOutput(final JsonNode data, final String accessTokenUrl) + throws IOException { + final Map result = new HashMap<>(); + // access_token is valid for only 10 minutes + if (data.has("access_token")) { + result.put("access_token", data.get("access_token").asText()); + } else { + throw new IOException(String.format("Missing 'access_token' in query params from %s", + accessTokenUrl)); + } + + if (data.has("refresh_token")) { + result.put("refresh_token", data.get("refresh_token").asText()); + } else { + throw new IOException(String.format("Missing 'refresh_token' in query params from %s", + accessTokenUrl)); + } + return result; + } + + private String extractAuthorizeUrl(JsonNode inputOAuthConfiguration) { + var url = inputOAuthConfiguration.get("host"); + return url == null ? StringUtils.EMPTY : url.asText(); + } + + private String extractTokenUrl(JsonNode inputOAuthConfiguration) { + var url = inputOAuthConfiguration.get("host"); + // var url = inputOAuthConfiguration.get("token_url"); + return url == null ? StringUtils.EMPTY : url.asText(); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java index bfec69ba6cf8..2e35902e99d4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java @@ -14,7 +14,7 @@ public class NormalizationRunnerFactory { public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.1.71"; + public static final String NORMALIZATION_VERSION = "0.1.72"; static final Map> NORMALIZATION_MAPPING = ImmutableMap.>builder() diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 7b93447d3357..d1ed9c18639e 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -142,6 +142,9 @@ To use Azure Blob Storage, you will need to [create a storage account](https://d Navigate to the Airbyte UI to set up Snowflake as a destination. You'll need the following information to configure the Snowflake destination: +#### There are 2 way ways of oauth supported: login\pass and oauth2. + +### Login and Password | Field | Description | |---|---| | [Host](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html) | The host domain of the snowflake instance (must include the account, region, cloud environment, and end with snowflakecomputing.com). Example: `accountname.us-east-2.aws.snowflakecomputing.com` | @@ -153,6 +156,20 @@ Navigate to the Airbyte UI to set up Snowflake as a destination. You'll need the | Password | The password associated with the username. | | [JDBC URL Params](https://docs.snowflake.com/en/user-guide/jdbc-parameters.html) (Optional) | Additional properties to pass to the JDBC URL string when connecting to the database formatted as `key=value` pairs separated by the symbol `&`. Example: `key1=value1&key2=value2&key3=value3` | + +### OAuth 2.0 +Field | Description | +|---|---| +| [Host](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html) | The host domain of the snowflake instance (must include the account, region, cloud environment, and end with snowflakecomputing.com). Example: `accountname.us-east-2.aws.snowflakecomputing.com` | +| [Role](https://docs.snowflake.com/en/user-guide/security-access-control-overview.html#roles) | The role you created in Step 1 for Airbyte to access Snowflake. Example: `AIRBYTE_ROLE` | +| [Warehouse](https://docs.snowflake.com/en/user-guide/warehouses-overview.html#overview-of-warehouses) | The warehouse you created in Step 1 for Airbyte to sync data into. Example: `AIRBYTE_WAREHOUSE` | +| [Database](https://docs.snowflake.com/en/sql-reference/ddl-database.html#database-schema-share-ddl) | The database you created in Step 1 for Airbyte to sync data into. Example: `AIRBYTE_DATABASE` | +| [Schema](https://docs.snowflake.com/en/sql-reference/ddl-database.html#database-schema-share-ddl) | The default schema used as the target schema for all statements issued from the connection that do not explicitly specify a schema name. | +| Username | The username you created in Step 1 to allow Airbyte to access the database. Example: `AIRBYTE_USER` | +| OAuth2 | The Login name and password to obtain auth token. | +| [JDBC URL Params](https://docs.snowflake.com/en/user-guide/jdbc-parameters.html) (Optional) | Additional properties to pass to the JDBC URL string when connecting to the database formatted as `key=value` pairs separated by the symbol `&`. Example: `key1=value1&key2=value2&key3=value3` | + + To use AWS S3 as the cloud storage, enter the information for the S3 bucket you created in Step 2: | Field | Description | @@ -218,6 +235,7 @@ Now that you have set up the Snowflake destination connector, check out the foll | Version | Date | Pull Request | Subject | |:--------|:-----------| :----- | :------ | +| 0.4.24 | 2022-03-24 | [\#11093](https://github.com/airbytehq/airbyte/pull/11093) | Added OAuth support | | 0.4.22 | 2022-03-18 | [\#10793](https://github.com/airbytehq/airbyte/pull/10793) | Fix namespace with invalid characters | | 0.4.21 | 2022-03-18 | [\#11071](https://github.com/airbytehq/airbyte/pull/11071) | Switch to compressed on-disk buffering before staging to s3/internal stage | | 0.4.20 | 2022-03-14 | [\#10341](https://github.com/airbytehq/airbyte/pull/10341) | Add Azure blob staging support | diff --git a/docs/understanding-airbyte/basic-normalization.md b/docs/understanding-airbyte/basic-normalization.md index 4a3fc774a5c1..3376aced6d2d 100644 --- a/docs/understanding-airbyte/basic-normalization.md +++ b/docs/understanding-airbyte/basic-normalization.md @@ -350,6 +350,7 @@ Therefore, in order to "upgrade" to the desired normalization version, you need | Airbyte Version | Normalization Version | Date | Pull Request | Subject | |:----------------| :--- | :--- | :--- | :--- | +| 0.35.59-alpha | 0.1.72 | 2022-03-24 | [\#11093](https://github.com/airbytehq/airbyte/pull/11093) | Added Snowflake OAuth2.0 support | | 0.35.53-alpha | 0.1.71 | 2022-03-14 | [\#11077](https://github.com/airbytehq/airbyte/pull/11077) | Enable BigQuery to handle project ID embedded inside dataset ID | | 0.35.49-alpha | 0.1.70 | 2022-03-11 | [\#11051](https://github.com/airbytehq/airbyte/pull/11051) | Upgrade dbt to 1.0.0 (except for MySQL and Oracle) | | 0.35.45-alpha | 0.1.69 | 2022-03-04 | [\#10754](https://github.com/airbytehq/airbyte/pull/10754) | Enable Clickhouse normalization over SSL |