Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription: support topic loose range for path and time #12760

Merged
merged 7 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,9 @@ public static void assertDataEventuallyOnEnv(
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.atMost(timeoutSeconds, TimeUnit.SECONDS)
.untilAsserted(
() -> {
Expand All @@ -720,6 +723,36 @@ public static void assertDataEventuallyOnEnv(
}
}

public static void assertDataEventuallyOnEnv(
BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult) {
assertDataEventuallyOnEnv(env, sql, expectedHeaderWithResult, 600);
}

public static void assertDataEventuallyOnEnv(
BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult, long timeoutSeconds) {
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.atMost(timeoutSeconds, TimeUnit.SECONDS)
.untilAsserted(
() -> {
try {
TestUtils.assertSingleResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeaderWithResult);
} catch (Exception e) {
Assert.fail();
}
});
} catch (Exception e) {
e.printStackTrace();
fail();
}
}

public static void assertDataAlwaysOnEnv(
BaseEnv env, String sql, String expectedHeader, Set<String> expectedResSet) {
assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10);
Expand All @@ -735,6 +768,9 @@ public static void assertDataAlwaysOnEnv(
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.atMost(consistentSeconds, TimeUnit.SECONDS)
.failFast(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,12 +1013,23 @@ public void testRealtimeLooseRange() throws Exception {
return;
}

TestUtils.tryExecuteNonQueriesWithRetry(
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"insert into root.db.d1 (time, at1)" + " values (5000, 1), (16000, 3)",
"insert into root.db.d1 (time, at1, at2)" + " values (5001, 1, 2), (6001, 3, 4)",
"flush"));
"flush"))) {
return;
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select count(at1) from root.db.d1 where time >= 2000 and time <= 10000",
new HashMap<String, String>() {
{
put("count(root.db.d1.at1)", "4");
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class IoTDBSubscriptionITConstant {

private static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
private static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
private static final long AWAITILITY_POLL_INTERVAL_SECOND = 1L;
private static final long AWAITILITY_AT_MOST_SECOND = 600L;

public static final ConditionFactory AWAIT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,8 @@ public void testTopicInvalidTimeRangeConfig() throws Exception {
@Test
public void testTopicInvalidPathConfig() throws Exception {
// Test invalid path when using tsfile format
// NOTE: Delete this test after the restriction "on path/time range/processor when subscribing
// to tsfile" is removed.
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put(TopicConstant.PATH_KEY, "root.db.*.s");
Expand All @@ -542,6 +544,8 @@ public void testTopicInvalidPathConfig() throws Exception {
@Test
public void testTopicInvalidProcessorConfig() throws Exception {
// Test invalid processor when using tsfile format
// NOTE: Delete this test after the restriction "on path/time range/processor when subscribing
// to tsfile" is removed.
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put("processor", "tumbling-time-sampling-processor");
Expand Down Expand Up @@ -578,6 +582,7 @@ public void testTopicWithQueryMode() throws Exception {
e.printStackTrace();
fail(e.getMessage());
}
assertTopicCount(1);

// Subscription
final AtomicInteger rowCount = new AtomicInteger();
Expand Down Expand Up @@ -652,6 +657,136 @@ public void testTopicWithQueryMode() throws Exception {
}
}

@Test
public void testTopicWithLooseRange() throws Exception {
// Insert some historical data on sender
try (final ISession session = senderEnv.getSessionConnection()) {
session.executeNonQueryStatement(
"insert into root.db.d1 (time, at1, at2) values (1000, 1, 2), (2000, 3, 4)");
session.executeNonQueryStatement(
"insert into root.db1.d1 (time, at1, at2) values (1000, 1, 2), (2000, 3, 4)");
session.executeNonQueryStatement(
"insert into root.db.d1 (time, at1, at2) values (3000, 1, 2), (4000, 3, 4)");
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}

// Create topic
final String topicName = "topic12";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
final Properties config = new Properties();
config.put(TopicConstant.LOOSE_RANGE_KEY, TopicConstant.LOOSE_RANGE_TIME_AND_PATH_VALUE);
config.put(TopicConstant.PATH_KEY, "root.db.d1.at1");
config.put(TopicConstant.START_TIME_KEY, "1500");
config.put(TopicConstant.END_TIME_KEY, "2500");
session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
assertTopicCount(1);

final AtomicBoolean dataPrepared = new AtomicBoolean(false);
final AtomicBoolean topicSubscribed = new AtomicBoolean(false);
final AtomicBoolean result = new AtomicBoolean(false);
final List<Thread> threads = new ArrayList<>();

// Subscribe on sender
threads.add(
new Thread(
() -> {
try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
.consumerId("c1")
.consumerGroupId("cg1")
.autoCommit(false)
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection()) {
consumer.open();
consumer.subscribe(topicName);
topicSubscribed.set(true);
while (!result.get()) {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time
if (dataPrepared.get()) {
final List<SubscriptionMessage> messages =
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
for (final SubscriptionMessage message : messages) {
for (final Iterator<Tablet> it =
message.getSessionDataSetsHandler().tabletIterator();
it.hasNext(); ) {
final Tablet tablet = it.next();
session.insertTablet(tablet);
}
}
consumer.commitSync(messages);
}
}
consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
} finally {
LOGGER.info("consumer exiting...");
}
},
String.format("%s - consumer", testName.getMethodName())));

// Insert some realtime data on sender
threads.add(
new Thread(
() -> {
while (!topicSubscribed.get()) {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time
}
try (final ISession session = senderEnv.getSessionConnection()) {
session.executeNonQueryStatement(
"insert into root.db.d1 (time, at1, at2) values (1001, 1, 2), (2001, 3, 4)");
session.executeNonQueryStatement(
"insert into root.db1.d1 (time, at1, at2) values (1001, 1, 2), (2001, 3, 4)");
session.executeNonQueryStatement(
"insert into root.db.d1 (time, at1, at2) values (3001, 1, 2), (4001, 3, 4)");
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
dataPrepared.set(true);
},
String.format("%s - data inserter", testName.getMethodName())));

for (final Thread thread : threads) {
thread.start();
}

try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
AWAIT.untilAsserted(
() ->
TestUtils.assertSingleResultSetEqual(
TestUtils.executeQueryWithRetry(
statement,
"select count(at1) from root.db.d1 where time >= 1500 and time <= 2500"),
new HashMap<String, String>() {
{
put("count(root.db.d1.at1)", "2");
}
}));
}

result.set(true);
for (final Thread thread : threads) {
thread.join();
}
}

private void testTopicInvalidRuntimeConfigTemplate(
final String topicName, final Properties config) throws Exception {
// Create topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class TopicConfig extends PipeParameters {

Expand All @@ -40,14 +43,6 @@ public TopicConfig(final Map<String, String> attributes) {
super(attributes);
}

private static final Map<String, String> LOOSE_RANGE_TIME_CONFIG =
new HashMap<String, String>() {
{
put("history.loose-range", "time");
put("realtime.loose-range", "time");
}
};

private static final Map<String, String> REALTIME_BATCH_MODE_CONFIG =
Collections.singletonMap("realtime.mode", "batch");
private static final Map<String, String> REALTIME_STREAM_MODE_CONFIG =
Expand All @@ -58,6 +53,15 @@ public TopicConfig(final Map<String, String> attributes) {
private static final Map<String, String> SUBSCRIBE_MODE_CONFIG =
Collections.singletonMap("mode", "subscribe");

private static final Set<String> LOOSE_RANGE_KEY_SET =
Collections.unmodifiableSet(
new HashSet<String>() {
{
add("history.loose-range");
add("realtime.loose-range");
}
});

/////////////////////////////// de/ser ///////////////////////////////

public void serialize(final DataOutputStream stream) throws IOException {
Expand Down Expand Up @@ -102,12 +106,6 @@ public Map<String, String> getAttributesWithTimeRange(final long creationTime) {
attributesWithTimeRange.put(TopicConstant.END_TIME_KEY, endTime);
}

// enable loose range when using tsfile format
if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(
attributes.getOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE))) {
attributesWithTimeRange.putAll(LOOSE_RANGE_TIME_CONFIG);
}

return attributesWithTimeRange;
}

Expand All @@ -125,6 +123,14 @@ public Map<String, String> getAttributesWithSourceMode() {
: SUBSCRIBE_MODE_CONFIG;
}

public Map<String, String> getAttributesWithSourceLooseRange() {
final String looseRangeValue =
attributes.getOrDefault(
TopicConstant.LOOSE_RANGE_KEY, TopicConstant.LOOSE_RANGE_DEFAULT_VALUE);
return LOOSE_RANGE_KEY_SET.stream()
.collect(Collectors.toMap(key -> key, key -> looseRangeValue));
}

public Map<String, String> getAttributesWithProcessorPrefix() {
final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
attributes.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public class TopicConstant {
public static final String FORMAT_TS_FILE_HANDLER_VALUE = "TsFileHandler";
public static final String FORMAT_DEFAULT_VALUE = FORMAT_SESSION_DATA_SETS_HANDLER_VALUE;

public static final String LOOSE_RANGE_KEY = "loose-range";
public static final String LOOSE_RANGE_TIME_VALUE = "time";
public static final String LOOSE_RANGE_PATH_VALUE = "path";
public static final String LOOSE_RANGE_TIME_AND_PATH_VALUE = "time,path";
public static final String LOOSE_RANGE_DEFAULT_VALUE = "";

private TopicConstant() {
throw new IllegalStateException("Utility class");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ public Map<String, String> generateExtractorAttributes() {
extractorAttributes.putAll(config.getAttributesWithRealtimeMode());
// source mode
extractorAttributes.putAll(config.getAttributesWithSourceMode());
// loose range
extractorAttributes.putAll(config.getAttributesWithSourceLooseRange());
return extractorAttributes;
}

Expand Down
Loading