Обработка синхронных операций в event-driven архитектуре
Руководство по обязательным синхронным операциям в event-driven архитектуре. Рассказываем про Saga-паттерн, Outbox, компенсационные транзакции и практические решения.
Как правильно обрабатывать обязательные синхронные операции в event-driven архитектуре?
У меня возник вопрос по event-driven архитектуре, поскольку я изучаю теорию для написания тестового приложения.
Ситуация: у меня есть эндпоинт регистрации. Клиент отправляет запрос, сервис авторизации создает запись с данными авторизации (authRepo), после чего отправляет событие «зарегистрирован пользователь». При этом необходимо гарантировать создание публичного профиля пользователя.
Проблема: как убедиться, что профиль создан? Мне нужно отдать cookie пользователю, но если сервис, отвечающий за публичные профили, упадет или крашнется, пользователь не сможет войти в свой профиль, а повторно зарегистрироваться не сможет, так как он уже зарегистрирован.
Как решать подобные ситуации в event-driven архитектуре, когда требуется синхронная гарантия выполнения определенных операций?
В event‑driven архитектуре обязательные синхронные операции можно реализовать с помощью нескольких надёжных паттернов, таких как Saga с компенсационными транзакциями, Outbox‑паттерн и механизмы идемпотентности. Для вашей ситуации с регистрацией пользователя лучше всего подойдет подход с использованием Saga‑паттерна и гарантированного создания профиля через компенсационные механизмы.
Содержание
- Saga‑паттерн для распределённых транзакций
- Outbox‑паттерн для надёжной публикации событий
- Компенсационные транзакции
- Реализация для сценария регистрации
- Альтернативные подходы
- Практические рекомендации
Saga‑паттерн для распределённых транзакций
Saga‑паттерн является основным решением для обеспечения согласованности в распределённых системах. Он работает через последовательность локальных транзакций, где каждая транзакция публикует событие для запуска следующей.
Как это работает:
- Каждая локальная транзакция обновляет базу данных и публикует событие
- Если транзакция не выполняется из‑за нарушения бизнес‑правил, saga выполняет серию компенсационных транзакций
- Компенсационные транзакции отменяют изменения, сделанные предыдущими транзакциями
Преимущество этого подхода в том, что он позволяет избежать блокировок и обеспечивает конечную согласованность без необходимости в двухфазном коммите.
“Если участник не может выполнить свою транзакцию, он должен инициировать компенсационную транзакцию, чтобы отменить эффекты неудачной транзакции и вернуть систему в согласованное состояние” — DEV Community
Outbox‑паттерн для надёжной публикации событий
Outbox‑паттерн решает проблему гарантированной публикации событий, что критически важно для вашей ситуации.
Механизм работы:
- При создании записи в
authRepoв той же транзакции создаётся запись в таблицеoutbox - Запись
outboxсодержит данные события для публикации - Отдельный процесс опрашивает таблицу
outboxи публикует события - После успешной публикации событие помечается как обработанное
Это гарантирует, что событие о регистрации пользователя не будет потеряно даже если брокер событий временно недоступен.
“Чтобы быть надёжным, приложение должно атомарно обновлять свою базу данных и публиковать событие. Оно не может использовать традиционный механизм распределённой транзакции, охватывающей базу данных и брокер сообщений” — Microservices.io
Компенсационные транзакции
Компенсационные транзакции — это ключ к решению вашей проблемы с созданием профиля.
Пример для вашей ситуации:
-
Основные транзакции:
- Создать запись аутентификации
- Создать публичный профиль
- Опубликовать событие «пользователь зарегистрирован»
-
Компенсационные транзакции:
- Удалить профиль пользователя (если шаг 2 не удался)
- Удалить запись аутентификации (если шаг 1 не удался)
“Если любая из подтранзакций не выполняется, необходимо реализовать и выполнить процесс компенсации” — IBM Cloud Architecture
Реализация для сценария регистрации
Для вашего конкретного случая регистрации пользователя рекомендуется следующий подход:
Вариант 1: Saga с оркестрацией
# Пример оркестрации Saga для регистрации
class RegistrationSaga:
def start(self, user_data):
try:
# Шаг 1: Создать запись аутентификации
auth_record = auth_repo.create(user_data)
# Шаг 2: Создать публичный профиль
profile = profile_service.create(auth_record.user_id)
# Шаг 3: Опубликовать событие регистрации
event_bus.publish('user_registered', {
'user_id': auth_record.user_id,
'profile_id': profile.id
})
return auth_record
except ProfileCreationError:
# Компенсация: удалить запись аутентификации
auth_repo.delete(auth_record.id)
raise RegistrationError("Не удалось создать профиль")
Вариант 2: Синхронная проверка с отложенным созданием профиля
# Альтернативный подход с немедленной проверкой
async def register_user(user_data):
# Шаг 1: Создать запись аутентификации
auth_record = await auth_repo.create(user_data)
try:
# Шаг 2: Синхронно создать профиль
profile = await profile_service.create(auth_record.user_id)
# Шаг 3: Только после успеха выдать cookie
await issue_session_cookie(auth_record.session_id)
except ProfileCreationError:
# Компенсация: удалить запись аутентификации
await auth_repo.delete(auth_record.id)
raise RegistrationError("Не удалось создать профиль")
Альтернативные подходы
1. Двухфазный подход с предварительной проверкой
async def register_user(user_data):
# Фаза 1: Проверка доступности сервисов
if not await profile_service.health_check():
raise ServiceUnavailableError("Сервис профилей временно недоступен")
# Фаза 2: Регистрация с гарантированным созданием профиля
auth_record = await auth_repo.create(user_data)
profile = await profile_service.create(auth_record.user_id)
# Только после успеха выдать cookie
await issue_session_cookie(auth_record.session_id)
2. Паттерн Circuit Breaker
# Использование для защиты от падения сервиса профилей
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
@circuit_breaker(failure_threshold=5, recovery_timeout=30)
async def create_profile_with_fallback(user_id):
try:
return await profile_service.create(user_id)
except ServiceUnavailableError:
# Создать временный профиль
return await create_temp_profile(user_id)
Практические рекомендации
1. Мониторинг и наблюдаемость
Внедрите comprehensive мониторинг для отслеживания:
- Состояния Saga транзакций
- Задержек в обработке событий
- Успешности компенсационных транзакций
“Мониторинг и автоскейлинг: запускать автоматическое масштабирование при отставании потребителя или глубине очереди” — DZone
2. Обработка ошибок и повторные попытки
# Пример с обработкой ошибок и повторными попытками
async def reliable_profile_creation(user_id, max_retries=3):
for attempt in range(max_retries):
try:
return await profile_service.create(user_id)
except ServiceUnavailableError as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Экспоненциальная задержка
3. Идемпотентность операций
Убедитесь, что операции идемпотентны, чтобы безопасно повторять их:
# Идемпотентное создание профиля
async def create_profile_idempotent(user_id):
existing_profile = await profile_repo.get_by_user_id(user_id)
if existing_profile:
return existing_profile
return await profile_service.create(user_id)
4. Тестирование сценариев сбоя
Тестируйте все возможные сценарии сбоя:
- Отказ сервиса профилей
- Таймаут сети
- Частичные сбои
- Восстановление после сбоя
“Схемы предохранителей и механизмы отката: предотвращают распространение проблемы одного потребителя” — DZone
Источники
- Understanding the Saga Pattern in Event-Driven Architecture - DEV Community
- Microservices Pattern: Event-driven architecture
- Saga Design Pattern - Azure Architecture Center
- Patterns in Event-Driven Architectures - IBM
- Architecture Patterns for Reliable Event-Driven Systems - DZone
- Saga and Process Manager - Event-Driven.io
- Saga choreography pattern - AWS Prescriptive Guidance
- IBM Garage Event-Driven Reference Architecture - Saga Pattern
- What do you mean by “Event-Driven”? - Martin Fowler
- Event-Driven Architecture - System Design - GeeksforGeeks
Заключение
- Используйте Saga‑паттерн для управления распределёнными транзакциями с гарантированным откатом через компенсационные транзакции
- Реализуйте Outbox‑паттерн для надёжной публикации событий и избежания потери данных
- Внедрите идемпотентность операций для безопасного повторного выполнения
- Добавьте механизмы мониторинга и обработки ошибок с повторными попытками
- Тестируйте сценарии сбоя для обеспечения надёжности системы
Для вашей ситуации с регистрацией пользователя рекомендуется подход с Saga‑оркестрацией, где создание профиля является частью транзакции, а в случае сбоя выполняется компенсация через удаление записи аутентификации. Это гарантирует, что пользователь не останется в «подвешенном» состоянии с созданной записью аутентификации, но без профиля.