Как правильно запускать несколько ботов в asyncio
Решение проблем с дублированием команд и event loop при запуске нескольких ботов в Python с использованием asyncio. Оптимизированная архитектура для управления групповыми ботами.
Как правильно запускать несколько привязанных ботов в Python с использованием asyncio, чтобы основной event loop не ломался и не возникало дублирования команд?
У меня есть следующий код для запуска бота:
def main():
logger.info(f"Starting bot...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(group_manager.setup_group_clones())
loop.create_task(check_old_tokens())
loop.create_task(periodic_check())
loop.create_task(periodic_refresh_groups())
try:
bot.run_forever()
except KeyboardInterrupt:
logger.info("Shutting down")
И класс для управления групповыми ботами:
class GroupCloneManager:
def __init__(self):
self.group_bots: Dict[int, Bot] = {}
self.group_tasks: Dict[int, asyncio.Task] = {}
async def setup_group_clones(self):
try:
async with make_session() as db_session:
groups = await get_connected_groups(db_session)
for group in groups:
await self.add_group_clone(group)
except Exception as e:
logger.debug(f"Error adding groups: {e}")
async def add_group_clone(self, group_data: ConnectedGroups):
group_id = group_data.group_id
token = ssl_crypter.decrypt(group_data.access_token).decode('utf-8')
try:
group_api = API(token=token)
group_bot = Bot(
api=group_api,
labeler=deepcopy(labeler),
state_dispenser=deepcopy(state_dispenser)
)
self.group_bots[group_id] = group_bot
self.group_tasks[group_id] = asyncio.create_task(
self._run_group_bot(group_bot, group_data)
)
logger.debug(f"Group {group_id} added and successfully started")
return True
except Exception as e:
logger.debug(f"Group {group_id} failed to start, unavailable token ({e})")
return False
async def _run_group_bot(self, group_bot: Bot, group_data: ConnectedGroups):
group_id = group_data.group_id
try:
await group_bot.run_polling()
async with make_session() as db_session:
async with make_session() as db_session:
stmt = sa.update(ConnectedGroups).where(ConnectedGroups.group_id == group_id).values(is_active=True)
await db_session.execute(stmt)
await db_session.commit()
except Exception as e:
logger.debug(f"Group {group_id} stopped: {e}")
async def remove_group_clone(self, group_id: int):
if group_id in self.group_bots:
self.group_tasks[group_id].cancel()
if group_id in self.group_bots:
del self.group_bots[group_id]
del self.group_tasks[group_id]
async with make_session() as db_session:
stmt = sa.update(ConnectedGroups).where(ConnectedGroups.group_id == group_id).values(is_active=True)
await db_session.execute(stmt)
await db_session.commit()
logger.debug("Group removed from polling")
async def refresh_group_clones(self):
try:
async with make_session() as db_session:
current_groups = await get_connected_groups(db_session)
current_group_ids = {g['group_id'] for g in current_groups if g['is_active']}
existing_group_ids = set(self.group_bots.keys())
for group_data in current_groups:
if group_data.is_active and group_data.group_id not in existing_group_ids:
await self.add_group_clone(group_data)
for group_id in existing_group_ids:
if group_id not in current_group_ids:
await self.remove_group_clone(group_id)
except Exception as e:
logger.debug(f"Error refreshing groups: {e}")
Проблема в том, что я получаю варнинги в консоли:
DEBUG | 2025-11-15 14:51:44 | bot.control_groups:add_group_clone:46 > Group |id| added and successfully started
DEBUG | 2025-11-15 14:51:44 | bot.control_groups:_run_group_bot:64 > Group |id| stopped: Cannot close a running event loop
Боты (2) запускаются и реагируют, но дважды на одну команду. Как правильно организовать архитектуру, чтобы избежать этих проблем?
Правильная организация запуска нескольких ботов с использованием asyncio требует корректного управления event loop и понимания того, как боты взаимодействуют с циклом событий. Ваша текущая реализация создает несколько event loop, что приводит к проблемам.
Основные проблемы в вашем коде:
- Конфликт event loop: Вы создаете новый event loop вручную, но боты пытаются использовать свои собственные циклы
- Дублирование команд: Каждый бот запускает свой экземпляр polling, обрабатывая одни и те же сообщения
- Некорректное завершение: Попытка закрыть работающий event loop вызывает предупреждение
Contents
- Основные проблемы в текущей реализации
- Правильный подход к запуску нескольких ботов
- Оптимизированная архитектура GroupCloneManager
- Пример исправленного кода
- Практические рекомендации
- Альтернативные решения
Основные проблемы в текущей реализации
Ваш код имеет несколько фундаментальных проблем:
- Несколько event loop: Создание
asyncio.new_event_loop()приводит к конфликтам с внутренними циклами ботов - Дублирование polling: Каждый экземпляр
Botзапускает свой собственныйrun_polling(), что вызывает обработку одних сообщений несколько раз - Некорректное управление задачами: Задачи создаются в разных контекстах event loop
Согласно документации Python, event loop должен быть единым для всех асинхронных операций в приложении.
Правильный подход к запуску нескольких ботов
1. Использование единого event loop
Как рекомендует Real Python, все асинхронные задачи должны работать в рамках одного event loop:
async def main():
# Создаем единый event loop
loop = asyncio.get_running_loop()
# Настраиваем группы
await group_manager.setup_group_clones()
# Запускаем все задачи в одном цикле
tasks = [
check_old_tokens(),
periodic_check(),
periodic_refresh_groups(),
*group_manager.get_all_bot_tasks()
]
await asyncio.gather(*tasks)
2. Централизованное управление ботами
Вместо запуска run_forever() для каждого бота, используйте единую точку входа:
class GroupCloneManager:
def __init__(self):
self.group_bots: Dict[int, Bot] = {}
self.group_tasks: Dict[int, asyncio.Task] = {}
self._main_task = None
async def start_all_bots(self):
"""Запускает всех ботов в рамках единого event loop"""
self._main_task = asyncio.create_task(self._manage_bots())
async def _manage_bots(self):
"""Централизованное управление ботами"""
while True:
try:
# Обновляем список ботов
await self.refresh_group_clones()
# Проверяем состояние каждого бота
for group_id, task in list(self.group_tasks.items()):
if task.done():
# Бот упал, перезапускаем
await self.restart_bot(group_id)
await asyncio.sleep(30) # Период проверки
except Exception as e:
logger.error(f"Error in bot management: {e}")
await asyncio.sleep(10)
Оптимизированная архитектура GroupCloneManager
1. Измененный метод запуска ботов
async def add_group_clone(self, group_data: ConnectedGroups):
group_id = group_data.group_id
token = ssl_crypter.decrypt(group_data.access_token).decode('utf-8')
try:
group_api = API(token=token)
group_bot = Bot(
api=group_api,
labeler=deepcopy(labeler),
state_dispenser=deepcopy(state_dispenser)
)
# Вместо run_forever, используем polling через задачу
self.group_bots[group_id] = group_bot
self.group_tasks[group_id] = asyncio.create_task(
self._run_group_bot_polling(group_bot, group_id)
)
logger.debug(f"Group {group_id} added and successfully started")
return True
except Exception as e:
logger.debug(f"Group {group_id} failed to start, unavailable token ({e})")
return False
async def _run_group_bot_polling(self, group_bot: Bot, group_id: int):
"""Запускает polling для конкретного бота"""
try:
# Используем polling с таймаутом для возможности перезапуска
await groupBot.run_polling(
skip_updates=True, # Пропускаем старые обновления
timeout=20
)
except asyncio.CancelledError:
logger.info(f"Group {group_id} polling cancelled")
raise
except Exception as e:
logger.debug(f"Group {group_id} polling error: {e}")
# Бот упал, будет перезапущен через менеджер
2. Централизованное завершение работы
async def shutdown_all_bots(self):
"""Корректное завершение работы всех ботов"""
logger.info("Shutting down all bots...")
# Отменяем все задачи ботов
for task in self.group_tasks.values():
task.cancel()
# Ждем завершения
await asyncio.gather(*self.group_tasks.values(), return_exceptions=True)
# Закрываем соединения
for bot in self.group_bots.values():
if hasattr(bot, '_connection'):
await bot._connection.close()
logger.info("All bots shutdown completed")
Пример исправленного кода
import asyncio
from typing import Dict
from aiogram import Bot, API
from aiogram.dispatcher import Dispatcher
from aiogram.dispatcher.filters.state import StateDispenser
from aiogram.utils.keyboard import InlineKeyboardBuilder
from copy import deepcopy
class GroupCloneManager:
def __init__(self):
self.group_bots: Dict[int, Bot] = {}
self.group_tasks: Dict[int, asyncio.Task] = {}
self.labeler = InlineKeyboardBuilder()
self.state_dispenser = StateDispenser()
async def start(self):
"""Запускает менеджер"""
# Создаем один event loop для всего приложения
loop = asyncio.get_event_loop()
# Настраиваем боты
await self.setup_group_clones()
# Запускаем все задачи в одном цикле
tasks = [
self.check_old_tokens(),
self.periodic_check(),
self.periodic_refresh_groups(),
self._monitor_bots()
]
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
await self.shutdown()
async def _monitor_bots(self):
"""Мониторинг состояния ботов"""
while True:
try:
# Проверяем упавшие боты
for group_id in list(self.group_tasks.keys()):
task = self.group_tasks[group_id]
if task.done() and not task.cancelled():
try:
await task # Проверяем исключение
except Exception as e:
logger.error(f"Bot {group_id} failed: {e}")
await self.restart_bot(group_id)
await asyncio.sleep(30)
except Exception as e:
logger.error(f"Monitor error: {e}")
await asyncio.sleep(10)
async def restart_bot(self, group_id: int):
"""Перезапускает конкретного бота"""
if group_id in self.group_tasks:
self.group_tasks[group_id].cancel()
if group_id in self.group_bots:
del self.group_bots[group_id]
# Получаем данные группы из БД
group_data = await get_group_by_id(group_id)
if group_data:
await self.add_group_clone(group_data)
async def main():
"""Основная функция запуска"""
manager = GroupCloneManager()
try:
await manager.start()
except KeyboardInterrupt:
await manager.shutdown()
except Exception as e:
logger.error(f"Fatal error: {e}")
await manager.shutdown()
if __name__ == "__main__":
# Используем стандартный запуск asyncio
asyncio.run(main())
Практические рекомендации
-
Избегайте нескольких event loop: Как указано в документации, используйте единый event loop для всего приложения
-
Используйте asyncio.run(): Для новых приложений предпочтительнее использовать
asyncio.run()вместо ручного создания циклов -
Централизованное управление: Все боты должны управляться через единый менеджер
-
Обработка ошибок: Реализуйте обработку ошибок для каждого бота индивидуально
-
Graceful shutdown: Обеспечьте корректное завершение работы всех компонентов
Альтернативные решения
1. Использование multiprocessing
Если вам нужна истинная параллельность, используйте multiprocessing как рекомендует Feng’s Notes:
from multiprocessing import Process
def run_bot(token: str):
"""Запускает бота в отдельном процессе"""
asyncio.run(bot_main(token))
def main():
processes = []
for token in tokens:
p = Process(target=run_bot, args=(token,))
p.start()
processes.append(p)
for p in processes:
p.join()
2. Использование библиотек-оберток
Рассмотрите использование библиотек вроде aiogram-redis для распределенной работы с несколькими ботами.
Заключение
- Используйте единый event loop для всего приложения
- Централизованно управляйте всеми ботами через менеджер
- Реализуйте мониторинг и автоматический перезапуск упавших ботов
- Обеспечьте graceful shutdown для корректного завершения работы
- Избегайте дублирования polling - каждый бот должен обрабатывать обновления только один раз
Правильная организация архитектуры позволит избежать конфликтов event loop и дублирования команд при работе с несколькими ботами в asyncio.
Источники
- Event Loop — Python 3.14.0 documentation
- Asyncio two loops for different I/O tasks? - Stack Overflow
- Python’s asyncio: A Hands-On Walkthrough – Real Python
- Run Multiple Asyncio Frameworks, is it possible? - Feng’s Notes
- Asyncio Run Multiple Concurrent Event Loops - Super Fast Python
- Mastering Python’s Asyncio: A Practical Guide - Medium