Skip to content

Commit

Permalink
spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Dec 2, 2024
1 parent 9bcba4e commit 494b51a
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.subscription.it;

import org.apache.iotdb.session.Session;

import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;

Expand Down Expand Up @@ -51,9 +52,10 @@ public interface WrappedVoidSupplier {
}

public static void AWAIT_WITH_FLUSH(final Session session, final WrappedVoidSupplier assertions) {
AWAIT.untilAsserted(() -> {
session.executeQueryStatement("flush");
assertions.get();
});
AWAIT.untilAsserted(
() -> {
session.executeQueryStatement("flush");
assertions.get();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT_WITH_FLUSH;

/***
Expand Down Expand Up @@ -190,15 +189,17 @@ public void do_test()
session_src.executeNonQueryStatement("flush");
System.out.println("src: " + getCount(session_src, sql));

AWAIT_WITH_FLUSH(session_src, () -> {
check_count_non_strict(
9, "select count(s_0) from " + device, "Consumption data: s_0 " + device);
check_count_non_strict(
9, "select count(s_1) from " + device, "Consumption data: s_1 " + device);
check_count(0, "select count(s_0) from " + device2, "Consumption data: s_0 " + device2);
check_count(0, "select count(s_1) from " + device2, "Consumption data: s_1 " + device2);
check_count(0, "select count(s_0) from " + database2 + ".d_2", "Consumption data:d_2");
});
AWAIT_WITH_FLUSH(
session_src,
() -> {
check_count_non_strict(
9, "select count(s_0) from " + device, "Consumption data: s_0 " + device);
check_count_non_strict(
9, "select count(s_1) from " + device, "Consumption data: s_1 " + device);
check_count(0, "select count(s_0) from " + device2, "Consumption data: s_0 " + device2);
check_count(0, "select count(s_1) from " + device2, "Consumption data: s_1 " + device2);
check_count(0, "select count(s_0) from " + database2 + ".d_2", "Consumption data:d_2");
});

// Unsubscribe
consumer.unsubscribe(topicName);
Expand All @@ -213,14 +214,16 @@ public void do_test()

// Consumption data: Progress is not retained after cancellation and re-subscription. Full
// synchronization.
AWAIT_WITH_FLUSH(session_src, () -> {
check_count_non_strict(
11, "select count(s_0) from " + device, "consume data again: s_0 " + device);
check_count_non_strict(
11, "select count(s_1) from " + device, "Consumption data: s_1 " + device);
check_count(0, "select count(s_0) from " + device2, "Consumption data: s_0 " + device2);
check_count(0, "select count(s_1) from " + device2, "Consumption data: s_1 " + device2);
check_count(0, "select count(s_0) from " + database2 + ".d_2", "Consumption data:d_2");
});
AWAIT_WITH_FLUSH(
session_src,
() -> {
check_count_non_strict(
11, "select count(s_0) from " + device, "consume data again: s_0 " + device);
check_count_non_strict(
11, "select count(s_1) from " + device, "Consumption data: s_1 " + device);
check_count(0, "select count(s_0) from " + device2, "Consumption data: s_0 " + device2);
check_count(0, "select count(s_1) from " + device2, "Consumption data: s_1 " + device2);
check_count(0, "select count(s_0) from " + database2 + ".d_2", "Consumption data:d_2");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT_WITH_FLUSH;

/***
Expand Down Expand Up @@ -203,9 +202,11 @@ public void do_test()
thread.start();
thread.join();

AWAIT_WITH_FLUSH(session_src, () -> {
assertEquals(rowCount1.get(), 7, "pattern1");
assertEquals(rowCount2.get(), 5, "pattern2");
});
AWAIT_WITH_FLUSH(
session_src,
() -> {
assertEquals(rowCount1.get(), 7, "pattern1");
assertEquals(rowCount2.get(), 5, "pattern2");
});
}
}

0 comments on commit 494b51a

Please sign in to comment.