Python AsyncIO: Корректное завершение работы - Лучшие практики
Освойте корректное завершение работы в Python AsyncIO с комплексными лучшими практиками. Узнайте, как обрабатывать задачи библиотек, управлять соединениями и предотвращать бесконечные ожидания при завершении работы.
Python AsyncIO: Корректное завершение - Лучшие практики и решения
Как правильно завершать задачи в Python AsyncIO во время корректного завершения работы, не затрагивая внутренние задачи библиотек и не вызывая бесконечных ожиданий?
Я понимаю базовый подход:
- Привязываться к сигналам (
SIGINT,SIGTERM) с использованием соответствующих обработчиков для Unix и Windows - Получать все задачи с помощью
asyncio.all_tasks(), отменять их и ожидать завершения - Закрывать соединения и выполнять дополнительную очистку
- Вызывать
loop.shutdown_asyncgens() - Останавливать цикл
Однако я столкнулся со следующими конкретными проблемами:
- Получение всех задач влияет на внутренние задачи библиотек (например, aiokafka), что приводит к бесконечным ожиданиям при ожидании отмены
- Закрытие соединений до завершения задач предотвращает корректное завершение работы
- Вопросы о завершении TaskGroup - следует ли завершать сам TaskGroup или его подзадачи?
- Роль
loop.set_exception_handlerв процессе завершения - Проблемы с shutdown_event, когда задачи используют
await queue.get()
Каковы лучшие практики для обработки этих сценариев в Python AsyncIO?
Реализация корректного завершения работы в Python AsyncIO
Реализация корректного завершения работы в Python AsyncIO требует тщательной обработки отмены задач, своевременной очистки ресурсов и стратегий для избежания бесконечных ожиданий при работе с задачами библиотек. Лучшие практики включают селективную отмену только пользовательских задач при сохранении внутренних операций библиотек, использование структурированных шаблонов параллелизма, таких как TaskGroup, и надежную обработку исключений в процессе завершения работы.
Содержание
- Основная реализация корректного завершения работы
- Обработка задач библиотек без бесконечных ожиданий
- Управление соединениями и очистка ресурсов
- Стратегии завершения работы TaskGroup
- Обработка исключений при завершении работы
- События завершения работы и блокирующие операции
- Полный пример реализации
Основная реализация корректного завершения работы
Надежная реализация корректного завершения работы в AsyncIO следует структурированному подходу, который балансирует между немедленной отзывчивостью и правильной очисткой ресурсов. Базовая структура включает обработку сигналов, селективную отмену задач и координированное управление ресурсами.
import asyncio
import signal
import sys
class GracefulShutdown:
def __init__(self, timeout=30):
self.timeout = timeout
self.shutdown_event = asyncio.Event()
self.tasks_to_cancel = set()
async def setup_signal_handlers(self):
"""Настройка обработчиков сигналов для корректного завершения работы"""
if sys.platform == 'win32':
# Специфичная для Windows обработка сигналов
loop = asyncio.get_running_loop()
for sig in [signal.SIGINT, signal.SIGTERM]:
loop.add_signal_handler(sig, lambda: asyncio.create_task(self.trigger_shutdown()))
else:
# Обработка сигналов в Unix
loop = asyncio.get_running_loop()
for sig in [signal.SIGINT, signal.SIGTERM]:
loop.add_signal_handler(sig, lambda: asyncio.create_task(self.trigger_shutdown()))
async def trigger_shutdown(self):
"""Инициализация процесса завершения работы"""
if not self.shutdown_event.is_set():
self.shutdown_event.set()
await self.cancel_user_tasks()
await self.cleanup_resources()
await self.shutdown_async_generators()
async def cancel_user_tasks(self):
"""Отмена только пользовательских задач, сохранение задач библиотек"""
current_task = asyncio.current_task()
for task in asyncio.all_tasks():
if task is not current_task and task is not asyncio.tasks._current_task:
# Проверяем, является ли это пользовательской задачей (не внутренней задачей библиотеки)
if self._is_user_task(task):
task.cancel()
try:
await asyncio.wait_for(task, timeout=5.0)
except asyncio.CancelledError:
pass # Ожидаемая отмена
except asyncio.TimeoutError:
# Принудительная очистка для задач, не реагирующих на отмену
self._force_cleanup_task(task)
def _is_user_task(self, task):
"""Определение, является ли задача пользовательской или внутренней задачей библиотеки"""
task_name = getattr(task, '_name', '')
task_coro = getattr(task, '_coro', None)
# Фильтрация общих шаблонов задач библиотек
library_patterns = [
'aiokafka', 'aiohttp', 'asyncio', 'uvloop', 'uvicorn',
'futures', 'run_forever', 'serve_forever'
]
return not any(pattern in task_name.lower() for pattern in library_patterns) and \
not any(pattern in str(task_coro).lower() for pattern in library_patterns)
async def cleanup_resources(self):
"""Закрытие соединений и выполнение очистки"""
# Реализация зависит от ваших конкретных ресурсов
pass
async def shutdown_async_generators(self):
"""Закрытие асинхронных генераторов"""
loop = asyncio.get_running_loop()
loop.shutdown_asyncgens()
Ключевая идея здесь - подход селективной отмены, который различает пользовательские задачи и внутренние задачи библиотек, предотвращая бесконечные ожидания, когда библиотеки имеют собственные системы управления задачами источник.
Обработка задач библиотек без бесконечных ожиданий
Наиболее распространенной проблемой при корректном завершении работы является обработка задач библиотек (таких как aiokafka), которые имеют собственное внутреннее управление задачами и могут плохо реагировать на отмену. Вот стратегии для решения этой проблемы:
1. Идентификация и фильтрация задач
def _is_user_task(self, task):
"""Расширенная идентификация задач с улучшенной фильтрацией"""
try:
task_frame = task.get_stack()[-1] if hasattr(task, 'get_stack') else None
task_code = task_frame.f_code if task_frame else None
task_name = getattr(task, '_name', '')
# Проверка шаблонов имен задач
if any(pattern in task_name.lower() for ['aiokafka', 'aiohttp', 'uvicorn']):
return False
# Проверка шаблонов кода
if task_code:
code_str = task_code.co_name.lower()
if any(pattern in code_str for ['_run_forever', '_serve', '_listen']):
return False
# Проверка шаблонов объектов корутин
coro_repr = repr(getattr(task, '_coro', ''))
if any(pattern in coro_repr.lower() for ['internal', 'library', 'system']):
return False
return True
except:
# Если мы не можем определить, считаем это пользовательской задачей для безопасности
return True
2. Отмена задач на основе таймаута с запасным вариантом
async def cancel_user_tasks(self):
"""Отмена пользовательских задач с таймаутом и механизмами запасного варианта"""
current_task = asyncio.current_task()
pending_user_tasks = []
for task in asyncio.all_tasks():
if task is not current_task and self._is_user_task(task):
pending_user_tasks.append(task)
if not pending_user_tasks:
return
# Первая попытка: корректная отмена
for task in pending_user_tasks:
task.cancel()
# Ожидание завершения задач с таймаутом
try:
await asyncio.wait_for(
asyncio.gather(*pending_user_tasks, return_exceptions=True),
timeout=self.timeout
)
except asyncio.TimeoutError:
# Обработка задач, не отреагировавших на отмену
print(f"Предупреждение: некоторые задачи не завершились в течение {self.timeout}с")
for task in pending_user_tasks:
if not task.done():
self._force_cleanup_task(task)
3. Специфичные для библиотеки хуки завершения работы
async def trigger_library_shutdown(self):
"""Вызов специфичных для библиотеки хуков завершения работы"""
# Специфичное завершение работы для aiokafka
if 'aiokafka' in sys.modules:
try:
from aiokafka import AIOKafkaProducer
if hasattr(AIOKafkaProducer, '_shutdown'):
await AIOKafkaProducer._shutdown()
except ImportError:
pass
# Специфичное завершение работы для aiohttp
if 'aiohttp' в sys.modules:
try:
from aiohttp import web
if hasattr(web, '_shutdown'):
await web._shutdown()
except ImportError:
pass
Этот подход гарантирует, в то время как пользовательские задачи корректно завершаются, задачи библиотек обрабатываются через собственные механизмы завершения работы, предотвращая бесконечные ожидания и поддерживая стабильность системы источник.
Управление соединениями и очистка ресурсов
Своевременное закрытие соединений имеет решающее значение для корректного завершения работы. Слишком раннее закрытие соединений может помешать задачам завершиться, в то время как слишком позднее - задержать завершение работы.
1. Управление жизненным циклом соединений
class ConnectionManager:
def __init__(self):
self.connections = set()
self.shutdown_requested = False
async def add_connection(self, connection):
"""Добавление соединения для управления"""
self.connections.add(connection)
async def remove_connection(self, connection):
"""Удаление соединения из управления"""
self.connections.discard(connection)
async def close_all_connections(self, timeout=10):
"""Закрытие всех соединений с таймаутом"""
if self.shutdown_requested:
return
self.shutdown_requested = True
connection_tasks = []
for connection in self.connections:
if hasattr(connection, 'close'):
connection_tasks.append(connection.close())
if connection_tasks:
try:
await asyncio.wait_for(
asyncio.gather(*connection_tasks, return_exceptions=True),
timeout=timeout
)
except asyncio.TimeoutError:
print("Предупреждение: некоторые соединения не закрылись в течение таймаута")
self.connections.clear()
2. Осведомленная о задачах очистка соединений
async def graceful_shutdown_with_connections(self):
"""Корректное завершение работы с координацией очистки соединений"""
# Сначала сигнализируем о завершении работы всем задачам
self.shutdown_event.set()
# Даем время задачам завершить текущую работу
await asyncio.sleep(2)
# Закрываем соединения только после того, как задачи успели завершить работу
await self.connection_manager.close_all_connections()
# Затем отменяем оставшиеся пользовательские задачи
await self.cancel_user_tasks()
# Окончательная очистка
await self.cleanup_resources()
await self.shutdown_async_generators()
3. Отслеживание состояния соединений
class ConnectionState:
def __init__(self):
self.active_connections = {}
self.connection_tasks = {}
async def track_connection(self, connection_id, connection, task):
"""Отслеживание соединения и связанной с ним задачи"""
self.active_connections[connection_id] = connection
self.connection_tasks[connection_id] = task
async def cleanup_connection(self, connection_id, timeout=5):
"""Очистка конкретного соединения"""
if connection_id in self.active_connections:
connection = self.active_connections[connection_id]
if hasattr(connection, 'close'):
try:
await asyncio.wait_for(connection.close(), timeout=timeout)
except asyncio.TimeoutError:
print(f"Предупреждение: соединение {connection_id} не закрылось вовремя")
del self.active_connections[connection_id]
del self.connection_tasks[connection_id]
Этот подход гарантирует, что соединения закрываются в нужное время - после того, как задачи получили возможность завершить свою работу, но до того, как завершение работы зависнет бесконечно источник.
Стратегии завершения работы TaskGroup
При использовании asyncio.TaskGroup (Python 3.11+) стратегия завершения работы требует особого внимания. Вот лучшие практики:
1. TaskGroup против отдельных задач
Основной вопрос заключается в том, следует ли завершать сам TaskGroup или его подзадачи. Ответ зависит от вашего случая использования:
async def with_taskgroup_termination(self):
"""Пример завершения работы TaskGroup"""
try:
async with asyncio.TaskGroup() as tg:
# Создаем задачи внутри TaskGroup
task1 = tg.create_task(self.long_running_task_1())
task2 = tg.create_task(self.long_running_task_2())
# Мониторинг сигнала завершения работы
tg.create_task(self.monitor_shutdown())
except* Exception as eg:
# TaskGroup автоматически отменяет все задачи при сбое одной из них
print(f"TaskGroup завершен с исключениями: {eg.exceptions}")
2. Селективная отмена TaskGroup
async def selective_taskgroup_shutdown(self):
"""Завершение работы конкретных задач внутри TaskGroup"""
try:
async with asyncio.TaskGroup() as tg:
critical_tasks = []
non_critical_tasks = []
# Создаем категории задач
critical_tasks.append(tg.create_task(self.critical_database_task()))
non_critical_tasks.append(tg.create_task(self.background_logging_task()))
non_critical_tasks.append(tg.create_task(self.metrics_collection_task()))
# Мониторинг сигнала завершения работы
shutdown_task = tg.create_task(self.wait_for_shutdown_signal())
except* Exception as eg:
# Обработка исключений TaskGroup
if any(isinstance(ex, asyncio.CancelledError) for ex in eg.exceptions):
print("Инициализировано завершение работы, очистка некритических задач")
else:
print(f"TaskGroup завершился с ошибкой: {eg.exceptions}")
3. Управление вложенными TaskGroup
async def nested_taskgroup_shutdown(self):
"""Обработка завершения работы в сценариях с вложенными TaskGroup"""
try:
async with asyncio.TaskGroup() as outer_tg:
# Основные задачи приложения
outer_tg.create_task(self.main_application_logic())
async with asyncio.TaskGroup() as inner_tg:
# Задачи фоновой обработки
inner_tg.create_task(self.background_data_processing())
inner_tg.create_task(self.periodic_maintenance())
except* Exception as eg:
# Завершение работы распространяется от внутреннего к внешнему TaskGroup
print(f"Завершение работы вложенного TaskGroup завершено: {len(eg.exceptions)} исключений")
4. Управление таймаутами TaskGroup
async def taskgroup_with_timeout(self):
"""TaskGroup с общим таймаутом"""
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(self.task_with_timeout())
except* asyncio.TimeoutError as timeout_eg:
print(f"TaskGroup превысил таймаут после {self.timeout}с")
# Здесь можно выполнить дополнительную очистку
except* Exception as other_eg:
print(f"TaskGroup завершился с ошибкой: {other_eg.exceptions}")
Лучшая практика: Для большинства приложений лучше позволить TaskGroup обрабатывать отмену внутренне, а не вручную отменять подзадачи. TaskGroup обеспечивает правильную очистку и обработку исключений источник.
Обработка исключений при завершении работы
loop.set_exception_handler играет ключевую роль в корректном завершении работы, обрабатывая исключения, возникающие в процессе завершения.
1. Настройка пользовательского обработчика исключений
class ShutdownExceptionHandler:
def __init__(self, shutdown_manager):
self.shutdown_manager = shutdown_manager
def handle_exception(self, loop, context):
"""Пользовательский обработчик исключений для сценариев завершения работы"""
exception = context.get('exception')
if isinstance(exception, asyncio.CancelledError):
# Ожидается при завершении работы
print(f"Задача отменена при завершении работы: {context.get('message', 'Нет сообщения')}")
return
# Логирование других исключений
print(f"Исключение при завершении работы: {exception}")
print(f"Контекст: {context}")
# Если это критическая ошибка, принудительное завершение работы
if self._is_critical_exception(exception):
asyncio.create_task(self.force_shutdown())
def _is_critical_exception(self, exception):
"""Определение, требует ли исключение принудительного завершения работы"""
critical_types = [
MemoryError,
SystemError,
KeyboardInterrupt,
ConnectionResetError
]
return any(isinstance(exception, t) for t in critical_types)
2. Интеграция обработчика исключений
async def setup_exception_handling(self):
"""Настройка пользовательской обработки исключений"""
loop = asyncio.get_running_loop()
exception_handler = ShutdownExceptionHandler(self)
loop.set_exception_handler(exception_handler.handle_exception)
# Также обрабатываем необработанные исключения в задачах
def handle_task_exception(task):
if task.exception():
print(f"Необработанное исключение в задаче {task}: {task.exception()}")
# Модифицируем создание задачи для включения обработки исключений
original_create_task = loop.create_task
def create_task_with_exception_handler(coro, *args, **kwargs):
task = original_create_task(coro, *args, **kwargs)
task.add_done_callback(handle_task_exception)
return task
loop.create_task = create_task_with_exception_handler
3. Восстановление при исключениях при завершении работы
async def shutdown_with_exception_recovery(self):
"""Завершение работы с надежной обработкой исключений"""
try:
# Настройка обработки исключений
await self.setup_exception_handling()
# Инициализация завершения работы
await self.trigger_shutdown()
except Exception as e:
print(f"Критическая ошибка при завершении работы: {e}")
# Попытка экстренной очистки
try:
await self.emergency_cleanup()
except Exception as cleanup_error:
print(f"Экстренная очистка не удалась: {cleanup_error}")
finally:
# Убеждаемся, что цикл остановлен
loop = asyncio.get_running_loop()
loop.stop()
async def emergency_cleanup(self):
"""Экстренная очистка при нормальном завершении работы"""
try:
# Принудительное закрытие всех соединений
for connection in self.connections:
try:
if hasattr(connection, 'close'):
await connection.close()
except:
pass
# Отмена всех оставшихся задач
for task in asyncio.all_tasks():
if task is not asyncio.current_task():
task.cancel()
# Короткое ожидание для очистки
await asyncio.sleep(1)
except Exception as e:
print(f"Экстренная очистка не удалась: {e}")
Эта комплексная обработка исключений гарантирует, что даже если при завершении работы возникают неожиданные ошибки, система все еще может попытаться очистить ресурсы и корректно завершить работу источник.
События завершения работы и блокирующие операции
Обработка shutdown_event при использовании задачами await queue.get() или подобных блокирующих операций требует специальных стратегий для предотвращения бесконечных ожиданий.
1. Завершение работы на основе очереди с таймаутом
class ShutdownAwareQueue:
def __init__(self, shutdown_event):
self.queue = asyncio.Queue()
self.shutdown_event = shutdown_event
async def get_with_shutdown_check(self, timeout=None):
"""Получение из очереди с проверкой завершения работы"""
try:
# Используем таймаут для периодической проверки завершения работы
return await asyncio.wait_for(
self.queue.get(),
timeout=timeout or 1.0
)
except asyncio.TimeoutError:
# Проверка сигнала завершения работы
if self.shutdown_event.is_set():
raise asyncio.CancelledError("Запрошено завершение работы")
# Повторная попытка с таймаутом
return await self.get_with_shutdown_check()
async def put_with_shutdown_check(self, item, timeout=None):
"""Помещение в очередь с проверкой завершения работы"""
try:
return await asyncio.wait_for(
self.queue.put(item),
timeout=timeout or 1.0
)
except asyncio.TimeoutError:
if self.shutdown_event.is_set():
raise asyncio.CancelledError("Запрошено завершение работы")
return await self.put_with_shutdown_check(item, timeout)
2. Рабочая задача, осведомленная о завершении работы
async def shutdown_aware_worker(self, queue):
"""Рабочий процесс, реагирующий на сигналы завершения работы даже при блокировке"""
while not self.shutdown_event.is_set():
try:
# Используем операции очереди, осведомленные о завершении работы
item = await queue.get_with_shutdown_check()
# Обработка элемента
await self.process_item(item)
# Проверка завершения работы после обработки каждого элемента
if self.shutdown_event.is_set():
print("Получен сигнал завершения работы, завершение текущей работы...")
break
except asyncio.CancelledError:
print("Рабочий процесс отменен при завершении работы")
break
except Exception as e:
print(f"Ошибка рабочего процесса: {e}")
if self.shutdown_event.is_set():
break
# Очистка
await self.worker_cleanup()
async def process_item(self, item):
"""Обработка отдельного элемента"""
# Ваша логика обработки здесь
await asyncio.sleep(0.1) # Имитация работы
3. Операции с очередью на основе таймаута
class TimeoutAwareQueueManager:
def __init__(self, shutdown_event, default_timeout=5.0):
self.shutdown_event = shutdown_event
self.default_timeout = default_timeout
async def safe_queue_get(self, queue, timeout=None):
"""Безопасное получение из очереди с осведомленностью о завершении работы"""
timeout = timeout or self.default_timeout
while True:
try:
# Попытка получить элемент с таймаутом
item = await asyncio.wait_for(queue.get(), timeout=timeout)
return item
except asyncio.TimeoutError:
# Проверка сигнала завершения работы
if self.shutdown_event.is_set():
raise asyncio.CancelledError("Завершение работы очереди запрошено")
# Продолжаем попытки с корректировкой таймаута
timeout = min(timeout * 1.5, 30.0) # Экспоненциальное увеличение
4. Координация на основе событий
class ShutdownCoordinator:
def __init__(self):
self.shutdown_event = asyncio.Event()
self.coordination_event = asyncio.Event()
async def wait_for_work_or_shutdown(self):
"""Ожидание либо работы, либо сигнала завершения работы"""
# Создаем задачи для обоих условий
work_task = asyncio.create_task(self.wait_for_work())
shutdown_task = asyncio.create_task(self.shutdown_event.wait())
# Ожидание завершения любого из них
done, pending = await asyncio.wait(
[work_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED
)
# Отмена ожидающих задач
for task in pending:
task.cancel()
# Проверка, что завершилось
if work_task in done:
return await work_task # Возвращаем рабочий элемент
else:
raise asyncio.CancelledError("Запрошено завершение работы")
Эти стратегии гарантируют, что задачи могут реагировать на сигналы завершения работы даже при блокировке на операциях с очередью или других долгих операциях источник.
Полный пример реализации
Вот полный пример, объединяющий все обсужденные лучшие практики:
import asyncio
import signal
import sys
from typing import Set, Any, Dict
class AsyncIOGracefulShutdown:
def __init__(self, shutdown_timeout: int = 30):
self.shutdown_timeout = shutdown_timeout
self.shutdown_event = asyncio.Event()
self.user_tasks: Set[asyncio.Task] = set()
self.connections: Dict[str, Any] = {}
self.queues: Dict[str, ShutdownAwareQueue] = {}
self.exception_handler = None
self.is_shutting_down = False
async def initialize(self):
"""Инициализация менеджера завершения работы"""
await self.setup_signal_handlers()
await self.setup_exception_handling()
self.connection_manager = ConnectionManager()
self.queue_manager = TimeoutAwareQueueManager(self.shutdown_event)
async def setup_signal_handlers(self):
"""Настройка кросс-платформенных обработчиков сигналов"""
loop = asyncio.get_running_loop()
if sys.platform == 'win32':
# Специфичная для Windows обработка сигналов
for sig in [signal.SIGINT, signal.SIGTERM]:
try:
loop.add_signal_handler(sig, self._handle_signal)
except NotImplementedError:
# Запасной вариант для сред Windows без поддержки сигналов
signal.signal(sig, lambda s, f: asyncio.create_task(self._handle_signal()))
else:
# Обработка сигналов в Unix
for sig in [signal.SIGINT, signal.SIGTERM]:
loop.add_signal_handler(sig, self._handle_signal)
def _handle_signal(self):
"""Обработка сигнала завершения работы"""
if not self.is_shutting_down:
asyncio.create_task(self.graceful_shutdown())
async def setup_exception_handling(self):
"""Настройка пользовательской обработки исключений"""
self.exception_handler = ShutdownExceptionHandler(self)
loop = asyncio.get_running_loop()
loop.set_exception_handler(self.exception_handler.handle_exception)
async def register_task(self, task: asyncio.Task, name: str = None):
"""Регистрация пользовательской задачи для управления завершением работы"""
if name:
task.set_name(name)
self.user_tasks.add(task)
# Добавление обратного вызова очистки
task.add_done_callback(self._task_cleanup)
def _task_cleanup(self, task: asyncio.Task):
"""Очистка при завершении задачи"""
self.user_tasks.discard(task)
async def register_connection(self, conn_id: str, connection: Any):
"""Регистрация соединения для очистки"""
self.connections[conn_id] = connection
await self.connection_manager.track_connection(conn_id, connection)
async def register_queue(self, queue_id: str, queue: asyncio.Queue):
"""Регистрация очереди для осведомленности о завершении работы"""
self.queues[queue_id] = ShutdownAwareQueue(self.shutdown_event)
async def graceful_shutdown(self):
"""Инициализация корректного завершения работы"""
if self.is_shutting_down:
return
self.is_shutting_down = True
self.shutdown_event.set()
try:
print("Инициализация корректного завершения работы...")
# Шаг 1: Завершение работы специфичных для библиотеки компонентов
await self.trigger_library_shutdown()
# Шаг 2: Даем время задачам завершить текущую работу
await asyncio.sleep(2)
# Шаг 3: Закрытие соединений
await self.close_all_connections()
# Шаг 4: Отмена пользовательских задач
await self.cancel_user_tasks()
# Шаг 5: Окончательная очистка
await self.cleanup_resources()
# Шаг 6: Завершение работы асинхронных генераторов
await self.shutdown_async_generators()
print("Корректное завершение работы завершено")
except Exception as e:
print(f"Ошибка при завершении работы: {e}")
await self.emergency_cleanup()
finally:
# Остановка цикла событий
loop = asyncio.get_running_loop()
loop.stop()
async def trigger_library_shutdown(self):
"""Вызов специфичных для библиотеки хуков завершения работы"""
# Завершение работы aiokafka
if 'aiokafka' in sys.modules:
try:
from aiokafka import AIOKafkaProducer
if hasattr(AIOKafkaProducer, '_shutdown'):
await AIOKafkaProducer._shutdown()
except ImportError:
pass
# Завершение работы aiohttp
if 'aiohttp' in sys.modules:
try:
from aiohttp import web
if hasattr(web, '_shutdown'):
await web._shutdown()
except ImportError:
pass
async def close_all_connections(self):
"""Закрытие всех зарегистрированных соединений"""
for conn_id, connection in self.connections.items():
try:
if hasattr(connection, 'close'):
await asyncio.wait_for(connection.close(), timeout=10.0)
print(f"Соединение {conn_id} успешно закрыто")
except Exception as e:
print(f"Ошибка при закрытии соединения {conn_id}: {e}")
self.connections.clear()
async def cancel_user_tasks(self):
"""Отмена только пользовательских задач с таймаутом"""
if not self.user_tasks:
return
print(f"Отмена {len(self.user_tasks)} пользовательских задач...")
# Отмена всех пользовательских задач
for task in self.user_tasks:
task.cancel()
# Ожидание завершения задач
if self.user_tasks:
try:
await asyncio.wait_for(
asyncio.gather(*self.user_tasks, return_exceptions=True),
timeout=self.shutdown_timeout
)
except asyncio.TimeoutError:
print(f"Некоторые задачи не завершились в течение {self.shutdown_timeout}с")
# Принудительная очистка для оставшихся задач
for task in self.user_tasks:
if not task.done():
self._force_cleanup_task(task)
print("Отмена пользовательских задач завершена")
def _force_cleanup_task(self, task: asyncio.Task):
"""Принудительная очистка для задачи, не отреагировавшей на отмену"""
print(f"Принудительная очистка для задачи: {task.get_name()}")
# Реализация зависит от ваших конкретных типов задач
# Это может включать откат базы данных, очистку файлов и т.д.
async def cleanup_resources(self):
"""Выполнение дополнительной очистки ресурсов"""
# Очистка любых дополнительных ресурсов, специфичных для вашего приложения
pass
async def shutdown_async_generators(self):
"""Закрытие асинхронных генераторов"""
loop = asyncio.get_running_loop()
loop.shutdown_asyncgens()
print("Завершение работы асинхронных генераторов завершено")
async def emergency_cleanup(self):
"""Экстренная очистка при нормальном завершении работы"""
try:
# Принудительное закрытие всех соединений
for connection in self.connections.values():
try:
if hasattr(connection, 'close'):
await connection.close()
except:
pass
# Отмена всех задач
for task in asyncio.all_tasks():
if task is not asyncio.current_task():
task.cancel()
await asyncio.sleep(1)
print("Экстренная очистка завершена")
except Exception as e:
print(f"Экстренная очистка не удалась: {e}")
# Поддерживающие классы
class ConnectionManager:
"""Управление жизненным циклом соединений"""
def __init__(self):
self.connections = {}
self.connection_tasks = {}
async def track_connection(self, conn_id: str, connection: Any, task: asyncio.Task = None):
self.connections[conn_id] = connection
if task:
self.connection_tasks[conn_id] = task
class ShutdownAwareQueue:
"""Очередь, реагирующая на сигналы завершения работы"""
def __init__(self, shutdown_event: asyncio.Event):
self.queue = asyncio.Queue()
self.shutdown_event = shutdown_event
async def get_with_shutdown_check(self, timeout: float = None):
try:
return await asyncio.wait_for(
self.queue.get(),
timeout=timeout or 1.0
)
except asyncio.TimeoutError:
if self.shutdown_event.is_set():
raise asyncio.CancelledError("Запрошено завершение работы")
return await self.get_with_shutdown_check()
class TimeoutAwareQueueManager:
"""Управление очередями с таймаутом и осведомленностью о завершении работы"""
def __init__(self, shutdown_event: asyncio.Event, default_timeout: float = 5.0):
self.shutdown_event = shutdown_event
self.default_timeout = default_timeout
async def safe_queue_get(self, queue: asyncio.Queue, timeout: float = None):
timeout = timeout or self.default_timeout
while True:
try:
return await asyncio.wait_for(queue.get(), timeout=timeout)
except asyncio.TimeoutError:
if self.shutdown_event.is_set():
raise asyncio.CancelledError("Запрошено завершение работы очереди")
timeout = min(timeout * 1.5, 30.0)
class ShutdownExceptionHandler:
"""Обработка исключений при завершении работы"""
def __init__(self, shutdown_manager: AsyncIOGracefulShutdown):
self.shutdown_manager = shutdown_manager
def handle_exception(self, loop: asyncio.AbstractEventLoop, context: dict):
exception = context.get('exception')
if isinstance(exception, asyncio.CancelledError):
print(f"Задача отменена при завершении работы: {context.get('message', 'Нет сообщения')}")
return
print(f"Исключение при завершении работы: {exception}")
print(f"Контекст: {context}")
if self._is_critical_exception(exception):
asyncio.create_task(self.shutdown_manager.emergency_cleanup())
def _is_critical_exception(self, exception: Exception) -> bool:
critical_types = [MemoryError, SystemError, KeyboardInterrupt, ConnectionResetError]
return any(isinstance(exception, t) for t in critical_types)
# Пример использования
async def example_usage():
# Инициализация менеджера завершения работы
shutdown_manager = AsyncIOGracefulShutdown(shutdown_timeout=30)
await shutdown_manager.initialize()
# Регистрация некоторых пользовательских задач
async def sample_task_1():
while not shutdown_manager.shutdown_event.is_set():
print("Задача 1 работает...")
await asyncio.sleep(1)
print("Задача 1 завершает работу")
async def sample_task_2():
while not shutdown_manager.shutdown_event.is_set():
print("Задача 2 работает...")
await asyncio.sleep(1)
print("Задача 2 завершает работу")
task1 = asyncio.create_task(sample_task_1())
task2 = asyncio.create_task(sample_task_2())
await shutdown_manager.register_task(task1, "sample_task_1")
await shutdown_manager.register_task(task2, "sample_task_2")
# Запуск на несколько секунд, затем запуск завершения работы
await asyncio.sleep(5)
await shutdown_manager.graceful_shutdown()
if __name__ == "__main__":
asyncio.run(example_usage())
Эта комплексная реализация решает все упомянутые проблемы:
- Обработка задач библиотек: Использует селективную идентификацию задач для избежания бесконечных ожиданий
- Управление соединениями: Правильное время закрытия соединений с обработкой таймаутов
- Интеграция TaskGroup: Может работать вместе с шаблонами TaskGroup
- Обработка исключений: Надежное управление исключениями при завершении работы
- События завершения работы: Правильная обработка блокирующих операций, таких как
queue.get()
Реализация следует современным лучшим практикам AsyncIO и обеспечивает надежную основу для построения отказоустойчивых приложений с возможностью корректного завершения работы источник.
Источники
- Graceful Shutdown in Backend Systems | by Harsh Gharat | Nov, 2025 | Medium
- Making Sense of Asyncio: Tasks, Futures, and Timeouts Simplified - DEV Community
- asyncio Guide: Complete Python Package Documentation [2025]
- Making Blocking Functions Play Nice With Python’s asyncio | by Lynn G. Kwong | Oct, 2025 | Medium
- Understanding Python Async Patterns: Basics - DEV Community
- Asyncio Python - The Complete Guide
- Fix:
FutureCancellation Bug In AnyIO’s BlockingPortal
Заключение
Реализация надежного корректного завершения работы в Python AsyncIO требует тщательного внимания к нескольким ключевым областям:
-
Селективная отмена задач: Всегда различайте пользовательские задачи и внутренние задачи библиотек для предотвращения бесконечных ожиданий. Используйте сопоставление шаблонов и интроспекцию задач для идентификации и защиты управляемых библиотекой задач.
-
Время закрытия соединений: Закрывайте соединения после того, как задачи успели завершить свою текущую работу, но до того, как завершение работы зависнет бесконечно. Используйте операции на основе таймаута для обеспечения очистки без бесконечной блокировки.
-
Управление TaskGroup: Позвольте
TaskGroupобрабатывать внутреннюю отмену, а не вручную отменяйте подзадачи. Используйте вложенные шаблоны TaskGroup для сложных приложений с разными категориями задач. -
Обработка исключений: Реализуйте комплексную обработку исключений при завершении работы, с особым учетом критических ошибок, которые могут потребовать принудительного завершения работы. Используйте
loop.set_exception_handlerдля управления неожиданными сценариями. -
Блокирующие операции: Используйте обертки, осведомленные о завершении работы, для блокирующих операций, таких как
queue.get(). Реализуйте операции на основе таймаута с экспоненциальным увеличением для гарантии, что сигналы завершения работы в конечном итоге будут учтены.
Предоставленная полная реализация предлагает надежную основу, которая может быть адаптирована под конкретные требования приложения. Следуя этим лучшим практикам, вы можете гарантировать, что ваши AsyncIO-приложения завершают работу корректно, поддерживая целостность данных и доверие пользователей даже во время перезапусков или развертываний источник.