diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 21ae9545f97c7..340b53b0a257c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -533,7 +533,11 @@ repos: ^airflow-core/newsfragments/41368\.significant\.rst$| ^airflow-core/newsfragments/41761.significant\.rst$| ^airflow-core/newsfragments/43349\.significant\.rst$| + ^airflow-core/newsfragments/60921\.significant\.rst$| ^airflow-core/src/airflow/api_fastapi/auth/managers/simple/ui/pnpm-lock\.yaml$| + ^airflow-core/src/airflow/api_fastapi/gunicorn_config\.py$| + ^airflow-core/src/airflow/cli/commands/api_server_command\.py$| + ^airflow-core/src/airflow/api_fastapi/gunicorn_monitor\.py$| ^airflow-core/src/airflow/cli/commands/local_commands/fastapi_api_command\.py$| ^airflow-core/src/airflow/config_templates/| ^airflow-core/src/airflow/models/baseoperator\.py$| diff --git a/airflow-core/docs/administration-and-deployment/web-stack.rst b/airflow-core/docs/administration-and-deployment/web-stack.rst index 5f2159649b3b0..e365bf60e0fa8 100644 --- a/airflow-core/docs/administration-and-deployment/web-stack.rst +++ b/airflow-core/docs/administration-and-deployment/web-stack.rst @@ -57,3 +57,84 @@ separately. This might be useful for scaling them independently or for deploying airflow api-server --apps core # serve only the Execution API Server airflow api-server --apps execution + +Server Types +------------ + +The API server supports two server types: ``uvicorn`` (default) and ``gunicorn``. + +Uvicorn (Default) +~~~~~~~~~~~~~~~~~ + +Uvicorn is the default server type. It's simple to set up and works on all platforms including Windows. + +.. code-block:: bash + + airflow api-server + +Gunicorn +~~~~~~~~ + +Gunicorn provides additional features for production deployments: + +- **Memory sharing**: Workers share memory via copy-on-write after fork, reducing total memory usage +- **Rolling worker restarts**: Zero-downtime worker recycling to prevent memory accumulation +- **Proper signal handling**: SIGTTOU kills the oldest worker (FIFO), enabling true rolling restarts + +.. note:: + + Gunicorn requires the ``gunicorn`` extra: ``pip install 'apache-airflow-core[gunicorn]'`` + + Gunicorn is Unix-only and does not work on Windows. + +To enable gunicorn mode: + +.. code-block:: bash + + export AIRFLOW__API__SERVER_TYPE=gunicorn + airflow api-server + +Rolling Worker Restarts +^^^^^^^^^^^^^^^^^^^^^^^ + +To enable periodic worker recycling (useful for long-running processes to prevent memory accumulation): + +.. code-block:: bash + + export AIRFLOW__API__SERVER_TYPE=gunicorn + export AIRFLOW__API__WORKER_REFRESH_INTERVAL=43200 # Restart workers every 12 hours + export AIRFLOW__API__WORKER_REFRESH_BATCH_SIZE=1 # Restart one worker at a time + airflow api-server + +The rolling restart process: + +1. Spawns new workers before killing old ones (zero downtime) +2. Waits for new workers to be ready (process title check) +3. Performs HTTP health check to verify workers can serve requests +4. Kills old workers (oldest first) +5. Repeats until all original workers are replaced + +Configuration Options +^^^^^^^^^^^^^^^^^^^^^ + +The following configuration options are available in the ``[api]`` section: + +- ``server_type``: ``uvicorn`` (default) or ``gunicorn`` +- ``worker_refresh_interval``: Seconds between worker refresh cycles (0 = disabled, default) +- ``worker_refresh_batch_size``: Number of workers to refresh per cycle (default: 1) +- ``reload_on_plugin_change``: Reload when plugin files change (default: False) + +When to Use Gunicorn +^^^^^^^^^^^^^^^^^^^^ + +Use gunicorn when you need: + +- Long-running API server processes where memory accumulation is a concern +- Multi-worker deployments where memory sharing matters +- Production environments requiring zero-downtime worker recycling + +Use the default uvicorn when: + +- Running on Windows +- Running in development or testing environments +- Running short-lived containers (e.g., Kubernetes pods that get recycled) diff --git a/airflow-core/docs/extra-packages-ref.rst b/airflow-core/docs/extra-packages-ref.rst index 9f307fd95a7ce..3f9780be9c094 100644 --- a/airflow-core/docs/extra-packages-ref.rst +++ b/airflow-core/docs/extra-packages-ref.rst @@ -126,6 +126,8 @@ other packages that can be used by airflow or some of its providers. +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | graphviz | ``pip install 'apache-airflow[graphviz]'`` | Graphviz renderer for converting Dag to graphical output | +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ +| gunicorn | ``pip install 'apache-airflow[gunicorn]'`` | Gunicorn server with rolling worker restarts for the API server | ++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | ldap | ``pip install 'apache-airflow[ldap]'`` | LDAP authentication for users | +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | leveldb | ``pip install 'apache-airflow[leveldb]'`` | Required for use leveldb extra in google provider | diff --git a/airflow-core/newsfragments/60921.significant.rst b/airflow-core/newsfragments/60921.significant.rst new file mode 100644 index 0000000000000..c97bbcdf97a97 --- /dev/null +++ b/airflow-core/newsfragments/60921.significant.rst @@ -0,0 +1,51 @@ +Add gunicorn support for API server with zero-downtime worker recycling + +The API server now supports gunicorn as an alternative server with rolling worker restarts +to prevent memory accumulation in long-running processes. + +**Key Benefits:** + +* **Rolling worker restarts**: New workers spawn and pass health checks before old workers + are killed, ensuring zero downtime during worker recycling. + +* **Memory sharing**: Gunicorn uses preload + fork, so workers share memory via + copy-on-write. This significantly reduces total memory usage compared to uvicorn's + multiprocess mode where each worker loads everything independently. + +* **Correct FIFO signal handling**: Gunicorn's SIGTTOU kills the oldest worker (FIFO), + not the newest (LIFO), which is correct for rolling restarts. + +**Configuration:** + +.. code-block:: ini + + [api] + # Use gunicorn instead of uvicorn + server_type = gunicorn + + # Enable rolling worker restarts every 12 hours + worker_refresh_interval = 43200 + + # Restart workers one at a time + worker_refresh_batch_size = 1 + +Or via environment variables: + +.. code-block:: bash + + export AIRFLOW__API__SERVER_TYPE=gunicorn + export AIRFLOW__API__WORKER_REFRESH_INTERVAL=43200 + +**Requirements:** + +Install the gunicorn extra: ``pip install 'apache-airflow-core[gunicorn]'`` + +**Note on uvicorn (default):** + +The default uvicorn mode does not support rolling worker restarts because: + +1. With workers=1, there is no master process to send signals to +2. uvicorn's SIGTTOU kills the newest worker (LIFO), defeating rolling restart purposes +3. Each uvicorn worker loads everything independently with no memory sharing + +If you need worker recycling or memory-efficient multi-worker deployment, use gunicorn. diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml index e964e101c05fe..34fab3b42dab7 100644 --- a/airflow-core/pyproject.toml +++ b/airflow-core/pyproject.toml @@ -188,6 +188,9 @@ dependencies = [ "memray" = [ "memray>=1.19.0", ] +"gunicorn" = [ + "gunicorn>=23.0.0", +] "otel" = [ "opentelemetry-exporter-prometheus>=0.47b0", ] @@ -195,7 +198,7 @@ dependencies = [ "statsd>=3.3.0", ] "all" = [ - "apache-airflow-core[graphviz,kerberos,otel,statsd]" + "apache-airflow-core[graphviz,gunicorn,kerberos,otel,statsd]" ] [project.scripts] diff --git a/airflow-core/src/airflow/api_fastapi/gunicorn_app.py b/airflow-core/src/airflow/api_fastapi/gunicorn_app.py new file mode 100644 index 0000000000000..d0b39c4cb4e70 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/gunicorn_app.py @@ -0,0 +1,243 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Gunicorn application with integrated worker monitoring. + +This module provides a custom Gunicorn application that integrates worker +monitoring directly into the arbiter process loop. By subclassing the ``Arbiter``, +we can perform rolling worker restarts without needing a separate monitoring +thread or subprocess. + +The pattern follows gunicorn's recommended extension approach: +- Subclass ``BaseApplication`` to configure gunicorn programmatically +- Override ``run()`` to use a custom ``Arbiter`` +- Custom ``Arbiter`` hooks into manage_workers() for monitoring logic +""" + +from __future__ import annotations + +import logging +import signal +import sys +import time +from typing import TYPE_CHECKING, Any + +from gunicorn.app.base import BaseApplication +from gunicorn.arbiter import Arbiter + +from airflow.configuration import conf + +if TYPE_CHECKING: + from fastapi import FastAPI + from gunicorn.app.base import Application + +log = logging.getLogger(__name__) + + +class AirflowArbiter(Arbiter): + """Custom ``Arbiter`` with rolling worker restarts via manage_workers() hook.""" + + def __init__(self, app: Application): + super().__init__(app) + + # Worker refresh configuration + self.worker_refresh_interval = conf.getint("api", "worker_refresh_interval", fallback=0) + self.worker_refresh_batch_size = conf.getint("api", "worker_refresh_batch_size", fallback=1) + + # State tracking for rolling restarts + self._last_refresh_time = time.monotonic() + self._refresh_in_progress = False + self._workers_to_replace: set[int] = set() + + # Validate configuration + if self.worker_refresh_batch_size > self.num_workers: + log.warning( + "worker_refresh_batch_size (%d) > num_workers (%d), reducing batch size", + self.worker_refresh_batch_size, + self.num_workers, + ) + self.worker_refresh_batch_size = self.num_workers + + if self.worker_refresh_interval > 0: + log.info( + "Worker refresh enabled: interval=%ds, batch_size=%d", + self.worker_refresh_interval, + self.worker_refresh_batch_size, + ) + + def manage_workers(self) -> None: + """Maintain worker count and perform rolling restarts if configured.""" + super().manage_workers() + + # Check if worker refresh is enabled and due + if self.worker_refresh_interval > 0: + self._check_worker_refresh() + + def _check_worker_refresh(self) -> None: + """Check if it's time to start or continue a worker refresh cycle.""" + elapsed = time.monotonic() - self._last_refresh_time + + if not self._refresh_in_progress: + # Check if it's time to start a new refresh cycle + if elapsed >= self.worker_refresh_interval: + self._start_refresh_cycle() + else: + # Continue ongoing refresh cycle + self._continue_refresh_cycle() + + def _start_refresh_cycle(self) -> None: + """Start a new rolling worker refresh cycle.""" + if not self.WORKERS: + return + + self._refresh_in_progress = True + self._workers_to_replace = set(self.WORKERS.keys()) + log.info( + "Starting worker refresh cycle: %d workers to replace", + len(self._workers_to_replace), + ) + self._continue_refresh_cycle() + + def _continue_refresh_cycle(self) -> None: + """Continue rolling refresh: spawn new workers, kill old ones in batches.""" + # Remove workers that have already exited + current_pids = set(self.WORKERS.keys()) + self._workers_to_replace &= current_pids + + if not self._workers_to_replace: + # All original workers have been replaced + log.info("Worker refresh cycle completed") + self._refresh_in_progress = False + self._last_refresh_time = time.monotonic() + return + + # Check if we have capacity to spawn new workers + # We temporarily exceed num_workers during rolling restart + current_count = len(self.WORKERS) + max_during_refresh = self.num_workers + self.worker_refresh_batch_size + + if current_count < max_during_refresh: + # Spawn new workers up to batch size + workers_to_spawn = min( + self.worker_refresh_batch_size, + max_during_refresh - current_count, + ) + if workers_to_spawn > 0: + log.debug("Spawning %d new worker(s) for refresh", workers_to_spawn) + for _ in range(workers_to_spawn): + self.spawn_worker() + + # If we have more workers than target, kill old ones + if current_count > self.num_workers: + workers_to_kill = min( + current_count - self.num_workers, + self.worker_refresh_batch_size, + len(self._workers_to_replace), + ) + + # Kill oldest workers first (FIFO) + sorted_workers = sorted( + [(pid, w) for pid, w in self.WORKERS.items() if pid in self._workers_to_replace], + key=lambda x: x[1].age, + ) + + for pid, worker in sorted_workers[:workers_to_kill]: + log.info("Killing old worker %d (age: %s) for refresh", pid, worker.age) + self.kill_worker(pid, signal.SIGTERM) + self._workers_to_replace.discard(pid) + + +class AirflowGunicornApp(BaseApplication): + """Gunicorn application that uses AirflowArbiter for worker management.""" + + def __init__(self, options: dict[str, Any] | None = None): + self.options = options or {} + self.application: FastAPI | None = None + super().__init__() + + def load_config(self) -> None: + """Load configuration from options dict into gunicorn config.""" + for key, value in self.options.items(): + if key in self.cfg.settings and value is not None: + self.cfg.set(key.lower(), value) + + def load(self) -> Any: + """Load and return the WSGI/ASGI application.""" + if self.application is None: + from airflow.api_fastapi.main import app + + self.application = app + return self.application + + def run(self) -> None: + """Run the application with AirflowArbiter.""" + try: + AirflowArbiter(self).run() + except RuntimeError as e: + print(f"\nError: {e}\n", file=sys.stderr) + sys.stderr.flush() + sys.exit(1) + + +def create_gunicorn_app( + host: str, + port: int, + num_workers: int, + worker_timeout: int, + ssl_cert: str | None = None, + ssl_key: str | None = None, + access_log: bool = True, + log_level: str = "info", + proxy_headers: bool = False, +) -> AirflowGunicornApp: + """ + Create a configured AirflowGunicornApp instance. + + :param host: Host to bind to + :param port: Port to bind to + :param num_workers: Number of worker processes + :param worker_timeout: Worker timeout in seconds + :param ssl_cert: Path to SSL certificate file + :param ssl_key: Path to SSL key file + :param access_log: Whether to enable access logging + :param log_level: Log level (debug, info, warning, error, critical) + :param proxy_headers: Whether to trust proxy headers + """ + options = { + "bind": f"{host}:{port}", + "workers": num_workers, + "worker_class": "uvicorn.workers.UvicornWorker", + "timeout": worker_timeout, + "graceful_timeout": worker_timeout, + "keepalive": worker_timeout, + "loglevel": log_level, + "preload_app": True, + # Use our gunicorn_config module for hooks (post_worker_init, worker_exit) + "config": "python:airflow.api_fastapi.gunicorn_config", + } + + if ssl_cert and ssl_key: + options["certfile"] = ssl_cert + options["keyfile"] = ssl_key + + if access_log: + options["accesslog"] = "-" # Log to stdout + + if proxy_headers: + options["forwarded_allow_ips"] = "*" + + return AirflowGunicornApp(options) diff --git a/airflow-core/src/airflow/api_fastapi/gunicorn_config.py b/airflow-core/src/airflow/api_fastapi/gunicorn_config.py new file mode 100644 index 0000000000000..4ba8e06d99961 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/gunicorn_config.py @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Gunicorn configuration hooks for the Airflow API server. + +This module provides Gunicorn server hooks that are loaded via the -c config option. +These hooks handle: +- Setting process titles to indicate worker readiness (useful for debugging) +- Cleaning up ORM connections on worker exit + +Usage: + gunicorn -c python:airflow.api_fastapi.gunicorn_config airflow.api_fastapi.main:app +""" + +from __future__ import annotations + +import logging +import sys + +from airflow import settings + +log = logging.getLogger(__name__) + +# On macOS, setproctitle has issues, so we skip it +if sys.platform == "darwin": + + def setproctitle(title): + log.debug("macOS detected, skipping setproctitle") +else: + from setproctitle import setproctitle + + +def post_worker_init(worker): + """ + Execute after a worker has been initialized. + + This hook runs in each worker process after the ASGI app has been loaded. + We set the process title with a ready prefix for debugging visibility. + """ + ready_prefix = settings.GUNICORN_WORKER_READY_PREFIX + proc_title = f"{ready_prefix}airflow api-server worker" + setproctitle(proc_title) + log.info("Worker initialized and ready, set process title: %s", proc_title) + + +def worker_exit(server, worker): + """ + Execute when a worker is about to exit. + + This hook runs in the worker process itself (not the arbiter) just before exit. + We clean up ORM connections to ensure proper resource cleanup. + """ + log.debug("Worker %s exited, disposing ORM connections", worker.pid) + settings.dispose_orm(do_log=False) diff --git a/airflow-core/src/airflow/cli/commands/api_server_command.py b/airflow-core/src/airflow/cli/commands/api_server_command.py index 1753499916a45..dc7b4fabf9a3e 100644 --- a/airflow-core/src/airflow/cli/commands/api_server_command.py +++ b/airflow-core/src/airflow/cli/commands/api_server_command.py @@ -45,27 +45,72 @@ if TYPE_CHECKING: from argparse import Namespace -# This shouldn't be necessary but there seems to be an issue in uvloop that causes bad file descriptor -# errors when shutting down workers. Despite the 'closed' status of the issue it is not solved, -# more info here: https://github.com/benoitc/gunicorn/issues/1877#issuecomment-1911136399 +def _run_api_server_with_gunicorn( + args, + apps: str, + num_workers: int, + worker_timeout: int, + proxy_headers: bool, +) -> None: + """ + Run the API server using gunicorn with uvicorn workers. + + Uses a custom Arbiter that integrates worker monitoring directly into + the arbiter process loop. This provides: + - Rolling worker restarts for memory management + - Direct access to worker state (no external monitoring needed) + - Proper signal handling through gunicorn's infrastructure + - Memory sharing via preload + fork copy-on-write + """ + from airflow.api_fastapi.gunicorn_app import create_gunicorn_app + + ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args) + + log_level = conf.get("logging", "uvicorn_logging_level", fallback="info").lower() + access_log_enabled = log_level not in ("error", "critical", "fatal") -@enable_memray_trace(component=MemrayTraceComponents.api) -def _run_api_server(args, apps: str, num_workers: int, worker_timeout: int, proxy_headers: bool): - """Run the API server.""" log.info( textwrap.dedent( f"""\ - Running the uvicorn with: + Running the API server with gunicorn: Apps: {apps} Workers: {num_workers} Host: {args.host}:{args.port} Timeout: {worker_timeout} - Logfiles: {args.log_file or "-"} =================================================================""" ) ) - # get ssl cert and key filepaths here instead of passing them as arguments to reduce the number of arguments + + gunicorn_app = create_gunicorn_app( + host=args.host, + port=args.port, + num_workers=num_workers, + worker_timeout=worker_timeout, + ssl_cert=ssl_cert, + ssl_key=ssl_key, + access_log=access_log_enabled, + log_level=log_level, + proxy_headers=proxy_headers, + ) + + # run() blocks until gunicorn exits + gunicorn_app.run() + + +def _run_api_server_with_uvicorn( + args, + apps: str, + num_workers: int, + worker_timeout: int, + proxy_headers: bool, +) -> None: + """ + Run the API server using uvicorn directly. + + This is the default mode. Note that uvicorn's multiprocess mode does not + share memory between workers (each worker loads everything independently). + """ ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args) # setproctitle causes issue on Mac OS: https://github.com/benoitc/gunicorn/issues/3021 @@ -107,6 +152,48 @@ def _run_api_server(args, apps: str, num_workers: int, worker_timeout: int, prox ) +@enable_memray_trace(component=MemrayTraceComponents.api) +def _run_api_server(args, apps: str, num_workers: int, worker_timeout: int, proxy_headers: bool): + """Run the API server using the configured server type.""" + server_type = conf.get("api", "server_type", fallback="uvicorn").lower() + + if server_type == "gunicorn": + try: + import gunicorn # noqa: F401 + except ImportError: + raise AirflowConfigException( + "Gunicorn is not installed. Install it with: pip install 'apache-airflow-core[gunicorn]'" + ) + + _run_api_server_with_gunicorn( + args=args, + apps=apps, + num_workers=num_workers, + worker_timeout=worker_timeout, + proxy_headers=proxy_headers, + ) + else: + log.info( + textwrap.dedent( + f"""\ + Running the API server with uvicorn: + Apps: {apps} + Workers: {num_workers} + Host: {args.host}:{args.port} + Timeout: {worker_timeout} + Logfiles: {args.log_file or "-"} + =================================================================""" + ) + ) + _run_api_server_with_uvicorn( + args=args, + apps=apps, + num_workers=num_workers, + worker_timeout=worker_timeout, + proxy_headers=proxy_headers, + ) + + def with_api_apps_env(func: Callable[[Namespace], RT]) -> Callable[[Namespace], RT]: """We use AIRFLOW_API_APPS to specify which apps are initialized in the API server.""" diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 591e6de9134fe..bebb43ed42dd8 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -1492,6 +1492,53 @@ api: type: integer example: ~ default: "120" + server_type: + description: | + The server type to use for the API server. Options are: + + - ``uvicorn``: Default. Uses uvicorn directly. Simple setup, but does not + support rolling worker restarts with ``worker_refresh_interval`` because + uvicorn's multiprocess mode has no master process when workers=1 and + SIGTTOU kills the newest worker (LIFO), not oldest. + + - ``gunicorn``: Uses gunicorn with uvicorn workers. Supports rolling worker + restarts via ``worker_refresh_interval``. Requires ``apache-airflow-core[gunicorn]`` + extra. Gunicorn always has master/worker architecture and SIGTTOU kills + the oldest worker (FIFO) which is correct for rolling restarts. + + Note: When using gunicorn, workers share memory via copy-on-write after fork, + reducing overall memory usage. With uvicorn multiprocess mode, each worker + loads everything independently with no memory sharing. + version_added: 3.2.0 + type: string + example: gunicorn + default: "uvicorn" + worker_refresh_interval: + description: | + Number of seconds between rolling worker refreshes for the API server. + When enabled, workers are periodically restarted using a rolling restart + pattern (new workers spawn before old ones are killed) to prevent memory + accumulation while maintaining zero downtime. + + Set to 0 to disable worker refresh (default). + + This option only works when ``server_type = gunicorn``. With uvicorn, + this setting is ignored because uvicorn's SIGTTOU kills the newest worker + (LIFO order) rather than oldest, which defeats rolling restart purposes. + version_added: 3.2.0 + type: integer + example: "43200" + default: "0" + worker_refresh_batch_size: + description: | + Number of workers to refresh at a time during rolling restarts. + Higher values refresh faster but temporarily use more resources. + + Only used when ``worker_refresh_interval > 0`` and ``server_type = gunicorn``. + version_added: 3.2.0 + type: integer + example: ~ + default: "1" log_config: description: | Path to the logging configuration file for the uvicorn server. diff --git a/airflow-core/src/airflow/settings.py b/airflow-core/src/airflow/settings.py index d93a1eb436350..0667302fcbb47 100644 --- a/airflow-core/src/airflow/settings.py +++ b/airflow-core/src/airflow/settings.py @@ -762,3 +762,7 @@ def initialize(): LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True) DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077") + +# Prefix used by gunicorn workers to indicate they are ready to serve requests +# Used by GunicornMonitor to track worker readiness via process titles +GUNICORN_WORKER_READY_PREFIX: str = "[ready] " diff --git a/airflow-core/tests/unit/cli/commands/test_gunicorn_monitor.py b/airflow-core/tests/unit/cli/commands/test_gunicorn_monitor.py new file mode 100644 index 0000000000000..c526efc8c5df1 --- /dev/null +++ b/airflow-core/tests/unit/cli/commands/test_gunicorn_monitor.py @@ -0,0 +1,450 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Tests for AirflowArbiter and AirflowGunicornApp.""" + +from __future__ import annotations + +from unittest import mock + +import pytest + + +class TestAirflowArbiter: + """Tests for the AirflowArbiter class.""" + + @pytest.fixture + def mock_app(self): + """Create a mock gunicorn application.""" + app = mock.MagicMock() + app.cfg = mock.MagicMock() + app.cfg.workers = 4 + app.cfg.settings = {} + return app + + def test_init_with_refresh_enabled(self, mock_app): + """Test AirflowArbiter initialization with worker refresh enabled.""" + + def mock_arbiter_init(self, app): + # Set up minimal state that Arbiter.__init__ would set + self._num_workers = 4 + self.cfg = app.cfg + self.WORKERS = {} + + with mock.patch( + "airflow.api_fastapi.gunicorn_app.conf.getint", + side_effect=lambda section, key, fallback=None: { + ("api", "worker_refresh_interval"): 1800, + ("api", "worker_refresh_batch_size"): 2, + }.get((section, key), fallback), + ): + with mock.patch("gunicorn.arbiter.Arbiter.__init__", mock_arbiter_init): + from airflow.api_fastapi.gunicorn_app import AirflowArbiter + + arbiter = AirflowArbiter(mock_app) + + assert arbiter.worker_refresh_interval == 1800 + assert arbiter.worker_refresh_batch_size == 2 + assert arbiter._refresh_in_progress is False + assert arbiter._workers_to_replace == set() + + def test_init_batch_size_capped_to_workers(self, mock_app, caplog): + """Test that batch size is reduced when greater than worker count.""" + + def mock_arbiter_init(self, app): + self._num_workers = 4 + self.cfg = app.cfg + self.WORKERS = {} + + with mock.patch( + "airflow.api_fastapi.gunicorn_app.conf.getint", + side_effect=lambda section, key, fallback=None: { + ("api", "worker_refresh_interval"): 1800, + ("api", "worker_refresh_batch_size"): 10, # Greater than workers + }.get((section, key), fallback), + ): + with mock.patch("gunicorn.arbiter.Arbiter.__init__", mock_arbiter_init): + from airflow.api_fastapi.gunicorn_app import AirflowArbiter + + arbiter = AirflowArbiter(mock_app) + + assert arbiter.worker_refresh_batch_size == 4 # Capped to num_workers + assert "reducing batch size" in caplog.text + + def test_init_with_refresh_disabled(self, mock_app): + """Test AirflowArbiter initialization with worker refresh disabled.""" + + def mock_arbiter_init(self, app): + self._num_workers = 4 + self.cfg = app.cfg + self.WORKERS = {} + + with mock.patch( + "airflow.api_fastapi.gunicorn_app.conf.getint", + side_effect=lambda section, key, fallback=None: { + ("api", "worker_refresh_interval"): 0, # Disabled + ("api", "worker_refresh_batch_size"): 1, + }.get((section, key), fallback), + ): + with mock.patch("gunicorn.arbiter.Arbiter.__init__", mock_arbiter_init): + from airflow.api_fastapi.gunicorn_app import AirflowArbiter + + arbiter = AirflowArbiter(mock_app) + + assert arbiter.worker_refresh_interval == 0 + + def test_manage_workers_calls_parent(self, mock_app): + """Test that manage_workers calls parent implementation.""" + + def mock_arbiter_init(self, app): + self._num_workers = 4 + self.cfg = app.cfg + self.WORKERS = {} + + with mock.patch( + "airflow.api_fastapi.gunicorn_app.conf.getint", + side_effect=lambda section, key, fallback=None: { + ("api", "worker_refresh_interval"): 0, + ("api", "worker_refresh_batch_size"): 1, + }.get((section, key), fallback), + ): + with mock.patch("gunicorn.arbiter.Arbiter.__init__", mock_arbiter_init): + with mock.patch("gunicorn.arbiter.Arbiter.manage_workers") as mock_parent: + from airflow.api_fastapi.gunicorn_app import AirflowArbiter + + arbiter = AirflowArbiter(mock_app) + arbiter.manage_workers() + + mock_parent.assert_called_once() + + def test_manage_workers_triggers_refresh_when_due(self, mock_app): + """Test that manage_workers starts refresh when interval elapsed.""" + + def mock_arbiter_init(self, app): + self._num_workers = 4 + self.cfg = app.cfg + self.WORKERS = {} + + with mock.patch( + "airflow.api_fastapi.gunicorn_app.conf.getint", + side_effect=lambda section, key, fallback=None: { + ("api", "worker_refresh_interval"): 1800, + ("api", "worker_refresh_batch_size"): 1, + }.get((section, key), fallback), + ): + with mock.patch("gunicorn.arbiter.Arbiter.__init__", mock_arbiter_init): + with mock.patch("gunicorn.arbiter.Arbiter.manage_workers"): + from airflow.api_fastapi.gunicorn_app import AirflowArbiter + + arbiter = AirflowArbiter(mock_app) + arbiter.WORKERS = {100: mock.MagicMock(), 101: mock.MagicMock()} + arbiter.spawn_worker = mock.MagicMock() # Prevent deep call + + # Simulate time elapsed past refresh interval + with mock.patch("time.monotonic", return_value=arbiter._last_refresh_time + 2000): + arbiter.manage_workers() + + # Should have started a refresh cycle + assert arbiter._refresh_in_progress is True + assert arbiter._workers_to_replace == {100, 101} + + def test_start_refresh_cycle(self, mock_app): + """Test starting a refresh cycle marks workers for replacement.""" + + def mock_arbiter_init(self, app): + self._num_workers = 4 + self.cfg = app.cfg + self.WORKERS = {} + + with mock.patch( + "airflow.api_fastapi.gunicorn_app.conf.getint", + side_effect=lambda section, key, fallback=None: { + ("api", "worker_refresh_interval"): 1800, + ("api", "worker_refresh_batch_size"): 1, + }.get((section, key), fallback), + ): + with mock.patch("gunicorn.arbiter.Arbiter.__init__", mock_arbiter_init): + from airflow.api_fastapi.gunicorn_app import AirflowArbiter + + arbiter = AirflowArbiter(mock_app) + arbiter.WORKERS = { + 100: mock.MagicMock(age=0), + 101: mock.MagicMock(age=1), + 102: mock.MagicMock(age=2), + } + arbiter.spawn_worker = mock.MagicMock() # Prevent deep call + + arbiter._start_refresh_cycle() + + assert arbiter._refresh_in_progress is True + assert arbiter._workers_to_replace == {100, 101, 102} + + def test_continue_refresh_cycle_spawns_workers(self, mock_app): + """Test that continue_refresh_cycle spawns new workers.""" + + def mock_arbiter_init(self, app): + self._num_workers = 2 + self.cfg = app.cfg + self.WORKERS = {} + + with mock.patch( + "airflow.api_fastapi.gunicorn_app.conf.getint", + side_effect=lambda section, key, fallback=None: { + ("api", "worker_refresh_interval"): 1800, + ("api", "worker_refresh_batch_size"): 1, + }.get((section, key), fallback), + ): + with mock.patch("gunicorn.arbiter.Arbiter.__init__", mock_arbiter_init): + from airflow.api_fastapi.gunicorn_app import AirflowArbiter + + arbiter = AirflowArbiter(mock_app) + arbiter.WORKERS = {100: mock.MagicMock(age=0), 101: mock.MagicMock(age=1)} + arbiter._refresh_in_progress = True + arbiter._workers_to_replace = {100, 101} + arbiter.spawn_worker = mock.MagicMock() + arbiter.kill_worker = mock.MagicMock() + + arbiter._continue_refresh_cycle() + + # Should spawn 1 worker (batch_size) + arbiter.spawn_worker.assert_called_once() + + def test_continue_refresh_cycle_kills_old_workers(self, mock_app): + """Test that continue_refresh_cycle kills old workers when over capacity.""" + + def mock_arbiter_init(self, app): + self._num_workers = 2 + self.cfg = app.cfg + self.WORKERS = {} + + with mock.patch( + "airflow.api_fastapi.gunicorn_app.conf.getint", + side_effect=lambda section, key, fallback=None: { + ("api", "worker_refresh_interval"): 1800, + ("api", "worker_refresh_batch_size"): 1, + }.get((section, key), fallback), + ): + with mock.patch("gunicorn.arbiter.Arbiter.__init__", mock_arbiter_init): + from airflow.api_fastapi.gunicorn_app import AirflowArbiter + + arbiter = AirflowArbiter(mock_app) + # 3 workers (1 over capacity) + arbiter.WORKERS = { + 100: mock.MagicMock(age=0), + 101: mock.MagicMock(age=1), + 102: mock.MagicMock(age=2), # New worker + } + arbiter._refresh_in_progress = True + arbiter._workers_to_replace = {100, 101} # Old workers to replace + arbiter.spawn_worker = mock.MagicMock() + arbiter.kill_worker = mock.MagicMock() + + arbiter._continue_refresh_cycle() + + # Should kill oldest worker (age=0, pid=100) + import signal + + arbiter.kill_worker.assert_called_once_with(100, signal.SIGTERM) + # Worker 100 should be removed from tracking + assert 100 not in arbiter._workers_to_replace + + def test_refresh_cycle_completes(self, mock_app): + """Test that refresh cycle completes when all workers replaced.""" + + def mock_arbiter_init(self, app): + self._num_workers = 2 + self.cfg = app.cfg + self.WORKERS = {} + + with mock.patch( + "airflow.api_fastapi.gunicorn_app.conf.getint", + side_effect=lambda section, key, fallback=None: { + ("api", "worker_refresh_interval"): 1800, + ("api", "worker_refresh_batch_size"): 1, + }.get((section, key), fallback), + ): + with mock.patch("gunicorn.arbiter.Arbiter.__init__", mock_arbiter_init): + from airflow.api_fastapi.gunicorn_app import AirflowArbiter + + arbiter = AirflowArbiter(mock_app) + # All new workers (none to replace) + arbiter.WORKERS = {102: mock.MagicMock(age=0), 103: mock.MagicMock(age=1)} + arbiter._refresh_in_progress = True + arbiter._workers_to_replace = {100, 101} # These are gone now + arbiter.spawn_worker = mock.MagicMock() + arbiter.kill_worker = mock.MagicMock() + + with mock.patch("time.monotonic", return_value=12345): + arbiter._continue_refresh_cycle() + + # Refresh should be complete + assert arbiter._refresh_in_progress is False + assert arbiter._workers_to_replace == set() + assert arbiter._last_refresh_time == 12345 + + +class TestAirflowGunicornApp: + """Tests for the AirflowGunicornApp class.""" + + def test_load_config(self): + """Test that options are loaded into gunicorn config.""" + from airflow.api_fastapi.gunicorn_app import AirflowGunicornApp + + def mock_init(self, options): + pass # Do nothing, we'll set up state manually + + with mock.patch.object(AirflowGunicornApp, "__init__", mock_init): + app = AirflowGunicornApp.__new__(AirflowGunicornApp) + app.options = {"workers": 4, "bind": "0.0.0.0:8080"} + app.cfg = mock.MagicMock() + app.cfg.settings = {"workers": mock.MagicMock(), "bind": mock.MagicMock()} + + app.load_config() + + assert app.cfg.set.call_count == 2 + app.cfg.set.assert_any_call("workers", 4) + app.cfg.set.assert_any_call("bind", "0.0.0.0:8080") + + def test_load_returns_airflow_app(self): + """Test that load() returns the Airflow FastAPI app.""" + from airflow.api_fastapi.gunicorn_app import AirflowGunicornApp + + def mock_init(self, options): + pass # Do nothing, we'll set up state manually + + with mock.patch.object(AirflowGunicornApp, "__init__", mock_init): + app = AirflowGunicornApp.__new__(AirflowGunicornApp) + app.application = None + + with mock.patch("airflow.api_fastapi.main.app", "mock_fastapi_app"): + result = app.load() + + assert result == "mock_fastapi_app" + assert app.application == "mock_fastapi_app" + + def test_run_uses_airflow_arbiter(self): + """Test that run() uses AirflowArbiter.""" + from airflow.api_fastapi.gunicorn_app import AirflowGunicornApp + + def mock_init(self, options): + pass # Do nothing, we'll set up state manually + + with mock.patch.object(AirflowGunicornApp, "__init__", mock_init): + app = AirflowGunicornApp.__new__(AirflowGunicornApp) + + with mock.patch("airflow.api_fastapi.gunicorn_app.AirflowArbiter") as mock_arbiter: + mock_arbiter_instance = mock.MagicMock() + mock_arbiter.return_value = mock_arbiter_instance + + app.run() + + mock_arbiter.assert_called_once_with(app) + mock_arbiter_instance.run.assert_called_once() + + +class TestCreateGunicornApp: + """Tests for the create_gunicorn_app factory function.""" + + def test_create_basic_app(self): + """Test creating an app with basic settings.""" + from airflow.api_fastapi.gunicorn_app import create_gunicorn_app + + with mock.patch("airflow.api_fastapi.gunicorn_app.AirflowGunicornApp") as mock_app_class: + create_gunicorn_app( + host="0.0.0.0", + port=8080, + num_workers=4, + worker_timeout=120, + ) + + mock_app_class.assert_called_once() + options = mock_app_class.call_args[0][0] + + assert options["bind"] == "0.0.0.0:8080" + assert options["workers"] == 4 + assert options["timeout"] == 120 + assert options["worker_class"] == "uvicorn.workers.UvicornWorker" + assert options["preload_app"] is True + + def test_create_app_with_ssl(self): + """Test creating an app with SSL settings.""" + from airflow.api_fastapi.gunicorn_app import create_gunicorn_app + + with mock.patch("airflow.api_fastapi.gunicorn_app.AirflowGunicornApp") as mock_app_class: + create_gunicorn_app( + host="0.0.0.0", + port=8443, + num_workers=4, + worker_timeout=120, + ssl_cert="/path/to/cert.pem", + ssl_key="/path/to/key.pem", + ) + + options = mock_app_class.call_args[0][0] + + assert options["certfile"] == "/path/to/cert.pem" + assert options["keyfile"] == "/path/to/key.pem" + + def test_create_app_with_proxy_headers(self): + """Test creating an app with proxy headers enabled.""" + from airflow.api_fastapi.gunicorn_app import create_gunicorn_app + + with mock.patch("airflow.api_fastapi.gunicorn_app.AirflowGunicornApp") as mock_app_class: + create_gunicorn_app( + host="0.0.0.0", + port=8080, + num_workers=4, + worker_timeout=120, + proxy_headers=True, + ) + + options = mock_app_class.call_args[0][0] + + assert options["forwarded_allow_ips"] == "*" + + def test_create_app_with_access_log(self): + """Test creating an app with access logging enabled.""" + from airflow.api_fastapi.gunicorn_app import create_gunicorn_app + + with mock.patch("airflow.api_fastapi.gunicorn_app.AirflowGunicornApp") as mock_app_class: + create_gunicorn_app( + host="0.0.0.0", + port=8080, + num_workers=4, + worker_timeout=120, + access_log=True, + ) + + options = mock_app_class.call_args[0][0] + + assert options["accesslog"] == "-" + + def test_create_app_without_access_log(self): + """Test creating an app with access logging disabled.""" + from airflow.api_fastapi.gunicorn_app import create_gunicorn_app + + with mock.patch("airflow.api_fastapi.gunicorn_app.AirflowGunicornApp") as mock_app_class: + create_gunicorn_app( + host="0.0.0.0", + port=8080, + num_workers=4, + worker_timeout=120, + access_log=False, + ) + + options = mock_app_class.call_args[0][0] + + assert "accesslog" not in options diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 10886aa879b04..078b02b1dca5c 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1191,6 +1191,7 @@ muldelete multi-cloud multimodal Multinamespace +multiprocess mutex mv mwaa diff --git a/pyproject.toml b/pyproject.toml index 9d8d15bea2e0e..ec75205cd0f76 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,9 @@ packages = [] "graphviz" = [ "apache-airflow-core[graphviz]" ] +"gunicorn" = [ + "apache-airflow-core[gunicorn]" +] "kerberos" = [ "apache-airflow-core[kerberos]" ] @@ -385,7 +388,7 @@ packages = [] "apache-airflow-providers-zendesk>=4.9.0" ] "all" = [ - "apache-airflow[aiobotocore,amazon-aws-auth,apache-atlas,apache-webhdfs,async,cloudpickle,github-enterprise,google-auth,graphviz,kerberos,ldap,memray,otel,pandas,polars,rabbitmq,s3fs,sentry,statsd,uv]", + "apache-airflow[aiobotocore,amazon-aws-auth,apache-atlas,apache-webhdfs,async,cloudpickle,github-enterprise,google-auth,graphviz,gunicorn,kerberos,ldap,memray,otel,pandas,polars,rabbitmq,s3fs,sentry,statsd,uv]", "apache-airflow-core[all]", "apache-airflow-providers-airbyte>=5.0.0", "apache-airflow-providers-alibaba>=3.0.0",