diff --git a/airflow-core/docs/howto/create-custom-decorator.rst b/airflow-core/docs/howto/create-custom-decorator.rst index 32942393de92c..0b1cebe197c71 100644 --- a/airflow-core/docs/howto/create-custom-decorator.rst +++ b/airflow-core/docs/howto/create-custom-decorator.rst @@ -18,76 +18,170 @@ Creating Custom ``@task`` Decorators ==================================== -As of Airflow 2.2 it is possible add custom decorators to the TaskFlow interface from within a provider +As of Airflow 2.2 it is possible to add custom decorators to the TaskFlow interface from within a provider package and have those decorators appear natively as part of the ``@task.____`` design. -For an example. Let's say you were trying to create an easier mechanism to run python functions as "foo" -tasks. The steps to create and register ``@task.foo`` are: +This is useful when you already have an operator that should execute a Python callable, but you want DAG +authors to use it with the TaskFlow API: -1. Create a ``FooDecoratedOperator`` +.. code-block:: python - In this case, we are assuming that you have an existing ``FooOperator`` that takes a python function as an - argument. By creating a ``FooDecoratedOperator`` that inherits from ``FooOperator`` and - ``airflow.decorators.base.DecoratedOperator``, Airflow will supply much of the needed functionality required - to treat your new class as a taskflow native class. + from airflow.sdk import task - You should also override the ``custom_operator_name`` attribute to provide a custom name for the task. For - example, ``_DockerDecoratedOperator`` in the ``apache-airflow-providers-docker`` provider sets this to - ``@task.docker`` to indicate the decorator name it implements. -2. Create a ``foo_task`` function + @task.foo(task_id="run_in_foo") + def transform_customer_data(customer_id: str): + return {"customer_id": customer_id} - Once you have your decorated class, create a function like this, to convert - the new ``FooDecoratedOperator`` into a TaskFlow function decorator! +At a high level, a custom TaskFlow decorator has three parts: - .. code-block:: python +* a decorated operator class that combines your operator with + ``airflow.sdk.bases.decorator.DecoratedOperator``; +* a small factory function that calls ``airflow.sdk.bases.decorator.task_decorator_factory``; +* provider metadata that tells Airflow the decorator name and the import path of the factory function. - from typing import TYPE_CHECKING - from airflow.sdk.bases.decorator import task_decorator_factory +The examples below use ``FooOperator`` and register it as ``@task.foo``. ``FooOperator`` is assumed to be +an operator you already maintain in your provider. - if TYPE_CHECKING: - from airflow.sdk.bases.decorator import TaskDecorator +If you only need a helper inside one DAG file or one internal project, you may not need a registered +``@task.foo`` decorator at all. A regular Python function that wraps ``@task`` can be enough. Registering a +new ``@task.`` decorator is the provider-level approach: it makes the decorator discoverable anywhere +the provider is installed. +1. Create a decorated operator +------------------------------ - def foo_task( - python_callable: Callable | None = None, - multiple_outputs: bool | None = None, +Create a decorated operator that inherits from both ``DecoratedOperator`` and the operator you want to +expose through TaskFlow: + +.. code-block:: python + + from __future__ import annotations + + from collections.abc import Callable, Collection, Mapping + from typing import Any + + from airflow.sdk.bases.decorator import DecoratedOperator + from airflow.providers.foo.operators.foo import FooOperator + + + class FooDecoratedOperator(DecoratedOperator, FooOperator): + """Wrap a Python callable in FooOperator for use with ``@task.foo``.""" + + custom_operator_name = "@task.foo" + + def __init__( + self, + *, + python_callable: Callable, + op_args: Collection[Any] | None = None, + op_kwargs: Mapping[str, Any] | None = None, **kwargs, - ) -> "TaskDecorator": - return task_decorator_factory( + ) -> None: + super().__init__( python_callable=python_callable, - multiple_outputs=multiple_outputs, - decorated_operator_class=FooDecoratedOperator, + op_args=op_args, + op_kwargs=op_kwargs, **kwargs, ) -3. Register your new decorator in get_provider_info of your provider +``DecoratedOperator`` captures the decorated Python function and the arguments passed when the task is +called in the DAG. Your operator stays responsible for the actual execution behavior. + +Set ``custom_operator_name`` to the public decorator name. This is the name users see in UI and task +representations. For example, the Docker provider uses ``@task.docker``. + +If the base operator already has arguments named ``python_callable``, ``op_args``, or ``op_kwargs``, pass +those values through ``kwargs_to_upstream`` when calling ``DecoratedOperator.__init__``. The standard +``@task`` implementation does this because ``PythonOperator`` also uses those argument names. + +2. Create the decorator factory function +---------------------------------------- + +Create a function that turns ``FooDecoratedOperator`` into a TaskFlow decorator: + +.. code-block:: python + + from __future__ import annotations + + from collections.abc import Callable + from typing import TYPE_CHECKING + + from airflow.sdk.bases.decorator import task_decorator_factory + + if TYPE_CHECKING: + from airflow.sdk.bases.decorator import TaskDecorator + + + def foo_task( + python_callable: Callable | None = None, + multiple_outputs: bool | None = None, + **kwargs, + ) -> "TaskDecorator": + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=FooDecoratedOperator, + **kwargs, + ) + +Users can then write either ``@task.foo`` or ``@task.foo(...)``. Any keyword arguments that are not handled +by ``task_decorator_factory`` are forwarded to ``FooDecoratedOperator`` when the decorated function is +called. + +.. note:: + + Airflow providers that support both Airflow 2 and Airflow 3 often import ``DecoratedOperator``, + ``TaskDecorator``, and ``task_decorator_factory`` from ``airflow.providers.common.compat.sdk`` instead + of importing from ``airflow.sdk`` directly. Use the compatibility module when your provider needs that + wider version support. + +3. Register the decorator in provider metadata +---------------------------------------------- + +Airflow discovers custom TaskFlow decorators from provider metadata. The metadata entry contains: + +``name`` + The attribute that will be added under ``task``. For ``@task.foo``, use ``foo``. + +``class-name`` + The import path to the decorator factory function, not the decorated operator class. + +For providers in the Airflow repository, add the entry to the provider's ``provider.yaml`` file: + +.. code-block:: yaml + + task-decorators: + - name: foo + class-name: airflow.providers.foo.decorators.foo.foo_task + +The generated ``get_provider_info.py`` file will include this metadata after provider metadata is +regenerated. Do not edit the generated file directly in the Airflow repository. - Finally, add a key-value ``task-decorators`` to the dict returned from the provider entrypoint as described - in :doc:`apache-airflow-providers:howto/create-custom-providers`. This should be - a list with each item containing ``name`` and ``class-name`` keys. When Airflow starts, the - ``ProviderManager`` class will automatically import this value and ``task.foo`` will work as a new decorator! +For third-party providers that expose provider metadata from a Python entrypoint, add the same information +to the dict returned by ``get_provider_info`` as described in +:doc:`apache-airflow-providers:howto/create-custom-providers`: - .. code-block:: python +.. code-block:: python - def get_provider_info(): - return { - "package-name": "foo-provider-airflow", - "name": "Foo", - "task-decorators": [ - { - "name": "foo", - # "Import path" and function name of the `foo_task` - "class-name": "name.of.python.package.foo_task", - } - ], - # ... - } + def get_provider_info(): + return { + "package-name": "foo-provider-airflow", + "name": "Foo", + "task-decorators": [ + { + "name": "foo", + "class-name": "airflow.providers.foo.decorators.foo.foo_task", + } + ], + # ... + } - Please note that the ``name`` must be a valid python identifier. +The ``name`` must be a valid Python identifier. When Airflow starts, ``ProviderManager`` reads the +``task-decorators`` metadata and makes the factory function available as ``task.foo``. (Optional) Adding IDE auto-completion support -============================================= +--------------------------------------------- .. note::