From 93d5c8423b6b370dd12fa52a9ac16f65905911da Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Wed, 3 Jun 2026 13:26:48 +0200 Subject: [PATCH 1/2] Implement Docker support with image creation and configuration updates --- .dockerignore | 22 ++++ .github/workflows/docker.yml | 53 +++++++++ Dockerfile | 35 ++++++ bin/wayang-submit | 105 +++++++++--------- wayang-assembly/README.md | 49 ++++++++ .../java/channels/ChannelConversions.java | 95 +++++++++------- .../wayang/java/plugin/JavaBasicPlugin.java | 2 +- .../plugin/JavaChannelConversionPlugin.java | 2 +- 8 files changed, 269 insertions(+), 94 deletions(-) create mode 100644 .dockerignore create mode 100644 .github/workflows/docker.yml create mode 100644 Dockerfile diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..ab28f868b --- /dev/null +++ b/.dockerignore @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +* +!Dockerfile +!wayang-assembly/ +wayang-assembly/** +!wayang-assembly/target/ +wayang-assembly/target/** +!wayang-assembly/target/*-dist.tar.gz diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml new file mode 100644 index 000000000..1eb2b9e80 --- /dev/null +++ b/.github/workflows/docker.yml @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: Docker + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +concurrency: + group: docker-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + build: + name: Build Docker image + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v5 + + - name: Set up JDK 17 + uses: actions/setup-java@v5 + with: + java-version: 17 + distribution: temurin + cache: maven + + - name: Build Wayang distribution + run: | + ./mvnw clean install -B -Dmaven.test.skip=true + ./mvnw package -B -pl :wayang-assembly -Pdistribution -Dmaven.test.skip=true + + - name: Build Docker image + run: docker build -t apache-wayang:ci . + + - name: Smoke test Java platform + run: docker run --rm apache-wayang:ci diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..f6edb4e84 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM eclipse-temurin:17-jre + +ARG WAYANG_DIST=wayang-assembly/target/*-dist.tar.gz + +ENV WAYANG_HOME=/opt/wayang +ENV PATH="${WAYANG_HOME}/bin:${PATH}" + +COPY ${WAYANG_DIST} /tmp/wayang-dist.tar.gz + +RUN mkdir -p /opt /root/.wayang \ + && tar -xzf /tmp/wayang-dist.tar.gz -C /opt \ + && extracted_dir="$(find /opt -mindepth 1 -maxdepth 1 -type d -name 'wayang-*' | head -n 1)" \ + && test -n "${extracted_dir}" \ + && ln -s "${extracted_dir}" "${WAYANG_HOME}" \ + && rm /tmp/wayang-dist.tar.gz + +WORKDIR /opt/wayang + +ENTRYPOINT ["/opt/wayang/bin/wayang-submit"] +CMD ["org.apache.wayang.apps.pi.PiEstimation", "java", "1"] diff --git a/bin/wayang-submit b/bin/wayang-submit index 92d87c44d..ca8ad9c07 100755 --- a/bin/wayang-submit +++ b/bin/wayang-submit @@ -41,21 +41,6 @@ if [ -z "${WAYANG_HOME}" ]; then export WAYANG_HOME=$( get_realpath ${BASH_SOURCE[0]} ) fi -if [ -z "${SPARK_HOME}" ]; then - echo "The variable SPARK_HOME it needs to be setup" >&2 - exit 1 -fi - -#if [ -z "${FLINK_HOME}" ]; then -# echo "The variable FLINK_HOME it needs to be setup" >&2 -# exit 1 -#fi - -if [ -z "${HADOOP_HOME}" ]; then - echo "The variable HADOOP_HOME it needs to be setup" >&2 - exit 1 -fi - # Find the java binary if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" @@ -68,57 +53,77 @@ else fi fi -# Find Spark jars. -if [ -d "${SPARK_HOME}" ]; then - SPARK_JARS_DIR="${SPARK_HOME}/jars" -fi +WAYANG_CODE="${WAYANG_HOME}/jars" -# Find Hadoop jars. -if [ -d "${HADOOP_HOME}" ]; then - HADOOP_JARS_DIR="${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/common/lib/*" -fi +WAYANG_LIBS="${WAYANG_HOME}/libs" -if [ "$(ls ${SPARK_JARS_DIR} | grep ^hadoop | wc -l)" == "0" ]; then +WAYANG_CONF="${WAYANG_HOME}/conf" - HADOOP_JARS_DIR="${HADOOP_JARS_DIR}:${HADOOP_HOME}/share/hadoop/mapreduce/$(ls ${HADOOP_HOME}/share/hadoop/mapreduce | grep ^hadoop-mapreduce-client-common | grep -v tests | head -n 1)" - HADOOP_JARS_DIR="${HADOOP_JARS_DIR}:${HADOOP_HOME}/share/hadoop/mapreduce/$(ls ${HADOOP_HOME}/share/hadoop/mapreduce | grep ^hadoop-mapreduce-client-core | grep -v tests | head -n 1)" - HADOOP_JARS_DIR="${HADOOP_JARS_DIR}:${HADOOP_HOME}/share/hadoop/mapreduce/$(ls ${HADOOP_HOME}/share/hadoop/mapreduce | grep ^hadoop-mapreduce-client-jobclient | grep -v tests | head -n 1)" - HADOOP_JARS_DIR="${HADOOP_JARS_DIR}:${HADOOP_HOME}/share/hadoop/hdfs/$(ls ${HADOOP_HOME}/share/hadoop/hdfs | grep ^hadoop-hdfs-client | grep -v tests | head -n 1)" - HADOOP_JARS_DIR="${HADOOP_JARS_DIR}:${HADOOP_HOME}/share/hadoop/hdfs/lib/$(ls ${HADOOP_HOME}/share/hadoop/hdfs/lib | grep ^hadoop-annotations | grep -v tests | head -n 1)" - HADOOP_JARS_DIR="${HADOOP_JARS_DIR}:${HADOOP_HOME}/share/hadoop/hdfs/lib/$(ls ${HADOOP_HOME}/share/hadoop/hdfs/lib | grep ^hadoop-auth | grep -v tests | head -n 1)" +# Bootstrap the classpath. +WAYANG_CLASSPATH="${WAYANG_CONF}/*:${WAYANG_CODE}/*:${WAYANG_LIBS}/*" -fi +append_classpath() { + if [ -n "$1" ]; then + WAYANG_CLASSPATH="${WAYANG_CLASSPATH}:$1" + fi +} +append_first_matching_jar() { + local dir="$1" + local pattern="$2" + local jar -WAYANG_CODE="${WAYANG_HOME}/jars" + if [ -d "${dir}" ]; then + jar=$(find "${dir}" -maxdepth 1 -name "${pattern}" ! -name "*tests*" | head -n 1) + append_classpath "${jar}" + fi +} -WAYANG_LIBS="${WAYANG_HOME}/libs" +if [ -n "${SPARK_HOME}" ]; then + if [ ! -d "${SPARK_HOME}/jars" ]; then + echo "SPARK_HOME is set but ${SPARK_HOME}/jars does not exist" >&2 + exit 1 + fi + append_classpath "${SPARK_HOME}/jars/*" +fi -WAYANG_CONF="${WAYANG_HOME}/conf" +if [ -n "${HADOOP_HOME}" ]; then + if [ ! -d "${HADOOP_HOME}/share/hadoop/common" ]; then + echo "HADOOP_HOME is set but ${HADOOP_HOME}/share/hadoop/common does not exist" >&2 + exit 1 + fi + append_classpath "${HADOOP_HOME}/share/hadoop/common/*" + append_classpath "${HADOOP_HOME}/share/hadoop/common/lib/*" + append_first_matching_jar "${HADOOP_HOME}/share/hadoop/mapreduce" "hadoop-mapreduce-client-common*.jar" + append_first_matching_jar "${HADOOP_HOME}/share/hadoop/mapreduce" "hadoop-mapreduce-client-core*.jar" + append_first_matching_jar "${HADOOP_HOME}/share/hadoop/mapreduce" "hadoop-mapreduce-client-jobclient*.jar" + append_first_matching_jar "${HADOOP_HOME}/share/hadoop/hdfs" "hadoop-hdfs-client*.jar" + append_first_matching_jar "${HADOOP_HOME}/share/hadoop/hdfs/lib" "hadoop-annotations*.jar" + append_first_matching_jar "${HADOOP_HOME}/share/hadoop/hdfs/lib" "hadoop-auth*.jar" +fi -# Bootstrap the classpath. -WAYANG_CLASSPATH="${WAYANG_CONF}/*:${WAYANG_CODE}/*:${WAYANG_LIBS}/*" -WAYANG_CLASSPATH="${WAYANG_CLASSPATH}:${SPARK_JARS_DIR}/*:${HADOOP_JARS_DIR}" +if [ -n "${FLINK_HOME}" ]; then + if [ ! -d "${FLINK_HOME}/lib" ]; then + echo "FLINK_HOME is set but ${FLINK_HOME}/lib does not exist" >&2 + exit 1 + fi + append_classpath "${FLINK_HOME}/lib/*" +fi + +append_classpath "${WAYANG_EXTRA_CLASSPATH}" -FLAGS="" +FLAGS=() if [ "${FLAG_LOG}" = "true" ]; then - FLAGS="${FLAGS} -Dlog4j.configuration=file://${WAYANG_CONF}/log4j.properties" + FLAGS+=("-Dlog4j.configuration=file://${WAYANG_CONF}/log4j.properties") fi if [ "${FLAG_WAYANG}" = "true" ]; then - FLAGS="${FLAGS} -Dwayang.configuration=file://${WAYANG_CONF}/wayang.properties" + FLAGS+=("-Dwayang.configuration=file://${WAYANG_CONF}/wayang.properties") fi if [ -n "${OTHER_FLAGS}" ]; then - FLAGS="${FLAGS} ${OTHER_FLAGS}" + read -r -a OTHER_FLAGS_ARRAY <<< "${OTHER_FLAGS}" + FLAGS+=("${OTHER_FLAGS_ARRAY[@]}") fi -# Wrap args in quotes to be able to execute args with parenthesis, spaces, etc -ARGS="" -for arg in $(echo ${@:2}) -do - ARGS="$ARGS \"${arg}\"" -done - -eval "$RUNNER $FLAGS -cp "${WAYANG_CLASSPATH}" $CLASS ${ARGS}" - +exec "$RUNNER" "${FLAGS[@]}" -cp "${WAYANG_CLASSPATH}" "$CLASS" "${@:2}" diff --git a/wayang-assembly/README.md b/wayang-assembly/README.md index 85c2928d8..52c59b3b6 100644 --- a/wayang-assembly/README.md +++ b/wayang-assembly/README.md @@ -41,3 +41,52 @@ To execute the Wayang Assembly you need to execute the following command in the ./mvnw clean install -DskipTests ./mvnw clean package -pl :wayang-assembly -Pdistribution ``` + +# Docker Image + +The project Docker image packages the Wayang assembly with a Java 17 runtime. It +does not distribute external execution platforms such as Apache Spark, Apache +Hadoop, Apache Flink, or database drivers. Those runtimes stay outside the image +and can be connected at container runtime. + +Build the assembly first, then build the image from the project root. Do not use +the Maven `standalone` profile for this image, because that profile changes +external platform dependencies from `provided` to `compile`. + +```shell +./mvnw clean install -Dmaven.test.skip=true +./mvnw package -pl :wayang-assembly -Pdistribution -Dmaven.test.skip=true +docker build -t apache-wayang:local . +``` + +Run the default Java platform smoke test: + +```shell +docker run --rm apache-wayang:local +``` + +Run a specific Wayang application by passing the main class and its arguments to +`wayang-submit`: + +```shell +docker run --rm apache-wayang:local \ + org.apache.wayang.apps.pi.PiEstimation java 1 +``` + +To connect external platforms, mount or install their runtimes separately and +set the corresponding environment variables: + +```shell +docker run --rm \ + -v /path/to/spark:/opt/spark:ro \ + -v /path/to/hadoop:/opt/hadoop:ro \ + -e SPARK_HOME=/opt/spark \ + -e HADOOP_HOME=/opt/hadoop \ + apache-wayang:local \ + org.apache.wayang.apps.pi.PiEstimation spark 1 +``` + +Additional platform libraries, such as JDBC drivers, can be supplied with +`WAYANG_EXTRA_CLASSPATH`. A custom Wayang configuration can be mounted at +`${WAYANG_HOME}/conf/wayang.properties` and activated with `FLAG_WAYANG=true`. +Applications still need to register the platform plugins they use. diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/channels/ChannelConversions.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/channels/ChannelConversions.java index e1e9661ca..8de868b8c 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/channels/ChannelConversions.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/channels/ChannelConversions.java @@ -27,9 +27,9 @@ import org.apache.wayang.java.operators.JavaObjectFileSink; import org.apache.wayang.java.operators.JavaObjectFileSource; import org.apache.wayang.java.operators.JavaTsvFileSink; -import org.apache.wayang.java.operators.JavaTsvFileSource; import org.apache.wayang.java.platform.JavaPlatform; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -38,6 +38,8 @@ */ public class ChannelConversions { + private static final String HADOOP_WRITABLE_CLASS = "org.apache.hadoop.io.Writable"; + public static final ChannelConversion STREAM_TO_COLLECTION = new DefaultChannelConversion( StreamChannel.DESCRIPTOR, CollectionChannel.DESCRIPTOR, @@ -46,49 +48,58 @@ public class ChannelConversions { // We could add a COLLECTION_TO_STREAM conversion, but it would probably never be used. - public static final ChannelConversion STREAM_TO_HDFS_TSV = new DefaultChannelConversion( - StreamChannel.DESCRIPTOR, - FileChannel.HDFS_TSV_DESCRIPTOR, - () -> new JavaTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class)) - ); - - public static final ChannelConversion COLLECTION_TO_HDFS_TSV = new DefaultChannelConversion( - CollectionChannel.DESCRIPTOR, - FileChannel.HDFS_TSV_DESCRIPTOR, - () -> new JavaTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class)) - ); - - public static final ChannelConversion HDFS_TSV_TO_STREAM = new DefaultChannelConversion( - FileChannel.HDFS_TSV_DESCRIPTOR, - StreamChannel.DESCRIPTOR, - () -> new JavaTsvFileSource<>(DataSetType.createDefault(Tuple2.class)) - ); + public static Collection getAll() { + Collection channelConversions = new ArrayList<>(); + channelConversions.add(STREAM_TO_COLLECTION); - public static final ChannelConversion STREAM_TO_HDFS_OBJECT_FILE = new DefaultChannelConversion( - StreamChannel.DESCRIPTOR, - FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR, - () -> new JavaObjectFileSink<>(DataSetType.createDefault(Void.class)) - ); + if (isClassAvailable(HADOOP_WRITABLE_CLASS)) { + channelConversions.addAll(createHdfsChannelConversions()); + } - public static final ChannelConversion COLLECTION_TO_HDFS_OBJECT_FILE = new DefaultChannelConversion( - CollectionChannel.DESCRIPTOR, - FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR, - () -> new JavaObjectFileSink<>(DataSetType.createDefault(Void.class)) - ); + return channelConversions; + } - public static final ChannelConversion HDFS_OBJECT_FILE_TO_STREAM = new DefaultChannelConversion( - FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR, - StreamChannel.DESCRIPTOR, - () -> new JavaObjectFileSource<>(DataSetType.createDefault(Void.class)) - ); + private static Collection createHdfsChannelConversions() { + return Arrays.asList( + new DefaultChannelConversion( + StreamChannel.DESCRIPTOR, + FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR, + () -> new JavaObjectFileSink<>(DataSetType.createDefault(Void.class)) + ), + new DefaultChannelConversion( + CollectionChannel.DESCRIPTOR, + FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR, + () -> new JavaObjectFileSink<>(DataSetType.createDefault(Void.class)) + ), + new DefaultChannelConversion( + FileChannel.HDFS_OBJECT_FILE_DESCRIPTOR, + StreamChannel.DESCRIPTOR, + () -> new JavaObjectFileSource<>(DataSetType.createDefault(Void.class)) + ), +// new DefaultChannelConversion( +// FileChannel.HDFS_TSV_DESCRIPTOR, +// StreamChannel.DESCRIPTOR, +// () -> new JavaTsvFileSource<>(DataSetType.createDefault(Tuple2.class)) +// ), + new DefaultChannelConversion( + StreamChannel.DESCRIPTOR, + FileChannel.HDFS_TSV_DESCRIPTOR, + () -> new JavaTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class)) + ), + new DefaultChannelConversion( + CollectionChannel.DESCRIPTOR, + FileChannel.HDFS_TSV_DESCRIPTOR, + () -> new JavaTsvFileSink<>(DataSetType.createDefaultUnchecked(Tuple2.class)) + ) + ); + } - public static Collection ALL = Arrays.asList( - STREAM_TO_COLLECTION, - STREAM_TO_HDFS_OBJECT_FILE, - COLLECTION_TO_HDFS_OBJECT_FILE, - HDFS_OBJECT_FILE_TO_STREAM, -// HDFS_TSV_TO_STREAM, - STREAM_TO_HDFS_TSV, - COLLECTION_TO_HDFS_TSV - ); + private static boolean isClassAvailable(String className) { + try { + Class.forName(className, false, ChannelConversions.class.getClassLoader()); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } } diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/plugin/JavaBasicPlugin.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/plugin/JavaBasicPlugin.java index 47714011b..f85fb3e1b 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/plugin/JavaBasicPlugin.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/plugin/JavaBasicPlugin.java @@ -43,7 +43,7 @@ public Collection getMappings() { @Override public Collection getChannelConversions() { - return ChannelConversions.ALL; + return ChannelConversions.getAll(); } @Override diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/plugin/JavaChannelConversionPlugin.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/plugin/JavaChannelConversionPlugin.java index 451de2199..5792a29bb 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/plugin/JavaChannelConversionPlugin.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/plugin/JavaChannelConversionPlugin.java @@ -41,7 +41,7 @@ public Collection getMappings() { @Override public Collection getChannelConversions() { - return ChannelConversions.ALL; + return ChannelConversions.getAll(); } @Override From 4f052d6744b86cdc460e71ca1da1c16fcb61a0ea Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Fri, 5 Jun 2026 09:48:14 +0200 Subject: [PATCH 2/2] Add Docker support for Python API and enhance Java integration - Updated Dockerfile for Python API with necessary configurations. - Created entrypoint script for Python container to ensure API availability. - Added example word count script for demonstration. - Enhanced Java integration with dynamic API URL configuration. - Updated dependencies in setup.cfg for compatibility. - Modified GitHub Actions workflow to include Python image build and tests. --- .dockerignore | 14 +++++ .github/workflows/docker.yml | 29 +++++++++- Dockerfile | 26 ++++++--- bin/wayang-submit | 15 +++++ python/Dockerfile | 55 +++++++++++++++++++ python/README.md | 50 +++++++++++++++++ python/docker/entrypoint.sh | 49 +++++++++++++++++ python/examples/wordcount.py | 34 ++++++++++++ python/setup.cfg | 6 +- python/src/pywy/core/core.py | 18 +++++- .../python/executor/PythonProcessCaller.java | 34 +++++++++--- .../wayang/core/util/fs/FileSystems.java | 30 ++++++++-- 12 files changed, 330 insertions(+), 30 deletions(-) create mode 100644 python/Dockerfile create mode 100644 python/docker/entrypoint.sh create mode 100644 python/examples/wordcount.py diff --git a/.dockerignore b/.dockerignore index ab28f868b..185fd7fde 100644 --- a/.dockerignore +++ b/.dockerignore @@ -15,6 +15,20 @@ * !Dockerfile +!python/ +python/** +!python/Dockerfile +!python/docker/ +!python/docker/** +!python/examples/ +!python/examples/** +!python/pyproject.toml +!python/setup.cfg +!python/README.md +!python/src/ +python/src/** +!python/src/pywy/ +!python/src/pywy/** !wayang-assembly/ wayang-assembly/** !wayang-assembly/target/ diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 1eb2b9e80..510cc813f 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -47,7 +47,30 @@ jobs: ./mvnw package -B -pl :wayang-assembly -Pdistribution -Dmaven.test.skip=true - name: Build Docker image - run: docker build -t apache-wayang:ci . + run: docker buildx build --load -t apache-wayang:ci . - - name: Smoke test Java platform - run: docker run --rm apache-wayang:ci + - name: Smoke test WordCount on Java platform + run: | + docker run --rm apache-wayang:ci \ + org.apache.wayang.apps.wordcount.Main \ + java \ + file:///opt/wayang/smoke/wordcount.txt + + - name: Build Python API Docker image + run: docker buildx build --load -f python/Dockerfile -t apache-wayang-python:ci . + + - name: Smoke test Python API against REST host + run: | + set -e + docker network create wayang-python-smoke + cleanup() { + docker rm -f wayang-rest >/dev/null 2>&1 || true + docker network rm wayang-python-smoke >/dev/null 2>&1 || true + } + trap cleanup EXIT + docker run -d --name wayang-rest --network wayang-python-smoke apache-wayang-python:ci + docker run --rm --network wayang-python-smoke \ + --entrypoint /usr/local/bin/wayang-python-entrypoint \ + -e WAYANG_API_HOST=wayang-rest \ + apache-wayang-python:ci \ + python python/examples/wordcount.py diff --git a/Dockerfile b/Dockerfile index f6edb4e84..b75a5b82c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,22 +13,32 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM eclipse-temurin:17-jre +FROM eclipse-temurin:17-jre AS wayang-assembler ARG WAYANG_DIST=wayang-assembly/target/*-dist.tar.gz -ENV WAYANG_HOME=/opt/wayang -ENV PATH="${WAYANG_HOME}/bin:${PATH}" - COPY ${WAYANG_DIST} /tmp/wayang-dist.tar.gz -RUN mkdir -p /opt /root/.wayang \ - && tar -xzf /tmp/wayang-dist.tar.gz -C /opt \ - && extracted_dir="$(find /opt -mindepth 1 -maxdepth 1 -type d -name 'wayang-*' | head -n 1)" \ +RUN mkdir -p /opt/wayang-root /opt/wayang /opt/wayang/smoke \ + && tar -xzf /tmp/wayang-dist.tar.gz -C /opt/wayang-root \ + && extracted_dir="$(find /opt/wayang-root -mindepth 1 -maxdepth 1 -type d -name 'wayang-*' | head -n 1)" \ && test -n "${extracted_dir}" \ - && ln -s "${extracted_dir}" "${WAYANG_HOME}" \ + && cp -a "${extracted_dir}/." /opt/wayang/ \ + && find /opt/wayang -type f \( -name "*-sources.jar" -o -name "*-javadoc.jar" -o -name "README.md" \) -delete \ + && printf "apache wayang docker smoke test\n" > /opt/wayang/smoke/wordcount.txt \ && rm /tmp/wayang-dist.tar.gz +FROM eclipse-temurin:17-jre + +ENV WAYANG_HOME=/opt/wayang +ENV FLAG_WAYANG=true +ENV PATH="${WAYANG_HOME}/bin:${PATH}" + +COPY --from=wayang-assembler /opt/wayang /opt/wayang + +RUN mkdir -p /root/.wayang /opt/wayang/conf \ + && touch /opt/wayang/conf/wayang.properties + WORKDIR /opt/wayang ENTRYPOINT ["/opt/wayang/bin/wayang-submit"] diff --git a/bin/wayang-submit b/bin/wayang-submit index ca8ad9c07..a75380a68 100755 --- a/bin/wayang-submit +++ b/bin/wayang-submit @@ -113,6 +113,21 @@ fi append_classpath "${WAYANG_EXTRA_CLASSPATH}" FLAGS=() +FLAGS+=( + "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED" + "--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED" + "--add-opens=java.base/java.nio=ALL-UNNAMED" + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" + "--add-opens=java.base/java.lang=ALL-UNNAMED" + "--add-opens=java.base/java.util=ALL-UNNAMED" + "--add-opens=java.base/java.io=ALL-UNNAMED" + "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED" + "--add-opens=java.base/sun.reflect.annotation=ALL-UNNAMED" + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" + "--add-opens=java.base/java.net=ALL-UNNAMED" + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" +) + if [ "${FLAG_LOG}" = "true" ]; then FLAGS+=("-Dlog4j.configuration=file://${WAYANG_CONF}/log4j.properties") fi diff --git a/python/Dockerfile b/python/Dockerfile new file mode 100644 index 000000000..17d331c7a --- /dev/null +++ b/python/Dockerfile @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ARG WAYANG_IMAGE=apache-wayang:ci +FROM ${WAYANG_IMAGE} AS wayang-runtime + +FROM python:3.11-slim + +ENV PYTHONUNBUFFERED=1 +ENV WAYANG_API_HOST=wayang-rest +ENV WAYANG_API_PORT=8080 +ENV FLAG_WAYANG=true +ENV JAVA_HOME=/opt/java/openjdk +ENV WAYANG_HOME=/opt/wayang +ENV PATH=/opt/wayang/bin:/opt/java/openjdk/bin:/opt/wayang/python/.venv/bin:${PATH} + +USER root + +COPY --from=wayang-runtime /opt/java/openjdk /opt/java/openjdk +COPY --from=wayang-runtime /opt/wayang /opt/wayang + +WORKDIR /opt/wayang/python + +COPY python/ /opt/wayang/python/ + +RUN mkdir -p /root/.wayang \ + && python3 -m venv /opt/wayang/python/.venv \ + && python -m pip install --no-cache-dir -r src/pywy/requirements.txt \ + && python -m pip install --no-cache-dir . \ + && printf '%s\n' \ + 'wayang.api.python.worker = /opt/wayang/python/src/pywy/execution/worker.py' \ + 'wayang.api.python.path = /opt/wayang/python/.venv/bin/python' \ + 'wayang.api.python.env.path = /opt/wayang/python/src' \ + >> /opt/wayang/conf/wayang.properties + +COPY python/docker/entrypoint.sh /usr/local/bin/wayang-python-entrypoint + +RUN chmod +x /usr/local/bin/wayang-python-entrypoint + +WORKDIR /opt/wayang + +ENTRYPOINT ["bin/wayang-submit"] +CMD ["org.apache.wayang.api.json.Main", "8080"] diff --git a/python/README.md b/python/README.md index 915a5a17d..eafa2e7ee 100644 --- a/python/README.md +++ b/python/README.md @@ -96,5 +96,55 @@ if __name__ == "__main__": word_count ``` +By default, `pywy` submits plans to +`http://localhost:8080/wayang-api-json/submit-plan/json`. For a remote REST API +or another container, configure either the full URL: + +```python +from pywy.configuration import Configuration +from pywy.dataquanta import WayangContext + +configuration = Configuration() +configuration.set_property("wayang.api.python.url", "http://wayang-rest:8080/wayang-api-json/submit-plan/json") +ctx = WayangContext(configuration) +``` + +or configure host and port separately: + +```python +configuration.set_property("wayang.api.python.host", "wayang-rest") +configuration.set_property("wayang.api.python.port", "8080") +``` + +The same values can be supplied through environment variables: +`WAYANG_API_URL`, or `WAYANG_API_HOST` and `WAYANG_API_PORT`. + +## Docker + +The Python API can be run with the Wayang REST API as a separate container. +Build the assembly first, then build the base Wayang image and the optional +Python-enabled image from the repository root: + +```shell +./mvnw clean install -Dmaven.test.skip=true +./mvnw package -pl :wayang-assembly -Pdistribution -Dmaven.test.skip=true +docker buildx build --load -t apache-wayang:ci . +docker buildx build --load -f python/Dockerfile -t apache-wayang-python:ci . +``` + +Start the REST API and run the Python example on the same Docker network: + +```shell +docker network create wayang-python +docker run -d --name wayang-rest --network wayang-python apache-wayang-python:ci +docker run --rm --network wayang-python \ + --entrypoint /usr/local/bin/wayang-python-entrypoint \ + -e WAYANG_API_HOST=wayang-rest \ + apache-wayang-python:ci \ + python python/examples/wordcount.py +docker rm -f wayang-rest +docker network rm wayang-python +``` + ### Testing python code You can run the python tests by using pytest, the requirements for the tests are listed in `python/src/pywy/requirements.txt`. To run the tests navigate to the base wayang folder, e.g. `/var/www/html` and run `pytest -s python/src/pywy` if you need to pass a specific configuration for your use case you can also add a config flag `pytest -s --config=pathToYourConfig python/src/pywy/` diff --git a/python/docker/entrypoint.sh b/python/docker/entrypoint.sh new file mode 100644 index 000000000..1688eb8c1 --- /dev/null +++ b/python/docker/entrypoint.sh @@ -0,0 +1,49 @@ +#!/usr/bin/env sh +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu + +python - <<'PY' +import os +import socket +import sys +import time +import urllib.parse + +base_url = os.environ.get("WAYANG_API_URL") +if base_url is not None: + parsed_url = urllib.parse.urlparse(base_url) + host = parsed_url.hostname + port = parsed_url.port or 80 +else: + host = os.environ.get("WAYANG_API_HOST", "localhost") + port = int(os.environ.get("WAYANG_API_PORT", "8080")) + +deadline = time.time() + int(os.environ.get("WAYANG_API_WAIT_TIMEOUT", "60")) +while time.time() < deadline: + try: + with socket.create_connection((host, port), timeout=2): + pass + break + except Exception: + time.sleep(1) +else: + print(f"Timed out waiting for Wayang REST API at {host}:{port}", file=sys.stderr) + sys.exit(1) +PY + +exec "$@" diff --git a/python/examples/wordcount.py b/python/examples/wordcount.py new file mode 100644 index 000000000..a0fbf9f84 --- /dev/null +++ b/python/examples/wordcount.py @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pywy.dataquanta import WayangContext +from pywy.platforms.java import JavaPlugin + + +def word_count(): + WayangContext() \ + .register({JavaPlugin}) \ + .textfile("file:///opt/wayang/smoke/wordcount.txt") \ + .flatmap(lambda line: line.split(), str, str) \ + .filter(lambda word: word.strip() != "", str) \ + .map(lambda word: (word.lower(), 1), str, (str, int)) \ + .reduce_by_key(lambda item: item[0], lambda left, right: (left[0], int(left[1]) + int(right[1])), (str, int)) \ + .store_textfile("file:///tmp/wayang-python-wordcount.txt", (str, int)) + + +if __name__ == "__main__": + word_count() diff --git a/python/setup.cfg b/python/setup.cfg index 36e5ee994..89c0c704d 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -37,9 +37,9 @@ package_dir = packages = find: python_requires = >=3.6 install_requires = - cloudpickle ==3.0.0 - requests ==2.31.0 - numpy ==1.19.5 + cloudpickle ==3.1.2 + requests ==2.33.0 + numpy ==1.24.0 tests_require = unitest ==1.3.5 diff --git a/python/src/pywy/core/core.py b/python/src/pywy/core/core.py index e6641a073..2babc52cd 100644 --- a/python/src/pywy/core/core.py +++ b/python/src/pywy/core/core.py @@ -16,6 +16,7 @@ from typing import Set, Iterable, Dict import json +import os import requests from pywy.configuration import Configuration @@ -126,10 +127,21 @@ def execute(self): json_data["operators"].append(serializer.serialize(operator)) pipeline = [] - port = self.configuration.get_property("wayang.api.python.port") or 8080 + api_url = self.configuration.get_property("wayang.api.python.url") or os.environ.get("WAYANG_API_URL") + if api_url is None: + host = ( + self.configuration.get_property("wayang.api.python.host") + or os.environ.get("WAYANG_API_HOST") + or "localhost" + ) + port = ( + self.configuration.get_property("wayang.api.python.port") + or os.environ.get("WAYANG_API_PORT") + or 8080 + ) + api_url = f"http://{host}:{port}/wayang-api-json/submit-plan/json" - url = f'http://localhost:{port}/wayang-api-json/submit-plan/json' headers = {'Content-type': 'application/json'} json_body = json.dumps(json_data) print(json_body) - response = requests.post(url, headers=headers, json=json_data) + response = requests.post(api_url, headers=headers, json=json_data) diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java index d3d66a333..106ec5d43 100644 --- a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java +++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java @@ -19,14 +19,17 @@ package org.apache.wayang.api.python.executor; import java.io.IOException; +import java.io.InputStream; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; +import java.net.URL; import java.util.Arrays; import java.util.Map; import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.util.ReflectionUtils; public class PythonProcessCaller { @@ -41,7 +44,7 @@ public class PythonProcessCaller { public PythonProcessCaller() { // TODO create documentation to how to the configuration in the code - this.configuration = new Configuration(); + this.configuration = createConfiguration(); this.ready = false; final byte[] addr = new byte[4]; addr[0] = 127; @@ -72,9 +75,9 @@ public PythonProcessCaller() { } catch (final Exception e) { final String msg = String.format( "Python worker failed with config %s, using python path %s, using worker %s, using env %s", configuration, - this.configuration.getStringProperty("wayang.api.python.path"), - this.configuration.getStringProperty("wayang.api.python.worker"), - this.configuration.getStringProperty("wayang.api.python.env.path")); + this.configuration.getStringProperty("wayang.api.python.path", ""), + this.configuration.getStringProperty("wayang.api.python.worker", ""), + this.configuration.getStringProperty("wayang.api.python.env.path", "")); throw new WayangException(msg, e); } @@ -88,13 +91,30 @@ public PythonProcessCaller() { final String msg = String.format( "Python worker failed to connect back, with config %s, using python path %s, using worker %s, using env %s", configuration, - this.configuration.getStringProperty("wayang.api.python.path"), - this.configuration.getStringProperty("wayang.api.python.worker"), - this.configuration.getStringProperty("wayang.api.python.env.path")); + this.configuration.getStringProperty("wayang.api.python.path", ""), + this.configuration.getStringProperty("wayang.api.python.worker", ""), + this.configuration.getStringProperty("wayang.api.python.env.path", "")); throw new WayangException(msg, e); } } + private static Configuration createConfiguration() { + final Configuration configuration = new Configuration((String) null); + final URL pythonDefaults = ReflectionUtils.getResourceURL("wayang-api-python-defaults.properties"); + if (pythonDefaults != null) { + try (InputStream inputStream = pythonDefaults.openStream()) { + configuration.load(inputStream); + } catch (IOException e) { + throw new WayangException("Could not load Python API defaults.", e); + } + } + final String configurationFileUrl = System.getProperty("wayang.configuration"); + if (configurationFileUrl != null) { + configuration.load(configurationFileUrl); + } + return configuration; + } + public Process getProcess() { return process; } diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java index 087a64d24..cf4b61009 100644 --- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java @@ -24,7 +24,7 @@ import org.apache.logging.log4j.Logger; import java.io.FileNotFoundException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Optional; @@ -38,20 +38,38 @@ public class FileSystems { private static final Logger LOGGER = LogManager.getLogger(FileSystems.class); + private static final String HADOOP_FILE_SYSTEM_CLASS = "org.apache.hadoop.fs.FileSystem"; + /** * We need file sizes several times during the optimization process, so we cache them. */ private static final LruCache fileSizeCache = new LruCache<>(20); - private static Collection registeredFileSystems = Arrays.asList( - new LocalFileSystem(), - new HadoopFileSystem(), - new S3FileSystem() - ); + private static Collection registeredFileSystems = createRegisteredFileSystems(); private FileSystems() { } + private static Collection createRegisteredFileSystems() { + Collection fileSystems = new ArrayList<>(); + fileSystems.add(new LocalFileSystem()); + + if (isClassAvailable(HADOOP_FILE_SYSTEM_CLASS)) { + fileSystems.add(new HadoopFileSystem()); + } + + fileSystems.add(new S3FileSystem()); + return Collections.unmodifiableCollection(fileSystems); + } + + private static boolean isClassAvailable(String className) { + try { + Class.forName(className, false, FileSystems.class.getClassLoader()); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } public static Optional getFileSystem(String fileUrl) { return registeredFileSystems.stream()