Skip to content

Commit

Permalink
Supports scaling MySQL table which only contains unique index (#17786)
Browse files Browse the repository at this point in the history
* Load indexes in PipelineTableMetaDataLoader

* Load unique indexes only

* Add nullable into PipelineColumnMetaData

* Supports scaling table which only contains unique index

* Fix checkstyle

* Fix PipelineTableMetaDataTest

* Complete InventoryTaskSplitterTest

* Complete PipelineTableMetaDataLoaderTest
  • Loading branch information
TeslaCN authored May 19, 2022
1 parent a393962 commit c3e474f
Show file tree
Hide file tree
Showing 22 changed files with 202 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {

private String logicTableName;

// TODO rename to uniqueKey
private String primaryKey;
private String uniqueKey;

private Integer uniqueKeyDataType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public final class Column {

private final boolean updated;

private final boolean primaryKey;
private final boolean uniqueKey;

public Column(final String name, final Object value, final boolean updated, final boolean primaryKey) {
this(name, null, value, updated, primaryKey);
public Column(final String name, final Object value, final boolean updated, final boolean uniqueKey) {
this(name, null, value, updated, uniqueKey);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
*/
@Getter
@Setter
@EqualsAndHashCode(of = {"tableName", "primaryKeyValue"}, callSuper = false)
@EqualsAndHashCode(of = {"tableName", "uniqueKeyValue"}, callSuper = false)
@ToString
public final class DataRecord extends Record {

private final List<Column> columns;

private final List<Object> primaryKeyValue = new LinkedList<>();
private final List<Object> uniqueKeyValue = new LinkedList<>();

private final List<Object> oldPrimaryKeyValues = new ArrayList<>();
private final List<Object> oldUniqueKeyValues = new ArrayList<>();

private String type;

Expand All @@ -59,9 +59,9 @@ public DataRecord(final IngestPosition<?> position, final int columnCount) {
*/
public void addColumn(final Column data) {
columns.add(data);
if (data.isPrimaryKey()) {
primaryKeyValue.add(data.getValue());
oldPrimaryKeyValues.add(data.getOldValue());
if (data.isUniqueKey()) {
uniqueKeyValue.add(data.getValue());
oldUniqueKeyValues.add(data.getOldValue());
}
}

Expand Down Expand Up @@ -90,7 +90,7 @@ public Column getColumn(final int index) {
* @return key
*/
public Key getKey() {
return new Key(tableName, primaryKeyValue);
return new Key(tableName, uniqueKeyValue);
}

/**
Expand All @@ -99,7 +99,7 @@ public Key getKey() {
* @return key
*/
public Key getOldKey() {
return new Key(tableName, oldPrimaryKeyValues);
return new Key(tableName, oldUniqueKeyValues);
}

@EqualsAndHashCode
Expand All @@ -108,6 +108,6 @@ public static class Key {

private final String tableName;

private final List<Object> primaryKeyValues;
private final List<Object> uniqueKeyValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private void executeUpdate(final Connection connection, final DataRecord record)
}
for (int i = 0; i < conditionColumns.size(); i++) {
Column keyColumn = conditionColumns.get(i);
ps.setObject(updatedColumns.size() + i + 1, (keyColumn.isPrimaryKey() && keyColumn.isUpdated()) ? keyColumn.getOldValue() : keyColumn.getValue());
ps.setObject(updatedColumns.size() + i + 1, (keyColumn.isUniqueKey() && keyColumn.isUpdated()) ? keyColumn.getOldValue() : keyColumn.getValue());
}
int updateCount = ps.executeUpdate();
if (1 != updateCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key,
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
mergedDataRecord.addColumn(new Column(
dataRecord.getColumn(i).getName(),
dataRecord.getColumn(i).isPrimaryKey()
dataRecord.getColumn(i).isUniqueKey()
? beforeDataRecord.getColumn(i).getOldValue()
: beforeDataRecord.getColumn(i).getValue(),
true,
dataRecord.getColumn(i).isPrimaryKey()));
dataRecord.getColumn(i).isUniqueKey()));
}
mergedDataRecord.setTableName(dataRecord.getTableName());
mergedDataRecord.setType(IngestDataChangeType.DELETE);
Expand All @@ -153,12 +153,12 @@ private DataRecord mergeColumn(final DataRecord preDataRecord, final DataRecord
for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
result.addColumn(new Column(
curDataRecord.getColumn(i).getName(),
preDataRecord.getColumn(i).isPrimaryKey()
preDataRecord.getColumn(i).isUniqueKey()
? mergePrimaryKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
: null,
curDataRecord.getColumn(i).getValue(),
preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
curDataRecord.getColumn(i).isPrimaryKey()));
curDataRecord.getColumn(i).isUniqueKey()));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ protected void doStart() {
private void dump() {
String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
String firstSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey(), uniqueKeyDataType, true);
String laterSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey(), uniqueKeyDataType, false);
String firstSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), uniqueKeyDataType, true);
String laterSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), uniqueKeyDataType, false);
IngestPosition<?> position = dumperConfig.getPosition();
log.info("inventory dump, uniqueKeyDataType={}, firstSQL={}, laterSQL={}, position={}", uniqueKeyDataType, firstSQL, laterSQL, position);
if (position instanceof FinishedPosition) {
Expand Down Expand Up @@ -167,12 +167,12 @@ private Optional<Object> dump0(final Connection conn, final String sql, final in
record.setType(IngestDataChangeType.INSERT);
record.setTableName(logicTableName);
for (int i = 1; i <= metaData.getColumnCount(); i++) {
boolean isPrimaryKey = tableMetaData.isPrimaryKey(i - 1);
boolean isUniqueKey = tableMetaData.isUniqueKey(i - 1);
Object value = readValue(resultSet, i);
if (isPrimaryKey) {
if (isUniqueKey) {
maxUniqueKeyValue = value;
}
record.addColumn(new Column(metaData.getColumnName(i), value, true, isPrimaryKey));
record.addColumn(new Column(metaData.getColumnName(i), value, true, isUniqueKey));
}
pushRecord(record);
rowCount++;
Expand All @@ -198,8 +198,8 @@ private Object getPositionEndValue(final IngestPosition<?> position) {
}

private IngestPosition<?> newPosition(final ResultSet rs) throws SQLException {
return null == dumperConfig.getPrimaryKey() ? new PlaceholderPosition()
: PrimaryKeyPositionFactory.newInstance(rs.getObject(dumperConfig.getPrimaryKey()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
return null == dumperConfig.getUniqueKey() ? new PlaceholderPosition()
: PrimaryKeyPositionFactory.newInstance(rs.getObject(dumperConfig.getUniqueKey()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
}

protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand Down Expand Up @@ -108,17 +113,36 @@ private Map<TableName, PipelineTableMetaData> loadTableMetaData0(final Connectio
throw ex;
}
boolean primaryKey = primaryKeys.contains(columnName);
PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(ordinalPosition, columnName, dataType, dataTypeName, primaryKey);
boolean isNullable = "YES".equals(resultSet.getString("IS_NULLABLE"));
PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(ordinalPosition, columnName, dataType, dataTypeName, isNullable, primaryKey);
columnMetaDataMap.put(columnName, columnMetaData);
}
}
Map<TableName, PipelineTableMetaData> result = new LinkedHashMap<>();
for (Entry<String, Map<String, PipelineColumnMetaData>> entry : tablePipelineColumnMetaDataMap.entrySet()) {
result.put(new TableName(entry.getKey()), new PipelineTableMetaData(entry.getKey(), entry.getValue()));
String tableName = entry.getKey();
result.put(new TableName(tableName), new PipelineTableMetaData(tableName, entry.getValue(), loadIndexesOfTable(connection, schemaName, entry.getValue(), tableName)));
}
return result;
}

private Collection<PipelineIndexMetaData> loadIndexesOfTable(final Connection connection, final String schemaName, final Map<String, PipelineColumnMetaData> columns,
final String tableName) throws SQLException {
Map<String, PipelineIndexMetaData> result = new LinkedHashMap<>();
Map<String, SortedMap<Short, String>> orderedColumnsOfIndexes = new LinkedHashMap<>();
try (ResultSet resultSet = connection.getMetaData().getIndexInfo(connection.getCatalog(), schemaName, tableName, true, false)) {
while (resultSet.next()) {
String indexName = resultSet.getString("INDEX_NAME");
result.computeIfAbsent(indexName, unused -> new PipelineIndexMetaData(indexName, new LinkedList<>()));
orderedColumnsOfIndexes.computeIfAbsent(indexName, unused -> new TreeMap<>()).put(resultSet.getShort("ORDINAL_POSITION"), resultSet.getString("COLUMN_NAME"));
}
}
for (PipelineIndexMetaData each : result.values()) {
orderedColumnsOfIndexes.get(each.getName()).values().stream().map(columns::get).forEach(each.getColumns()::add);
}
return result.values();
}

private Set<String> loadPrimaryKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
Set<String> result = new LinkedHashSet<>();
// TODO order primary keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public final class PipelineColumnMetaData implements Comparable<PipelineColumnMe

private final String dataTypeName;

private final boolean nullable;

private final boolean primaryKey;

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.metadata.model;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.List;

/**
* Pipeline meta data of index.
*/
@RequiredArgsConstructor
@Getter
public final class PipelineIndexMetaData {

private final String name;

private final List<PipelineColumnMetaData> columns;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -47,14 +48,18 @@ public final class PipelineTableMetaData {
@Getter
private final List<String> primaryKeyColumns;

public PipelineTableMetaData(final String name, final Map<String, PipelineColumnMetaData> columnMetaDataMap) {
@Getter
private final Collection<PipelineIndexMetaData> uniqueIndexes;

public PipelineTableMetaData(final String name, final Map<String, PipelineColumnMetaData> columnMetaDataMap, final Collection<PipelineIndexMetaData> uniqueIndexes) {
this.name = name;
this.columnMetaDataMap = columnMetaDataMap;
List<PipelineColumnMetaData> columnMetaDataList = new ArrayList<>(columnMetaDataMap.values());
Collections.sort(columnMetaDataList);
columnNames = Collections.unmodifiableList(columnMetaDataList.stream().map(PipelineColumnMetaData::getName).collect(Collectors.toList()));
primaryKeyColumns = Collections.unmodifiableList(columnMetaDataList.stream().filter(PipelineColumnMetaData::isPrimaryKey)
.map(PipelineColumnMetaData::getName).collect(Collectors.toList()));
this.uniqueIndexes = Collections.unmodifiableCollection(uniqueIndexes);
}

/**
Expand Down Expand Up @@ -82,13 +87,17 @@ public PipelineColumnMetaData getColumnMetaData(final String columnName) {
}

/**
* Judge whether column is primary key or not.
* Judge whether column is unique key or not.
*
* @param columnIndex column index
* @return true if the column is primary key, otherwise false
* @return true if the column is unique key, otherwise false
*/
public boolean isPrimaryKey(final int columnIndex) {
return columnIndex < columnNames.size() && columnMetaDataMap.get(columnNames.get(columnIndex)).isPrimaryKey();
public boolean isUniqueKey(final int columnIndex) {
if (columnIndex >= columnNames.size()) {
return false;
}
String columnName = columnNames.get(columnIndex);
return columnMetaDataMap.get(columnName).isPrimaryKey() || (columnName.equals(uniqueIndexes.iterator().next().getColumns().get(0).getName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public final class RecordUtil {
public static List<Column> extractPrimaryColumns(final DataRecord dataRecord) {
List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
for (Column each : dataRecord.getColumns()) {
if (each.isPrimaryKey()) {
if (each.isUniqueKey()) {
result.add(each);
}
}
Expand All @@ -58,7 +58,7 @@ public static List<Column> extractPrimaryColumns(final DataRecord dataRecord) {
public static List<Column> extractConditionColumns(final DataRecord dataRecord, final Set<String> shardingColumns) {
List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
for (Column each : dataRecord.getColumns()) {
if (each.isPrimaryKey() || shardingColumns.contains(each.getName())) {
if (each.isUniqueKey() || shardingColumns.contains(each.getName())) {
result.add(each);
}
}
Expand Down
Loading

0 comments on commit c3e474f

Please sign in to comment.