CONCURRENT_FORECAST_MODELS =
+ Arrays.asList(
+ new FakeModelInfo("sundial", "sundial", "builtin", "active"),
+ new FakeModelInfo("timer_xl", "timer", "builtin", "active"));
+ private static final String CONCURRENT_FORECAST_SQL_TEMPLATE =
+ "SELECT * FROM FORECAST(model_id=>'%s', targets=>(SELECT time,s FROM concurrent_db.AI) ORDER BY time, output_length=>%d)";
+
@BeforeClass
public static void setUp() throws Exception {
EnvFactory.getEnv().initClusterEnvironment(1, 1);
prepareDataInTree();
prepareDataInTable();
+ prepareDataForConcurrentForecast();
}
@AfterClass
@@ -410,6 +423,63 @@ public static void forecastTableFunctionErrorTest(
statement, invalidTimecolSQL2, "701: The type of the column [s0] is not as expected.");
}
+ // ========== Concurrent forecast tests ==========
+
+ @Test
+ public void concurrentForecastTest() throws SQLException, InterruptedException {
+ for (FakeModelInfo modelInfo : CONCURRENT_FORECAST_MODELS) {
+ concurrentGPUForecastTest(modelInfo, "0,1");
+ // TODO: Enable cpu test after optimize memory consumption
+ // concurrentGPUForecastTest(modelInfo, "cpu");
+ }
+ }
+
+ private void concurrentGPUForecastTest(FakeModelInfo modelInfo, String devices)
+ throws SQLException, InterruptedException {
+ final int forecastLength = 512;
+ try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ final String forecastSQL =
+ String.format(CONCURRENT_FORECAST_SQL_TEMPLATE, modelInfo.getModelId(), forecastLength);
+ final int threadCnt = 10;
+ // PR CI keeps a concurrency smoke check; nightly/daily can dial this up if regressions
+ // appear.
+ final int loop = 10;
+ statement.execute(
+ String.format("LOAD MODEL %s TO DEVICES '%s'", modelInfo.getModelId(), devices));
+ checkModelOnSpecifiedDevice(statement, modelInfo.getModelId(), devices);
+ long startTime = System.currentTimeMillis();
+ concurrentInference(statement, forecastSQL, threadCnt, loop, forecastLength);
+ long endTime = System.currentTimeMillis();
+ LOGGER.info(
+ String.format(
+ "Model %s concurrent inference %d reqs (%d threads, %d loops) in GPU takes time: %dms",
+ modelInfo.getModelId(), threadCnt * loop, threadCnt, loop, endTime - startTime));
+ statement.execute(
+ String.format("UNLOAD MODEL %s FROM DEVICES '%s'", modelInfo.getModelId(), devices));
+ checkModelNotOnSpecifiedDevice(statement, modelInfo.getModelId(), devices);
+ }
+ }
+
+ private static void prepareDataForConcurrentForecast() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE concurrent_db");
+ statement.execute("CREATE TABLE concurrent_db.AI (s DOUBLE FIELD)");
+ for (int i = 0; i < 2880; i++) {
+ statement.addBatch(
+ String.format(
+ "INSERT INTO concurrent_db.AI(time, s) VALUES(%d, %f)",
+ i, Math.sin(i * Math.PI / 1440)));
+ if ((i + 1) % 500 == 0) {
+ statement.executeBatch();
+ statement.clearBatch();
+ }
+ }
+ statement.executeBatch();
+ }
+ }
+
// ========== InstanceManagement tests ==========
@Test
From 680e4a8ea0356be321ccc7b359951fcdb6b0edc8 Mon Sep 17 00:00:00 2001
From: Yongzao <532741407@qq.com>
Date: Tue, 19 May 2026 13:16:43 +0800
Subject: [PATCH 4/4] Merge AINodeClusterConfigIT into AINodeSharedClusterIT
(run last)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
The cluster-config test calls REMOVE AINODE, which permanently tears
the AINode out of the cluster, so we couldn't share its 1C1D1A startup
without guaranteeing it runs after every other @Test.
Use @FixMethodOrder(NAME_ASCENDING) on AINodeSharedClusterIT and rename
the migrated test to zzAiNodeRegisterAndRemoveMustRunLast — alphabetical
sorting then keeps it strictly last. IoTDBTestRunner extends
BlockJUnit4ClassRunner without overriding child-method ordering, so the
JUnit-standard FixMethodOrder applies. The class doc warns reviewers
not to add new @Test methods that sort after this one.
The other 16 @Test methods are net-zero by construction (paired
LOAD/UNLOAD, register/drop, or pure SHOW), so the alphabetical reorder
doesn't introduce inter-test coupling. Saves the last per-CI cluster
startup we couldn't fold in earlier.
---
.../ainode/it/AINodeClusterConfigIT.java | 127 ------------------
.../ainode/it/AINodeSharedClusterIT.java | 92 ++++++++++++-
2 files changed, 90 insertions(+), 129 deletions(-)
delete mode 100644 integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java
diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java
deleted file mode 100644
index de6d3c48be3e0..0000000000000
--- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.ainode.it;
-
-import org.apache.iotdb.it.env.EnvFactory;
-import org.apache.iotdb.it.framework.IoTDBTestRunner;
-import org.apache.iotdb.itbase.category.AIClusterIT;
-import org.apache.iotdb.itbase.env.BaseEnv;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import static org.apache.iotdb.ainode.utils.AINodeTestUtils.checkHeader;
-import static org.junit.Assert.assertEquals;
-
-@RunWith(IoTDBTestRunner.class)
-@Category({AIClusterIT.class})
-public class AINodeClusterConfigIT {
-
- @BeforeClass
- public static void setUp() throws Exception {
- EnvFactory.getEnv().initClusterEnvironment(1, 1);
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- EnvFactory.getEnv().cleanClusterEnvironment();
- }
-
- @Test
- public void aiNodeRegisterAndRemoveTest() throws SQLException {
- String show_sql = "SHOW AINODES";
- String title = "NodeID,Status,InternalAddress,InternalPort";
-
- // Verify AINode exists via both dialects before removal
- try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
- Statement statement = connection.createStatement()) {
- verifyAINodeExists(statement, show_sql, title);
- }
- try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
- Statement statement = connection.createStatement()) {
- verifyAINodeExists(statement, show_sql, title);
- }
-
- // Remove AINode
- try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
- Statement statement = connection.createStatement()) {
- statement.execute("REMOVE AINODE");
- waitForAINodeRemoval(statement, show_sql, title);
- }
-
- // Verify removal is visible via table dialect as well
- try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
- Statement statement = connection.createStatement()) {
- try (ResultSet resultSet = statement.executeQuery(show_sql)) {
- checkHeader(resultSet.getMetaData(), title);
- int count = 0;
- while (resultSet.next()) {
- count++;
- }
- assertEquals(0, count);
- }
- }
- }
-
- private static void verifyAINodeExists(Statement statement, String showSql, String title)
- throws SQLException {
- try (ResultSet resultSet = statement.executeQuery(showSql)) {
- checkHeader(resultSet.getMetaData(), title);
- int count = 0;
- while (resultSet.next()) {
- assertEquals("2", resultSet.getString(1));
- assertEquals("Running", resultSet.getString(2));
- count++;
- }
- assertEquals(1, count);
- }
- }
-
- private static void waitForAINodeRemoval(Statement statement, String showSql, String title)
- throws SQLException {
- for (int retry = 0; retry < 500; retry++) {
- try (ResultSet resultSet = statement.executeQuery(showSql)) {
- checkHeader(resultSet.getMetaData(), title);
- int count = 0;
- while (resultSet.next()) {
- count++;
- }
- if (count == 0) {
- return;
- }
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- Assert.fail("The target AINode is not removed successfully after all retries.");
- }
-}
diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java
index 1accdedb53464..1686e916ff0b7 100644
--- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java
@@ -29,9 +29,11 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,10 +64,16 @@
/**
* Consolidates AINodeDeviceManageIT, AINodeModelManageIT, AINodeCallInferenceIT, AINodeForecastIT,
- * AINodeInstanceManagementIT, and AINodeConcurrentForecastIT into a single class that shares one
- * 1C1D1A cluster, avoiding 6 redundant cluster startups.
+ * AINodeInstanceManagementIT, AINodeConcurrentForecastIT, and AINodeClusterConfigIT into a single
+ * class that shares one 1C1D1A cluster, avoiding 7 redundant cluster startups.
+ *
+ * Test methods run in alphabetical order via {@link FixMethodOrder} so that {@link
+ * #zzAiNodeRegisterAndRemoveMustRunLast()} (which calls {@code REMOVE AINODE} and tears the AINode
+ * out of the cluster) executes after every other test. Do not add new
+ * {@code @Test} methods whose names sort after {@code zz}.
*/
@RunWith(IoTDBTestRunner.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@Category({AIClusterIT.class})
public class AINodeSharedClusterIT {
@@ -562,6 +570,86 @@ private void instanceFailTest(Statement statement) {
"1510: Device ID list contains duplicate entries.");
}
+ // ========== AINode lifecycle (must run last) ==========
+
+ /**
+ * REMOVE AINODE permanently tears the AINode out of the cluster, so this test must run after
+ * every other {@code @Test} in the class. The {@code zz} prefix combined with {@link
+ * FixMethodOrder} on the class keeps it last under alphabetical sorting; do not add new test
+ * methods whose names sort after this one.
+ */
+ @Test
+ public void zzAiNodeRegisterAndRemoveMustRunLast() throws SQLException {
+ final String showSql = "SHOW AINODES";
+ final String title = "NodeID,Status,InternalAddress,InternalPort";
+
+ // Verify AINode exists via both dialects before removal
+ try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ verifyAINodeExists(statement, showSql, title);
+ }
+ try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ verifyAINodeExists(statement, showSql, title);
+ }
+
+ // Remove AINode
+ try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("REMOVE AINODE");
+ waitForAINodeRemoval(statement, showSql, title);
+ }
+
+ // Verify removal is visible via table dialect as well
+ try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(showSql)) {
+ checkHeader(resultSet.getMetaData(), title);
+ int count = 0;
+ while (resultSet.next()) {
+ count++;
+ }
+ assertEquals(0, count);
+ }
+ }
+ }
+
+ private static void verifyAINodeExists(Statement statement, String showSql, String title)
+ throws SQLException {
+ try (ResultSet resultSet = statement.executeQuery(showSql)) {
+ checkHeader(resultSet.getMetaData(), title);
+ int count = 0;
+ while (resultSet.next()) {
+ assertEquals("2", resultSet.getString(1));
+ assertEquals("Running", resultSet.getString(2));
+ count++;
+ }
+ assertEquals(1, count);
+ }
+ }
+
+ private static void waitForAINodeRemoval(Statement statement, String showSql, String title)
+ throws SQLException {
+ for (int retry = 0; retry < 500; retry++) {
+ try (ResultSet resultSet = statement.executeQuery(showSql)) {
+ checkHeader(resultSet.getMetaData(), title);
+ int count = 0;
+ while (resultSet.next()) {
+ count++;
+ }
+ if (count == 0) {
+ return;
+ }
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ Assert.fail("The target AINode is not removed successfully after all retries.");
+ }
+
// ========== Helper methods (from ModelManageIT) ==========
private static void registerUserDefinedModel(