Другое

Обработка синхронных операций в event-driven архитектуре

Руководство по обязательным синхронным операциям в event-driven архитектуре. Рассказываем про Saga-паттерн, Outbox, компенсационные транзакции и практические решения.

Как правильно обрабатывать обязательные синхронные операции в event-driven архитектуре?

У меня возник вопрос по event-driven архитектуре, поскольку я изучаю теорию для написания тестового приложения.

Ситуация: у меня есть эндпоинт регистрации. Клиент отправляет запрос, сервис авторизации создает запись с данными авторизации (authRepo), после чего отправляет событие «зарегистрирован пользователь». При этом необходимо гарантировать создание публичного профиля пользователя.

Проблема: как убедиться, что профиль создан? Мне нужно отдать cookie пользователю, но если сервис, отвечающий за публичные профили, упадет или крашнется, пользователь не сможет войти в свой профиль, а повторно зарегистрироваться не сможет, так как он уже зарегистрирован.

Как решать подобные ситуации в event-driven архитектуре, когда требуется синхронная гарантия выполнения определенных операций?

В event‑driven архитектуре обязательные синхронные операции можно реализовать с помощью нескольких надёжных паттернов, таких как Saga с компенсационными транзакциями, Outbox‑паттерн и механизмы идемпотентности. Для вашей ситуации с регистрацией пользователя лучше всего подойдет подход с использованием Saga‑паттерна и гарантированного создания профиля через компенсационные механизмы.


Содержание

Saga‑паттерн для распределённых транзакций

Saga‑паттерн является основным решением для обеспечения согласованности в распределённых системах. Он работает через последовательность локальных транзакций, где каждая транзакция публикует событие для запуска следующей.

Как это работает:

  • Каждая локальная транзакция обновляет базу данных и публикует событие
  • Если транзакция не выполняется из‑за нарушения бизнес‑правил, saga выполняет серию компенсационных транзакций
  • Компенсационные транзакции отменяют изменения, сделанные предыдущими транзакциями

Преимущество этого подхода в том, что он позволяет избежать блокировок и обеспечивает конечную согласованность без необходимости в двухфазном коммите.

“Если участник не может выполнить свою транзакцию, он должен инициировать компенсационную транзакцию, чтобы отменить эффекты неудачной транзакции и вернуть систему в согласованное состояние” — DEV Community


Outbox‑паттерн для надёжной публикации событий

Outbox‑паттерн решает проблему гарантированной публикации событий, что критически важно для вашей ситуации.

Механизм работы:

  1. При создании записи в authRepo в той же транзакции создаётся запись в таблице outbox
  2. Запись outbox содержит данные события для публикации
  3. Отдельный процесс опрашивает таблицу outbox и публикует события
  4. После успешной публикации событие помечается как обработанное

Это гарантирует, что событие о регистрации пользователя не будет потеряно даже если брокер событий временно недоступен.

“Чтобы быть надёжным, приложение должно атомарно обновлять свою базу данных и публиковать событие. Оно не может использовать традиционный механизм распределённой транзакции, охватывающей базу данных и брокер сообщений” — Microservices.io


Компенсационные транзакции

Компенсационные транзакции — это ключ к решению вашей проблемы с созданием профиля.

Пример для вашей ситуации:

  • Основные транзакции:

    1. Создать запись аутентификации
    2. Создать публичный профиль
    3. Опубликовать событие «пользователь зарегистрирован»
  • Компенсационные транзакции:

    1. Удалить профиль пользователя (если шаг 2 не удался)
    2. Удалить запись аутентификации (если шаг 1 не удался)

“Если любая из подтранзакций не выполняется, необходимо реализовать и выполнить процесс компенсации” — IBM Cloud Architecture


Реализация для сценария регистрации

Для вашего конкретного случая регистрации пользователя рекомендуется следующий подход:

Вариант 1: Saga с оркестрацией

python
# Пример оркестрации Saga для регистрации
class RegistrationSaga:
    def start(self, user_data):
        try:
            # Шаг 1: Создать запись аутентификации
            auth_record = auth_repo.create(user_data)

            # Шаг 2: Создать публичный профиль
            profile = profile_service.create(auth_record.user_id)

            # Шаг 3: Опубликовать событие регистрации
            event_bus.publish('user_registered', {
                'user_id': auth_record.user_id,
                'profile_id': profile.id
            })

            return auth_record

        except ProfileCreationError:
            # Компенсация: удалить запись аутентификации
            auth_repo.delete(auth_record.id)
            raise RegistrationError("Не удалось создать профиль")

Вариант 2: Синхронная проверка с отложенным созданием профиля

python
# Альтернативный подход с немедленной проверкой
async def register_user(user_data):
    # Шаг 1: Создать запись аутентификации
    auth_record = await auth_repo.create(user_data)

    try:
        # Шаг 2: Синхронно создать профиль
        profile = await profile_service.create(auth_record.user_id)

        # Шаг 3: Только после успеха выдать cookie
        await issue_session_cookie(auth_record.session_id)

    except ProfileCreationError:
        # Компенсация: удалить запись аутентификации
        await auth_repo.delete(auth_record.id)
        raise RegistrationError("Не удалось создать профиль")

Альтернативные подходы

1. Двухфазный подход с предварительной проверкой

python
async def register_user(user_data):
    # Фаза 1: Проверка доступности сервисов
    if not await profile_service.health_check():
        raise ServiceUnavailableError("Сервис профилей временно недоступен")

    # Фаза 2: Регистрация с гарантированным созданием профиля
    auth_record = await auth_repo.create(user_data)
    profile = await profile_service.create(auth_record.user_id)

    # Только после успеха выдать cookie
    await issue_session_cookie(auth_record.session_id)

2. Паттерн Circuit Breaker

python
# Использование для защиты от падения сервиса профилей
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
@circuit_breaker(failure_threshold=5, recovery_timeout=30)
async def create_profile_with_fallback(user_id):
    try:
        return await profile_service.create(user_id)
    except ServiceUnavailableError:
        # Создать временный профиль
        return await create_temp_profile(user_id)

Практические рекомендации

1. Мониторинг и наблюдаемость

Внедрите comprehensive мониторинг для отслеживания:

  • Состояния Saga транзакций
  • Задержек в обработке событий
  • Успешности компенсационных транзакций

“Мониторинг и автоскейлинг: запускать автоматическое масштабирование при отставании потребителя или глубине очереди” — DZone

2. Обработка ошибок и повторные попытки

python
# Пример с обработкой ошибок и повторными попытками
async def reliable_profile_creation(user_id, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await profile_service.create(user_id)
        except ServiceUnavailableError as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Экспоненциальная задержка

3. Идемпотентность операций

Убедитесь, что операции идемпотентны, чтобы безопасно повторять их:

python
# Идемпотентное создание профиля
async def create_profile_idempotent(user_id):
    existing_profile = await profile_repo.get_by_user_id(user_id)
    if existing_profile:
        return existing_profile

    return await profile_service.create(user_id)

4. Тестирование сценариев сбоя

Тестируйте все возможные сценарии сбоя:

  • Отказ сервиса профилей
  • Таймаут сети
  • Частичные сбои
  • Восстановление после сбоя

“Схемы предохранителей и механизмы отката: предотвращают распространение проблемы одного потребителя” — DZone


Источники

  1. Understanding the Saga Pattern in Event-Driven Architecture - DEV Community
  2. Microservices Pattern: Event-driven architecture
  3. Saga Design Pattern - Azure Architecture Center
  4. Patterns in Event-Driven Architectures - IBM
  5. Architecture Patterns for Reliable Event-Driven Systems - DZone
  6. Saga and Process Manager - Event-Driven.io
  7. Saga choreography pattern - AWS Prescriptive Guidance
  8. IBM Garage Event-Driven Reference Architecture - Saga Pattern
  9. What do you mean by “Event-Driven”? - Martin Fowler
  10. Event-Driven Architecture - System Design - GeeksforGeeks

Заключение

  1. Используйте Saga‑паттерн для управления распределёнными транзакциями с гарантированным откатом через компенсационные транзакции
  2. Реализуйте Outbox‑паттерн для надёжной публикации событий и избежания потери данных
  3. Внедрите идемпотентность операций для безопасного повторного выполнения
  4. Добавьте механизмы мониторинга и обработки ошибок с повторными попытками
  5. Тестируйте сценарии сбоя для обеспечения надёжности системы

Для вашей ситуации с регистрацией пользователя рекомендуется подход с Saga‑оркестрацией, где создание профиля является частью транзакции, а в случае сбоя выполняется компенсация через удаление записи аутентификации. Это гарантирует, что пользователь не останется в «подвешенном» состоянии с созданной записью аутентификации, но без профиля.

Авторы
Проверено модерацией
Модерация