diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java index 1d0ef260ed48..9aa01ad4acfb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java @@ -36,19 +36,20 @@ public void setUp() { senderEnv = MultiEnvFactory.getEnv(0); receiverEnv = MultiEnvFactory.getEnv(1); + setUpConfig(); + + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); + } + + void setUpConfig() { // enable auto create schema senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); - // for IoTDBSubscriptionConsumerGroupIT - receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true); - // 10 min, assert that the operations will not time out senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000); receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000); - - senderEnv.initClusterEnvironment(); - receiverEnv.initClusterEnvironment(); } @After diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java index 07d5b4ed791b..992d151520f9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java @@ -108,6 +108,15 @@ static final class SubscriptionInfo { } } + @Override + void setUpConfig() { + super.setUpConfig(); + + // Enable air gap receiver + receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true); + } + + @Override @Before public void setUp() { super.setUp(); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java new file mode 100644 index 000000000000..2b1cc407b7f7 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it.dual; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; +import org.apache.iotdb.session.subscription.SubscriptionSession; +import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; + +import org.apache.tsfile.write.record.Tablet; +import org.awaitility.Awaitility; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2Subscription.class}) +public class IoTDBSubscriptionTimePrecisionIT extends AbstractSubscriptionDualIT { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBSubscriptionTimePrecisionIT.class); + + @Override + void setUpConfig() { + super.setUpConfig(); + + // Set timestamp precision to nanosecond + senderEnv.getConfig().getCommonConfig().setTimestampPrecision("ns"); + receiverEnv.getConfig().getCommonConfig().setTimestampPrecision("ns"); + } + + @Test + public void testTopicTimePrecision() throws Exception { + final String host = senderEnv.getIP(); + final int port = Integer.parseInt(senderEnv.getPort()); + + // Insert some historical data on sender + final long currentTime1 = System.currentTimeMillis() * 1000_000L; // in nanosecond + try (final ISession session = senderEnv.getSessionConnection()) { + for (int i = 0; i < 100; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); + session.executeNonQueryStatement( + String.format("insert into root.db.d1(time, s2) values (%s, 1)", currentTime1 - i)); + } + session.executeNonQueryStatement("flush"); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Create topic on sender + final String topic1 = "topic1"; + final String topic2 = "topic2"; + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + { + final Properties config = new Properties(); + config.put(TopicConstant.START_TIME_KEY, currentTime1 - 99); + config.put( + TopicConstant.END_TIME_KEY, + TopicConstant.NOW_TIME_VALUE); // now should be strictly larger than current time 1 + session.createTopic(topic1, config); + } + { + final Properties config = new Properties(); + config.put( + TopicConstant.START_TIME_KEY, + TopicConstant.NOW_TIME_VALUE); // now should be strictly smaller than current time 2 + session.createTopic(topic2, config); + } + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Insert some historical data on sender again + final long currentTime2 = System.currentTimeMillis() * 1000_000L; // in nanosecond + try (final ISession session = senderEnv.getSessionConnection()) { + for (int i = 0; i < 100; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d2(time, s1) values (%s, 1)", currentTime2 + i)); + } + session.executeNonQueryStatement("flush"); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Subscribe on sender and insert on receiver + final AtomicBoolean isClosed = new AtomicBoolean(false); + final Thread thread = + new Thread( + () -> { + try (final SubscriptionPullConsumer consumer = + new SubscriptionPullConsumer.Builder() + .host(host) + .port(port) + .consumerId("c1") + .consumerGroupId("cg1") + .autoCommit(false) + .buildPullConsumer(); + final ISession session = receiverEnv.getSessionConnection()) { + consumer.open(); + consumer.subscribe(topic1, topic2); + while (!isClosed.get()) { + LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time + final List messages = + consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); + for (final SubscriptionMessage message : messages) { + for (final Iterator it = + message.getSessionDataSetsHandler().tabletIterator(); + it.hasNext(); ) { + final Tablet tablet = it.next(); + session.insertTablet(tablet); + } + } + consumer.commitSync(messages); + } + // Auto unsubscribe topics + } catch (final Exception e) { + e.printStackTrace(); + // Avoid failure + } finally { + LOGGER.info("consumer exiting..."); + } + }); + thread.start(); + + // Check data on receiver + try { + try (final Connection connection = receiverEnv.getConnection(); + final Statement statement = connection.createStatement()) { + // Keep retrying if there are execution failures + Awaitility.await() + .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS) + .pollInterval( + IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS) + .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertSingleResultSetEqual( + TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"), + new HashMap() { + { + put("count(root.db.d1.s2)", "100"); + put("count(root.db.d2.s1)", "100"); + } + })); + } + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + isClosed.set(true); + thread.join(); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java index 7091c93b4dbc..0cd6bc0d0a10 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java @@ -506,7 +506,7 @@ public void testTopicInvalidTimeRangeConfig() throws Exception { session.open(); final Properties properties = new Properties(); properties.put(TopicConstant.START_TIME_KEY, "2024-01-32"); - properties.put(TopicConstant.END_TIME_KEY, "now"); + properties.put(TopicConstant.END_TIME_KEY, TopicConstant.NOW_TIME_VALUE); session.createTopic("topic1", properties); fail(); } catch (final Exception ignored) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java index ed3d59bd3d48..afdbfe244d72 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java @@ -20,7 +20,9 @@ package org.apache.iotdb.confignode.procedure.impl.subscription.topic; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; +import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -76,7 +78,9 @@ protected void executeFromValidate(ConfigNodeProcedureEnv env) throws Subscripti topicMeta = new TopicMeta( createTopicReq.getTopicName(), - System.currentTimeMillis(), + CommonDateTimeUtils.convertMilliTimeWithPrecision( + System.currentTimeMillis(), + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), createTopicReq.getTopicAttributes()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index b5bff97ec98a..dc86e82e361d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -144,11 +144,13 @@ public void validate(final PipeParameterValidator validator) { if (historicalDataExtractionStartTime > historicalDataExtractionEndTime) { throw new PipeParameterNotValidException( String.format( - "%s or %s should be less than or equal to %s or %s.", + "%s (%s) [%s] should be less than or equal to %s (%s) [%s].", SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY, + historicalDataExtractionStartTime, SOURCE_END_TIME_KEY, - EXTRACTOR_END_TIME_KEY)); + EXTRACTOR_END_TIME_KEY, + historicalDataExtractionEndTime)); } } catch (final Exception e) { // compatible with the current validation framework @@ -191,11 +193,13 @@ public void validate(final PipeParameterValidator validator) { if (historicalDataExtractionStartTime > historicalDataExtractionEndTime) { throw new PipeParameterNotValidException( String.format( - "%s (%s) should be less than or equal to %s (%s).", + "%s (%s) [%s] should be less than or equal to %s (%s) [%s].", EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY, + historicalDataExtractionStartTime, EXTRACTOR_HISTORY_END_TIME_KEY, - SOURCE_HISTORY_END_TIME_KEY)); + SOURCE_HISTORY_END_TIME_KEY, + historicalDataExtractionEndTime)); } } catch (final Exception e) { // Compatible with the current validation framework diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index d809bd72a818..ead1ef1d849d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -133,11 +133,13 @@ public void validate(final PipeParameterValidator validator) throws Exception { if (realtimeDataExtractionStartTime > realtimeDataExtractionEndTime) { throw new PipeParameterNotValidException( String.format( - "%s or %s should be less than or equal to %s or %s.", + "%s (%s) [%s] should be less than or equal to %s (%s) [%s].", SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY, + realtimeDataExtractionStartTime, SOURCE_END_TIME_KEY, - EXTRACTOR_END_TIME_KEY)); + EXTRACTOR_END_TIME_KEY, + realtimeDataExtractionEndTime)); } } catch (final Exception e) { // compatible with the current validation framework @@ -302,7 +304,7 @@ protected void extractHeartbeat(final PipeRealtimeEvent event) { // Record the pending queue size before trying to put heartbeatEvent into queue ((PipeHeartbeatEvent) event.getEvent()).recordExtractorQueueSize(pendingQueue); - Event lastEvent = pendingQueue.peekLast(); + final Event lastEvent = pendingQueue.peekLast(); if (lastEvent instanceof PipeRealtimeEvent && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent && (((PipeHeartbeatEvent) ((PipeRealtimeEvent) lastEvent).getEvent()).isShouldPrintMessage() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index d0ebc9333823..cb4d6f57fb6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -49,6 +49,7 @@ import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager; import org.apache.iotdb.commons.udf.service.UDFClassLoader; import org.apache.iotdb.commons.udf.service.UDFExecutableManager; +import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; @@ -1917,7 +1918,12 @@ public SettableFuture createTopic(CreateTopicStatement createT // Validate topic config final TopicMeta temporaryTopicMeta = - new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes); + new TopicMeta( + topicName, + CommonDateTimeUtils.convertMilliTimeWithPrecision( + System.currentTimeMillis(), + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), + topicAttributes); try { PipeAgent.plugin().validateExtractor(temporaryTopicMeta.generateExtractorAttributes()); PipeAgent.plugin().validateProcessor(temporaryTopicMeta.generateProcessorAttributes()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java index 33f5065cbae6..a0e80db04e06 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java @@ -39,7 +39,7 @@ public class TopicMeta { private String topicName; - private long creationTime; + private long creationTime; // raw timestamp based on system timestamp precision private TopicConfig config; private Set subscribedConsumerGroupIds;