Другое

Исправление ошибки 'Broken pipe' в Airflow SSH: Полное руководство

Полное руководство по исправлению ошибок 'client_loop: send disconnect: Broken pipe' в DAGs Airflow при использовании gcloud compute ssh для долгосрочных задач обучения ML.

Как предотвратить ошибку ‘client_loop: send disconnect: Broken pipe’ в моем Airflow DAG при использовании gcloud compute ssh для длительного обучения моделей машинного обучения?

У меня есть Airflow DAG с BashOperator, который подключается к GCP VM через SSH для обучения модели:

python
training = BashOperator(
    task_id="training",
    bash_command="echo 'wait for VM initialization' && sleep 60 && "
                "gcloud config set account xx && "
                "gcloud compute ssh {SERVICE_ACCOUNT.split('@')[0]}@{x} "
                "--tunnel-through-iap "
                "--ssh-flag='-o ServerAliveInterval=60' "
                "--ssh-flag='-o ServerAliveCountMax=5' "
                "--command {train_cmd}",
    retry_delay=timedelta(minutes=2),
)

Код обучения модели:

python
print("fitting model")
tr.model_fit()

def model_fit(self):
    self.model = YouTubeRanking(
        task="ranking",
        data_info=self.data_info,
        embed_size=16,
        n_epochs=7,
        lr=1e-4,
        batch_size=512,
        use_bn=True,
        hidden_units=(128, 64, 32),
    )
    
    self.model.fit(
        self.train_data,
        neg_sampling=True,
        verbose=0,
        shuffle=True,
        eval_data=self.test_data,
        metrics=["loss"],
    )

Ошибка ‘client_loop: send disconnect: Broken pipe’ возникает периодически - иногда сразу после запуска, иногда через несколько часов. В большинстве случаев все работает нормально, и нет значительной разницы в объеме данных. Я уже добавил флаги SSH для keepalive, но это не решило проблему. Какие дополнительные конфигурации или подходы я могу реализовать для поддержания стабильных SSH-соединений во время длительных заданий обучения?

Введение

Ошибка “client_loop: send disconnect: Broken pipe” обычно возникает из-за таймаутов SSH-соединений или сетевых прерываний во время длительно выполняющихся задач. Для решения этой проблемы в вашем DAG Airflow требуется комбинация конфигураций SSH keepalive, инструментов управления сессиями, таких как screen/tmux, и надежных механизмов обработки ошибок.

Содержание

Причины возникновения ошибки

Ошибка “client_loop: send disconnect: Broken pipe” возникает, когда SSH-соединение неожиданно прерывается, обычно из-за:

  • Сетевые таймауты: промежуточные маршрутизаторы или файрволы сбрасывают неактивные соединения
  • Прерывания со стороны клиента: ваш локальный компьютер теряет сетевое подключение
  • Таймауты со стороны сервера: SSH-сервер закрывает соединения, которые кажутся неактивными
  • Проблемы, специфичные для GCP: туннелирование через IAP или конфигурации файрволов

Как объясняется в руководстве ReliablePenguin, это обычно вызвано таймаутами простоя или нестабильными сетевыми условиями, что делает особенно проблематичным длительные обучающие задания, которые могут не производить вывод в течение длительного времени.

Исправления конфигурации SSH

Хотя вы уже пробовали некоторые флаги SSH, вот дополнительные конфигурации, которые помогут поддерживать стабильные соединения:

Улучшенные настройки SSH Keepalive

python
training = BashOperator(
    task_id="training",
    bash_command="echo 'wait for VM initialization' && sleep 60 && "
                "gcloud config set account xx && "
                "gcloud compute ssh {SERVICE_ACCOUNT.split('@')[0]}@{x} "
                "--tunnel-through-iap "
                "--ssh-flag='-o ServerAliveInterval=60' "
                "--ssh-flag='-o ServerAliveCountMax=5' "
                "--ssh-flag='-o TCPKeepAlive=yes' "
                "--ssh-flag='-o ServerAliveInterval=120' "
                "--ssh-flag='-o ServerAliveCountMax=3' "
                "--ssh-flag='-o IPQoS=throughput' "
                "--ssh-flag='-o ConnectTimeout=30' "
                "--command {train_cmd}",
    retry_delay=timedelta(minutes=2),
)

Ключевые дополнительные флаги для рассмотрения:

  • TCPKeepAlive yes: Отправляет TCP-пакеты keepalive для поддержания соединения
  • IPQoS throughput: Приоритизирует сетевой трафик для SSH-соединений
  • ConnectTimeout 30: Устанавливает разумный таймаут подключения

Как сообщает TecMint, включение TCPKeepAlive решило аналогичные проблемы для пользователей, испытывающих отключения после перезагрузки сервера.

Постоянная конфигурация SSH

Для более постоянных решений создайте или измените ваш файл ~/.ssh/config:

Host *
    ServerAliveInterval 120
    ServerAliveCountMax 3
    TCPKeepAlive yes
    IPQoS throughput
    ControlMaster auto
    ControlPath ~/.ssh/master-%r@%h:%p
    ControlPersist 600

Эта конфигурация обеспечивает последовательное поведение SSH для всех соединений и поддерживает сокеты управления для более быстрого повторного подключения.

Управление сессиями с помощью Screen/Tmux

Наиболее надежный подход для длительно выполняющихся процессов - использование инструментов управления сессиями, которые поддерживают работу ваших процессов независимо от SSH-соединения.

Использование Screen

python
training = BashOperator(
    task_id="training",
    bash_command="echo 'wait for VM initialization' && sleep 60 && "
                "gcloud config set account xx && "
                "gcloud compute ssh {SERVICE_ACCOUNT.split('@')[0]}@{x} "
                "--tunnel-through-iap "
                "--ssh-flag='-o ServerAliveInterval=60' "
                "--ssh-flag='-o ServerAliveCountMax=5' "
                "--command 'screen -dmS training bash -c \"" 
                "echo \"Starting training at $(date)\" && "
                "{train_cmd} && echo \"Training completed at $(date)\" || echo \"Training failed at $(date)\"'\"",
    retry_delay=timedelta(minutes=2),
)

Использование Tmux (Рекомендуется)

python
training = BashOperator(
    task_id="training",
    bash_command="echo 'wait for VM initialization' && sleep 60 && "
                "gcloud config set account xx && "
                "gcloud compute ssh {SERVICE_ACCOUNT.split('@')[0]}@{x} "
                "--tunnel-through-iap "
                "--ssh-flag='-o ServerAliveInterval=60' "
                "--ssh-flag='-o ServerAliveCountMax=5' "
                "--command 'tmux new-session -d -s training bash -c \"" 
                "echo \"Starting training at $(date)\" && "
                "{train_cmd} && echo \"Training completed at $(date)\" || echo \"Training failed at $(date)\"'\"",
    retry_delay=timedelta(minutes=2),
)

Почему управление сессиями работает: Как рекомендуется в документации TensorFlow Google Cloud, “запускайте сеанс обучения с помощью мультиплексора терминала или инструмента управления сессиями, такого как tmux или screen”. Эти инструменты поддерживают работу ваших процессов даже при обрыве SSH-соединения, позволяя вам подключиться снова и отслеживать прогресс позже.

Мониторинг и управление сессиями

После запуска сеанса обучения вы можете отслеживать его с помощью:

bash
# Список активных сессий
gcloud compute ssh user@instance --command "tmux ls"

# Подключение к сессии
gcloud compute ssh user@instance --command "tmux attach -t training"

Альтернативные подходы к SSH

Использование встроенного удаленного выполнения GCP

Рассмотрите возможность использования gcloud compute ssh с флагом --dry-run для настройки, а затем прямого выполнения команд:

python
training = BashOperator(
    task_id="training",
    bash_command="echo 'wait for VM initialization' && sleep 60 && "
                "gcloud config set account xx && "
                "gcloud compute ssh --dry-run {SERVICE_ACCOUNT.split('@')[0]}@{x} "
                "--tunnel-through-iap "
                "--ssh-flag='-o ServerAliveInterval=60' "
                "--ssh-flag='-o ServerAliveCountMax=5' "
                "--command 'echo \"SSH setup successful\"' && "
                "gcloud compute instances add-metadata {x} "
                "--metadata startup-script='#!/bin/bash\n"
                "tmux new-session -d -s training bash -c \\\"{train_cmd}\\\"'",
    retry_delay=timedelta(minutes=2),
)

Использование удаленного выполнения команд Cloud SDK

python
from google.cloud import storage
from google.oauth2 import service_account

def run_remote_training():
    client = storage.Client()
    bucket = client.bucket('your-training-scripts')
    blob = bucket.blob('training_script.sh')
    blob.upload_from_string(f'#!/bin/bash\n{train_cmd}')
    
    # Удаленное выполнение
    !gcloud compute scp training_script.sh {x}:~/ --zone=your-zone
    !gcloud compute ssh {x} --zone=your-zone --command "chmod +x ~/training_script.sh && tmux new-session -d -s training bash -c '~/training_script.sh'"

Специфические решения для Airflow

Улучшенная обработка ошибок и повторные попытки

python
from airflow.providers.google.cloud.operators.compute import ComputeEngineStartInstanceOperator
from airflow.providers.google.cloud.operators.compute import ComputeEngineStopInstanceOperator

def training_failure_callback(context):
    """Обработка ошибок обучения"""
    task_instance = context['task_instance']
    if task_instance.state == 'failed':
        # Логируем ошибку и, возможно, перезапускаем инстанс
        print(f"Training failed for {task_instance.task_id}")
        # Добавьте здесь вашу логику обработки ошибок

training = BashOperator(
    task_id="training",
    bash_command="echo 'wait for VM initialization' && sleep 60 && "
                "gcloud config set account xx && "
                "gcloud compute ssh {SERVICE_ACCOUNT.split('@')[0]}@{x} "
                "--tunnel-through-iap "
                "--ssh-flag='-o ServerAliveInterval=60' "
                "--ssh-flag='-o ServerAliveCountMax=5' "
                "--ssh-flag='-o TCPKeepAlive=yes' "
                "--command 'tmux new-session -d -s training bash -c \\\"{train_cmd}\\\"'",
    retry_delay=timedelta(minutes=2),
    retries=3,
    on_failure_callback=training_failure_callback,
)

Использование SSH-оператора Airflow

Рассмотрите возможность использования SSH-провайдера Airflow для лучшего управления соединениями:

python
from airflow.providers.ssh.operators.ssh import SSHOperator

training = SSHOperator(
    task_id="training",
    ssh_conn_id="gcp_ssh_conn",
    command="tmux new-session -d -s training bash -c '{train_cmd}'",
    retries=3,
    retry_delay=timedelta(minutes=2),
)

С SSH-соединением, настроенным в UI Airflow:

Connection Id: gcp_ssh_conn
Host: {instance_ip}
Username: {SERVICE_ACCOUNT.split('@')[0]}
SSH Key File: /path/to/your/key.pem

Сетевые и инфраструктурные аспекты

VPN и сетевая стабильность

  • Используйте стабильное VPN-соединение при удаленной работе
  • Рассмотрите возможность запуска планировщика Airflow на VM GCP для уменьшения количества сетевых переходов
  • Включите сетевую премиум-уровня Google Cloud для лучшей надежности

Конфигурация инстанса

python
# Настройте ваш инстанс GCP с соответствующими параметрами
gcloud compute instances create {x} \
    --machine-type=n1-standard-4 \
    --image-family=ubuntu-2004-lts \
    --image-project=ubuntu-os-cloud \
    --boot-disk-size=100GB \
    --boot-disk-type=pd-ssd \
    --maintenance-policy=MIGRATE \
    --restart-policy=ALWAYS

Мониторинг и оповещения

python
from airflow.providers.google.cloud.operators.cloud_monitoring import (
    CloudMonitoringWriteMetricOperator
)

monitor_training = CloudMonitoringWriteMetricOperator(
    task_id="monitor_training",
    project_id="{{ var.value.gcp_project }}",
    metric_type="custom.googleapis.com/training/progress",
    value=0.5,
    metric_labels={
        "task_id": "{{ task.task_id }}",
        "instance": "{{ task_instance.xcom_pull(task_ids='get_instance')[0] }}"
    }
)

Заключение

Для предотвращения ошибки “client_loop: send disconnect: Broken pipe” в вашем DAG Airflow реализуйте следующие ключевые стратегии:

  1. Используйте инструменты управления сессиями, такие как tmux или screen, для поддержания работы процессов обучения независимо от SSH-соединений, что является наиболее надежным подходом для длительных задач.

  2. Улучшите конфигурацию SSH с помощью комплексных настроек keepalive, включая TCPKeepAlive, несколько значений ServerAliveInterval и приоритизацию IPQoS.

  3. Реализуйте надежную обработку ошибок в вашем DAG Airflow с помощью соответствующих механизмов повторных попыток и обратных вызовов обработки ошибок для корректного обработки периодических проблем с соединением.

  4. Рассмотрите альтернативные подходы, такие как использование встроенного удаленного выполнения GCP или Cloud SDK для более надежного выполнения команд.

  5. Оптимизируйте вашу инфраструктуру, используя стабильные сетевые соединения, правильные конфигурации инстансов и мониторинг для своевременного обнаружения и решения проблем.

Подход с управлением сессиями с помощью tmux или screen обеспечивает наиболее надежное решение, как рекомендуется в документации Google Cloud, гарантируя, что ваши обучающие задания продолжают выполняться даже при потере SSH-соединений. Сочетайте это с правильными настройками SSH keepalive и механизмами повторных попыток Airflow для комплексного решения проблем стабильности соединения.

Авторы
Проверено модерацией
Модерация
Исправление ошибки 'Broken pipe' в Airflow SSH: Полное руководство