Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Postgres: Duplicate key error when two tables from different pg schemas have the same name. #9552

Closed
Tracked by #11845
alafanechere opened this issue Jan 17, 2022 · 5 comments · Fixed by #13855
Assignees
Labels

Comments

@alafanechere
Copy link
Contributor

alafanechere commented Jan 17, 2022

Environment

  • Airbyte version: example is 0.22.0-alpha
  • OS Version / Instance: Ubuntu
  • Deployment: example are Docker or Kubernetes deploy env
  • Source Connector and version: Postgres 0.4.2
  • Severity: Critical
  • Step where error happened: Update latest source schema

Current Behavior

When a database with multiple schemas has two tables with the same name in a different schema it breaks the discovery step.

airbyte-server      | 2022-01-17 12:00:41 ERROR i.a.s.e.UncaughtExceptionMapper(toResponse):22 - Uncaught exception
airbyte-server      | java.lang.IllegalStateException: Duplicate key account (attempted merging values class AirbyteStreamAndConfiguration {
airbyte-server      |     stream: class AirbyteStream {
airbyte-server      |         name: account
...
airbyte-server      |         supportedSyncModes: [full_refresh, incremental]
airbyte-server      |         sourceDefinedCursor: null
airbyte-server      |         defaultCursorField: []
airbyte-server      |         sourceDefinedPrimaryKey: [[id]]
airbyte-server      |         namespace: <my_first_pg_schema>
airbyte-server      |     }
airbyte-server      |     config: class AirbyteStreamConfiguration {
airbyte-server      |         syncMode: full_refresh
airbyte-server      |         cursorField: []
airbyte-server      |         destinationSyncMode: overwrite
airbyte-server      |         primaryKey: [[id]]
airbyte-server      |         aliasName: account
airbyte-server      |         selected: true
airbyte-server      |     }
airbyte-server      | } and class AirbyteStreamAndConfiguration {
airbyte-server      |     stream: class AirbyteStream {
airbyte-server      |         name: account
...
airbyte-server      |         supportedSyncModes: [full_refresh, incremental]
airbyte-server      |         sourceDefinedCursor: null
airbyte-server      |         defaultCursorField: []
airbyte-server      |         sourceDefinedPrimaryKey: [[id]]
airbyte-server      |         namespace: <my_second_pg_schema>
airbyte-server      |     }
airbyte-server      |     config: class AirbyteStreamConfiguration {
airbyte-server      |         syncMode: full_refresh
airbyte-server      |         cursorField: []
airbyte-server      |         destinationSyncMode: overwrite
airbyte-server      |         primaryKey: [[id]]
airbyte-server      |         aliasName: account
airbyte-server      |         selected: true
airbyte-server      |     }
airbyte-server      | })
airbyte-server      | 	at java.util.stream.Collectors.duplicateKeyException(Collectors.java:135) ~[?:?]
airbyte-server      | 	at java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:182) ~[?:?]
airbyte-server      | 	at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) ~[?:?]
airbyte-server      | 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
airbyte-server      | 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?]
airbyte-server      | 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[?:?]
airbyte-server      | 	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) ~[?:?]
airbyte-server      | 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
airbyte-server      | 	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) ~[?:?]

This is probably due to the fact that the maps of streams is structure by name in updateSchemaWithDiscovery in /airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java:

final Map<String, AirbyteStreamAndConfiguration> originalStreamsByName = original.getStreams()
        .stream()
        .collect(toMap(s -> s.getStream().getName(), s -> s));

Expected Behavior

The discovery step for this source should handle tables with the same name but in different namespaces.

Logs

server-logs.txt

Steps to Reproduce

  1. Create two tables with the same name but in different Postgres schema
  2. Create a connection using a Postgre source
  3. Run schema discovery

Related slack conversation

@alafanechere alafanechere added type/bug Something isn't working area/connectors Connector related issues priority/critical Critical priority! community connectors/source/postgres labels Jan 17, 2022
@alafanechere
Copy link
Contributor Author

@tuliren could you please confirm this issue is legit? Maybe it's already a known problem.
If it is I think it's not Postgres-specific, but global to namespaced sources.

@tuliren
Copy link
Contributor

tuliren commented Jan 17, 2022

Yes, it's a legit issue. But I don't think it is a general problem. In most places referring the stream names, we use AirbyteStreamNameNamespacePair instead of just a String. So I think we just need to update the key of the offending map to this class.


Actually AirbyteStreamNameNamespacePair is not available in this module. But this can be fixed with a simple Java record.

@smoussa
Copy link

smoussa commented Mar 30, 2022

Hey @tuliren, @alafanechere. Any updates on this? We are having to rebuild our connections again every time our table schemas change.

@grishick
Copy link
Contributor

grishick commented May 3, 2022

Hey team! Please add your planning poker estimate with ZenHub @tuliren @edgao

@grishick
Copy link
Contributor

grishick commented May 3, 2022

Please add your planning poker estimate with ZenHub @noahkawasaki-airbyte

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
No open projects
Status: No status
Development

Successfully merging a pull request may close this issue.

6 participants