Другое

Исправление ошибки 'State Failed' в Airflow 3 Docker

Узнайте, как устранить ошибку 'Завершено с состоянием failed, но атрибут состояния экземпляра задачи находится в очереди' в настройках Apache Airflow 3 Docker. Исправьте проблемы с подключением и синхронизацией состояний с пошаговыми решениями.

Что означает ошибка ‘Завершено с состоянием failed, но атрибут состояния экземпляра задачи имеет значение queued’ в Apache Airflow 3, и как можно её исправить при запуске DAG с помощью Docker?

Я пытаюсь настроить Apache Airflow 3 с Docker на виртуальной машине, используя следующую конфигурацию:

yaml
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE}
  restart: unless-stopped
  env_file: .env
  environment:
    # ---- Airflow (основные) ----
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: ${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN}
    AIRFLOW__SDK__API_BASE_URL: ${AIRFLOW__SDK__API_BASE_URL}
    AIRFLOW__CORE__EXECUTOR: ${AIRFLOW__CORE__EXECUTOR}
    AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW__CORE__FERNET_KEY}
    AIRFLOW__CORE__LOAD_EXAMPLES: ${AIRFLOW__CORE__LOAD_EXAMPLES}
    AIRFLOW__CORE__DEFAULT_TIMEZONE: ${AIRFLOW__CORE__DEFAULT_TIMEZONE}
    AIRFLOW__WEBSERVER__BASE_URL: ${AIRFLOW__WEBSERVER__BASE_URL}
    AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: ${AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE}
    AIRFLOW__DATABASE__SQL_ALCHEMY_ENGINE_OPTIONS: ${AIRFLOW__DATABASE__SQL_ALCHEMY_ENGINE_OPTIONS}
    AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_USERS: ${AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_USERS}
    AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_PASSWORDS_FILE: ${AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_PASSWORDS_FILE}

    # ---- Сеть / Прокси : отключить прокси для внутренних вызовов ----
    NO_PROXY: "localhost,127.0.0.1,web,af_web,airflow.local,*.local"
    no_proxy: "localhost,127.0.0.1,web,af_web,airflow.local,*.local"
    HTTP_PROXY: ""
    HTTPS_PROXY: ""
    http_proxy: ""
    https_proxy: ""
  user: "${AIRFLOW_UID}:${AIRFLOW_GID}"
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./simple_auth_passwords.json:/opt/airflow/simple_auth_passwords.json
  networks:
    airflow_net: {}

services:
  # === API + UI Airflow ===
  web:
    <<: *airflow-common
    container_name: af_web
    ports:
      - "8080:8080"
    command: >
      bash -lc "
        airflow db migrate &&
        airflow api-server --host 0.0.0.0 --port 8080 --proxy-headers --workers 4
      "
    healthcheck:
      test: ["CMD", "bash", "-lc", "curl -sf http://localhost:8080/api/v2/monitor/health | grep -q 'healthy'"]
      interval: 10s
      timeout: 5s
      retries: 12
    networks:
      airflow_net:
        aliases: [web, af_web]

  # === Планировщик ===
  scheduler:
    <<: *airflow-common
    container_name: af_scheduler
    command: >
      bash -lc "
        export NO_PROXY='localhost,127.0.0.1,web,af_web,airflow.local,*.local' &&
        export no_proxy='$NO_PROXY' &&
        export HTTP_PROXY= HTTPS_PROXY= http_proxy= https_proxy= &&
        export AIRFLOW__SDK__API_BASE_URL='http://web:8080' &&
        airflow db migrate &&
        airflow scheduler
      "
    depends_on:
      web:
        condition: service_healthy
    healthcheck:
      test: ["CMD", "airflow", "db", "check"]
      interval: 20s
      timeout: 5s
      retries: 10

  # === Обработчик DAG (парсинг/сериализация DAG) ===
  dag_processor:
    <<: *airflow-common
    container_name: af_dagproc
    command: >
      bash -lc "
        export NO_PROXY='localhost,127.0.0.1,web,af_web,airflow.local,*.local' &&
        export no_proxy='$NO_PROXY' &&
        export HTTP_PROXY= HTTPS_PROXY= http_proxy= https_proxy= &&
        export AIRFLOW__SDK__API_BASE_URL='http://web:8080' &&
        airflow dag-processor
      "
    depends_on:
      web:
        condition: service_healthy
      scheduler:
        condition: service_started

  # === Триггер (асинхронные сенсоры) ===
  triggerer:
    <<: *airflow-common
    container_name: af_triggerer
    command: >
      bash -lc "
        export NO_PROXY='localhost,127.0.0.1,web,af_web,airflow.local,*.local' &&
        export no_proxy='$NO_PROXY' &&
        export HTTP_PROXY= HTTPS_PROXY= http_proxy= https_proxy= &&
        export AIRFLOW__SDK__API_BASE_URL='http://web:8080' &&
        airflow triggerer
      "
    depends_on:
      web:
        condition: service_healthy
      scheduler:
        condition: service_started

networks:
  airflow_net:
    driver: bridge

Каждый раз, когда я пытаюсь запустить DAG, я получаю ошибку: ‘Завершено с состоянием failed, но атрибут состояния экземпляра задачи имеет значение queued.’ В логах планировщика я вижу: ‘httpx.ConnectError: [Errno 111] Connection refused.’

Я проверил, что базовый URL API установлен в ‘http://web:8080’, что кажется правильным. Что может вызывать эту проблему, и как её можно исправить?

Ошибка “Finished with state failed, but the task instance’s state attribute is queued” в Apache Airflow 3 возникает при несоответствии между состоянием, сообщенным исполнителем (executor), и состоянием, хранящимся в базе данных. Обычно это происходит, когда планировщик (scheduler) теряет связь с исполнителем или когда проблемы с подключением предотвращают правильную синхронизацию состояний между компонентами в вашей Docker-среде.


Содержание


Понимание ошибки

Это сообщение об ошибке указывает на проблему синхронизации состояний, когда два компонента Airflow не согласны относительно статуса задачи. Согласно официальной документации Airflow, планировщик помечает задачи как завершенные с ошибкой, когда:

  1. Задача находится в очереди дольше, чем scheduler.task_queued_timeout
  2. Истекло время ожидания сердцебиения (heartbeat) экземпляра задачи
  3. Внешний процесс изменил состояние задачи

Конкретная ошибка, которую вы видите, возникает, когда:

  • Исполнитель сообщает о состоянии задачи как “failed” (ошибка)
  • Планировщик по-прежнему показывает задачу как “queued” (в очереди)
  • Это несоответствие происходит из-за сбоев связи между компонентами

В вашем случае httpx.ConnectError: [Errno 111] Connection refused указывает на то, что планировщик не может установить связь с исполнителем или сервером API, что вызывает несоответствие состояний.


Распространенные причины в Docker-средах

На основе множества проблем на GitHub и обсуждений, эта проблема особенно распространена в настройках Docker Airflow 3.x по нескольким причинам:

1. Проблемы обнаружения сервисов

Docker-контейнеры могут корректно не разрешать имена хостов между сервисами, особенно с новой архитектурой Airflow 3.x, которая включает отдельные компоненты API, планировщика и исполнителя.

2. Проблемы конфигурации сети

Ваша конфигурация показывает изоляцию сети с airflow_net, но псевдонимы сервисов и разрешение имен хостов могут работать некорректно.

3. Коммуникация с сервером API

Ошибка указывает на то, что планировщик не может подключиться к серверу API по адресу http://web:8080, несмотря на вашу конфигурацию AIRFLOW__SDK__API_BASE_URL.


Проблемы с подключением и конфигурацией сети

Ошибка httpx.ConnectError: [Errno 111] Connection refused указывает конкретно на проблемы сетевой коммуникации. Вот возможные причины:

Проверки состояния сервисов

Ваша конфигурация включает проверки состояния для веб-сервиса:

yaml
healthcheck:
  test: ["CMD", "bash", "-lc", "curl -sf http://localhost:8080/api/v2/monitor/health | grep -q 'healthy'"]

Однако планировщик может пытаться подключиться до того, как сервер API будет полностью готов, даже с условиями service_healthy.

Разрешение имен хостов

Планировщик использует http://web:8080 для подключения к серверу API, но Docker-сеть может некорректно разрешать это имя хоста во время запуска.

Настройки прокси

Хотя вы настроили параметры прокси, переменные окружения NO_PROXY могут не предотвращать попытки подключения, которые завершаются ошибкой.


Проблемы связи между планировщиком и исполнителем

В Airflow 3.x архитектура значительно изменилась с введением:

  1. Сервер API: Обрабатывает веб-интерфейс и внешние коммуникации
  2. Планировщик: Управляет планированием задач и состоянием
  3. Обработчик DAG: Обрабатывает парсинг DAG
  4. Триггерер: Управляет операциями сенсоров

Ошибка несоответствия состояний возникает, когда:

  • Планировщик ставит задачу в очередь
  • Исполнитель пытается выполнить ее, но не из-за проблем с подключением
  • Исполнитель сообщает о состоянии “failed”
  • Планировщик никогда не получает это обновление и продолжает считать задачу “queued” (в очереди)

Это создает несоответствие, с которым вы столкнулись.


Проблемы конфигурации

Несколько проблем конфигурации в вашей настройке могут вызывать эту проблему:

1. Отсутствующая конфигурация исполнителя

Вы установили AIRFLOW__CORE__EXECUTOR, но не указали тип исполнителя. Для Docker-настроек обычно следует использовать:

  • LocalExecutor для одноконтейнерных настроек
  • CeleryExecutor для распределенных настроек с Redis/RabbitMQ

2. Конфигурация URL сервера API

Настройка AIRFLOW__SDK__API_BASE_URL может быть недостаточной. Airflow 3.x требует дополнительной конфигурации для правильной коммуникации между сервисами.

3. Отсутствующая конфигурация рабочего процесса

Ваша конфигурация не включает отдельный сервис рабочего процесса (worker), что может вызывать сбой при запуске исполнителя внутри контейнера планировщика.


Пошаговые решения

Решение 1: Исправление коммуникации с сервером API

Обновите конфигурации планировщика и триггерера для использования правильного URL API:

yaml
scheduler:
  # ... существующая конфигурация ...
  command: >
    bash -lc "
      export NO_PROXY='localhost,127.0.0.1,web,af_web,airflow.local,*.local' &&
      export no_proxy='$NO_PROXY' &&
      export HTTP_PROXY= HTTPS_PROXY= http_proxy= https_proxy= &&
      export AIRFLOW__API__API_SERVER_HOST='web' &&
      export AIRFLOW__API__API_SERVER_PORT='8080' &&
      export AIRFLOW__API__API_SERVER_PROTOCOL='http' &&
      airflow db migrate &&
      airflow scheduler
    "

Решение 2: Добавление отдельного сервиса рабочего процесса

Добавьте отдельный сервис рабочего процесса в ваш docker-compose:

yaml
worker:
  <<: *airflow-common
  container_name: af_worker
  command: >
    bash -lc "
      export NO_PROXY='localhost,127.0.0.1,web,af_web,airflow.local,*.local' &&
      export no_proxy='$NO_PROXY' &&
      export HTTP_PROXY= HTTPS_PROXY= http_proxy= https_proxy= &&
      export AIRFLOW__API__API_SERVER_HOST='web' &&
      export AIRFLOW__API__API_SERVER_PORT='8080' &&
      export AIRFLOW__API__API_SERVER_PROTOCOL='http' &&
      airflow worker
    "
  depends_on:
    web:
      condition: service_healthy
    scheduler:
      condition: service_started

Решение 3: Правильная настройка LocalExecutor

Установите тип исполнителя как LocalExecutor и настройте параллелизм:

yaml
environment:
  # ... существующие переменные ...
  AIRFLOW__CORE__EXECUTOR: LocalExecutor
  AIRFLOW__CORE__PARALLELISM: 4
  AIRFLOW__CORE__DAG_CONCURRENCY: 4

Решение 4: Добавление таймаутов подключения

Настройте параметры таймаута для предотвращения зависания:

yaml
environment:
  # ... существующие переменные ...
  AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT: "300"  # 5 минут
  AIRFLOW__API__API_SERVER_READ_TIMEOUT: "300"
  AIRFLOW__API__API_SERVER_WRITE_TIMEOUT: "300"

Решение 5: Исправление конфигурации сети

Обновите псевдонимы сервисов и конфигурацию сети:

yaml
networks:
  airflow_net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16

services:
  web:
    # ... существующая конфигурация ...
    networks:
      airflow_net:
        aliases: [web, af_web, localhost]
        ipv4_address: 172.20.0.2

Профилактические меры

1. Мониторинг состояния компонентов

Реализуйте правильные проверки состояния и мониторинг:

yaml
healthcheck:
  test: ["CMD-SHELL", "curl -f http://localhost:8080/api/v1/health || exit 1"]
  interval: 30s
  timeout: 10s
  retries: 5
  start_period: 40s

2. Использование Airflow 3.1+

Обновитесь до Airflow 3.1.2 или более поздней версии, которая включает улучшенную синхронизацию состояний и исправления ошибок для этой проблемы.

3. Реализация повторных попыток

Добавьте логику повторных попыток в ваши DAG для обработки временных проблем с подключением:

python
from airflow.utils.email import send_email
from airflow.exceptions import AirflowException

default_args = {
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'on_failure_callback': send_email,
    # ... другие аргументы ...
}

4. Анализ логов

Включите детальное логирование для раннего выявления проблем с подключением:

yaml
logging:
  level: INFO
  format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

Источники

  1. Устранение неполадок Apache Airflow - Внешнее изменение состояния задачи
  2. GitHub Issue: Airflow 3.0.3 sporadically fails tasks with state mismatch
  3. GitHub Discussion: DAG execution fails with ‘state mismatch’ error when behind an NGINX reverse proxy
  4. Stack Overflow: What does ‘state attribute is queued’ mean for airflow 3?
  5. DrDroid: Apache Airflow Task stuck in ‘queued’ state
  6. AWS re:Post: MWAA Tasks failing while queued (state mismatch)
  7. GitHub Issue: Worker fails with httpx.ConnectError: [Errno 111] Connection refused after upgrading to 3.0.6

Заключение

Ошибка “Finished with state failed, but the task instance’s state attribute is queued” в Apache Airflow 3 в основном вызвана сбоями связи между планировщиком, исполнителем и сервером API компонентов. В вашей Docker-среде это, вероятно, связано с:

  1. Проблемами сетевого подключения между контейнерами
  2. Неправильным обнаружением сервисов и разрешением имен хостов
  3. Отсутствующей конфигурацией сервиса рабочего процесса
  4. Проблемами коммуникации с сервером API

Для решения этой проблемы реализуйте ключевые исправления:

  • Добавьте правильную конфигурацию URL сервера API
  • Включите отдельный сервис рабочего процесса
  • Используйте LocalExecutor для простых Docker-настроек
  • Настройте псевдонимы сети и IP-адреса
  • Установите соответствующие значения таймаута

Предотвратите будущие проблемы, обновившись до Airflow 3.1+, реализовав комплексные проверки состояния и добавив логику повторных попыток в ваши DAG. Проактивно отслеживайте логи компонентов для выявления проблем с подключением до того, как они приведут к сбоям задач.

Авторы
Проверено модерацией
Модерация