From dcb16e72b2cf8d03849412c13b3e03fec11d98f5 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Wed, 5 Jun 2024 12:51:43 +0800 Subject: [PATCH 1/6] Subscription: fix topic now timestamp precision --- .../it/dual/AbstractSubscriptionDualIT.java | 13 +++++++------ .../it/dual/IoTDBSubscriptionConsumerGroupIT.java | 9 +++++++++ .../it/dual/IoTDBSubscriptionTopicIT.java | 2 +- .../subscription/topic/CreateTopicProcedure.java | 6 +++++- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java index 1d0ef260ed48..9aa01ad4acfb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java @@ -36,19 +36,20 @@ public void setUp() { senderEnv = MultiEnvFactory.getEnv(0); receiverEnv = MultiEnvFactory.getEnv(1); + setUpConfig(); + + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); + } + + void setUpConfig() { // enable auto create schema senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); - // for IoTDBSubscriptionConsumerGroupIT - receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true); - // 10 min, assert that the operations will not time out senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000); receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000); - - senderEnv.initClusterEnvironment(); - receiverEnv.initClusterEnvironment(); } @After diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java index 07d5b4ed791b..992d151520f9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java @@ -108,6 +108,15 @@ static final class SubscriptionInfo { } } + @Override + void setUpConfig() { + super.setUpConfig(); + + // Enable air gap receiver + receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true); + } + + @Override @Before public void setUp() { super.setUp(); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java index 7091c93b4dbc..0cd6bc0d0a10 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java @@ -506,7 +506,7 @@ public void testTopicInvalidTimeRangeConfig() throws Exception { session.open(); final Properties properties = new Properties(); properties.put(TopicConstant.START_TIME_KEY, "2024-01-32"); - properties.put(TopicConstant.END_TIME_KEY, "now"); + properties.put(TopicConstant.END_TIME_KEY, TopicConstant.NOW_TIME_VALUE); session.createTopic("topic1", properties); fail(); } catch (final Exception ignored) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java index ed3d59bd3d48..afdbfe244d72 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java @@ -20,7 +20,9 @@ package org.apache.iotdb.confignode.procedure.impl.subscription.topic; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; +import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -76,7 +78,9 @@ protected void executeFromValidate(ConfigNodeProcedureEnv env) throws Subscripti topicMeta = new TopicMeta( createTopicReq.getTopicName(), - System.currentTimeMillis(), + CommonDateTimeUtils.convertMilliTimeWithPrecision( + System.currentTimeMillis(), + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), createTopicReq.getTopicAttributes()); } From 49cc9ee7310f0137514a710d35bb29651420007d Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Wed, 5 Jun 2024 12:52:26 +0800 Subject: [PATCH 2/6] add IT --- .../IoTDBSubscriptionTimePrecisionIT.java | 182 ++++++++++++++++++ 1 file changed, 182 insertions(+) create mode 100644 integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java new file mode 100644 index 000000000000..ca6a3a875804 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it.dual; + +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; +import org.apache.iotdb.session.subscription.SubscriptionSession; +import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; + +import org.apache.tsfile.write.record.Tablet; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2Subscription.class}) +public class IoTDBSubscriptionTimePrecisionIT extends AbstractSubscriptionDualIT { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBSubscriptionTimePrecisionIT.class); + + @Override + void setUpConfig() { + super.setUpConfig(); + + // Set timestamp precision to nanosecond + senderEnv.getConfig().getCommonConfig().setTimestampPrecision("ns"); + receiverEnv.getConfig().getCommonConfig().setTimestampPrecision("ns"); + } + + @Test + public void testTopicTimePrecision() throws Exception { + // Insert some historical data on sender + final long currentTime = System.currentTimeMillis() * 1000_000L; // in nanosecond + try (final ISession session = senderEnv.getSessionConnection()) { + for (int i = 0; i < 100; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d1(time, s) values (%s, 1)", i)); + session.executeNonQueryStatement( + String.format("insert into root.db.d2(time, s) values (%s, 1)", currentTime - i)); + } + session.executeNonQueryStatement("flush"); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Create topic on sender + final String topicName = "topic0"; + final String host = senderEnv.getIP(); + final int port = Integer.parseInt(senderEnv.getPort()); + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.END_TIME_KEY, TopicConstant.NOW_TIME_VALUE); + session.createTopic(topicName, config); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + assertTopicCount(1); + + // Subscribe on sender and insert on receiver + final AtomicBoolean isClosed = new AtomicBoolean(false); + final Thread thread = + new Thread( + () -> { + try (final SubscriptionPullConsumer consumer = + new SubscriptionPullConsumer.Builder() + .host(host) + .port(port) + .consumerId("c1") + .consumerGroupId("cg1") + .autoCommit(false) + .buildPullConsumer(); + final ISession session = receiverEnv.getSessionConnection()) { + consumer.open(); + consumer.subscribe(topicName); + while (!isClosed.get()) { + LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time + final List messages = + consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); + for (final SubscriptionMessage message : messages) { + for (final Iterator it = + message.getSessionDataSetsHandler().tabletIterator(); + it.hasNext(); ) { + final Tablet tablet = it.next(); + session.insertTablet(tablet); + } + } + consumer.commitSync(messages); + } + consumer.unsubscribe(topicName); + } catch (final Exception e) { + e.printStackTrace(); + // Avoid failure + } finally { + LOGGER.info("consumer exiting..."); + } + }); + thread.start(); + + // Check data on receiver + try { + try (final Connection connection = receiverEnv.getConnection(); + final Statement statement = connection.createStatement()) { + // Keep retrying if there are execution failures + Awaitility.await() + .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, TimeUnit.SECONDS) + .pollInterval( + IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, TimeUnit.SECONDS) + .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertSingleResultSetEqual( + TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"), + new HashMap() { + { + put("count(root.db.d1.s)", "100"); + put("count(root.db.d2.s)", "100"); + } + })); + } + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + isClosed.set(true); + thread.join(); + } + } + + private void assertTopicCount(final int count) throws Exception { + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List showTopicResult = + client.showTopic(new TShowTopicReq()).topicInfoList; + Assert.assertEquals(count, showTopicResult.size()); + } + } +} From d5bac48ef5655cc80b62dbc6be0523c359f7181b Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Wed, 5 Jun 2024 17:02:52 +0800 Subject: [PATCH 3/6] improve --- .../IoTDBSubscriptionTimePrecisionIT.java | 67 +++++++++++-------- ...peHistoricalDataRegionTsFileExtractor.java | 12 ++-- .../PipeRealtimeDataRegionExtractor.java | 6 +- .../executor/ClusterConfigTaskExecutor.java | 8 ++- 4 files changed, 59 insertions(+), 34 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java index ca6a3a875804..2b1cc407b7f7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java @@ -19,9 +19,6 @@ package org.apache.iotdb.subscription.it.dual; -import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; -import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo; -import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq; import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.isession.ISession; import org.apache.iotdb.it.framework.IoTDBTestRunner; @@ -34,7 +31,6 @@ import org.apache.tsfile.write.record.Tablet; import org.awaitility.Awaitility; -import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -71,14 +67,17 @@ void setUpConfig() { @Test public void testTopicTimePrecision() throws Exception { + final String host = senderEnv.getIP(); + final int port = Integer.parseInt(senderEnv.getPort()); + // Insert some historical data on sender - final long currentTime = System.currentTimeMillis() * 1000_000L; // in nanosecond + final long currentTime1 = System.currentTimeMillis() * 1000_000L; // in nanosecond try (final ISession session = senderEnv.getSessionConnection()) { for (int i = 0; i < 100; ++i) { session.executeNonQueryStatement( - String.format("insert into root.db.d1(time, s) values (%s, 1)", i)); + String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); session.executeNonQueryStatement( - String.format("insert into root.db.d2(time, s) values (%s, 1)", currentTime - i)); + String.format("insert into root.db.d1(time, s2) values (%s, 1)", currentTime1 - i)); } session.executeNonQueryStatement("flush"); } catch (final Exception e) { @@ -87,19 +86,42 @@ public void testTopicTimePrecision() throws Exception { } // Create topic on sender - final String topicName = "topic0"; - final String host = senderEnv.getIP(); - final int port = Integer.parseInt(senderEnv.getPort()); + final String topic1 = "topic1"; + final String topic2 = "topic2"; try (final SubscriptionSession session = new SubscriptionSession(host, port)) { session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.END_TIME_KEY, TopicConstant.NOW_TIME_VALUE); - session.createTopic(topicName, config); + { + final Properties config = new Properties(); + config.put(TopicConstant.START_TIME_KEY, currentTime1 - 99); + config.put( + TopicConstant.END_TIME_KEY, + TopicConstant.NOW_TIME_VALUE); // now should be strictly larger than current time 1 + session.createTopic(topic1, config); + } + { + final Properties config = new Properties(); + config.put( + TopicConstant.START_TIME_KEY, + TopicConstant.NOW_TIME_VALUE); // now should be strictly smaller than current time 2 + session.createTopic(topic2, config); + } + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Insert some historical data on sender again + final long currentTime2 = System.currentTimeMillis() * 1000_000L; // in nanosecond + try (final ISession session = senderEnv.getSessionConnection()) { + for (int i = 0; i < 100; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d2(time, s1) values (%s, 1)", currentTime2 + i)); + } + session.executeNonQueryStatement("flush"); } catch (final Exception e) { e.printStackTrace(); fail(e.getMessage()); } - assertTopicCount(1); // Subscribe on sender and insert on receiver final AtomicBoolean isClosed = new AtomicBoolean(false); @@ -116,7 +138,7 @@ public void testTopicTimePrecision() throws Exception { .buildPullConsumer(); final ISession session = receiverEnv.getSessionConnection()) { consumer.open(); - consumer.subscribe(topicName); + consumer.subscribe(topic1, topic2); while (!isClosed.get()) { LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time final List messages = @@ -131,7 +153,7 @@ public void testTopicTimePrecision() throws Exception { } consumer.commitSync(messages); } - consumer.unsubscribe(topicName); + // Auto unsubscribe topics } catch (final Exception e) { e.printStackTrace(); // Avoid failure @@ -157,8 +179,8 @@ public void testTopicTimePrecision() throws Exception { TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"), new HashMap() { { - put("count(root.db.d1.s)", "100"); - put("count(root.db.d2.s)", "100"); + put("count(root.db.d1.s2)", "100"); + put("count(root.db.d2.s1)", "100"); } })); } @@ -170,13 +192,4 @@ public void testTopicTimePrecision() throws Exception { thread.join(); } } - - private void assertTopicCount(final int count) throws Exception { - try (final SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - final List showTopicResult = - client.showTopic(new TShowTopicReq()).topicInfoList; - Assert.assertEquals(count, showTopicResult.size()); - } - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index b5bff97ec98a..dc86e82e361d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -144,11 +144,13 @@ public void validate(final PipeParameterValidator validator) { if (historicalDataExtractionStartTime > historicalDataExtractionEndTime) { throw new PipeParameterNotValidException( String.format( - "%s or %s should be less than or equal to %s or %s.", + "%s (%s) [%s] should be less than or equal to %s (%s) [%s].", SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY, + historicalDataExtractionStartTime, SOURCE_END_TIME_KEY, - EXTRACTOR_END_TIME_KEY)); + EXTRACTOR_END_TIME_KEY, + historicalDataExtractionEndTime)); } } catch (final Exception e) { // compatible with the current validation framework @@ -191,11 +193,13 @@ public void validate(final PipeParameterValidator validator) { if (historicalDataExtractionStartTime > historicalDataExtractionEndTime) { throw new PipeParameterNotValidException( String.format( - "%s (%s) should be less than or equal to %s (%s).", + "%s (%s) [%s] should be less than or equal to %s (%s) [%s].", EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY, + historicalDataExtractionStartTime, EXTRACTOR_HISTORY_END_TIME_KEY, - SOURCE_HISTORY_END_TIME_KEY)); + SOURCE_HISTORY_END_TIME_KEY, + historicalDataExtractionEndTime)); } } catch (final Exception e) { // Compatible with the current validation framework diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index d809bd72a818..47c220d28c70 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -133,11 +133,13 @@ public void validate(final PipeParameterValidator validator) throws Exception { if (realtimeDataExtractionStartTime > realtimeDataExtractionEndTime) { throw new PipeParameterNotValidException( String.format( - "%s or %s should be less than or equal to %s or %s.", + "%s (%s) [%s] should be less than or equal to %s (%s) [%s].", SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY, + realtimeDataExtractionStartTime, SOURCE_END_TIME_KEY, - EXTRACTOR_END_TIME_KEY)); + EXTRACTOR_END_TIME_KEY, + realtimeDataExtractionEndTime)); } } catch (final Exception e) { // compatible with the current validation framework diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index d0ebc9333823..cb4d6f57fb6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -49,6 +49,7 @@ import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager; import org.apache.iotdb.commons.udf.service.UDFClassLoader; import org.apache.iotdb.commons.udf.service.UDFExecutableManager; +import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; @@ -1917,7 +1918,12 @@ public SettableFuture createTopic(CreateTopicStatement createT // Validate topic config final TopicMeta temporaryTopicMeta = - new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes); + new TopicMeta( + topicName, + CommonDateTimeUtils.convertMilliTimeWithPrecision( + System.currentTimeMillis(), + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), + topicAttributes); try { PipeAgent.plugin().validateExtractor(temporaryTopicMeta.generateExtractorAttributes()); PipeAgent.plugin().validateProcessor(temporaryTopicMeta.generateProcessorAttributes()); From 0cb803889468ac7e9dcc0d576c223d907d81b10b Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Wed, 5 Jun 2024 17:03:33 +0800 Subject: [PATCH 4/6] improve --- .../dataregion/realtime/PipeRealtimeDataRegionExtractor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index 47c220d28c70..ead1ef1d849d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -304,7 +304,7 @@ protected void extractHeartbeat(final PipeRealtimeEvent event) { // Record the pending queue size before trying to put heartbeatEvent into queue ((PipeHeartbeatEvent) event.getEvent()).recordExtractorQueueSize(pendingQueue); - Event lastEvent = pendingQueue.peekLast(); + final Event lastEvent = pendingQueue.peekLast(); if (lastEvent instanceof PipeRealtimeEvent && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent && (((PipeHeartbeatEvent) ((PipeRealtimeEvent) lastEvent).getEvent()).isShouldPrintMessage() From b5094f2772a081c20c19103d07cc9beaa5b316a1 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Wed, 5 Jun 2024 17:24:23 +0800 Subject: [PATCH 5/6] empty commit From 7518e1634b3e5b1e90ef55d8cf317c2a5163ec81 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Wed, 5 Jun 2024 19:43:18 +0800 Subject: [PATCH 6/6] add comment --- .../apache/iotdb/commons/subscription/meta/topic/TopicMeta.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java index 33f5065cbae6..a0e80db04e06 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java @@ -39,7 +39,7 @@ public class TopicMeta { private String topicName; - private long creationTime; + private long creationTime; // raw timestamp based on system timestamp precision private TopicConfig config; private Set subscribedConsumerGroupIds;