diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java index e5769a5c6f6c7..c4c3986259f55 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java @@ -24,7 +24,9 @@ import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.i18n.PipeMessages; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.receiver.PipeReceiverFilePathUtils; import org.apache.iotdb.commons.queryengine.common.SessionInfo; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.auth.AuthorityChecker; @@ -54,6 +56,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.file.Paths; import java.time.ZoneId; import java.util.Map; import java.util.Objects; @@ -267,9 +270,11 @@ private SyncIdentityInfo getCurrentSyncIdentityInfo() { * @param tsFilePipeData pipeData * @param fileDir path of file data dir */ - private void handleTsFilePipeData(final TsFilePipeData tsFilePipeData, final String fileDir) { + private void handleTsFilePipeData(final TsFilePipeData tsFilePipeData, final String fileDir) + throws IOException { final String tsFileName = tsFilePipeData.getTsFileName(); - final File dir = new File(fileDir); + final File tsFile = resolveFileInFileDataDir(fileDir, tsFileName); + final File dir = tsFile.getParentFile(); final File[] targetFiles = dir.listFiles((dir1, name) -> name.startsWith(tsFileName) && name.endsWith(PATCH_SUFFIX)); if (targetFiles != null) { @@ -311,10 +316,18 @@ public TSStatus transportFile(final TSyncTransportMetaInfo metaInfo, final ByteB final String fileDir = getFileDataDir(identityInfo); final String fileName = metaInfo.fileName; final long startIndex = metaInfo.startIndex; - final File file = new File(fileDir, fileName + PATCH_SUFFIX); + final File file; + final File fileWithoutPatch; + try { + fileWithoutPatch = resolveFileInFileDataDir(fileDir, fileName); + file = resolveFileInFileDataDir(fileDir, fileName + PATCH_SUFFIX); + } catch (final IOException e) { + LOGGER.warn(e.getMessage()); + return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage()); + } // step2. check startIndex - final IndexCheckResult result = checkStartIndexValid(new File(fileDir, fileName), startIndex); + final IndexCheckResult result = checkStartIndexValid(fileWithoutPatch, startIndex); if (!result.isResult()) { return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, result.getIndex()); } @@ -326,7 +339,7 @@ public TSStatus transportFile(final TSyncTransportMetaInfo metaInfo, final ByteB final byte[] byteArray = new byte[length]; buff.get(byteArray); randomAccessFile.write(byteArray); - recordStartIndex(new File(fileDir, fileName), startIndex + length); + recordStartIndex(fileWithoutPatch, startIndex + length); LOGGER.debug( DataNodePipeMessages.SYNC_START_AT_TO_IS_DONE, fileName, startIndex, startIndex + length); } catch (final IOException e) { @@ -337,6 +350,23 @@ public TSStatus transportFile(final TSyncTransportMetaInfo metaInfo, final ByteB return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""); } + private static File resolveFileInFileDataDir(final String fileDir, final String fileName) + throws IOException { + if (StringUtils.isEmpty(fileName)) { + throw new IOException(String.format(PipeMessages.ILLEGAL_FILENAME_PATH_TRAVERSAL, fileName)); + } + + final String illegalError = FileUtils.getIllegalError4Directory(fileName); + if (Objects.nonNull(illegalError)) { + throw new IOException( + String.format(PipeMessages.ILLEGAL_FILENAME_PATH_TRAVERSAL, fileName) + + ", " + + illegalError); + } + + return PipeReceiverFilePathUtils.resolveFilePath(Paths.get(fileDir), fileName).toFile(); + } + private IndexCheckResult checkStartIndexValid(final File file, final long startIndex) { // get local index from memory map long localIndex = getCurrentFileStartIndex(file.getAbsolutePath()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index 829c9aed6b988..5ae7942d20133 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.property.ThriftClientProperty; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; @@ -52,6 +53,9 @@ import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp; +import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo; import org.apache.iotdb.session.pool.SessionPool; @@ -66,6 +70,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.time.ZoneId; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -229,6 +234,7 @@ public void handshake() throws Exception { useSSL, trustStore, trustStorePwd); + openClientSession(); final TSyncIdentityInfo identityInfo = new TSyncIdentityInfo( pipeName, System.currentTimeMillis(), syncConnectorVersion, databaseName); @@ -259,6 +265,26 @@ public void handshake() throws Exception { .build(); } + private void openClientSession() throws TException { + final TSOpenSessionReq openSessionReq = new TSOpenSessionReq(); + openSessionReq.setClient_protocol(TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3); + openSessionReq.setUsername(user); + openSessionReq.setPassword(password); + openSessionReq.setZoneId(ZoneId.systemDefault().toString()); + openSessionReq.putToConfiguration("version", IoTDBConstant.ClientVersion.V_1_0.toString()); + openSessionReq.putToConfiguration("sql_dialect", "tree"); + + final TSOpenSessionResp openSessionResp = client.openSession(openSessionReq); + if (openSessionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + final String errorMsg = + String.format( + "Failed to login to receiver %s:%s for legacy pipe transfer because %s", + ipAddress, port, openSessionResp.getStatus().getMessage()); + LOGGER.warn(errorMsg); + throw new PipeRuntimeCriticalException(errorMsg); + } + } + @Override public void heartbeat() throws Exception { // do nothing diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 35ac06036b223..b9069d5910d97 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.audit.AuditEventType; import org.apache.iotdb.commons.audit.AuditLogFields; import org.apache.iotdb.commons.audit.AuditLogOperation; +import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; @@ -3398,24 +3399,58 @@ public TSStatus createTimeseriesUsingSchemaTemplate(TCreateTimeseriesUsingSchema @Override public TSStatus handshake(final TSyncIdentityInfo info) throws TException { - return PipeDataNodeAgent.receiver() - .legacy() - .handshake( - info, - SESSION_MANAGER.getCurrSession().getClientAddress(), - partitionFetcher, - schemaFetcher); + try { + final TSStatus status = checkLegacyPipeReceiverPermission(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + return PipeDataNodeAgent.receiver() + .legacy() + .handshake( + info, + SESSION_MANAGER.getCurrSession().getClientAddress(), + partitionFetcher, + schemaFetcher); + } finally { + SESSION_MANAGER.updateIdleTime(); + } } @Override public TSStatus sendPipeData(final ByteBuffer buff) throws TException { - return PipeDataNodeAgent.receiver().legacy().transportPipeData(buff); + try { + final TSStatus status = checkLegacyPipeReceiverPermission(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + return PipeDataNodeAgent.receiver().legacy().transportPipeData(buff); + } finally { + SESSION_MANAGER.updateIdleTime(); + } } @Override public TSStatus sendFile(final TSyncTransportMetaInfo metaInfo, final ByteBuffer buff) throws TException { - return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo, buff); + try { + final TSStatus status = checkLegacyPipeReceiverPermission(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo, buff); + } finally { + SESSION_MANAGER.updateIdleTime(); + } + } + + private TSStatus checkLegacyPipeReceiverPermission() { + final IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return getNotLoggedInStatus(); + } + return AuthorityChecker.getTSStatus( + AuthorityChecker.checkSystemPermission(clientSession.getUsername(), PrivilegeType.USE_PIPE), + PrivilegeType.USE_PIPE); } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java new file mode 100644 index 0000000000000..5ce4df74f7fa7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.protocol.legacy; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; +import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; + +public class IoTDBLegacyPipeReceiverAgentTest { + + private static final String PIPE_NAME = "poc"; + private static final long CREATE_TIME = 1700000000000L; + private static final String REMOTE_ADDRESS = "127.0.0.1"; + + private String originalSyncDir; + private Path syncDir; + private IoTDBLegacyPipeReceiverAgent agent; + + @Before + public void setUp() throws Exception { + originalSyncDir = CommonDescriptor.getInstance().getConfig().getSyncDir(); + syncDir = Files.createTempDirectory("legacy-pipe-receiver"); + CommonDescriptor.getInstance().getConfig().setSyncDir(syncDir.toString()); + + agent = new IoTDBLegacyPipeReceiverAgent(); + final TSStatus status = + agent.handshake( + new TSyncIdentityInfo(PIPE_NAME, CREATE_TIME, "UNKNOWN", ""), + REMOTE_ADDRESS, + null, + null); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + } + + @After + public void tearDown() throws Exception { + if (agent != null) { + agent.handleClientExit(); + } + CommonDescriptor.getInstance().getConfig().setSyncDir(originalSyncDir); + if (syncDir != null) { + org.apache.tsfile.external.commons.io.FileUtils.deleteDirectory(syncDir.toFile()); + } + } + + @Test + public void testTransportFileRejectsPathTraversal() throws Exception { + final String traversal = + ".." + File.separator + ".." + File.separator + ".." + File.separator + "pwned"; + + final TSStatus status = + agent.transportFile( + new TSyncTransportMetaInfo(traversal, 0), + ByteBuffer.wrap("pwned".getBytes(StandardCharsets.UTF_8))); + + Assert.assertEquals(TSStatusCode.SYNC_FILE_ERROR.getStatusCode(), status.getCode()); + Assert.assertTrue(status.getMessage().contains("Illegal fileName")); + Assert.assertFalse(Files.exists(syncDir.resolve("pwned.patch"))); + } + + @Test + public void testTransportFileWritesPlainFileUnderFileDataDir() throws Exception { + final String fileName = "1-2-3-4.tsfile"; + final byte[] payload = "iotdb".getBytes(StandardCharsets.UTF_8); + + final TSStatus status = + agent.transportFile(new TSyncTransportMetaInfo(fileName, 0), ByteBuffer.wrap(payload)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + final Path patchFile = getFileDataDir().resolve(fileName + ".patch"); + Assert.assertArrayEquals(payload, Files.readAllBytes(patchFile)); + } + + @Test + public void testTransportPipeDataRejectsPathTraversalTsFileName() throws Exception { + final String traversal = ".." + File.separator + "evil.tsfile"; + + final TSStatus status = + agent.transportPipeData(ByteBuffer.wrap(new TsFilePipeData("", traversal, -1).serialize())); + + Assert.assertEquals(TSStatusCode.PIPESERVER_ERROR.getStatusCode(), status.getCode()); + Assert.assertTrue(status.getMessage().contains("Illegal fileName")); + } + + private Path getFileDataDir() { + return syncDir + .resolve("receiver") + .resolve(String.format("%s-%d-%s", PIPE_NAME, CREATE_TIME, REMOTE_ADDRESS)) + .resolve("file-data"); + } +}