Другое

Как правильно запускать несколько ботов в asyncio

Решение проблем с дублированием команд и event loop при запуске нескольких ботов в Python с использованием asyncio. Оптимизированная архитектура для управления групповыми ботами.

Как правильно запускать несколько привязанных ботов в Python с использованием asyncio, чтобы основной event loop не ломался и не возникало дублирования команд?

У меня есть следующий код для запуска бота:

python
def main():
    logger.info(f"Starting bot...")

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    loop.run_until_complete(group_manager.setup_group_clones())
    loop.create_task(check_old_tokens())
    loop.create_task(periodic_check())
    loop.create_task(periodic_refresh_groups())

    try:
        bot.run_forever()
    except KeyboardInterrupt:
        logger.info("Shutting down")

И класс для управления групповыми ботами:

python
class GroupCloneManager:
    def __init__(self):
        self.group_bots: Dict[int, Bot] = {}
        self.group_tasks: Dict[int, asyncio.Task] = {}

    async def setup_group_clones(self):
        try:
            async with make_session() as db_session:
                groups = await get_connected_groups(db_session)
                for group in groups:
                    await self.add_group_clone(group)
        except Exception as e:
            logger.debug(f"Error adding groups: {e}")

    async def add_group_clone(self, group_data: ConnectedGroups):
        group_id = group_data.group_id
        token = ssl_crypter.decrypt(group_data.access_token).decode('utf-8')

        try:
            group_api = API(token=token)
            group_bot = Bot(
                api=group_api,
                labeler=deepcopy(labeler),
                state_dispenser=deepcopy(state_dispenser)
            )

            self.group_bots[group_id] = group_bot
            self.group_tasks[group_id] = asyncio.create_task(
                self._run_group_bot(group_bot, group_data)
            )

            logger.debug(f"Group {group_id} added and successfully started")
            return True
        except Exception as e:
            logger.debug(f"Group {group_id} failed to start, unavailable token ({e})")
            return False

    async def _run_group_bot(self, group_bot: Bot, group_data: ConnectedGroups):
        group_id = group_data.group_id
        try:
            await group_bot.run_polling()
            async with make_session() as db_session:
                async with make_session() as db_session:
                    stmt = sa.update(ConnectedGroups).where(ConnectedGroups.group_id == group_id).values(is_active=True)

                    await db_session.execute(stmt)
                    await db_session.commit()

        except Exception as e:
            logger.debug(f"Group {group_id} stopped: {e}")

    async def remove_group_clone(self, group_id: int):
        if group_id in self.group_bots:
            self.group_tasks[group_id].cancel()

        if group_id in self.group_bots:
            del self.group_bots[group_id]
            del self.group_tasks[group_id]

        async with make_session() as db_session:
            stmt = sa.update(ConnectedGroups).where(ConnectedGroups.group_id == group_id).values(is_active=True)

            await db_session.execute(stmt)
            await db_session.commit()

        logger.debug("Group removed from polling")

    async def refresh_group_clones(self):
        try:
            async with make_session() as db_session:
                current_groups = await get_connected_groups(db_session)
                current_group_ids = {g['group_id'] for g in current_groups if g['is_active']}
                existing_group_ids = set(self.group_bots.keys())

                for group_data in current_groups:
                    if group_data.is_active and group_data.group_id not in existing_group_ids:
                        await self.add_group_clone(group_data)

                for group_id in existing_group_ids:
                    if group_id not in current_group_ids:
                        await self.remove_group_clone(group_id)
        except Exception as e:
            logger.debug(f"Error refreshing groups: {e}")

Проблема в том, что я получаю варнинги в консоли:

DEBUG    | 2025-11-15 14:51:44 | bot.control_groups:add_group_clone:46 > Group |id| added and successfully started
DEBUG    | 2025-11-15 14:51:44 | bot.control_groups:_run_group_bot:64 > Group |id| stopped: Cannot close a running event loop

Боты (2) запускаются и реагируют, но дважды на одну команду. Как правильно организовать архитектуру, чтобы избежать этих проблем?

Правильная организация запуска нескольких ботов с использованием asyncio требует корректного управления event loop и понимания того, как боты взаимодействуют с циклом событий. Ваша текущая реализация создает несколько event loop, что приводит к проблемам.

Основные проблемы в вашем коде:

  1. Конфликт event loop: Вы создаете новый event loop вручную, но боты пытаются использовать свои собственные циклы
  2. Дублирование команд: Каждый бот запускает свой экземпляр polling, обрабатывая одни и те же сообщения
  3. Некорректное завершение: Попытка закрыть работающий event loop вызывает предупреждение

Contents

Основные проблемы в текущей реализации

Ваш код имеет несколько фундаментальных проблем:

  1. Несколько event loop: Создание asyncio.new_event_loop() приводит к конфликтам с внутренними циклами ботов
  2. Дублирование polling: Каждый экземпляр Bot запускает свой собственный run_polling(), что вызывает обработку одних сообщений несколько раз
  3. Некорректное управление задачами: Задачи создаются в разных контекстах event loop

Согласно документации Python, event loop должен быть единым для всех асинхронных операций в приложении.

Правильный подход к запуску нескольких ботов

1. Использование единого event loop

Как рекомендует Real Python, все асинхронные задачи должны работать в рамках одного event loop:

python
async def main():
    # Создаем единый event loop
    loop = asyncio.get_running_loop()
    
    # Настраиваем группы
    await group_manager.setup_group_clones()
    
    # Запускаем все задачи в одном цикле
    tasks = [
        check_old_tokens(),
        periodic_check(),
        periodic_refresh_groups(),
        *group_manager.get_all_bot_tasks()
    ]
    
    await asyncio.gather(*tasks)

2. Централизованное управление ботами

Вместо запуска run_forever() для каждого бота, используйте единую точку входа:

python
class GroupCloneManager:
    def __init__(self):
        self.group_bots: Dict[int, Bot] = {}
        self.group_tasks: Dict[int, asyncio.Task] = {}
        self._main_task = None

    async def start_all_bots(self):
        """Запускает всех ботов в рамках единого event loop"""
        self._main_task = asyncio.create_task(self._manage_bots())

    async def _manage_bots(self):
        """Централизованное управление ботами"""
        while True:
            try:
                # Обновляем список ботов
                await self.refresh_group_clones()
                
                # Проверяем состояние каждого бота
                for group_id, task in list(self.group_tasks.items()):
                    if task.done():
                        # Бот упал, перезапускаем
                        await self.restart_bot(group_id)
                
                await asyncio.sleep(30)  # Период проверки
            except Exception as e:
                logger.error(f"Error in bot management: {e}")
                await asyncio.sleep(10)

Оптимизированная архитектура GroupCloneManager

1. Измененный метод запуска ботов

python
async def add_group_clone(self, group_data: ConnectedGroups):
    group_id = group_data.group_id
    token = ssl_crypter.decrypt(group_data.access_token).decode('utf-8')

    try:
        group_api = API(token=token)
        group_bot = Bot(
            api=group_api,
            labeler=deepcopy(labeler),
            state_dispenser=deepcopy(state_dispenser)
        )

        # Вместо run_forever, используем polling через задачу
        self.group_bots[group_id] = group_bot
        self.group_tasks[group_id] = asyncio.create_task(
            self._run_group_bot_polling(group_bot, group_id)
        )

        logger.debug(f"Group {group_id} added and successfully started")
        return True
    except Exception as e:
        logger.debug(f"Group {group_id} failed to start, unavailable token ({e})")
        return False

async def _run_group_bot_polling(self, group_bot: Bot, group_id: int):
    """Запускает polling для конкретного бота"""
    try:
        # Используем polling с таймаутом для возможности перезапуска
        await groupBot.run_polling(
            skip_updates=True,  # Пропускаем старые обновления
            timeout=20
        )
    except asyncio.CancelledError:
        logger.info(f"Group {group_id} polling cancelled")
        raise
    except Exception as e:
        logger.debug(f"Group {group_id} polling error: {e}")
        # Бот упал, будет перезапущен через менеджер

2. Централизованное завершение работы

python
async def shutdown_all_bots(self):
    """Корректное завершение работы всех ботов"""
    logger.info("Shutting down all bots...")
    
    # Отменяем все задачи ботов
    for task in self.group_tasks.values():
        task.cancel()
    
    # Ждем завершения
    await asyncio.gather(*self.group_tasks.values(), return_exceptions=True)
    
    # Закрываем соединения
    for bot in self.group_bots.values():
        if hasattr(bot, '_connection'):
            await bot._connection.close()
    
    logger.info("All bots shutdown completed")

Пример исправленного кода

python
import asyncio
from typing import Dict
from aiogram import Bot, API
from aiogram.dispatcher import Dispatcher
from aiogram.dispatcher.filters.state import StateDispenser
from aiogram.utils.keyboard import InlineKeyboardBuilder
from copy import deepcopy

class GroupCloneManager:
    def __init__(self):
        self.group_bots: Dict[int, Bot] = {}
        self.group_tasks: Dict[int, asyncio.Task] = {}
        self.labeler = InlineKeyboardBuilder()
        self.state_dispenser = StateDispenser()
    
    async def start(self):
        """Запускает менеджер"""
        # Создаем один event loop для всего приложения
        loop = asyncio.get_event_loop()
        
        # Настраиваем боты
        await self.setup_group_clones()
        
        # Запускаем все задачи в одном цикле
        tasks = [
            self.check_old_tokens(),
            self.periodic_check(),
            self.periodic_refresh_groups(),
            self._monitor_bots()
        ]
        
        try:
            await asyncio.gather(*tasks)
        except KeyboardInterrupt:
            await self.shutdown()
    
    async def _monitor_bots(self):
        """Мониторинг состояния ботов"""
        while True:
            try:
                # Проверяем упавшие боты
                for group_id in list(self.group_tasks.keys()):
                    task = self.group_tasks[group_id]
                    if task.done() and not task.cancelled():
                        try:
                            await task  # Проверяем исключение
                        except Exception as e:
                            logger.error(f"Bot {group_id} failed: {e}")
                            await self.restart_bot(group_id)
                
                await asyncio.sleep(30)
            except Exception as e:
                logger.error(f"Monitor error: {e}")
                await asyncio.sleep(10)
    
    async def restart_bot(self, group_id: int):
        """Перезапускает конкретного бота"""
        if group_id in self.group_tasks:
            self.group_tasks[group_id].cancel()
        
        if group_id in self.group_bots:
            del self.group_bots[group_id]
        
        # Получаем данные группы из БД
        group_data = await get_group_by_id(group_id)
        if group_data:
            await self.add_group_clone(group_data)

async def main():
    """Основная функция запуска"""
    manager = GroupCloneManager()
    
    try:
        await manager.start()
    except KeyboardInterrupt:
        await manager.shutdown()
    except Exception as e:
        logger.error(f"Fatal error: {e}")
        await manager.shutdown()

if __name__ == "__main__":
    # Используем стандартный запуск asyncio
    asyncio.run(main())

Практические рекомендации

  1. Избегайте нескольких event loop: Как указано в документации, используйте единый event loop для всего приложения

  2. Используйте asyncio.run(): Для новых приложений предпочтительнее использовать asyncio.run() вместо ручного создания циклов

  3. Централизованное управление: Все боты должны управляться через единый менеджер

  4. Обработка ошибок: Реализуйте обработку ошибок для каждого бота индивидуально

  5. Graceful shutdown: Обеспечьте корректное завершение работы всех компонентов

Альтернативные решения

1. Использование multiprocessing

Если вам нужна истинная параллельность, используйте multiprocessing как рекомендует Feng’s Notes:

python
from multiprocessing import Process

def run_bot(token: str):
    """Запускает бота в отдельном процессе"""
    asyncio.run(bot_main(token))

def main():
    processes = []
    for token in tokens:
        p = Process(target=run_bot, args=(token,))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()

2. Использование библиотек-оберток

Рассмотрите использование библиотек вроде aiogram-redis для распределенной работы с несколькими ботами.


Заключение

  1. Используйте единый event loop для всего приложения
  2. Централизованно управляйте всеми ботами через менеджер
  3. Реализуйте мониторинг и автоматический перезапуск упавших ботов
  4. Обеспечьте graceful shutdown для корректного завершения работы
  5. Избегайте дублирования polling - каждый бот должен обрабатывать обновления только один раз

Правильная организация архитектуры позволит избежать конфликтов event loop и дублирования команд при работе с несколькими ботами в asyncio.

Источники

  1. Event Loop — Python 3.14.0 documentation
  2. Asyncio two loops for different I/O tasks? - Stack Overflow
  3. Python’s asyncio: A Hands-On Walkthrough – Real Python
  4. Run Multiple Asyncio Frameworks, is it possible? - Feng’s Notes
  5. Asyncio Run Multiple Concurrent Event Loops - Super Fast Python
  6. Mastering Python’s Asyncio: A Practical Guide - Medium
Авторы
Проверено модерацией
Модерация