Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ task_database_file_path=system/database/task.db
# Proactively triggers the interval for batch deliveries
# Effective mode: on every start
# Data type: long
executor_cron_heartbeat_event_interval_seconds=20
executor_cron_heartbeat_event_interval_seconds=5

# Task progress storage interval
# Effective mode: on every start
Expand Down Expand Up @@ -114,7 +114,7 @@ max_retry_times=5
# Bind with rpc_address
# Effective mode: on every start
# Data type: int
rpc_port=6667
rpc_port=6668

# Used for connection of IoTDB native clients(Session)
# Could set 127.0.0.1(for local test) or ipv4 address
Expand Down Expand Up @@ -156,4 +156,9 @@ pipe_leader_cache_memory_usage_percentage=0.1
# Enable/disable reference tracking for pipe events
# Effective mode: on every start
# Data type: boolean
pipe_event_reference_tracking_enabled=true
pipe_event_reference_tracking_enabled=true

# The maximum number of tablets that can be in a batch
# Effective mode: on every start
# Data type: int
pipe_max_allowed_event_count_in_tablet_batch=100
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,79 @@
}
};

public static volatile Option<Integer> DATA_NODE_ID =
public static final Option<Integer> DATA_NODE_ID =
new Option<Integer>("data_node_id", -1) {
@Override
public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Boolean> PIPE_MEMORY_MANAGEMENT_ENABLED =
new Option<Boolean>("pipe_memory_management_enabled", true) {
@Override
public void setValue(String valueString) {
value = Boolean.parseBoolean(valueString);
}
};

public static final Option<Integer> PIPE_MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES =
new Option<Integer>("pipe_memory_allocate_min_size_in_bytes", 32) {
@Override
public void setValue(String valueString) {
value = Integer.parseInt(valueString);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}
};

public static final Option<Double> PIPE_TOTAL_FLOATING_MEMORY_PROPORTION =
new Option<Double>("pipe_total_floating_memory_proportion", 0.2) {
@Override
public void setValue(String valueString) {
value = Double.parseDouble(valueString);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}
};

public static final Option<Integer> PIPE_MAX_ALLOWED_EVENT_COUNT_IN_TABLET_BATCH =
new Option<Integer>("pipe_max_allowed_event_count_in_tablet_batch", 100) {
@Override
public void setValue(String valueString) {
value = Integer.parseInt(valueString);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}
};

public static final Option<Integer> PIPE_MEMORY_ALLOCATE_MAX_RETRIES =
new Option<Integer>("pipe_memory_allocate_max_retries", 10) {
@Override
public void setValue(String valueString) {
value = Integer.parseInt(valueString);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}
};

public static final Option<Integer> PIPE_MEMORY_ALLOCATE_RETRY_INTERVAL_MS =
new Option<Integer>("pipe_memory_allocate_retry_interval_ms", 50) {
@Override
public void setValue(String valueString) {
value = Integer.parseInt(valueString);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}
};

public static final Option<Double>
PIPE_DATA_STRUCTURE_TABLET_MEMORY_BLOCK_ALLOCATION_REJECT_THRESHOLD =
new Option<Double>(
"pipe_data_structure_tablet_memory_block_allocation_reject_threshold", 0.4) {
@Override
public void setValue(String valueString) {
value = Double.parseDouble(valueString);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}
};

public static final Option<Double>
PIPE_DATA_STRUCTURE_TS_FILE_MEMORY_BLOCK_ALLOCATION_REJECT_THRESHOLD =
new Option<Double>(
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold", 0.4) {
@Override
public void setValue(String valueString) {
value = Double.parseDouble(valueString);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.collector.plugin.api;

import org.apache.iotdb.collector.plugin.api.customizer.CollectorParameters;
import org.apache.iotdb.collector.runtime.progress.ProgressIndex;
import org.apache.iotdb.pipe.api.PipeSource;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;

import java.util.Optional;

import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.SOURCE_DEVICE_ID_DEFAULT_VALUE;
import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.SOURCE_DEVICE_ID_KEY;
import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.SOURCE_IS_ALIGNED_DEFAULT_VALUE;
import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.SOURCE_IS_ALIGNED_KEY;
import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.SOURCE_REPORT_TIME_INTERVAL_DEFAULT_VALUE;
import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.SOURCE_REPORT_TIME_INTERVAL_KEY;
import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.SOURCE_SQL_DIALECT_DEFAULT_VALUE;
import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.SOURCE_SQL_DIALECT_KEY;
import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.SOURCE_SQL_DIALECT_VALUE_SET;

public abstract class BaseSource implements PipeSource {

protected ProgressIndex startIndex;
protected int instanceIndex;

protected String deviceId;
protected Boolean isAligned;
protected String sqlDialect;
protected int reportTimeInterval;

@Override
public void validate(final PipeParameterValidator validator) throws Exception {
CollectorParameters.validateBooleanParam(
validator, SOURCE_IS_ALIGNED_KEY, SOURCE_IS_ALIGNED_DEFAULT_VALUE);

CollectorParameters.validateSetParam(
validator,
SOURCE_SQL_DIALECT_KEY,
SOURCE_SQL_DIALECT_VALUE_SET,
SOURCE_SQL_DIALECT_DEFAULT_VALUE);

CollectorParameters.validateIntegerParam(
validator,
SOURCE_REPORT_TIME_INTERVAL_KEY,
SOURCE_REPORT_TIME_INTERVAL_DEFAULT_VALUE,
value -> value > 0);
}

@Override
public void customize(
final PipeParameters pipeParameters,
final PipeSourceRuntimeConfiguration pipeSourceRuntimeConfiguration)
throws Exception {
deviceId =
pipeParameters.getStringOrDefault(SOURCE_DEVICE_ID_KEY, SOURCE_DEVICE_ID_DEFAULT_VALUE);
isAligned =
pipeParameters.getBooleanOrDefault(SOURCE_IS_ALIGNED_KEY, SOURCE_IS_ALIGNED_DEFAULT_VALUE);
sqlDialect =
pipeParameters.getStringOrDefault(SOURCE_SQL_DIALECT_KEY, SOURCE_SQL_DIALECT_DEFAULT_VALUE);
reportTimeInterval =
pipeParameters.getIntOrDefault(
SOURCE_REPORT_TIME_INTERVAL_KEY, SOURCE_REPORT_TIME_INTERVAL_DEFAULT_VALUE);
}

@Override
public final void customize(

Check notice

Code scanning / CodeQL

Confusing overloading of methods Note

Method BaseSource.customize(..) could be confused with overloaded method
customize
, since dispatch depends on static types.
PipeParameters pipeParameters,
PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration)
throws Exception {
throw new UnsupportedOperationException();
}

public abstract Optional<ProgressIndex> report();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@

package org.apache.iotdb.collector.plugin.api;

import org.apache.iotdb.collector.runtime.progress.ProgressIndex;
import org.apache.iotdb.pipe.api.PipeSource;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;

import java.util.Optional;

public abstract class PullSource implements PipeSource {
public abstract class PullSource extends BaseSource implements PipeSource {

@Override
public final void customize(
PipeParameters pipeParameters,
PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration) {
throw new UnsupportedOperationException();
public void validate(final PipeParameterValidator validator) throws Exception {
super.validate(validator);
}

public abstract Optional<ProgressIndex> report();
@Override
public void customize(
final PipeParameters pipeParameters,
final PipeSourceRuntimeConfiguration pipeSourceRuntimeConfiguration)
throws Exception {
super.customize(pipeParameters, pipeSourceRuntimeConfiguration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@

package org.apache.iotdb.collector.plugin.api;

import org.apache.iotdb.collector.runtime.progress.ProgressIndex;
import org.apache.iotdb.collector.runtime.task.TaskDispatch;
import org.apache.iotdb.pipe.api.PipeSource;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;

import java.util.Optional;

public abstract class PushSource implements PipeSource {
public abstract class PushSource extends BaseSource implements PipeSource {

private TaskDispatch dispatch;

Expand All @@ -40,11 +38,16 @@ public PushSource() {
}

@Override
public final void customize(
PipeParameters pipeParameters,
PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration)
public void validate(final PipeParameterValidator validator) throws Exception {
super.validate(validator);
}

@Override
public void customize(
final PipeParameters pipeParameters,
final PipeSourceRuntimeConfiguration pipeSourceRuntimeConfiguration)
throws Exception {
throw new UnsupportedOperationException();
super.customize(pipeParameters, pipeSourceRuntimeConfiguration);
}

public final void setCollector(final EventCollector collector) {
Expand Down Expand Up @@ -75,6 +78,4 @@ public final void supply(final Event event) throws Exception {
public final void setDispatch(final TaskDispatch dispatch) {
this.dispatch = dispatch;
}

public abstract Optional<ProgressIndex> report();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,65 @@

package org.apache.iotdb.collector.plugin.api.customizer;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;

import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;

import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.BOOLEAN_SET;

public class CollectorParameters {
private static final Set<String> PARAM_SET =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("source", "processor", "sink")));

public static boolean matchAnyParam(final String param) {
return PARAM_SET.contains(param);
public static void validateStringRequiredParam(
final PipeParameterValidator validator, final String paramKey) {
validator.validate(
o -> Objects.nonNull(validator.getParameters().getString((String) o)),
String.format("%s is required, but git null.", paramKey),
paramKey);
}

public static void validateBooleanParam(
final PipeParameterValidator validator, final String paramKey, final boolean defaultValue) {
validateSetParam(validator, paramKey, BOOLEAN_SET, String.valueOf(defaultValue));
}

public static void validateSetParam(
final PipeParameterValidator validator,
final String paramKey,
final Set<String> valueSet,
final String defaultValue) {
final String paramValue = validator.getParameters().getStringOrDefault(paramKey, defaultValue);

validator.validate(
o -> valueSet.contains(o.toString()),
String.format("%s must be one of %s, but got %s.", paramKey, valueSet, paramValue),
paramValue);
}

public static void validateIntegerParam(
final PipeParameterValidator validator,
final String paramKey,
final Integer paramDefaultValue,
final Predicate<Integer> validationCondition) {
final int paramValue = validator.getParameters().getIntOrDefault(paramKey, paramDefaultValue);

validator.validate(
value -> validationCondition.test((Integer) value),
String.format("%s must be > 0, but got %d.", paramKey, paramValue),
paramValue);
}

public static void validateLongParam(
final PipeParameterValidator validator,
final String paramKey,
final Long paramDefaultValue,
final Predicate<Long> validationCondition) {
final long paramValue = validator.getParameters().getLongOrDefault(paramKey, paramDefaultValue);

validator.validate(
value -> validationCondition.test((Long) value),
String.format("%s must be > 0, but got %d.", paramKey, paramValue),
paramValue);
}
}
Loading