Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 141 additions & 47 deletions airflow-core/docs/howto/create-custom-decorator.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,76 +18,170 @@
Creating Custom ``@task`` Decorators
====================================

As of Airflow 2.2 it is possible add custom decorators to the TaskFlow interface from within a provider
As of Airflow 2.2 it is possible to add custom decorators to the TaskFlow interface from within a provider
package and have those decorators appear natively as part of the ``@task.____`` design.

For an example. Let's say you were trying to create an easier mechanism to run python functions as "foo"
tasks. The steps to create and register ``@task.foo`` are:
This is useful when you already have an operator that should execute a Python callable, but you want DAG
authors to use it with the TaskFlow API:

1. Create a ``FooDecoratedOperator``
.. code-block:: python

In this case, we are assuming that you have an existing ``FooOperator`` that takes a python function as an
argument. By creating a ``FooDecoratedOperator`` that inherits from ``FooOperator`` and
``airflow.decorators.base.DecoratedOperator``, Airflow will supply much of the needed functionality required
to treat your new class as a taskflow native class.
from airflow.sdk import task

You should also override the ``custom_operator_name`` attribute to provide a custom name for the task. For
example, ``_DockerDecoratedOperator`` in the ``apache-airflow-providers-docker`` provider sets this to
``@task.docker`` to indicate the decorator name it implements.

2. Create a ``foo_task`` function
@task.foo(task_id="run_in_foo")
def transform_customer_data(customer_id: str):
return {"customer_id": customer_id}

Once you have your decorated class, create a function like this, to convert
the new ``FooDecoratedOperator`` into a TaskFlow function decorator!
At a high level, a custom TaskFlow decorator has three parts:

.. code-block:: python
* a decorated operator class that combines your operator with
``airflow.sdk.bases.decorator.DecoratedOperator``;
* a small factory function that calls ``airflow.sdk.bases.decorator.task_decorator_factory``;
* provider metadata that tells Airflow the decorator name and the import path of the factory function.

from typing import TYPE_CHECKING
from airflow.sdk.bases.decorator import task_decorator_factory
The examples below use ``FooOperator`` and register it as ``@task.foo``. ``FooOperator`` is assumed to be
an operator you already maintain in your provider.

if TYPE_CHECKING:
from airflow.sdk.bases.decorator import TaskDecorator
If you only need a helper inside one DAG file or one internal project, you may not need a registered
``@task.foo`` decorator at all. A regular Python function that wraps ``@task`` can be enough. Registering a
new ``@task.<name>`` decorator is the provider-level approach: it makes the decorator discoverable anywhere
the provider is installed.

1. Create a decorated operator
------------------------------

def foo_task(
python_callable: Callable | None = None,
multiple_outputs: bool | None = None,
Create a decorated operator that inherits from both ``DecoratedOperator`` and the operator you want to
expose through TaskFlow:

.. code-block:: python

from __future__ import annotations

from collections.abc import Callable, Collection, Mapping
from typing import Any

from airflow.sdk.bases.decorator import DecoratedOperator
from airflow.providers.foo.operators.foo import FooOperator


class FooDecoratedOperator(DecoratedOperator, FooOperator):
"""Wrap a Python callable in FooOperator for use with ``@task.foo``."""

custom_operator_name = "@task.foo"

def __init__(
self,
*,
python_callable: Callable,
op_args: Collection[Any] | None = None,
op_kwargs: Mapping[str, Any] | None = None,
**kwargs,
) -> "TaskDecorator":
return task_decorator_factory(
) -> None:
super().__init__(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=FooDecoratedOperator,
op_args=op_args,
op_kwargs=op_kwargs,
**kwargs,
)

3. Register your new decorator in get_provider_info of your provider
``DecoratedOperator`` captures the decorated Python function and the arguments passed when the task is
called in the DAG. Your operator stays responsible for the actual execution behavior.

Set ``custom_operator_name`` to the public decorator name. This is the name users see in UI and task
representations. For example, the Docker provider uses ``@task.docker``.

If the base operator already has arguments named ``python_callable``, ``op_args``, or ``op_kwargs``, pass
those values through ``kwargs_to_upstream`` when calling ``DecoratedOperator.__init__``. The standard
``@task`` implementation does this because ``PythonOperator`` also uses those argument names.

2. Create the decorator factory function
----------------------------------------

Create a function that turns ``FooDecoratedOperator`` into a TaskFlow decorator:

.. code-block:: python

from __future__ import annotations

from collections.abc import Callable
from typing import TYPE_CHECKING

from airflow.sdk.bases.decorator import task_decorator_factory

if TYPE_CHECKING:
from airflow.sdk.bases.decorator import TaskDecorator


def foo_task(
python_callable: Callable | None = None,
multiple_outputs: bool | None = None,
**kwargs,
) -> "TaskDecorator":
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=FooDecoratedOperator,
**kwargs,
)

Users can then write either ``@task.foo`` or ``@task.foo(...)``. Any keyword arguments that are not handled
by ``task_decorator_factory`` are forwarded to ``FooDecoratedOperator`` when the decorated function is
called.

.. note::

Airflow providers that support both Airflow 2 and Airflow 3 often import ``DecoratedOperator``,
``TaskDecorator``, and ``task_decorator_factory`` from ``airflow.providers.common.compat.sdk`` instead
of importing from ``airflow.sdk`` directly. Use the compatibility module when your provider needs that
wider version support.

3. Register the decorator in provider metadata
----------------------------------------------

Airflow discovers custom TaskFlow decorators from provider metadata. The metadata entry contains:

``name``
The attribute that will be added under ``task``. For ``@task.foo``, use ``foo``.

``class-name``
The import path to the decorator factory function, not the decorated operator class.

For providers in the Airflow repository, add the entry to the provider's ``provider.yaml`` file:

.. code-block:: yaml

task-decorators:
- name: foo
class-name: airflow.providers.foo.decorators.foo.foo_task

The generated ``get_provider_info.py`` file will include this metadata after provider metadata is
regenerated. Do not edit the generated file directly in the Airflow repository.

Finally, add a key-value ``task-decorators`` to the dict returned from the provider entrypoint as described
in :doc:`apache-airflow-providers:howto/create-custom-providers`. This should be
a list with each item containing ``name`` and ``class-name`` keys. When Airflow starts, the
``ProviderManager`` class will automatically import this value and ``task.foo`` will work as a new decorator!
For third-party providers that expose provider metadata from a Python entrypoint, add the same information
to the dict returned by ``get_provider_info`` as described in
:doc:`apache-airflow-providers:howto/create-custom-providers`:

.. code-block:: python
.. code-block:: python

def get_provider_info():
return {
"package-name": "foo-provider-airflow",
"name": "Foo",
"task-decorators": [
{
"name": "foo",
# "Import path" and function name of the `foo_task`
"class-name": "name.of.python.package.foo_task",
}
],
# ...
}
def get_provider_info():
return {
"package-name": "foo-provider-airflow",
"name": "Foo",
"task-decorators": [
{
"name": "foo",
"class-name": "airflow.providers.foo.decorators.foo.foo_task",
}
],
# ...
}

Please note that the ``name`` must be a valid python identifier.
The ``name`` must be a valid Python identifier. When Airflow starts, ``ProviderManager`` reads the
``task-decorators`` metadata and makes the factory function available as ``task.foo``.

(Optional) Adding IDE auto-completion support
=============================================
---------------------------------------------

.. note::

Expand Down
Loading