Skip to content

Commit

Permalink
Fix typo
Browse files Browse the repository at this point in the history
Signed-off-by: Jiandong Chen <cjd19940801@gmail.com>
  • Loading branch information
jiandongchen committed Apr 18, 2024
1 parent 56feac7 commit 9f93df0
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,20 +423,20 @@ private void validateTableStructure(TableSchema flinkSchema) {
throw new IllegalArgumentException("Couldn't get the sink table's column info.");
}
// validate primary keys
List<String> primayKeys = new ArrayList<>();
List<String> primaryKeys = new ArrayList<>();
for (int i = 0; i < rows.size(); i++) {
String keysType = rows.get(i).get("COLUMN_KEY").toString();
if (!"PRI".equals(keysType)) {
continue;
}
primayKeys.add(rows.get(i).get("COLUMN_NAME").toString().toLowerCase());
primaryKeys.add(rows.get(i).get("COLUMN_NAME").toString().toLowerCase());
}
if (!primayKeys.isEmpty()) {
if (!primaryKeys.isEmpty()) {
if (!constraint.isPresent()) {
throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`.");
}
if (constraint.get().getColumns().size() != primayKeys.size() ||
!constraint.get().getColumns().stream().allMatch(col -> primayKeys.contains(col.toLowerCase()))) {
if (constraint.get().getColumns().size() != primaryKeys.size() ||
!constraint.get().getColumns().stream().allMatch(col -> primaryKeys.contains(col.toLowerCase()))) {
throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table.");
}
sinkOptions.enableUpsertDelete();
Expand Down

0 comments on commit 9f93df0

Please sign in to comment.