diff --git a/udf/worker/proto/common.proto b/udf/worker/proto/common.proto new file mode 100644 index 0000000000000..15890e2e855a6 --- /dev/null +++ b/udf/worker/proto/common.proto @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.spark.udf.worker; + +option java_package = "org.apache.spark.udf.worker"; + +option java_multiple_files = true; + +// The UDF in & output data format. +enum UDFWorkerDataFormat { + UDF_WORKER_DATA_FORMAT_UNSPECIFIED = 0; + + // The worker accepts and produces Apache arrow batches. + ARROW = 1; +} + +// The UDF execution type/shape. +message UDFShape { + oneof shape { + SparkUDFShapes spark = 1; + } +} + +enum SparkUDFShapes { + SPARK_UDF_SHAPE_UNSPECIFIED = 0; + + // UDF receives a row with 0+ columns as input + // and produces a single, scalar value as output + EXPRESSION = 1; + + // UDF receives a iterator to batch of rows as input and + // produces iterator to a batch of rows as output. + MAP_PARTITIONS = 2; +} diff --git a/udf/worker/proto/worker_spec.proto b/udf/worker/proto/worker_spec.proto new file mode 100644 index 0000000000000..6c9a4d3d10b48 --- /dev/null +++ b/udf/worker/proto/worker_spec.proto @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +import "udf/worker/proto/src/main/protobuf/common.proto"; + +package org.apache.spark.udf.worker; + +option java_package = "org.apache.spark.udf.worker"; +option java_multiple_files = true; + +/// +/// Worker specification +/// +message UDFWorkerSpecification { + WorkerEnvironment environment = 1; + WorkerCapabilities capabilities = 2; + + // How to create new workers. + // At the moment, only direct creation is supported. + // This can be extended with indirect/provisioned creation in the future. + oneof worker { + DirectWorker direct = 3; + } +} + +message WorkerEnvironment { + // Callable that is responsible for environment setup. + optional ProcessCallable installation = 1; + // Callable, which is called to verify that an environment + // is suitable to start the UDF worker. This callable + // needs to verify that + // - The worker code itself is present + // - Any needed dependencies are present + optional ProcessCallable environment_verification = 2; + // Callable, which is invoked after the worker has been terminated. + // This can be used to cleanup dependencies or temporary resources. + // Be careful not to cleanup resources that could be used by + // other workers running in parallel. + optional ProcessCallable environment_cleanup = 3; +} + +// Capabilities used for query planning +message WorkerCapabilities { + // The data formats that the worker supports for UDF data in- & output. + // Every worker MUST at least support ARROW. + // + // It is expected that for each UDF execution, the input format + // always matches the output format. + // + // If a worker supports multiple data formats, the engine will select + // the most suitable one for each UDF invocation. Which format was chosen + // is reported by the engine as part of the UDF protocol's init message. + repeated UDFWorkerDataFormat supported_data_formats = 1; + + // Which types of UDFs this worker supports. + // This should list all supported Shapes. + // Of which shape a specific UDF is will be communicated + // in the initial message of the UDF protocol. + // + // If a execution for an unsupported UDF type is requested + // the query will fail during query planning. + repeated UDFShape supported_udf_shapes = 2; + + // Whether multiple, concurrent UDF + // connections are supported by this worker + // (for example via multi-threading). + // + // In the first implementation of the engine-side + // worker specification, this property will not be used. + // + // Usage of this property, can be enabled in the future if the + // engine implements more advanced resource management (Tbd). + // + bool supports_concurrent_udfs = 3; + + // Whether compatible workers may be reused. + // If this is not supported, the worker is + // terminated after every single UDF invocation. + bool supports_reuse = 4; + + // To be extended with UDF chaining, ... +} + +// The worker that will be created to process UDFs +message DirectWorker { + // Blocking callable that is terminated by SIGTERM/SIGKILL + ProcessCallable runner = 1; + + UDFWorkerProperties properties = 2; +} + +message UDFWorkerProperties { + // Maximum amount of time to wait until the worker can accept connections. + // + // The engine will use this timeout, if it does not exceed a + // engine-configurable maximum time (e.g. 30 seconds). + optional int32 initialization_timeout_ms = 1; + + // Used for graceful engine-initiated termination signaled via SIGTERM. + // After this time, the worker process should have terminated itself. + // Otherwise, the process will be forcefully killed using SIGKILL. + // + // The engine will use this timeout, if it does not exceed a + // engine-configurable maximum time (e.g. 30 seconds). + optional int32 graceful_termination_timeout_ms = 2; + + // The connection this [[UDFWorker]] supports. Note that a single + // connection is sufficient to run multiple UDFs and (gRPC) services. + // + // On [[UDFWorker]] creation, connection information + // is passed to the callable as a string parameter. + // The string format depends on the [[WorkerConnection]]: + // + // For example, when using TCP, the callable argument will be: + // --connection PORT + // Here is a concrete example + // --connection 8080 + // + // For the format of each specific transport type, see the comments below. + WorkerConnection connection = 3; +} + +message WorkerConnection { + oneof transport { + UnixDomainSocket unix_domain_socket = 1; + TcpConnection tcp = 2; + } +} + + // Communication between the engine and worker +// is done using a UNIX domain socket. +// +// On [[UDFWorker]] creation, a path to a socket +// to listen on is passed as a argument. +// Examples: +// /tmp/channel-uuid.sock +// /some/system/path/channel-1234.sock +message UnixDomainSocket {} + +// Communication between the engine and worker +// is done using a localhost TCP connection. +// +// On [[UDFWorker]] creation, a PORT +// is passed as a connection parameter. +// +// It is expected that the worker binds to this +// port on both IPv4 and IPv6 localhost interfaces. +// E.g. the worker server should be reachable via +// 127.0.0.1:PORT and [::1]:PORT. +// +// Examples: +// 8080 +// 1234 +message TcpConnection {} + +message ProcessCallable { + // Executable to invoke. + // Examples: + // ["python3", "-m"] + // ["worker.bin"] + // ["java", "worker.java"] + // ["bin/bash", "-c"] + // This executable should be blocking, until the task is finished. + // Termination is requested via a SIGTERM signal. + // + // Success/Failure can be indicated via exit codes: + // Exit code 0 -> Success + // Exit code != 0 -> Failure + repeated string command = 1; + + // Arguments passed directly to the executable. + // Examples: + // ["udf_worker.py"] + // [""] + // ["--max_concurrency", "5"] + // ["\"echo 'Test'\""] + // + // Every executable will ALWAYS receive a + // --id argument. This argument CANNOT be part of the below list of arguments. + // The value of the id argument is a string with the engine-assigned + // id of this UDF Worker. This can be used in logs and other state information. + repeated string arguments = 2; + + // Environment variables for the invoked process. + map environment_variables = 3; +}