Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions udf/worker/proto/common.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

syntax = "proto3";

package org.apache.spark.udf.worker;

option java_package = "org.apache.spark.udf.worker";

option java_multiple_files = true;

// The UDF in & output data format.
enum UDFWorkerDataFormat {
UDF_WORKER_DATA_FORMAT_UNSPECIFIED = 0;

// The worker accepts and produces Apache arrow batches.
ARROW = 1;
}

// The UDF execution type/shape.
message UDFShape {
oneof shape {
SparkUDFShapes spark = 1;
}
}

enum SparkUDFShapes {
SPARK_UDF_SHAPE_UNSPECIFIED = 0;

// UDF receives a row with 0+ columns as input
// and produces a single, scalar value as output
EXPRESSION = 1;

// UDF receives a iterator to batch of rows as input and
// produces iterator to a batch of rows as output.
MAP_PARTITIONS = 2;
}
202 changes: 202 additions & 0 deletions udf/worker/proto/worker_spec.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

syntax = "proto3";

import "udf/worker/proto/src/main/protobuf/common.proto";

package org.apache.spark.udf.worker;

option java_package = "org.apache.spark.udf.worker";
option java_multiple_files = true;

///
/// Worker specification
///
message UDFWorkerSpecification {
WorkerEnvironment environment = 1;
WorkerCapabilities capabilities = 2;

// How to create new workers.
// At the moment, only direct creation is supported.
// This can be extended with indirect/provisioned creation in the future.
oneof worker {
DirectWorker direct = 3;
}
}

message WorkerEnvironment {
// Callable that is responsible for environment setup.
optional ProcessCallable installation = 1;
// Callable, which is called to verify that an environment
// is suitable to start the UDF worker. This callable
// needs to verify that
// - The worker code itself is present
// - Any needed dependencies are present
optional ProcessCallable environment_verification = 2;
// Callable, which is invoked after the worker has been terminated.
// This can be used to cleanup dependencies or temporary resources.
// Be careful not to cleanup resources that could be used by
// other workers running in parallel.
optional ProcessCallable environment_cleanup = 3;
}

// Capabilities used for query planning
message WorkerCapabilities {
// The data formats that the worker supports for UDF data in- & output.
// Every worker MUST at least support ARROW.
//
// It is expected that for each UDF execution, the input format
// always matches the output format.
//
// If a worker supports multiple data formats, the engine will select
// the most suitable one for each UDF invocation. Which format was chosen
// is reported by the engine as part of the UDF protocol's init message.
repeated UDFWorkerDataFormat supported_data_formats = 1;

// Which types of UDFs this worker supports.
// This should list all supported Shapes.
// Of which shape a specific UDF is will be communicated
// in the initial message of the UDF protocol.
//
// If a execution for an unsupported UDF type is requested
// the query will fail during query planning.
repeated UDFShape supported_udf_shapes = 2;

// Whether multiple, concurrent UDF
// connections are supported by this worker
// (for example via multi-threading).
//
// In the first implementation of the engine-side
// worker specification, this property will not be used.
//
// Usage of this property, can be enabled in the future if the
// engine implements more advanced resource management (Tbd).
//
bool supports_concurrent_udfs = 3;

// Whether compatible workers may be reused.
// If this is not supported, the worker is
// terminated after every single UDF invocation.
bool supports_reuse = 4;

// To be extended with UDF chaining, ...
}

// The worker that will be created to process UDFs
message DirectWorker {
// Blocking callable that is terminated by SIGTERM/SIGKILL
ProcessCallable runner = 1;

UDFWorkerProperties properties = 2;
}

message UDFWorkerProperties {
// Maximum amount of time to wait until the worker can accept connections.
//
// The engine will use this timeout, if it does not exceed a
// engine-configurable maximum time (e.g. 30 seconds).
optional int32 initialization_timeout_ms = 1;

// Used for graceful engine-initiated termination signaled via SIGTERM.
// After this time, the worker process should have terminated itself.
// Otherwise, the process will be forcefully killed using SIGKILL.
//
// The engine will use this timeout, if it does not exceed a
// engine-configurable maximum time (e.g. 30 seconds).
optional int32 graceful_termination_timeout_ms = 2;

// The connection this [[UDFWorker]] supports. Note that a single
// connection is sufficient to run multiple UDFs and (gRPC) services.
//
// On [[UDFWorker]] creation, connection information
// is passed to the callable as a string parameter.
// The string format depends on the [[WorkerConnection]]:
//
// For example, when using TCP, the callable argument will be:
// --connection PORT
// Here is a concrete example
// --connection 8080
//
// For the format of each specific transport type, see the comments below.
WorkerConnection connection = 3;
}

message WorkerConnection {
oneof transport {
UnixDomainSocket unix_domain_socket = 1;
TcpConnection tcp = 2;
}
}

// Communication between the engine and worker
// is done using a UNIX domain socket.
//
// On [[UDFWorker]] creation, a path to a socket
// to listen on is passed as a argument.
// Examples:
// /tmp/channel-uuid.sock
// /some/system/path/channel-1234.sock
message UnixDomainSocket {}

// Communication between the engine and worker
// is done using a localhost TCP connection.
//
// On [[UDFWorker]] creation, a PORT
// is passed as a connection parameter.
//
// It is expected that the worker binds to this
// port on both IPv4 and IPv6 localhost interfaces.
// E.g. the worker server should be reachable via
// 127.0.0.1:PORT and [::1]:PORT.
//
// Examples:
// 8080
// 1234
message TcpConnection {}

message ProcessCallable {
// Executable to invoke.
// Examples:
// ["python3", "-m"]
// ["worker.bin"]
// ["java", "worker.java"]
// ["bin/bash", "-c"]
// This executable should be blocking, until the task is finished.
// Termination is requested via a SIGTERM signal.
//
// Success/Failure can be indicated via exit codes:
// Exit code 0 -> Success
// Exit code != 0 -> Failure
repeated string command = 1;

// Arguments passed directly to the executable.
// Examples:
// ["udf_worker.py"]
// [""]
// ["--max_concurrency", "5"]
// ["\"echo 'Test'\""]
//
// Every executable will ALWAYS receive a
// --id argument. This argument CANNOT be part of the below list of arguments.
// The value of the id argument is a string with the engine-assigned
// id of this UDF Worker. This can be used in logs and other state information.
repeated string arguments = 2;

// Environment variables for the invoked process.
map<string, string> environment_variables = 3;
}