НейроАгент

Отладка зависания asyncio задач через 1 час

Отладка зависания asyncio задач через час с комплексными решениями для утечек ресурсов, проблем с циклом событий и многопоточности в Python-приложениях.

Длительно работающие задачи asyncio зависают примерно через час: Как отладить эту проблему?

У меня есть длительно работающее Python-приложение, построенное на asyncio, которое запускает несколько фоновых задач, работающих в бесконечном цикле. Эти задачи периодически выполняют CPU-работу с использованием asyncio.to_thread. Приложение работает корректно примерно час, после чего все асинхронные задачи полностью прекращают выполнение. Исключения не выбрасываются, использование CPU и памяти остается нормальным, но приложение полностью зависает.

Эта проблема возникала в нескольких проектах, которые активно используют асинхронность, хотя я никогда не мог воспроизвести ее в минимальном примере, пока не создал следующий.

Вот минимальный пример, который в конечном итоге проявляет зависание:

python
import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=10)

async def dummy_task(i):
    await asyncio.sleep(0.1)
    return i

async def batch_eval(trades):
    results = await asyncio.gather(*(dummy_task(t) for t in trades))
    return results

def sync_batch_eval(trades):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    results = loop.run_until_complete(batch_eval(trades))
    loop.close()
    return results

async def main():
    i = 0
    while True:
        trades = list(range(10))
        res = await asyncio.get_running_loop().run_in_executor(
            executor, sync_batch_eval, trades
        )
        if i % 50 == 0:
            print("итерация", i, "ok")
        i += 1
        await asyncio.sleep(0.05)

asyncio.run(main())

После запуска в течение некоторого времени (10-60 минут в зависимости от системы) вывод прекращается, и ошибки не появляются. Использование процессора падает почти до нуля, но процесс продолжает жить.

Мой реальный код следует аналогичному шаблону, где я вызываю функцию вроде:

python
def sync_batch_eval(trades, settings, allow_edge):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    async def run_batch():
        tasks = [batch_evaluate_trade(trade, settings, allow_edge) for trade in trades]
        return await asyncio.gather(*tasks)
    results = loop.run_until_complete(run_batch())
    loop.close()
    return results

Почему все задачи asyncio в конечном итоге зависают, несмотря на отсутствие исключений и нормальное использование системных ресурсов?

НейроАгент

Долгосрочные задачи asyncio “зависают” примерно через час: причины и решения

Проблема “зависания” долгосрочных задач asyncio примерно через час обычно вызвана утечками ресурсов, истощением цикла событий или проблемами синхронизации в многопоточной реализации Python. Наиболее распространенные виновники - неправильно закрываемые подключения к базам данных, накопление ресурсов в ThreadPoolExecutor или проблемы с созданием нескольких циклов событий в разных потоках без надлежащей синхронизации.

Содержание

Понимание основных причин

“Зависание”, которое вы наблюдаете примерно через час, - это хорошо задокументированная проблема в приложениях asyncio, особенно при использовании run_in_executor с многопоточными операциями. Согласно исследованиям Stack Overflow, это, вероятно, связано с проблемами многопоточного asyncio в Python до версии 3.13.

Основная проблема часто заключается в следующем:

  1. Истощение ресурсов: подключения к базам данных, файловые дескрипторы или другие ресурсы накапливаются без надлежащей очистки
  2. Повреждение цикла событий: несколько циклов событий создаются в разных потоках без надлежащей синхронизации
  3. Ограничения исполнителя потоков: ThreadPoolExecutor по умолчанию перегружен долгосрочными операциями
  4. Утечки памяти: неправильные ссылки на задачи и корутины предотвращают сборку мусора

“Кратко говоря, вы, вероятно, столкнулись с проблемой многопоточного asyncio в Python до версии 3.13, и если вы еще не сделали этого, первое, что следует попробовать - это перейти на Python 3.14.” - Анализ Stack Overflow

Распространенные паттерны утечек ресурсов

Ваш минимальный пример демонстрирует несколько паттернов, которые часто приводят к утечкам ресурсов и в конечном итоге к “зависанию”:

Утечки подключений к базам данных

В вашем реальном коде функция batch_evaluate_trade, вероятно, создает подключения к базам данных, которые не закрываются должным образом. Это классический паттерн, который приводит к истощению пула подключений:

python
# ПРОБЛЕМНЫЙ ПАТТЕРН - Отсутствие очистки
def sync_batch_eval(trades, settings, allow_edge):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    async def run_batch():
        tasks = [batch_evaluate_trade(trade, settings, allow_edge) for trade in trades]
        return await asyncio.gather(*tasks)
    results = loop.run_until_complete(run_batch())
    # loop.close() вызывается здесь, но подключения внутри задач могут не закрываться
    return results

Как показывает опыт отладки Markaicode:

“Единственное отсутствующее await-выражение вызывало утечку более 50 подключений к базе данных каждый час в часы пик. К тому моменту, как мы заметили проблему, наш пул подключений был полностью исчерпан, а новые запросы зависали бесконечно.”

Накопление ресурсов ThreadPoolExecutor

Ваше использование run_in_executor с общим ThreadPoolExecutor может привести к истощению потоков. Каждый вызов sync_batch_eval создает новый цикл событий и может порождать дополнительные потоки в исполнителе.

Неправильные ссылки на задачи

Использование create_task в режиме “запустил и забыл” без сохранения ссылок может вызывать тонкие утечки памяти:

python
# ПРОБЛЕМА: Ссылка на задачу не сохраняется
async def some_function():
    get_event_loop().create_task(coroutine(*args))
    # Задача может быть собрана мусором до завершения

Согласно исследованиям Stack Overflow:

“При использовании create_task в режиме ‘запустил и забыл’, мы должны сохранять ссылки активными для надежного выполнения.”

Проблемы управления циклом событий

Создание новых циклов событий в каждом потоке проблематично. Вызов asyncio.new_event_loop() в вашем коде может привести к нескольким проблемам:

Проблемы с несколькими циклами событий

Создание нескольких циклов событий в разных потоках без надлежащей синхронизации - известная проблема. Как отмечено в трекере проблем Python:

“BaseEventLoop.close() завершает исполнителя без ожидания, вызывая утечку висящих потоков”

Истощение цикла событий

Повторное создание и уничтожение циклов событий может исчерпать системные ресурсы. Документация Python рекомендует:

“Для смягчения этой проблемы рассмотрите возможность использования настраиваемого исполнителя для других пользовательских задач или установки исполнителя по умолчанию с большим количеством рабочих потоков.” - Документация Python 3.14

Проблемы потокобезопасности

Сам asyncio не является потокобезопасным, и при работе с исполнителями пула потоков требуется ручная синхронизация:

“Сам asyncio не является потокобезопасным, и при работе с исполнителями пула потоков или процессов требуется ручная синхронизация общих ресурсов.” - Блог Codilime

Стратегии отладки

Когда ваше приложение “зависает” через час, вот эффективные подходы к отладке:

1. Обновитесь до Python 3.14

Наиболее немедленным решением является обновление до Python 3.14, который решает многие проблемы многопоточного asyncio, присутствовавшие в более ранних версиях.

2. Мониторьте использование ресурсов

Отслеживайте следующие метрики со временем:

  • Количество активных потоков
  • Подключения к базе данных в использовании
  • Паттерны использования памяти
  • Количество задач в цикле событий

3. Используйте инструменты обнаружения утечек

Инструменты вроде pyleak могут помочь выявить утечки:

“pyleak использует внешний мониторинговый поток для обнаружения, когда цикл событий действительно становится неотвечающим, независимо от причины, затем захватывает стек вызовов, показывающий именно где произошло блокирование. Кроме того, pyleak также обнаруживает утечки asyncio-задач и потоков с полным стеком вызовов”

4. Ручное отслеживание задач

Реализуйте ручное отслеживание долгосрочных задач для определения, какие из них “зависли”:

“Вы можете найти все зависшие долгосрочные задачи в asyncio, вручную отслеживая, как долго каждая задача активна, и сообщая детали задачи, если превышается пороговое значение ‘слишком долгого’ времени. Этот подход можно использовать для поиска всех зависших, повисших и зомби-задач asyncio в цикле событий.” - Super Fast Python

5. Режим отладки с расширенным логированием

Включите режим отладки asyncio и добавьте всестороннее логирование:

python
import asyncio
asyncio.get_event_loop().set_debug(True)

Профилактика и решения

1. Реализуйте надлежащую очистку ресурсов

Убедитесь, что все ресурсы правильно закрываются с помощью контекстных менеджеров или блоков try/finally:

python
async def cleanup_user_session(session_id):
    connection = await get_db_connection()
    try:
        await connection.execute("DELETE FROM sessions WHERE id = ?", session_id)
    finally:
        # Всегда очищайте ресурсы в asyncio - цикл событий не сделает это за вас
        await connection.close()

2. Используйте постоянные циклы событий

Вместо создания новых циклов событий для каждой пакетной операции поддерживайте постоянный цикл событий:

python
class BatchProcessor:
    def __init__(self):
        self.loop = asyncio.new_event_loop()
        self.executor = ThreadPoolExecutor(max_workers=10)
    
    async def process_batch(self, trades, settings, allow_edge):
        tasks = [batch_evaluate_trade(trade, settings, allow_edge) for trade in trades]
        return await asyncio.gather(*tasks)
    
    def sync_batch_eval(self, trades, settings, allow_edge):
        try:
            return self.loop.run_until_complete(
                self.process_batch(trades, settings, allow_edge)
            )
        except Exception as e:
            # Обрабатывайте исключения соответствующим образом
            raise

3. Ограничьте размер пула потоков

Настройте ваш ThreadPoolExecutor с соответствующими ограничениями:

python
import concurrent.futures

# Используйте настраиваемый исполнитель с контролируемым размером
executor = concurrent.futures.ThreadPoolExecutor(
    max_workers=min(32, (os.cpu_count() or 1) * 4),
    thread_name_prefix='batch-worker'
)

4. Реализуйте таймауты и автоматические выключатели

Добавьте таймауты для предотвращения бесконечного зависания:

python
async def batch_eval_with_timeout(trades, timeout=30):
    try:
        return await asyncio.wait_for(
            asyncio.gather(*(dummy_task(t) for t in trades)),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        # Обрабатывайте таймаут соответствующим образом
        raise TimeoutError("Пакетная оценка превысила время ожидания")

5. Регулярные проверки состояния

Реализуйте периодические проверки состояния для мониторинга состояния системы:

python
async def health_check():
    while True:
        # Проверьте количество потоков
        thread_count = threading.active_count()
        
        # Проверьте подключения к базе данных
        db_connections = get_active_db_connections()
        
        # Проверьте задачи цикла событий
        loop = asyncio.get_running_loop()
        task_count = len(asyncio.all_tasks(loop))
        
        print(f"Состояние: {thread_count} потоков, {db_connections} подключений к БД, {task_count} задач")
        
        await asyncio.sleep(60)  # Проверять каждую минуту

Продвинутые методы мониторинга

Профилирование памяти

Используйте инструменты профилирования памяти для выявления паттернов роста использования памяти:

python
import tracemalloc

tracemalloc.start()

# Делайте снимки периодически
snapshot1 = tracemalloc.take_snapshot()
# ... запустите ваше приложение ...
snapshot2 = tracemalloc.take_snapshot()

# Сравнивайте снимки
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
for stat in top_stats[:10]:
    print(stat)

Анализ стека потоков

Захватывайте стеки потоков при возникновении проблем для определения блокирующих операций:

python
import threading
import sys

def dump_threads():
    for thread_id, frame in sys._current_frames().items():
        print(f"\nПоток {thread_id}:")
        traceback.print_stack(frame)

Инспекция цикла событий

Периодически проверяйте состояние вашего цикла событий:

python
def inspect_event_loop(loop):
    print(f"Активные задачи: {len(asyncio.all_tasks(loop))}")
    print(f"Ожидающие вызовы: {len(loop._callbacks)}")
    print(f"Готовые вызовы: {len(loop._ready)}")
    print(f"Запланированные вызовы: {len(loop._scheduled)}")

Источники

  1. Stack Overflow - Долгосрочные задачи asyncio зависают через ~1 час. Как можно отладить?
  2. Super Fast Python - Поиск зависших и долгосрочных задач в Asyncio
  3. Markaicode - Отладка проблем параллелизма Python Asyncio
  4. Документация Python 3.14 - Цикл событий
  5. Codilime - Как запускать блокирующие функции в цикле событий
  6. Reddit - pyleak - обнаружение утекших asyncio-задач, потоков и блокировок цикла событий
  7. Проблема Python 41699 - Потенциальная утечка памяти с asyncio и run_in_executor
  8. Проблема Python 34037 - asyncio: BaseEventLoop.close() завершает исполнитель без ожидания, вызывая утечку висящих потоков

Заключение

“Зависание” задач asyncio примерно через час обычно вызвано накоплением ресурсов, проблемами управления циклом событий или проблемами многопоточности. На основе исследований и распространенных паттернов:

  • Обновитесь до Python 3.14 как наиболее немедленное решение для проблем многопоточного asyncio
  • Реализуйте надлежащую очистку ресурсов с помощью блоков try/finally или контекстных менеджеров
  • Избегайте создания новых циклов событий для каждой пакетной операции; вместо этого используйте постоянные циклы
  • Мониторьте и ограничивайте использование пула потоков для предотвращения истощения исполнителя
  • Используйте инструменты отладки такие как pyleak и ручное отслеживание задач для раннего выявления проблем
  • Добавляйте проверки состояния и таймауты для предотвращения бесконечного зависания

Ключевой вывод заключается в том, что asyncio требует тщательного управления ресурсами и циклами событий, особенно в многопоточных средах. “Зависание” через час часто является результатом постепенного накопления ресурсов, а не внезапных сбоев, что делает регулярный мониторинг и проактивное управление ресурсами essential для долгосрочных приложений.