Airflow TaskFlow API: внутреннее устройство современного способа писать DAG-и

Страницы:  1

Ответить
 

Professor Seleznov


Apache Airflow долгое время ассоциировался с таким стилем описания workflow:
# объявляем задачи-таски
task1 = PythonOperator(...)
task2 = BashOperator(...)
# проставляем зависимости между ними
task1 >> task2
Это рабочий и до сих пор актуальный подход, но с Airflow 2.0.0 появился TaskFlow API — способ описывать DAG-и через обычные Python функции и декораторы:
@dag(dag_id="linear_demo")
def tutorial_dag()
@task
def extract():
return 42
@task
def transform(x):
return x * 2
# описываем зависимости и строим Flow
y = transform(extract())
# создаем даг
tutorial_dag()
TaskFlow в Airflow позволяет описывать DAG как обычный Python-код: @dag задает сам workflow/DAG, а @task превращает Python-функции в задачи Airflow. При вызове декорированных функций, например transform(extract()), выполняется не сам расчет, а создаются объекты задач, связи между ними и ссылки на их будущие результаты (через объект XComArg).
То есть TaskFlow - это декларативный DSL(domain-specific language) для построения DAG, где вызовы функций не выполняют вычисления, а описывают граф зависимостей.
Задачи статьи
В этой статье попробуем:
  • Заглянуть внутрь Airflow и понять, как работает TaskFlow API (для версии 3.2.1)
  • На основе этих идей написать собственный микро-фреймворк для закрепления понимания.
  • Сохранить названия и общую логику внутренних объектов Airflow.
  • Понять главный архитектурный принцип: описание DAG ≠ выполнение DAG.
Итак, давайте еще раз рассмотрим основные фазы TaskFlow:
  • Создается DAG через @dag и вызов функции DAG-а.
    В этот момент Airflow создает объект DAG, входит в его контекст и начинает исполнять тело функции DAG-а для сборки графа.
  • Внутри тела DAG-а @task декорирует Python-функции.
    То есть extract, transform становятся не обычными функциями, а объектами-декораторами (_TaskDecorator), которые умеют создавать Airflow-задачи.
  • При вызове декорированных функций, например
    y = transform(extract())
    создаются объекты задач (операторы) и зависимости между задачами.
    Именно здесь фактически собирается граф DAG.
  • Позже scheduler и worker исполняют уже собранный DAG.
Что делает @task
Объект task в Airflow является специальным вызываемым объектом (TaskDecoratorCollection). При декорировании функции он создаёт другой объект _TaskDecorator. см. исходники в [task-sdk\src\airflow\sdk\definitions\decorators\__init__.py]
Упрощённо:
class TaskDecoratorCollection:
def __call__(self, function):
return _TaskDecorator(function)
task = TaskDecoratorCollection()
Напомню:
@task
def extract():
эквивалентно: extract = task(extract)
А значит будет вызван:
TaskDecoratorCollection.__call__()
который вернёт _TaskDecorator, объявлен в [task-sdk\src\airflow\sdk\bases\decorator.py]
Переход от TaskDecoratorCollection к TaskDecorator немного запутан, если проследить - через _getattr__("python"), достает Python task decorator из provider registry
def python_task(...):
[providers\standard\src\airflow\providers\standard\decorators\python.py]
Далее вызывает task_decorator_factory(...), который и возвращает _TaskDecorator

Что такое _TaskDecorator
Это внутренний объект, который стоит за @task. Он хранит исходную функцию и умеет превращать её вызов в задачу DAG. 
class _TaskDecorator:
# храним функцию
def __init__(self, function):
self.function = function
# создается оператор Airflow и оборачивается в XComArg
def __call__(self, ...):
op = BaseOperator(
python_callable = self.function,
...
)
return XComArg(op)
Результат
  • создаёт operator
  • регистрирует его в DAG
  • возвращает ссылку на результат
Что такое XComArg
XComArg — это ленивая ссылка на будущий результат задачи, т.е. это не само значение, а декларативная ссылка на результат upstream-задачи, которая будет разрешена только во время выполнения DAG.
В нашем простом случае мы просто обернем оператор в блок init:
class XComArg:
def __init__(self, operator):
self.operator = operator
Смысл такой: _TaskDecorator создаёт operator, наружу возвращается XComArg(operator) и в логике проверяя аргументы, если видим XComArg, то понимаем что это задача.
То есть:
BaseOperator = задача в DAG
XComArg = ссылка на output этой задачи
Что такое оператор в Airflow
Оператор — это объект задачи в DAG. Идея:
operator = узел графа + правила его выполнения
Оператор описывает:
  • что запускать
  • от чего зависит задача
  • как её выполнять
  • параметры retries / pools / queue / timeout
В нашем учебном примере оператор хранит Python-функцию, а в реальном Airflow это может быть любая логика выполнения, не только Python-функция. Также нужен даг (чтобы зарегистрировать там задачу). В нашем примере это будет просто глобальная переменная _CURRENTDAG.
При создании объекта оператора мы также проставляем зависимости между задачами в текущем даге:
  • upstream_task_ids - Идентификаторы upstream-задач, от которых зависит текущая задача
  • downstream_task_ids - Идентификаторы downstream-задач, которые зависят от текущей задачи
  • dag.add_task(self) - добавляем текущий оператор в текущий даг
# храним текущий даг здесь
_CURRENT_DAG = None
class BaseOperator:
def __init__(self, python_callable, ...):
# Идентификатор задачи.
# В этом учебном примере это просто имя Python-функции.
self.task_id = python_callable.__name__
# Python-функция, которую будет выполнять эта задача.
self.python_callable = python_callable
# Для простоты берем текущий DAG из глобального контекста.
self.dag = _CURRENT_DAG
# Идентификаторы upstream-задач, от которых зависит текущая задача.
self.upstream_task_ids = set()
# Идентификаторы downstream-задач, которые зависят от текущей задачи.
self.downstream_task_ids = set()
# Ищем XComArg в аргументах задачи и по ним строим зависимости
# upstream_task_ids и downstream_task_ids
self._set_xcomarg_dependencies()
# Добавляем задачу в DAG.
self.dag.add_task(self)
# Выполнение задачи.
def execute(self, context):
return self.python_callable()
Реальные наследники BaseOperator
В Airflow это, например:
  • PythonOperator
  • BashOperator
  • Sensor operators
  • SQL operators
Что такое DAG и @dag
DAG — объект, который хранит описание workflow как граф задач. Он отвечает за:
  • dag_id, идентификатор
  • список задач
  • зависимости между задачами
  • контекст with DAG(...) и т.д.
В TaskFlow задается через декоратор @dag, но по сути @dag это удобная обертка над with DAG(...), см. [task-sdk\src\airflow\sdk\definitions\dag.py] в реализации декоратора def dag.
То есть конструкция вида:
@dag(...)
def tutorial_dag():
...
по смыслу близка к:
def tutorial_dag():
with DAG(...) as dag_obj:
...
return dag_obj
Для примера я сделал лишь версию с контекстным менеджером, чтобы не усложнять. И напомню, что у нас это будет глобальная переменная, в которую будем писать название текущего дага при входе/выходе из контекста.:
_CURRENTDAG = None
class DAG:
def __init__(self, dag_id):
self.dag_id = dag_id
# список задач для этого дага
self.task_dict = {}
# вызывается при создании оператора
def add_task(self, task):
self.task_dict[task.task_id] = task
@property
def tasks(self):
return list(self.task_dict.values())
# протокол контекстного менеджера
def __enter__(self):
global CURRENTDAG
_CURRENTDAG = self
return self
def __exit__(self, exc_type, exc, tb):
global CURRENTDAG
_CURRENTDAG = None
Почему нужен with DAG(...)
Когда создаётся задача внутри блока:
with DAG("demo"):
x = extract()
новая задача автоматически привязывается к текущему DAG.
Выполнение и что такое TaskInstance
Когда DAG уже описан, наступает runtime. Scheduler анализирует DAG и планирует выполнение задач, а worker исполняет конкретные TaskInstance — то есть конкретные запуски конкретных задач.
Идея:
  • BaseOperator — описание задачи
  • TaskInstance — конкретное исполнение этой задачи
Например, TaskInstance можно представить как:
  • task_id = "extract"
  • run_id или логическая дата запуска DAG-а
  • try_number = 2
При выполнении задачи ее результат может автоматически сохраняться через механизм XCom (механизм передачи данных между задачами).
  • в учебном примере результат сохраняется в XComStore
  • в настоящем Airflow результат сохраняется в XCom backend / metadata database
# хранилище сохраненных значений
class XComStore:
def __init__(self) -> None:
self.values: dict[tuple[str, str, str], Any] = {}
def push(self, dag_id: str, task_id: str, key: str, value: Any) -> None:
self.values[(dag_id, task_id, key)] = value
def pull(self, dag_id: str, task_id: str, key: str = XCOM_RETURN_KEY) -> Any:
return self.values[(dag_id, task_id, key)]
class TaskInstance:
def __init__(self, task: BaseOperator, xcom_store: XComStore):
self.task = task
self.xcom_store = xcom_store
def xcom_push(self, key: str, value: Any) -> None:
self.xcom_store.push(...)
def xcom_pull(self, task_ids: str, key: str = XCOM_RETURN_KEY) -> Any:
return self.xcom_store.pull(...)
def run(self):
# запускаем на исполнение
result = self.task.execute(context={})
# сохраняем результат в хранилище
self.xcom_store.push(self.task.task_id, result)
Что такое XCom
XCom — механизм передачи данных между задачами. В примере это просто словарь c ключем (dag_id, task_id, key). В настоящем Airflow XCom хранится как записи в metadata database в модели XComModel, на практике это часто PostgreSQL.
Определяется в [airflow-core\src\airflow\models\xcom.py]
То есть по имени дага, таски, ключу можно получить что там сохранили. Например:
@task
def extract():
return 42
Возвращаемое значение TaskFlow-задачи автоматически сериализуется и сохраняется как XCom под специальным ключом return_value. Следующая задача может получить его:
@task
def transform(x):
return x * 2
TaskFlow API делает это автоматически. В классическом стиле Airflow можно делать вручную: ti.xcom_pull(task_ids="extract"),здесь ti- это экземпляр TaskInstance.
В реальном Airflow обычно недостаточно только (dag_id, task_id, key) — еще важны run_id и map_index
  • run_id  - идентификатор конкретного запуска DAG-а
  • map_index нужен для dynamic task mapping, когда одна задача разворачивается в несколько параллельных task instances
Что важно понимать
XCom хранится в metadata DB Airflow, поэтому передача больших объектов через XCom может резко замедлить scheduler и webserver. Поэтому XCom предназначен для небольших данных: числа, строки, json, id, пути к файлам, метаданные. Не стоит передавать большие DataFrame. Лучше:
1) task1 пишет parquet
2) task2 получает путь через XCom
Общий итог
Мы реализовали очень упрощенную версию следующих объектов airflow, попытались сохранить внутреннюю логику и названия:
  • DAG
  • Operator
  • TaskDecorator
  • XComArg
  • TaskInstance
  • XCom
То есть рассмотрели основные концепции Airflow и TaskFlow API.
А далее - минимальный рабочий пример . В нем добавлен объект LinearTaskRunner, который умеет запускать наш линейный ETL.
from __future__ import annotations
from collections.abc import Callable
from typing import Any
_CURRENT_DAG = None
XCOM_RETURN_KEY = "return_value"
class DAG:
def __init__(self, dag_id: str) -> None:
self.dag_id = dag_id
self.task_dict: dict[str, BaseOperator] = {}
def add_task(self, task: BaseOperator) -> None:
# проверка на дубликат task
#if task.task_id in self.task_dict and self.task_dict[task.task_id] is not task:
# raise ValueError(f"Task id {task.task_id!r} already exists in DAG")
self.task_dict[task.task_id] = task
@property
def tasks(self) -> list[BaseOperator]:
return list(self.task_dict.values())
def get_task(self, task_id: str) -> BaseOperator:
return self.task_dict[task_id]
def __enter__(self) -> DAG:
global _CURRENT_DAG
_CURRENT_DAG = self
return self
def __exit__(self, exc_type: object, exc: object, tb: object) -> None:
global _CURRENT_DAG
_CURRENT_DAG = None
class XComStore:
def __init__(self) -> None:
self.values: dict[tuple[str, str, str], Any] = {}
def push(self, dag_id: str, task_id: str, key: str, value: Any) -> None:
self.values[(dag_id, task_id, key)] = value
def pull(self, dag_id: str, task_id: str, key: str = XCOM_RETURN_KEY) -> Any:
return self.values[(dag_id, task_id, key)]
class XComArg:
def __init__(self, operator: BaseOperator, key: str = XCOM_RETURN_KEY) -> None:
self.operator = operator
self.key = key
# Базовый класс оператора
class BaseOperator:
def __init__(
self,
python_callable: Callable[..., Any],
args: tuple[Any, ...],
kwargs: dict[str, Any],
dag: DAG | None = None,
) -> None:
self.task_id = python_callable.__name__
self.python_callable = python_callable
self.args = args
self.kwargs = kwargs
self.dag = dag or _CURRENT_DAG
self.upstream_task_ids: set[str] = set()
self.downstream_task_ids: set[str] = set()
self._set_xcomarg_dependencies()
# кладем эту задачу для текущего дага
if self.dag is not None:
self.dag.add_task(self)
def set_upstream(self, other: BaseOperator) -> None:
self.upstream_task_ids.add(other.task_id)
other.downstream_task_ids.add(self.task_id)
def _set_xcomarg_dependencies(self) -> None:
for arg in self.args:
if isinstance(arg, XComArg):
self.set_upstream(arg.operator)
for arg in self.kwargs.values():
if isinstance(arg, XComArg):
self.set_upstream(arg.operator)
def execute(self, context: dict[str, TaskInstance]) -> Any:
resolved_args = [
context["ti"].resolve(arg) if isinstance(arg, XComArg) else arg
for arg in self.args
]
resolved_kwargs = {
key: context["ti"].resolve(value) if isinstance(value, XComArg) else value
for key, value in self.kwargs.items()
}
return self.python_callable(*resolved_args, **resolved_kwargs)
class TaskInstance:
def __init__(self, task: BaseOperator, xcom_store: XComStore) -> None:
self.task = task
self.xcom_store = xcom_store
def xcom_push(self, key: str, value: Any) -> None:
self.xcom_store.push(
dag_id=self.task.dag.dag_id,
task_id=self.task.task_id,
key=key,
value=value,
)
def xcom_pull(self, task_ids: str, key: str = XCOM_RETURN_KEY) -> Any:
return self.xcom_store.pull(
dag_id=self.task.dag.dag_id,
task_id=task_ids,
key=key,
)
def resolve(self, value: Any) -> Any:
if isinstance(value, XComArg):
return self.xcom_pull(task_ids=value.operator.task_id, key=value.key)
return value
def run(self) -> Any:
context = {"ti": self}
result = self.task.execute(context)
self.xcom_push(XCOM_RETURN_KEY, result)
return result
class LinearTaskRunner:
def __init__(self, dag: DAG, xcom_store: XComStore) -> None:
self.dag = dag
self.xcom_store = xcom_store
def run(self, task: BaseOperator) -> Any:
self._run_task(task)
return self.xcom_store.pull(self.dag.dag_id, task.task_id)
def _run_task(self, task: BaseOperator) -> None:
""" def _run_task(self, task):
if task уже посчитан:
return
для каждого upstream:
_run_task(upstream)
запусти текущую задачу
"""
if (self.dag.dag_id, task.task_id, XCOM_RETURN_KEY) in self.xcom_store.values:
return
# Рекурсивный запуск upstream-задач
for upstream_task_id in task.upstream_task_ids:
upstream_task = self.dag.get_task(upstream_task_id)
self._run_task(upstream_task)
ti = TaskInstance(task=task, xcom_store=self.xcom_store)
ti.run()
class _TaskDecorator:
def __init__(self, function: Callable[..., Any]) -> None:
self.function = function
def __call__(self, *args: Any, **kwargs: Any) -> XComArg:
op = BaseOperator(
python_callable=self.function,
args=args,
kwargs=kwargs,
)
return XComArg(op)
class TaskDecoratorCollection:
def __call__(self, function: Callable[..., Any]) -> _TaskDecorator:
return _TaskDecorator(function)
task = TaskDecoratorCollection()
with DAG("linear_demo") as linear_dag:
@task
def extract():
print("extract")
return 3
@task
def transform(x):
print("transform")
return x + 2
@task
def load(x):
print("load")
return x * 10
result = load(transform(extract()))
linear_xcom_store = XComStore()
linear_runner = LinearTaskRunner(dag=linear_dag, xcom_store=linear_xcom_store)
print(linear_runner.run(result.operator)) # 50
print(linear_xcom_store.pull("linear_demo", "extract")) # 3
print(linear_xcom_store.pull("linear_demo", "transform")) # 5
print(linear_xcom_store.pull("linear_demo", "load")) # 50
-Источник
 
Loading...
Error