From 9e8568d170d0b0e8e16a4ec11e6e02ecd9b31ebb Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 19 May 2026 12:05:15 +0800 Subject: [PATCH 1/4] Speed up AINode IT and split CPU-only tests off GPU runner Two-tier isolation for org.apache.iotdb.ainode.it so that only the GPU-bound tests need a self-hosted GPU runner: - New AINodeIT category for tests that exercise only metadata/lifecycle paths (SHOW/DROP builtin model, REMOVE AINODE). These tests run on a plain ubuntu-latest runner via the new AINodeIT Maven profile and cluster-it-ainode-cpu.yml workflow. - AIClusterIT profile keeps tests that drive CALL INFERENCE, FORECAST, or LOAD MODEL TO DEVICES (CUDA), still running on the GPU runner. - AINodeBasicIT collects the 4 metadata tests previously mixed into AINodeSharedClusterIT; AINodeClusterConfigIT is re-tagged to AINodeIT. - AINodeWrapper now tolerates a missing /data/ainode/models cache, so CPU runners can boot AINode without the multi-GB weight bundle. When the cache is present, weights are symlinked instead of copied per fork to remove a large per-test-class IO cost. Additional speedups in the GPU pipeline: - AINodeConcurrentForecastIT loop count 100 -> 10 (still 100 reqs per model for a concurrency smoke check; nightly can dial up). - AINodeTestUtils.prepareDataInTree/Table/Table2 and AINodeConcurrentForecastIT.prepareDataForTableModel switched from per-row execute() to addBatch()/executeBatch() in chunks of 500. --- .github/workflows/cluster-it-ainode-cpu.yml | 70 +++++++++++ integration-test/pom.xml | 14 +++ .../it/env/cluster/node/AINodeWrapper.java | 83 ++++++++----- .../iotdb/itbase/category/AINodeIT.java | 27 ++++ .../apache/iotdb/ainode/it/AINodeBasicIT.java | 116 ++++++++++++++++++ .../ainode/it/AINodeClusterConfigIT.java | 4 +- .../ainode/it/AINodeConcurrentForecastIT.java | 11 +- .../ainode/it/AINodeSharedClusterIT.java | 54 -------- .../iotdb/ainode/utils/AINodeTestUtils.java | 21 +++- 9 files changed, 310 insertions(+), 90 deletions(-) create mode 100644 .github/workflows/cluster-it-ainode-cpu.yml create mode 100644 integration-test/src/main/java/org/apache/iotdb/itbase/category/AINodeIT.java create mode 100644 integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeBasicIT.java diff --git a/.github/workflows/cluster-it-ainode-cpu.yml b/.github/workflows/cluster-it-ainode-cpu.yml new file mode 100644 index 0000000000000..6046a13ba048b --- /dev/null +++ b/.github/workflows/cluster-it-ainode-cpu.yml @@ -0,0 +1,70 @@ +name: AINode IT - CPU + +on: + push: + branches: + - master + - 'rel/*' + - 'rc/*' + paths-ignore: + - 'docs/**' + - 'site/**' + pull_request: + branches: + - master + - 'rel/*' + - 'rc/*' + - 'force_ci/**' + paths-ignore: + - 'docs/**' + - 'site/**' + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 + MAVEN_ARGS: --batch-mode --no-transfer-progress + DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} + +jobs: + AINode-CPU: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v5 + - name: Set up JDK + uses: actions/setup-java@v5 + with: + distribution: corretto + java-version: 17 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Cache Maven packages + uses: actions/cache@v5 + with: + path: ~/.m2 + key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} + restore-keys: ${{ runner.os }}-m2- + - name: Adjust Linux kernel somaxconn + shell: bash + run: sudo sysctl -w net.core.somaxconn=65535 + - name: IT Test + shell: bash + run: | + mvn clean verify \ + -P with-integration-tests,with-ainode \ + -DskipUTs \ + -DintegrationTest.forkCount=1 \ + -pl integration-test,iotdb-core/ainode \ + -am \ + -PAINodeIT + - name: Upload Artifact + if: failure() + uses: actions/upload-artifact@v6 + with: + name: ainode-cpu-logs + path: integration-test/target/*-logs + retention-days: 30 diff --git a/integration-test/pom.xml b/integration-test/pom.xml index c61509ed2c84c..dc60f3e62a954 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -693,6 +693,20 @@ AI + + AINodeIT + + false + + + org.apache.iotdb.itbase.category.ManualIT + org.apache.iotdb.itbase.category.AINodeIT + false + false + false + AI + + DailyIT diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java index d452e19f381a6..d21caebf10a9c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java @@ -131,37 +131,62 @@ public void start() { }, propertiesFile); - // copy built-in LTSM + // Link built-in LTSM weights from the runner-wide cache. These can be hundreds of MB to + // multiple GB; copying them per fork dominates IT startup. Symlinks share read-only weights + // across forks; we fall back to a copy on platforms / filesystems that reject symlinks. + // CPU-only runners that only run metadata-level AINode tests won't have the cache pre-staged + // — log and skip in that case rather than failing. String builtInModelPath = filePrefix + File.separator + BUILT_IN_MODEL_PATH; - new File(builtInModelPath).mkdirs(); - try { - if (new File(builtInModelPath).exists()) { - PathUtils.deleteDirectory(Paths.get(builtInModelPath)); + File builtInModelDir = new File(builtInModelPath); + Path cacheRoot = Paths.get(CACHE_BUILT_IN_MODEL_PATH); + if (!Files.isDirectory(cacheRoot)) { + logger.info( + "AINode model weight cache {} not present; starting AINode without preloaded weights", + cacheRoot); + builtInModelDir.mkdirs(); + } else { + try { + if (builtInModelDir.exists()) { + PathUtils.deleteDirectory(builtInModelDir.toPath()); + } + } catch (NoSuchFileException e) { + // ignored + } + Path destRoot = builtInModelDir.toPath(); + builtInModelDir.getParentFile().mkdirs(); + try { + Files.createSymbolicLink(destRoot, cacheRoot); + logger.info("AINode symlinked model weights {} -> {}", destRoot, cacheRoot); + } catch (UnsupportedOperationException | IOException symlinkErr) { + logger.warn( + "AINode failed to symlink {} -> {} ({}), falling back to copy", + destRoot, + cacheRoot, + symlinkErr.toString()); + builtInModelDir.mkdirs(); + try (Stream s = Files.walk(cacheRoot)) { + s.forEach( + source -> { + Path destination = + Paths.get( + builtInModelPath, + source.toString().substring(CACHE_BUILT_IN_MODEL_PATH.length())); + logger.info("AINode copying model weights from {} to {}", source, destination); + try { + Files.copy( + source, + destination, + LinkOption.NOFOLLOW_LINKS, + StandardCopyOption.COPY_ATTRIBUTES); + } catch (IOException e) { + logger.error("AINode got error copying model weights", e); + throw new RuntimeException(e); + } + }); + } catch (Exception e) { + logger.error("AINode got error copying model weights", e); + } } - } catch (NoSuchFileException e) { - // ignored - } - try (Stream s = Files.walk(Paths.get(CACHE_BUILT_IN_MODEL_PATH))) { - s.forEach( - source -> { - Path destination = - Paths.get( - builtInModelPath, - source.toString().substring(CACHE_BUILT_IN_MODEL_PATH.length())); - logger.info("AINode copying model weights from {} to {}", source, destination); - try { - Files.copy( - source, - destination, - LinkOption.NOFOLLOW_LINKS, - StandardCopyOption.COPY_ATTRIBUTES); - } catch (IOException e) { - logger.error("AINode got error copying model weights", e); - throw new RuntimeException(e); - } - }); - } catch (Exception e) { - logger.error("AINode got error copying model weights", e); } // start AINode diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/category/AINodeIT.java b/integration-test/src/main/java/org/apache/iotdb/itbase/category/AINodeIT.java new file mode 100644 index 0000000000000..328ecbb0a83f0 --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/category/AINodeIT.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.itbase.category; + +/** + * Marker for AINode integration tests that exercise only metadata / lifecycle paths and therefore + * don't need a GPU. Tests tagged with this category can run on plain CPU runners; tests that drive + * inference, forecasting, or device-binding still belong in {@link AIClusterIT}. + */ +public interface AINodeIT {} diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeBasicIT.java new file mode 100644 index 0000000000000..04832ae120e12 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeBasicIT.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.ainode.it; + +import org.apache.iotdb.ainode.utils.AINodeTestUtils.FakeModelInfo; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.AINodeIT; +import org.apache.iotdb.itbase.env.BaseEnv; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.apache.iotdb.ainode.utils.AINodeTestUtils.BUILTIN_MODEL_MAP; +import static org.apache.iotdb.ainode.utils.AINodeTestUtils.checkHeader; +import static org.apache.iotdb.ainode.utils.AINodeTestUtils.errorTest; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Metadata-only AINode tests that don't drive inference or bind GPU devices, so they can run on a + * plain CPU runner. Tests that do exercise CUDA paths live in {@link AINodeSharedClusterIT}. + */ +@RunWith(IoTDBTestRunner.class) +@Category({AINodeIT.class}) +public class AINodeBasicIT { + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(1, 1); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void dropBuiltInModelErrorTestInTree() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + errorTest(statement, "drop model sundial", "1506: Cannot delete built-in model: sundial"); + } + } + + @Test + public void dropBuiltInModelErrorTestInTable() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + errorTest(statement, "drop model sundial", "1506: Cannot delete built-in model: sundial"); + } + } + + @Test + public void showBuiltInModelTestInTree() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + showBuiltInModelTest(statement); + } + } + + @Test + public void showBuiltInModelTestInTable() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + showBuiltInModelTest(statement); + } + } + + private void showBuiltInModelTest(Statement statement) throws SQLException { + int builtInModelCount = 0; + final String showSql = "SHOW MODELS"; + try (ResultSet resultSet = statement.executeQuery(showSql)) { + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + checkHeader(resultSetMetaData, "ModelId,ModelType,Category,State"); + while (resultSet.next()) { + builtInModelCount++; + FakeModelInfo modelInfo = + new FakeModelInfo( + resultSet.getString(1), + resultSet.getString(2), + resultSet.getString(3), + resultSet.getString(4)); + assertTrue(BUILTIN_MODEL_MAP.containsKey(modelInfo.getModelId())); + assertEquals(BUILTIN_MODEL_MAP.get(modelInfo.getModelId()), modelInfo); + } + } + assertEquals(BUILTIN_MODEL_MAP.size(), builtInModelCount); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java index de6d3c48be3e0..5c4d8fafd1ae3 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java @@ -21,7 +21,7 @@ import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; -import org.apache.iotdb.itbase.category.AIClusterIT; +import org.apache.iotdb.itbase.category.AINodeIT; import org.apache.iotdb.itbase.env.BaseEnv; import org.junit.AfterClass; @@ -40,7 +40,7 @@ import static org.junit.Assert.assertEquals; @RunWith(IoTDBTestRunner.class) -@Category({AIClusterIT.class}) +@Category({AINodeIT.class}) public class AINodeClusterConfigIT { @BeforeClass diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentForecastIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentForecastIT.java index fd021099d5f43..3e1c616580e71 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentForecastIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentForecastIT.java @@ -75,10 +75,15 @@ private static void prepareDataForTableModel() throws SQLException { statement.execute("CREATE DATABASE root"); statement.execute("CREATE TABLE root.AI (s DOUBLE FIELD)"); for (int i = 0; i < 2880; i++) { - statement.execute( + statement.addBatch( String.format( "INSERT INTO root.AI(time, s) VALUES(%d, %f)", i, Math.sin(i * Math.PI / 1440))); + if ((i + 1) % 500 == 0) { + statement.executeBatch(); + statement.clearBatch(); + } } + statement.executeBatch(); } } @@ -101,7 +106,9 @@ public void concurrentGPUForecastTest(AINodeTestUtils.FakeModelInfo modelInfo, S String.format( FORECAST_TABLE_FUNCTION_SQL_TEMPLATE, modelInfo.getModelId(), forecastLength); final int threadCnt = 10; - final int loop = 100; + // PR CI keeps a concurrency smoke check; nightly/daily can dial this up if regressions + // appear. + final int loop = 10; statement.execute( String.format("LOAD MODEL %s TO DEVICES '%s'", modelInfo.getModelId(), devices)); checkModelOnSpecifiedDevice(statement, modelInfo.getModelId(), devices); diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java index 7a71682f1671e..87930a1bfecf8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java @@ -54,7 +54,6 @@ import static org.apache.iotdb.ainode.utils.AINodeTestUtils.prepareDataInTree; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -168,59 +167,6 @@ public void userDefinedModelManagementTestInTable() throws SQLException, Interru } } - @Test - public void dropBuiltInModelErrorTestInTree() throws SQLException { - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - errorTest(statement, "drop model sundial", "1506: Cannot delete built-in model: sundial"); - } - } - - @Test - public void dropBuiltInModelErrorTestInTable() throws SQLException { - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - errorTest(statement, "drop model sundial", "1506: Cannot delete built-in model: sundial"); - } - } - - @Test - public void showBuiltInModelTestInTree() throws SQLException { - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - showBuiltInModelTest(statement); - } - } - - @Test - public void showBuiltInModelTestInTable() throws SQLException { - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - showBuiltInModelTest(statement); - } - } - - private void showBuiltInModelTest(Statement statement) throws SQLException { - int built_in_model_count = 0; - final String showSql = "SHOW MODELS"; - try (ResultSet resultSet = statement.executeQuery(showSql)) { - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); - checkHeader(resultSetMetaData, "ModelId,ModelType,Category,State"); - while (resultSet.next()) { - built_in_model_count++; - FakeModelInfo modelInfo = - new FakeModelInfo( - resultSet.getString(1), - resultSet.getString(2), - resultSet.getString(3), - resultSet.getString(4)); - assertTrue(BUILTIN_MODEL_MAP.containsKey(modelInfo.getModelId())); - assertEquals(BUILTIN_MODEL_MAP.get(modelInfo.getModelId()), modelInfo); - } - } - assertEquals(BUILTIN_MODEL_MAP.size(), built_in_model_count); - } - // ========== CallInference tests ========== @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/utils/AINodeTestUtils.java b/integration-test/src/test/java/org/apache/iotdb/ainode/utils/AINodeTestUtils.java index d69a301c06680..8f5567c7c9cf7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/utils/AINodeTestUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/utils/AINodeTestUtils.java @@ -232,11 +232,16 @@ public static void prepareDataInTree() throws SQLException { try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); Statement statement = connection.createStatement()) { for (int i = 0; i < 5760; i++) { - statement.execute( + statement.addBatch( String.format( "INSERT INTO root.AI(timestamp,s0,s1,s2,s3) VALUES(%d,%f,%f,%d,%d)", i, (float) i, (double) i, i, i)); + if ((i + 1) % 500 == 0) { + statement.executeBatch(); + statement.clearBatch(); + } } + statement.executeBatch(); } } @@ -248,11 +253,16 @@ public static void prepareDataInTable() throws SQLException { statement.execute( "CREATE TABLE db.AI (s0 FLOAT FIELD, s1 DOUBLE FIELD, s2 INT32 FIELD, s3 INT64 FIELD)"); for (int i = 0; i < 5760; i++) { - statement.execute( + statement.addBatch( String.format( "INSERT INTO db.AI(time,s0,s1,s2,s3) VALUES(%d,%f,%f,%d,%d)", i, (float) i, (double) i, i, i)); + if ((i + 1) % 500 == 0) { + statement.executeBatch(); + statement.clearBatch(); + } } + statement.executeBatch(); } } @@ -264,7 +274,7 @@ public static void prepareDataInTable2() throws SQLException { statement.execute( "CREATE TABLE db.AI2 (s0 FLOAT FIELD, s1 DOUBLE FIELD, s2 INT32 FIELD, s3 INT64 FIELD, s4 FLOAT FIELD, s5 DOUBLE FIELD, s6 INT32 FIELD, s7 INT64 FIELD, s8 FLOAT FIELD, s9 DOUBLE FIELD)"); for (int i = 0; i < 2880; i++) { - statement.execute( + statement.addBatch( String.format( "INSERT INTO db.AI2(time,s0,s1,s2,s3,s4,s5,s6,s7,s8,s9) VALUES(%d,%f,%f,%d,%d,%f,%f,%d,%d,%f,%f)", i, @@ -278,7 +288,12 @@ public static void prepareDataInTable2() throws SQLException { i * 2, (float) (i * 3), (double) (i * 3))); + if ((i + 1) % 500 == 0) { + statement.executeBatch(); + statement.clearBatch(); + } } + statement.executeBatch(); } } From 202f83e4ef0447a9d20b5d7bb1d63fa7eb3f5f11 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 19 May 2026 12:40:23 +0800 Subject: [PATCH 2/4] Revert CPU/GPU split; keep IT speedups in shared GPU runner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AINode strongly depends on the test-worker environment (model weight cache, GPU presence), so a CPU-only profile isn't a clean isolation boundary. Roll the metadata tests back into AINodeSharedClusterIT and keep the existing AIClusterIT GPU runner. What stays from the earlier commit: - AINodeWrapper symlinks /data/ainode/models instead of copying it per fork (with a copy fallback when the FS rejects symlinks). - AINodeConcurrentForecastIT loop count 100 -> 10 for a concurrency smoke check. - prepareDataInTree/Table/Table2 and prepareDataForTableModel switched from per-row execute() to addBatch()/executeBatch() in chunks of 500. What's reverted: - Drop AINodeIT category, AINodeBasicIT, the AINodeIT Maven profile, and the cluster-it-ainode-cpu.yml workflow. - Re-tag AINodeClusterConfigIT back to AIClusterIT. - Move the 4 metadata tests (drop builtin model error / show builtin models, both dialects) back into AINodeSharedClusterIT. - Drop the "tolerate missing weights cache" branch in AINodeWrapper — with no CPU-only runner the cache is always present. --- .github/workflows/cluster-it-ainode-cpu.yml | 70 ----------- integration-test/pom.xml | 14 --- .../it/env/cluster/node/AINodeWrapper.java | 87 ++++++------- .../iotdb/itbase/category/AINodeIT.java | 27 ---- .../apache/iotdb/ainode/it/AINodeBasicIT.java | 116 ------------------ .../ainode/it/AINodeClusterConfigIT.java | 4 +- .../ainode/it/AINodeSharedClusterIT.java | 56 +++++++++ 7 files changed, 97 insertions(+), 277 deletions(-) delete mode 100644 .github/workflows/cluster-it-ainode-cpu.yml delete mode 100644 integration-test/src/main/java/org/apache/iotdb/itbase/category/AINodeIT.java delete mode 100644 integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeBasicIT.java diff --git a/.github/workflows/cluster-it-ainode-cpu.yml b/.github/workflows/cluster-it-ainode-cpu.yml deleted file mode 100644 index 6046a13ba048b..0000000000000 --- a/.github/workflows/cluster-it-ainode-cpu.yml +++ /dev/null @@ -1,70 +0,0 @@ -name: AINode IT - CPU - -on: - push: - branches: - - master - - 'rel/*' - - 'rc/*' - paths-ignore: - - 'docs/**' - - 'site/**' - pull_request: - branches: - - master - - 'rel/*' - - 'rc/*' - - 'force_ci/**' - paths-ignore: - - 'docs/**' - - 'site/**' - workflow_dispatch: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -env: - MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 - MAVEN_ARGS: --batch-mode --no-transfer-progress - DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} - -jobs: - AINode-CPU: - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v5 - - name: Set up JDK - uses: actions/setup-java@v5 - with: - distribution: corretto - java-version: 17 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Maven packages - uses: actions/cache@v5 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2- - - name: Adjust Linux kernel somaxconn - shell: bash - run: sudo sysctl -w net.core.somaxconn=65535 - - name: IT Test - shell: bash - run: | - mvn clean verify \ - -P with-integration-tests,with-ainode \ - -DskipUTs \ - -DintegrationTest.forkCount=1 \ - -pl integration-test,iotdb-core/ainode \ - -am \ - -PAINodeIT - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v6 - with: - name: ainode-cpu-logs - path: integration-test/target/*-logs - retention-days: 30 diff --git a/integration-test/pom.xml b/integration-test/pom.xml index dc60f3e62a954..c61509ed2c84c 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -693,20 +693,6 @@ AI - - AINodeIT - - false - - - org.apache.iotdb.itbase.category.ManualIT - org.apache.iotdb.itbase.category.AINodeIT - false - false - false - AI - - DailyIT diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java index d21caebf10a9c..91faeae8eeee4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java @@ -134,58 +134,49 @@ public void start() { // Link built-in LTSM weights from the runner-wide cache. These can be hundreds of MB to // multiple GB; copying them per fork dominates IT startup. Symlinks share read-only weights // across forks; we fall back to a copy on platforms / filesystems that reject symlinks. - // CPU-only runners that only run metadata-level AINode tests won't have the cache pre-staged - // — log and skip in that case rather than failing. String builtInModelPath = filePrefix + File.separator + BUILT_IN_MODEL_PATH; File builtInModelDir = new File(builtInModelPath); + try { + if (builtInModelDir.exists()) { + PathUtils.deleteDirectory(builtInModelDir.toPath()); + } + } catch (NoSuchFileException e) { + // ignored + } Path cacheRoot = Paths.get(CACHE_BUILT_IN_MODEL_PATH); - if (!Files.isDirectory(cacheRoot)) { - logger.info( - "AINode model weight cache {} not present; starting AINode without preloaded weights", - cacheRoot); + Path destRoot = builtInModelDir.toPath(); + builtInModelDir.getParentFile().mkdirs(); + try { + Files.createSymbolicLink(destRoot, cacheRoot); + logger.info("AINode symlinked model weights {} -> {}", destRoot, cacheRoot); + } catch (UnsupportedOperationException | IOException symlinkErr) { + logger.warn( + "AINode failed to symlink {} -> {} ({}), falling back to copy", + destRoot, + cacheRoot, + symlinkErr.toString()); builtInModelDir.mkdirs(); - } else { - try { - if (builtInModelDir.exists()) { - PathUtils.deleteDirectory(builtInModelDir.toPath()); - } - } catch (NoSuchFileException e) { - // ignored - } - Path destRoot = builtInModelDir.toPath(); - builtInModelDir.getParentFile().mkdirs(); - try { - Files.createSymbolicLink(destRoot, cacheRoot); - logger.info("AINode symlinked model weights {} -> {}", destRoot, cacheRoot); - } catch (UnsupportedOperationException | IOException symlinkErr) { - logger.warn( - "AINode failed to symlink {} -> {} ({}), falling back to copy", - destRoot, - cacheRoot, - symlinkErr.toString()); - builtInModelDir.mkdirs(); - try (Stream s = Files.walk(cacheRoot)) { - s.forEach( - source -> { - Path destination = - Paths.get( - builtInModelPath, - source.toString().substring(CACHE_BUILT_IN_MODEL_PATH.length())); - logger.info("AINode copying model weights from {} to {}", source, destination); - try { - Files.copy( - source, - destination, - LinkOption.NOFOLLOW_LINKS, - StandardCopyOption.COPY_ATTRIBUTES); - } catch (IOException e) { - logger.error("AINode got error copying model weights", e); - throw new RuntimeException(e); - } - }); - } catch (Exception e) { - logger.error("AINode got error copying model weights", e); - } + try (Stream s = Files.walk(cacheRoot)) { + s.forEach( + source -> { + Path destination = + Paths.get( + builtInModelPath, + source.toString().substring(CACHE_BUILT_IN_MODEL_PATH.length())); + logger.info("AINode copying model weights from {} to {}", source, destination); + try { + Files.copy( + source, + destination, + LinkOption.NOFOLLOW_LINKS, + StandardCopyOption.COPY_ATTRIBUTES); + } catch (IOException e) { + logger.error("AINode got error copying model weights", e); + throw new RuntimeException(e); + } + }); + } catch (Exception e) { + logger.error("AINode got error copying model weights", e); } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/category/AINodeIT.java b/integration-test/src/main/java/org/apache/iotdb/itbase/category/AINodeIT.java deleted file mode 100644 index 328ecbb0a83f0..0000000000000 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/category/AINodeIT.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.itbase.category; - -/** - * Marker for AINode integration tests that exercise only metadata / lifecycle paths and therefore - * don't need a GPU. Tests tagged with this category can run on plain CPU runners; tests that drive - * inference, forecasting, or device-binding still belong in {@link AIClusterIT}. - */ -public interface AINodeIT {} diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeBasicIT.java deleted file mode 100644 index 04832ae120e12..0000000000000 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeBasicIT.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.ainode.it; - -import org.apache.iotdb.ainode.utils.AINodeTestUtils.FakeModelInfo; -import org.apache.iotdb.it.env.EnvFactory; -import org.apache.iotdb.it.framework.IoTDBTestRunner; -import org.apache.iotdb.itbase.category.AINodeIT; -import org.apache.iotdb.itbase.env.BaseEnv; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; - -import static org.apache.iotdb.ainode.utils.AINodeTestUtils.BUILTIN_MODEL_MAP; -import static org.apache.iotdb.ainode.utils.AINodeTestUtils.checkHeader; -import static org.apache.iotdb.ainode.utils.AINodeTestUtils.errorTest; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Metadata-only AINode tests that don't drive inference or bind GPU devices, so they can run on a - * plain CPU runner. Tests that do exercise CUDA paths live in {@link AINodeSharedClusterIT}. - */ -@RunWith(IoTDBTestRunner.class) -@Category({AINodeIT.class}) -public class AINodeBasicIT { - - @BeforeClass - public static void setUp() throws Exception { - EnvFactory.getEnv().initClusterEnvironment(1, 1); - } - - @AfterClass - public static void tearDown() throws Exception { - EnvFactory.getEnv().cleanClusterEnvironment(); - } - - @Test - public void dropBuiltInModelErrorTestInTree() throws SQLException { - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - errorTest(statement, "drop model sundial", "1506: Cannot delete built-in model: sundial"); - } - } - - @Test - public void dropBuiltInModelErrorTestInTable() throws SQLException { - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - errorTest(statement, "drop model sundial", "1506: Cannot delete built-in model: sundial"); - } - } - - @Test - public void showBuiltInModelTestInTree() throws SQLException { - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - showBuiltInModelTest(statement); - } - } - - @Test - public void showBuiltInModelTestInTable() throws SQLException { - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - showBuiltInModelTest(statement); - } - } - - private void showBuiltInModelTest(Statement statement) throws SQLException { - int builtInModelCount = 0; - final String showSql = "SHOW MODELS"; - try (ResultSet resultSet = statement.executeQuery(showSql)) { - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); - checkHeader(resultSetMetaData, "ModelId,ModelType,Category,State"); - while (resultSet.next()) { - builtInModelCount++; - FakeModelInfo modelInfo = - new FakeModelInfo( - resultSet.getString(1), - resultSet.getString(2), - resultSet.getString(3), - resultSet.getString(4)); - assertTrue(BUILTIN_MODEL_MAP.containsKey(modelInfo.getModelId())); - assertEquals(BUILTIN_MODEL_MAP.get(modelInfo.getModelId()), modelInfo); - } - } - assertEquals(BUILTIN_MODEL_MAP.size(), builtInModelCount); - } -} diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java index 5c4d8fafd1ae3..de6d3c48be3e0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java @@ -21,7 +21,7 @@ import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; -import org.apache.iotdb.itbase.category.AINodeIT; +import org.apache.iotdb.itbase.category.AIClusterIT; import org.apache.iotdb.itbase.env.BaseEnv; import org.junit.AfterClass; @@ -40,7 +40,7 @@ import static org.junit.Assert.assertEquals; @RunWith(IoTDBTestRunner.class) -@Category({AINodeIT.class}) +@Category({AIClusterIT.class}) public class AINodeClusterConfigIT { @BeforeClass diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java index 87930a1bfecf8..ffe8c6690653e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java @@ -54,6 +54,7 @@ import static org.apache.iotdb.ainode.utils.AINodeTestUtils.prepareDataInTree; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -167,6 +168,61 @@ public void userDefinedModelManagementTestInTable() throws SQLException, Interru } } + // ========== BuiltinModel tests ========== + + @Test + public void dropBuiltInModelErrorTestInTree() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + errorTest(statement, "drop model sundial", "1506: Cannot delete built-in model: sundial"); + } + } + + @Test + public void dropBuiltInModelErrorTestInTable() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + errorTest(statement, "drop model sundial", "1506: Cannot delete built-in model: sundial"); + } + } + + @Test + public void showBuiltInModelTestInTree() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + showBuiltInModelTest(statement); + } + } + + @Test + public void showBuiltInModelTestInTable() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + showBuiltInModelTest(statement); + } + } + + private void showBuiltInModelTest(Statement statement) throws SQLException { + int built_in_model_count = 0; + final String showSql = "SHOW MODELS"; + try (ResultSet resultSet = statement.executeQuery(showSql)) { + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + checkHeader(resultSetMetaData, "ModelId,ModelType,Category,State"); + while (resultSet.next()) { + built_in_model_count++; + FakeModelInfo modelInfo = + new FakeModelInfo( + resultSet.getString(1), + resultSet.getString(2), + resultSet.getString(3), + resultSet.getString(4)); + assertTrue(BUILTIN_MODEL_MAP.containsKey(modelInfo.getModelId())); + assertEquals(BUILTIN_MODEL_MAP.get(modelInfo.getModelId()), modelInfo); + } + } + assertEquals(BUILTIN_MODEL_MAP.size(), built_in_model_count); + } + // ========== CallInference tests ========== @Test From 7e769c2084b7730bffa6573f1d9a37e50b167733 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 19 May 2026 13:06:12 +0800 Subject: [PATCH 3/4] Merge AINodeConcurrentForecastIT into AINodeSharedClusterIT MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The concurrent forecast test only needs a 1C1D1A cluster — the same one AINodeSharedClusterIT already builds. Move the test, its helpers, and its data prep (under a separate concurrent_db database to avoid clashing with db.AI) into AINodeSharedClusterIT, and delete the standalone class. Saves one cluster startup per CI run. AINodeClusterConfigIT stays separate: its REMOVE AINODE call permanently breaks the cluster, so sharing risks bleeding into other tests. --- .../ainode/it/AINodeConcurrentForecastIT.java | 127 ------------------ .../ainode/it/AINodeSharedClusterIT.java | 74 +++++++++- 2 files changed, 72 insertions(+), 129 deletions(-) delete mode 100644 integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentForecastIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentForecastIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentForecastIT.java deleted file mode 100644 index 3e1c616580e71..0000000000000 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentForecastIT.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.ainode.it; - -import org.apache.iotdb.ainode.utils.AINodeTestUtils; -import org.apache.iotdb.it.env.EnvFactory; -import org.apache.iotdb.it.framework.IoTDBTestRunner; -import org.apache.iotdb.itbase.category.AIClusterIT; -import org.apache.iotdb.itbase.env.BaseEnv; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Arrays; -import java.util.List; - -import static org.apache.iotdb.ainode.utils.AINodeTestUtils.checkModelNotOnSpecifiedDevice; -import static org.apache.iotdb.ainode.utils.AINodeTestUtils.checkModelOnSpecifiedDevice; -import static org.apache.iotdb.ainode.utils.AINodeTestUtils.concurrentInference; - -@RunWith(IoTDBTestRunner.class) -@Category({AIClusterIT.class}) -public class AINodeConcurrentForecastIT { - - private static final Logger LOGGER = LoggerFactory.getLogger(AINodeConcurrentForecastIT.class); - - private static final List MODEL_LIST = - Arrays.asList( - new AINodeTestUtils.FakeModelInfo("sundial", "sundial", "builtin", "active"), - new AINodeTestUtils.FakeModelInfo("timer_xl", "timer", "builtin", "active")); - - private static final String FORECAST_TABLE_FUNCTION_SQL_TEMPLATE = - "SELECT * FROM FORECAST(model_id=>'%s', targets=>(SELECT time,s FROM root.AI) ORDER BY time, output_length=>%d)"; - - @BeforeClass - public static void setUp() throws Exception { - // Init 1C1D1A cluster environment - EnvFactory.getEnv().initClusterEnvironment(1, 1); - prepareDataForTableModel(); - } - - @AfterClass - public static void tearDown() throws Exception { - EnvFactory.getEnv().cleanClusterEnvironment(); - } - - private static void prepareDataForTableModel() throws SQLException { - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - statement.execute("CREATE DATABASE root"); - statement.execute("CREATE TABLE root.AI (s DOUBLE FIELD)"); - for (int i = 0; i < 2880; i++) { - statement.addBatch( - String.format( - "INSERT INTO root.AI(time, s) VALUES(%d, %f)", i, Math.sin(i * Math.PI / 1440))); - if ((i + 1) % 500 == 0) { - statement.executeBatch(); - statement.clearBatch(); - } - } - statement.executeBatch(); - } - } - - @Test - public void concurrentForecastTest() throws SQLException, InterruptedException { - for (AINodeTestUtils.FakeModelInfo modelInfo : MODEL_LIST) { - concurrentGPUForecastTest(modelInfo, "0,1"); - // TODO: Enable cpu test after optimize memory consumption - // concurrentGPUForecastTest(modelInfo, "cpu"); - } - } - - public void concurrentGPUForecastTest(AINodeTestUtils.FakeModelInfo modelInfo, String devices) - throws SQLException, InterruptedException { - final int forecastLength = 512; - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - // Single forecast request can be processed successfully - final String forecastSQL = - String.format( - FORECAST_TABLE_FUNCTION_SQL_TEMPLATE, modelInfo.getModelId(), forecastLength); - final int threadCnt = 10; - // PR CI keeps a concurrency smoke check; nightly/daily can dial this up if regressions - // appear. - final int loop = 10; - statement.execute( - String.format("LOAD MODEL %s TO DEVICES '%s'", modelInfo.getModelId(), devices)); - checkModelOnSpecifiedDevice(statement, modelInfo.getModelId(), devices); - long startTime = System.currentTimeMillis(); - concurrentInference(statement, forecastSQL, threadCnt, loop, forecastLength); - long endTime = System.currentTimeMillis(); - LOGGER.info( - String.format( - "Model %s concurrent inference %d reqs (%d threads, %d loops) in GPU takes time: %dms", - modelInfo.getModelId(), threadCnt * loop, threadCnt, loop, endTime - startTime)); - statement.execute( - String.format("UNLOAD MODEL %s FROM DEVICES '%s'", modelInfo.getModelId(), devices)); - checkModelNotOnSpecifiedDevice(statement, modelInfo.getModelId(), devices); - } - } -} diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java index ffe8c6690653e..1accdedb53464 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java @@ -32,6 +32,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.ResultSet; @@ -49,6 +51,7 @@ import static org.apache.iotdb.ainode.utils.AINodeTestUtils.checkHeader; import static org.apache.iotdb.ainode.utils.AINodeTestUtils.checkModelNotOnSpecifiedDevice; import static org.apache.iotdb.ainode.utils.AINodeTestUtils.checkModelOnSpecifiedDevice; +import static org.apache.iotdb.ainode.utils.AINodeTestUtils.concurrentInference; import static org.apache.iotdb.ainode.utils.AINodeTestUtils.errorTest; import static org.apache.iotdb.ainode.utils.AINodeTestUtils.prepareDataInTable; import static org.apache.iotdb.ainode.utils.AINodeTestUtils.prepareDataInTree; @@ -59,13 +62,15 @@ /** * Consolidates AINodeDeviceManageIT, AINodeModelManageIT, AINodeCallInferenceIT, AINodeForecastIT, - * and AINodeInstanceManagementIT into a single class that shares one 1C1D1A cluster, avoiding 5 - * redundant cluster startups (~20 min saved). + * AINodeInstanceManagementIT, and AINodeConcurrentForecastIT into a single class that shares one + * 1C1D1A cluster, avoiding 6 redundant cluster startups. */ @RunWith(IoTDBTestRunner.class) @Category({AIClusterIT.class}) public class AINodeSharedClusterIT { + private static final Logger LOGGER = LoggerFactory.getLogger(AINodeSharedClusterIT.class); + private static final String TARGET_DEVICES_STR = "0,1"; private static final Set TARGET_DEVICES = new HashSet<>(Arrays.asList(TARGET_DEVICES_STR.split(","))); @@ -87,11 +92,19 @@ public class AINodeSharedClusterIT { + "timecol=>'%s'" + ")"; + private static final List CONCURRENT_FORECAST_MODELS = + Arrays.asList( + new FakeModelInfo("sundial", "sundial", "builtin", "active"), + new FakeModelInfo("timer_xl", "timer", "builtin", "active")); + private static final String CONCURRENT_FORECAST_SQL_TEMPLATE = + "SELECT * FROM FORECAST(model_id=>'%s', targets=>(SELECT time,s FROM concurrent_db.AI) ORDER BY time, output_length=>%d)"; + @BeforeClass public static void setUp() throws Exception { EnvFactory.getEnv().initClusterEnvironment(1, 1); prepareDataInTree(); prepareDataInTable(); + prepareDataForConcurrentForecast(); } @AfterClass @@ -410,6 +423,63 @@ public static void forecastTableFunctionErrorTest( statement, invalidTimecolSQL2, "701: The type of the column [s0] is not as expected."); } + // ========== Concurrent forecast tests ========== + + @Test + public void concurrentForecastTest() throws SQLException, InterruptedException { + for (FakeModelInfo modelInfo : CONCURRENT_FORECAST_MODELS) { + concurrentGPUForecastTest(modelInfo, "0,1"); + // TODO: Enable cpu test after optimize memory consumption + // concurrentGPUForecastTest(modelInfo, "cpu"); + } + } + + private void concurrentGPUForecastTest(FakeModelInfo modelInfo, String devices) + throws SQLException, InterruptedException { + final int forecastLength = 512; + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + final String forecastSQL = + String.format(CONCURRENT_FORECAST_SQL_TEMPLATE, modelInfo.getModelId(), forecastLength); + final int threadCnt = 10; + // PR CI keeps a concurrency smoke check; nightly/daily can dial this up if regressions + // appear. + final int loop = 10; + statement.execute( + String.format("LOAD MODEL %s TO DEVICES '%s'", modelInfo.getModelId(), devices)); + checkModelOnSpecifiedDevice(statement, modelInfo.getModelId(), devices); + long startTime = System.currentTimeMillis(); + concurrentInference(statement, forecastSQL, threadCnt, loop, forecastLength); + long endTime = System.currentTimeMillis(); + LOGGER.info( + String.format( + "Model %s concurrent inference %d reqs (%d threads, %d loops) in GPU takes time: %dms", + modelInfo.getModelId(), threadCnt * loop, threadCnt, loop, endTime - startTime)); + statement.execute( + String.format("UNLOAD MODEL %s FROM DEVICES '%s'", modelInfo.getModelId(), devices)); + checkModelNotOnSpecifiedDevice(statement, modelInfo.getModelId(), devices); + } + } + + private static void prepareDataForConcurrentForecast() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE concurrent_db"); + statement.execute("CREATE TABLE concurrent_db.AI (s DOUBLE FIELD)"); + for (int i = 0; i < 2880; i++) { + statement.addBatch( + String.format( + "INSERT INTO concurrent_db.AI(time, s) VALUES(%d, %f)", + i, Math.sin(i * Math.PI / 1440))); + if ((i + 1) % 500 == 0) { + statement.executeBatch(); + statement.clearBatch(); + } + } + statement.executeBatch(); + } + } + // ========== InstanceManagement tests ========== @Test From 680e4a8ea0356be321ccc7b359951fcdb6b0edc8 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 19 May 2026 13:16:43 +0800 Subject: [PATCH 4/4] Merge AINodeClusterConfigIT into AINodeSharedClusterIT (run last) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cluster-config test calls REMOVE AINODE, which permanently tears the AINode out of the cluster, so we couldn't share its 1C1D1A startup without guaranteeing it runs after every other @Test. Use @FixMethodOrder(NAME_ASCENDING) on AINodeSharedClusterIT and rename the migrated test to zzAiNodeRegisterAndRemoveMustRunLast — alphabetical sorting then keeps it strictly last. IoTDBTestRunner extends BlockJUnit4ClassRunner without overriding child-method ordering, so the JUnit-standard FixMethodOrder applies. The class doc warns reviewers not to add new @Test methods that sort after this one. The other 16 @Test methods are net-zero by construction (paired LOAD/UNLOAD, register/drop, or pure SHOW), so the alphabetical reorder doesn't introduce inter-test coupling. Saves the last per-CI cluster startup we couldn't fold in earlier. --- .../ainode/it/AINodeClusterConfigIT.java | 127 ------------------ .../ainode/it/AINodeSharedClusterIT.java | 92 ++++++++++++- 2 files changed, 90 insertions(+), 129 deletions(-) delete mode 100644 integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java deleted file mode 100644 index de6d3c48be3e0..0000000000000 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.ainode.it; - -import org.apache.iotdb.it.env.EnvFactory; -import org.apache.iotdb.it.framework.IoTDBTestRunner; -import org.apache.iotdb.itbase.category.AIClusterIT; -import org.apache.iotdb.itbase.env.BaseEnv; - -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; - -import static org.apache.iotdb.ainode.utils.AINodeTestUtils.checkHeader; -import static org.junit.Assert.assertEquals; - -@RunWith(IoTDBTestRunner.class) -@Category({AIClusterIT.class}) -public class AINodeClusterConfigIT { - - @BeforeClass - public static void setUp() throws Exception { - EnvFactory.getEnv().initClusterEnvironment(1, 1); - } - - @AfterClass - public static void tearDown() throws Exception { - EnvFactory.getEnv().cleanClusterEnvironment(); - } - - @Test - public void aiNodeRegisterAndRemoveTest() throws SQLException { - String show_sql = "SHOW AINODES"; - String title = "NodeID,Status,InternalAddress,InternalPort"; - - // Verify AINode exists via both dialects before removal - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - verifyAINodeExists(statement, show_sql, title); - } - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - verifyAINodeExists(statement, show_sql, title); - } - - // Remove AINode - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - statement.execute("REMOVE AINODE"); - waitForAINodeRemoval(statement, show_sql, title); - } - - // Verify removal is visible via table dialect as well - try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); - Statement statement = connection.createStatement()) { - try (ResultSet resultSet = statement.executeQuery(show_sql)) { - checkHeader(resultSet.getMetaData(), title); - int count = 0; - while (resultSet.next()) { - count++; - } - assertEquals(0, count); - } - } - } - - private static void verifyAINodeExists(Statement statement, String showSql, String title) - throws SQLException { - try (ResultSet resultSet = statement.executeQuery(showSql)) { - checkHeader(resultSet.getMetaData(), title); - int count = 0; - while (resultSet.next()) { - assertEquals("2", resultSet.getString(1)); - assertEquals("Running", resultSet.getString(2)); - count++; - } - assertEquals(1, count); - } - } - - private static void waitForAINodeRemoval(Statement statement, String showSql, String title) - throws SQLException { - for (int retry = 0; retry < 500; retry++) { - try (ResultSet resultSet = statement.executeQuery(showSql)) { - checkHeader(resultSet.getMetaData(), title); - int count = 0; - while (resultSet.next()) { - count++; - } - if (count == 0) { - return; - } - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - Assert.fail("The target AINode is not removed successfully after all retries."); - } -} diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java index 1accdedb53464..1686e916ff0b7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java @@ -29,9 +29,11 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import org.junit.runners.MethodSorters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,10 +64,16 @@ /** * Consolidates AINodeDeviceManageIT, AINodeModelManageIT, AINodeCallInferenceIT, AINodeForecastIT, - * AINodeInstanceManagementIT, and AINodeConcurrentForecastIT into a single class that shares one - * 1C1D1A cluster, avoiding 6 redundant cluster startups. + * AINodeInstanceManagementIT, AINodeConcurrentForecastIT, and AINodeClusterConfigIT into a single + * class that shares one 1C1D1A cluster, avoiding 7 redundant cluster startups. + * + *

Test methods run in alphabetical order via {@link FixMethodOrder} so that {@link + * #zzAiNodeRegisterAndRemoveMustRunLast()} (which calls {@code REMOVE AINODE} and tears the AINode + * out of the cluster) executes after every other test. Do not add new + * {@code @Test} methods whose names sort after {@code zz}. */ @RunWith(IoTDBTestRunner.class) +@FixMethodOrder(MethodSorters.NAME_ASCENDING) @Category({AIClusterIT.class}) public class AINodeSharedClusterIT { @@ -562,6 +570,86 @@ private void instanceFailTest(Statement statement) { "1510: Device ID list contains duplicate entries."); } + // ========== AINode lifecycle (must run last) ========== + + /** + * REMOVE AINODE permanently tears the AINode out of the cluster, so this test must run after + * every other {@code @Test} in the class. The {@code zz} prefix combined with {@link + * FixMethodOrder} on the class keeps it last under alphabetical sorting; do not add new test + * methods whose names sort after this one. + */ + @Test + public void zzAiNodeRegisterAndRemoveMustRunLast() throws SQLException { + final String showSql = "SHOW AINODES"; + final String title = "NodeID,Status,InternalAddress,InternalPort"; + + // Verify AINode exists via both dialects before removal + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + verifyAINodeExists(statement, showSql, title); + } + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + verifyAINodeExists(statement, showSql, title); + } + + // Remove AINode + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("REMOVE AINODE"); + waitForAINodeRemoval(statement, showSql, title); + } + + // Verify removal is visible via table dialect as well + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + try (ResultSet resultSet = statement.executeQuery(showSql)) { + checkHeader(resultSet.getMetaData(), title); + int count = 0; + while (resultSet.next()) { + count++; + } + assertEquals(0, count); + } + } + } + + private static void verifyAINodeExists(Statement statement, String showSql, String title) + throws SQLException { + try (ResultSet resultSet = statement.executeQuery(showSql)) { + checkHeader(resultSet.getMetaData(), title); + int count = 0; + while (resultSet.next()) { + assertEquals("2", resultSet.getString(1)); + assertEquals("Running", resultSet.getString(2)); + count++; + } + assertEquals(1, count); + } + } + + private static void waitForAINodeRemoval(Statement statement, String showSql, String title) + throws SQLException { + for (int retry = 0; retry < 500; retry++) { + try (ResultSet resultSet = statement.executeQuery(showSql)) { + checkHeader(resultSet.getMetaData(), title); + int count = 0; + while (resultSet.next()) { + count++; + } + if (count == 0) { + return; + } + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + Assert.fail("The target AINode is not removed successfully after all retries."); + } + // ========== Helper methods (from ModelManageIT) ========== private static void registerUserDefinedModel(