Skip to content

Commit

Permalink
[#11392] Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 29, 2024
1 parent e3f0791 commit 8911fa1
Showing 1 changed file with 47 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ public <T> T execute(TableName tableName, AsyncTableCallback<T> action) {

@Override
public CompletableFuture<Void> put(TableName tableName, final Put put) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(put, "put");

CompletableFuture<Void> future = this.execute(tableName, new AsyncTableCallback<>() {
@Override
public CompletableFuture<Void> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
Expand All @@ -125,6 +128,9 @@ public CompletableFuture<Void> doInTable(AsyncTable<ScanResultConsumer> table) t

@Override
public List<CompletableFuture<Void>> put(TableName tableName, final List<Put> puts) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(puts, "puts");

List<CompletableFuture<Void>> futures = this.execute(tableName, new AsyncTableCallback<>() {
@Override
public List<CompletableFuture<Void>> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
Expand All @@ -137,6 +143,10 @@ public List<CompletableFuture<Void>> doInTable(AsyncTable<ScanResultConsumer> ta

@Override
public <T> CompletableFuture<T> get(TableName tableName, final Get get, final RowMapper<T> mapper) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(get, "get");
Objects.requireNonNull(mapper, "mapper");

CompletableFuture<T> futures = this.execute(tableName, new AsyncTableCallback<>() {
@Override
public CompletableFuture<T> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
Expand All @@ -159,6 +169,10 @@ public T apply(Result result) {

@Override
public <T> List<CompletableFuture<T>> get(TableName tableName, final List<Get> gets, final RowMapper<T> mapper) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(gets, "gets");
Objects.requireNonNull(mapper, "mapper");

List<CompletableFuture<T>> futures = this.execute(tableName, new AsyncTableCallback<>() {
@Override
public List<CompletableFuture<T>> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
Expand All @@ -185,6 +199,9 @@ public T apply(Result result) {

@Override
public CompletableFuture<Void> delete(TableName tableName, final Delete delete) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(delete, "delete");

CompletableFuture<Void> futures = this.execute(tableName, new AsyncTableCallback<>() {
@Override
public CompletableFuture<Void> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
Expand All @@ -198,6 +215,9 @@ public CompletableFuture<Void> doInTable(AsyncTable<ScanResultConsumer> table) t

@Override
public List<CompletableFuture<Result>> increment(final TableName tableName, final List<Increment> incrementList) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(incrementList, "incrementList");

List<CompletableFuture<Result>> futures = execute(tableName, new AsyncTableCallback<List<CompletableFuture<Result>>>() {
@Override
public List<CompletableFuture<Result>> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
Expand All @@ -211,6 +231,9 @@ public List<CompletableFuture<Result>> doInTable(AsyncTable<ScanResultConsumer>

@Override
public CompletableFuture<Result> increment(final TableName tableName, Increment increment) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(increment, "increment");

CompletableFuture<Result> future = execute(tableName, new AsyncTableCallback<CompletableFuture<Result>>() {
@Override
public CompletableFuture<Result> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
Expand All @@ -223,7 +246,10 @@ public CompletableFuture<Result> doInTable(AsyncTable<ScanResultConsumer> table)


@Override
public CompletableFuture<CasResult> maxColumnValue(TableName tableName, CheckAndMax max) {
public CompletableFuture<CasResult> maxColumnValue(final TableName tableName, CheckAndMax max) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(max, "max");

CompletableFuture<CasResult> result = this.execute(tableName, new AsyncTableCallback<>() {
@Override
public CompletableFuture<CasResult> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
Expand Down Expand Up @@ -256,7 +282,9 @@ public CompletableFuture<CasResult> apply(CheckAndMutateResult checkAndMutateRes

}

public List<CompletableFuture<CasResult>> maxColumnValue(TableName tableName, List<CheckAndMax> maxs) {
public List<CompletableFuture<CasResult>> maxColumnValue(final TableName tableName, List<CheckAndMax> maxs) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(maxs, "maxs");

List<CheckAndMutate> checkAndMutates = new ArrayList<>(maxs.size());
for (CheckAndMax max : maxs) {
Expand Down Expand Up @@ -287,6 +315,9 @@ public List<CompletableFuture<CasResult>> doInTable(AsyncTable<ScanResultConsume

@Override
public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate(TableName tableName, List<CheckAndMutate> checkAndMutates) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(checkAndMutates, "checkAndMutates");

List<CompletableFuture<CheckAndMutateResult>> futures = execute(tableName, new AsyncTableCallback<>() {
@Override
public List<CompletableFuture<CheckAndMutateResult>> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
Expand All @@ -298,6 +329,10 @@ public List<CompletableFuture<CheckAndMutateResult>> doInTable(AsyncTable<ScanRe
}

public <T> List<T> findParallel(final TableName tableName, final List<Scan> scans, final ResultsExtractor<T> action) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(scans, "scans");
Objects.requireNonNull(action, "action");

return execute(tableName, new AsyncTableCallback<>() {
@Override
public List<T> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
Expand All @@ -314,6 +349,11 @@ public List<T> doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable
}

public <T> T executeDistributedScan(TableName tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor<T> action) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(scan, "scan");
Objects.requireNonNull(rowKeyDistributor, "rowKeyDistributor");
Objects.requireNonNull(action, "action");

final T result = execute(tableName, new AsyncTableCallback<>() {
@Override
public T doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
Expand Down Expand Up @@ -341,6 +381,11 @@ public T doInTable(AsyncTable<ScanResultConsumer> table) throws Throwable {
}

public <T> T executeParallelDistributedScan(TableName tableName, Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor<T> action, int numParallelThreads) {
Objects.requireNonNull(tableName, "tableName");
Objects.requireNonNull(scan, "scan");
Objects.requireNonNull(rowKeyDistributor, "rowKeyDistributor");
Objects.requireNonNull(action, "action");

try {
StopWatch watch = StopWatch.createStarted();

Expand Down

0 comments on commit 8911fa1

Please sign in to comment.