Skip to content

Commit

Permalink
[Feature][Connector-V2][FTS] Improve codes
Browse files Browse the repository at this point in the history
  • Loading branch information
TyrantLucifer committed Mar 14, 2023
1 parent 77cbcd7 commit 3e3620b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ public void close() throws IOException {
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
synchronized (output.getCheckpointLock()) {
FlinkTableStoreSourceSplit split = sourceSplits.poll();
final FlinkTableStoreSourceSplit split = sourceSplits.poll();
if (Objects.nonNull(split)) {
// read logic
try (RecordReader<RowData> reader =
try (final RecordReader<RowData> reader =
table.newRead().createReader(split.getSplit())) {
RecordReaderIterator<RowData> rowIterator = new RecordReaderIterator<>(reader);
final RecordReaderIterator<RowData> rowIterator = new RecordReaderIterator<>(reader);
while (rowIterator.hasNext()) {
RowData row = rowIterator.next();
SeaTunnelRow seaTunnelRow = RowConverter.convert(row, seaTunnelRowType);
final RowData row = rowIterator.next();
final SeaTunnelRow seaTunnelRow = RowConverter.convert(row, seaTunnelRowType);
output.collect(seaTunnelRow);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private void assignSplit(int taskId) {
// if parallelism > 1, according to hashCode of split's id to determine whether to
// allocate the current task
for (FlinkTableStoreSourceSplit fileSourceSplit : pendingSplit) {
int splitOwner =
final int splitOwner =
getSplitOwner(fileSourceSplit.splitId(), context.currentParallelism());
if (splitOwner == taskId) {
currentTaskSplits.add(fileSourceSplit);
Expand All @@ -146,9 +146,9 @@ private void assignSplit(int taskId) {

/** Get all splits of table */
private Set<FlinkTableStoreSourceSplit> getTableSplits() {
Set<FlinkTableStoreSourceSplit> tableSplits = new HashSet<>();
final Set<FlinkTableStoreSourceSplit> tableSplits = new HashSet<>();
// TODO Support columns projection
List<Split> splits = table.newScan().plan().splits();
final List<Split> splits = table.newScan().plan().splits();
splits.forEach(split -> tableSplits.add(new FlinkTableStoreSourceSplit(split)));
return tableSplits;
}
Expand Down

0 comments on commit 3e3620b

Please sign in to comment.