Программирование

Обработка ошибок в контроллерах с распределенными транзакциями

Руководство по обработке ошибок в контроллерах с вложенными сервисами. Паттерны TCC, Saga и компенсационные транзакции для систем без возможности отката.

4 ответа 1 просмотр

Как обрабатывать ошибки в контроллере с вложенными сервисами, когда некоторые операции невозможно откатить?

Я разрабатываю бэкенд веб-приложения, и у меня возникла проблема с обработкой ошибок в эндпоинте, который выполняет несколько последовательных операций:

  1. Регистрация платежа через внешний API (возвращает объект Payment)
  2. Верификация платежа через внешний API (возвращает номер верификации и URL)
  3. Запись информации о платеже в Google Sheet

Проблема возникает, когда первые две операции уже выполнены (включая записи во внешних базах данных), но третья операция завершается ошибкой. Поскольку первые две операции невозможно откатить, как правильно организовать обработку ошибок в таких сценариях?

Я рассматривал использование вложенных try-except блоков, но не уверен, как правильно обрабатывать ошибки внешних API вызовов, когда нет возможности выполнить откат (rollback).

Вот пример кода моего эндпоинта на Python:

python
@endpoint(route="register-payment", methods=["POST"])
@staticmethod
def register_payment(http_request: Request) -> RestResponse:

 request = RegisterPaymentRequest.model_validate_json(http_request.body)

 payment = InsuranceService.register_payment(
 transaction_id=request.transaction_id,
 payment_id=request.payment_id,
 amount=request.amount,
 invoice_id_list=request.invoice_id_list,
 )
 
 invoices = []
 for invoice_id in request.invoice_id_list:
 invoice = InvoiceRepository.filter(
 idefact=invoice_id,
 ).first()
 if invoice:
 invoices.append(invoice)

 verification = InsuranceService.verify_payment(
 register_id=payment.register_id,
 invoices = invoices
 )

 SheetsService.write_payment(
 invoices=invoices,
 payment=payment,
 numreling=verification.numreling,
 urlrep=verification.urlrep
 )

 return RestResponse(
 status=rest_status.HTTP_200_OK,
 data=Response(
 code="0",
 error=False,
 message="Payment validated successfully",
 data={
 "numreling": verification.numreling,
 "urlrep": verification.urlrep
 },
 ).to_dict(),
 )

Обработка ошибок в контроллерах с вложенными сервисами, когда операции невозможно откатить, требует использования паттернов компенсирующих транзакций, таких как TCC (Try-Confirm-Cancel), Circuit Choreography или Saga. Эти подходы обеспечивают согласованность данных в распределенных системах через механизмы компенсации вместо традиционных отката транзакций, что особенно актуально при работе с внешними API и системами, которые не поддерживают транзакционные операции.

GitHub Octocat representing distributed transaction patterns

Содержание


Анализ проблемы распределенных транзакций

Ваша проблема типична для распределенных систем, где несколько сервисов участвуют в одной бизнес-транзакции. Когда первые операции (регистрация платежа и верификация) уже выполнены, но последующая операция (запись в Google Sheets) завершается ошибкой, возникает ситуация невозможности отката.

Ключевые особенности вашей ситуации:

  1. Операции уже модифицируют внешние системы - платеж зарегистрирован в страховой системе
  2. Внешние API не поддерживают транзакционный откат - нет возможности отменить уже выполненные операции
  3. Последовательный характер операций - каждая следующая операция зависит от результатов предыдущих

Такие сценарии требуют использования распределенных транзакций, которые обеспечивают согласованность данных через механизмы компенсации, а не через традиционный откат. В таких системах важен координатор распределенных транзакций, который отслеживает состояние всех операций и инициирует процессы компенсации при необходимости.


Паттерн TCC (Try-Confirm-Cancel)

Паттерн TCC (Try-Confirm-Cancel) предоставляет структурированный подход к управлению распределенными транзакциями без возможности отката. Этот паттерн разделяет каждую операцию на три фазы:

  1. Try - Резервирование ресурсов без окончательного применения изменений
  2. Confirm - Подтверждение операции и окончательное применение изменений
  3. Cancel - Отмена операции и освобождение зарезервированных ресурсов

Для вашего сценария реализации TCC паттерна:

python
@endpoint(route="register-payment", methods=["POST"])
@staticmethod
def register_payment(http_request: Request) -> RestResponse:
 request = RegisterPaymentRequest.model_validate_json(http_request.body)
 
 # Фаза Try - регистрация платежа без окончательного подтверждения
 try:
 payment_reservation = InsuranceService.try_register_payment(
 transaction_id=request.transaction_id,
 payment_id=request.payment_id,
 amount=request.amount,
 invoice_id_list=request.invoice_id_list,
 )
 
 # Получение счетов для верификации
 invoices = []
 for invoice_id in request.invoice_id_list:
 invoice = InvoiceRepository.filter(
 idefact=invoice_id,
 ).first()
 if invoice:
 invoices.append(invoice)

 # Фаза Try - верификация платежа
 verification_reservation = InsuranceService.try_verify_payment(
 register_id=payment_reservation.register_id,
 invoices=invoices
 )
 
 # Фаза Try - подготовка записи в Google Sheets
 sheets_reservation = SheetsService.try_write_payment(
 invoices=invoices,
 payment=payment_reservation,
 numreling=verification_reservation.numreling,
 urlrep=verification_reservation.urlrep
 )
 
 # Если все операции Try успешны, выполняем Confirm
 InsuranceService.confirm_register_payment(payment_reservation)
 InsuranceService.confirm_verify_payment(verification_reservation)
 SheetsService.confirm_write_payment(sheets_reservation)
 
 return RestResponse(
 status=rest_status.HTTP_200_OK,
 data=Response(
 code="0",
 error=False,
 message="Payment validated successfully",
 data={
 "numreling": verification_reservation.numreling,
 "urlrep": verification_reservation.urlrep
 },
 ).to_dict(),
 )
 
 except Exception as e:
 # При ошибке выполняем Cancel для всех успешных Try операций
 try:
 if 'sheets_reservation' in locals():
 SheetsService.cancel_write_payment(sheets_reservation)
 if 'verification_reservation' in locals():
 InsuranceService.cancel_verify_payment(verification_reservation)
 if 'payment_reservation' in locals():
 InsuranceService.cancel_register_payment(payment_reservation)
 except Exception as cancel_error:
 # Логируем ошибку компенсации, но не прерываем обработку основной ошибки
 log_error(f"Error during compensation: {cancel_error}")
 
 return RestResponse(
 status=rest_status.HTTP_500_INTERNAL_SERVER_ERROR,
 data=Response(
 code="1",
 error=True,
 message=f"Payment processing failed: {str(e)}",
 data={},
 ).to_dict(),
 )

Преимущества TCC подхода:

  • Атомарность обеспечивается через компенсационные операции
  • Изоляция - каждая операция обрабатывается независимо
  • Надежность - даже при сбое системы можно восстановить согласованность

Как отмечает автор проекта NTcc-TransactionCore, реализация TCC паттерна через аннотации [Compensable] позволяет эффективно управлять распределенными транзакциями без поддержки отката в внешних системах.


Паттерн Circuit Choreography

Паттерн Circuit Choreography предлагает более гибкий подход к управлению цепочками транзакций, поддерживая замкнутую цепь с маршрутами вперед (forward), назад (backward) и восстановления (restoration).

Ключевые элементы этого подхода для вашего случая:

  1. Обязательное использование Correlation ID - для отслеживания связей между операциями
  2. Уровни восстановления:
  • Level 1 - полный откат (когда это возможно)
  • Level 2 - частичный откат (когда полный откат невозможен)
  • Level 3 - сохранение транзакции (например, уведомление администратора)

Реализация Circuit Choreography для вашего сценария:

python
class PaymentOrchestrator:
 def __init__(self):
 self.circuit_breaker = CircuitBreaker()
 self.correlation_id = str(uuid.uuid4())
 
 def process_payment(self, request):
 # Создаем цепь транзакций
 circuit = CircuitChain(self.correlation_id)
 
 try:
 # Добавляем операции в цепь
 circuit.add_operation(
 "register_payment",
 lambda: self._register_payment(request),
 compensation=self._compensate_registration
 )
 
 circuit.add_operation(
 "verify_payment", 
 lambda: self._verify_payment(request),
 compensation=self._compensate_verification
 )
 
 circuit.add_operation(
 "write_to_sheets",
 lambda: self._write_to_sheets(request),
 compensation=self._notify_admin_failure
 )
 
 # Выполняем цепь
 circuit.execute()
 
 return self._build_success_response(circuit)
 
 except CircuitBreakerOpenException:
 # Circuit breaker предотвращает каскадные сбои
 log_error(f"Circuit breaker open for correlation_id: {self.correlation_id}")
 return self._build_circuit_breaker_response()
 
 except Exception as e:
 # Запускаем восстановление цепи
 circuit.rollback()
 return self._build_error_response(e)
 
 def _register_payment(self, request):
 # Реализация регистрации платежа
 payment = InsuranceService.register_payment(
 transaction_id=request.transaction_id,
 payment_id=request.payment_id,
 amount=request.amount,
 invoice_id_list=request.invoice_id_list,
 )
 return payment
 
 def _compensate_registration(self, payment):
 # Компенсация регистрации (если возможно)
 try:
 InsuranceService.cancel_registration(payment.register_id)
 except Exception as e:
 # Если компенсация невозможна, сохраняем информацию для ручного восстановления
 self._save_failed_compensation("registration", payment, e)
 
 def _verify_payment(self, request, payment):
 invoices = []
 for invoice_id in request.invoice_id_list:
 invoice = InvoiceRepository.filter(idefact=invoice_id).first()
 if invoice:
 invoices.append(invoice)
 
 verification = InsuranceService.verify_payment(
 register_id=payment.register_id,
 invoices=invoices
 )
 return verification
 
 def _compensate_verification(self, verification):
 try:
 InsuranceService.cancel_verification(verification.verification_id)
 except Exception as e:
 self._save_failed_compensation("verification", verification, e)
 
 def _write_to_sheets(self, request, payment, verification):
 # Запись в Google Sheets
 SheetsService.write_payment(
 invoices=request.invoice_id_list,
 payment=payment,
 numreling=verification.numreling,
 urlrep=verification.urlrep
 )
 return {"status": "success"}
 
 def _notify_admin_failure(self, context):
 # Уведомление администратора о необходимости ручного вмешательства
 admin_notification_service.send_notification(
 subject="Payment processing failure",
 message=f"Payment processing failed at sheets writing step. Correlation ID: {self.correlation_id}",
 context=context
 )

Как подчеркивает архитектор Erwin Kramer, автор Circuit Choreography паттерна, ключевой особенностью является поддержка замкнутой цепи во все времена и интеграция с Circuit Breaker для предотвращения каскадных сбоев в распределенной системе.


Паттерн Saga для асинхронной компенсации

Паттерн Saga предлагает асинхронный подход к управлению распределенными транзакциями, где каждая операция выполняется последовательно, а при сбое запускаются компенсационные транзакции в обратном порядке.

Особенности Saga подхода для вашего сценария:

  1. Идемпотентная обработка - гарантия уникального результата при многократном выполнении
  2. Асинхронное выполнение компенсационных транзакций - не блокирует основной процесс
  3. Последовательное восстановление состояния - компенсации выполняются в обратном порядке

Реализация Saga паттерна:

python
class PaymentSaga:
 def __init__(self):
 self.steps = []
 self.completed_steps = []
 self.saga_id = str(uuid.uuid4())
 
 def add_step(self, name, operation, compensation):
 self.steps.append({
 'name': name,
 'operation': operation,
 'compensation': compensation
 })
 
 def execute(self):
 try:
 for step in self.steps:
 result = step['operation']()
 self.completed_steps.append({
 'name': step['name'],
 'result': result
 })
 return self._build_success_response()
 except Exception as e:
 # Запускаем компенсационные транзакции в обратном порядке
 self.compensate()
 raise e
 
 def compensate(self):
 # Выполняем компенсации в обратном порядке
 for step in reversed(self.completed_steps):
 try:
 step_data = next(s for s in self.steps if s['name'] == step['name'])
 step_data['compensation'](step['result'])
 except Exception as comp_error:
 # Логируем ошибку компенсации, но продолжаем выполнение
 log_error(f"Compensation failed for step {step['name']}: {comp_error}")
 
 def save_state(self):
 # Сохраняем состояние саги для восстановления после сбоя
 saga_state = {
 'saga_id': self.saga_id,
 'completed_steps': self.completed_steps,
 'timestamp': datetime.now()
 }
 saga_repository.save(saga_state)
 
 def restore_and_continue(self):
 # Восстанавливаем состояние саги после сбоя
 saved_state = saga_repository.get_by_id(self.saga_id)
 if saved_state:
 self.completed_steps = saved_state['completed_steps']
 # Продолжаем выполнение с того шага, на котором остановились
 start_index = len(self.completed_steps)
 for i in range(start_index, len(self.steps)):
 try:
 result = self.steps[i]['operation']()
 self.completed_steps.append({
 'name': self.steps[i]['name'],
 'result': result
 })
 except Exception as e:
 self.compensate()
 raise e

# Использование в контроллере
def register_payment(http_request: Request) -> RestResponse:
 request = RegisterPaymentRequest.model_validate_json(http_request.body)
 
 saga = PaymentSaga()
 
 # Добавляем шаги в сагу
 saga.add_step(
 name="register_payment",
 operation=lambda: InsuranceService.register_payment(
 transaction_id=request.transaction_id,
 payment_id=request.payment_id,
 amount=request.amount,
 invoice_id_list=request.invoice_id_list,
 ),
 compensation=lambda payment: InsuranceService.cancel_registration(payment.register_id)
 )
 
 saga.add_step(
 name="verify_payment",
 operation=lambda: _verify_payment(request, saga.completed_steps[0]['result']),
 compensation=lambda verification: InsuranceService.cancel_verification(verification.verification_id)
 )
 
 saga.add_step(
 name="write_to_sheets",
 operation=lambda: SheetsService.write_payment(
 invoices=request.invoice_id_list,
 payment=saga.completed_steps[0]['result'],
 numreling=saga.completed_steps[1]['result'].numreling,
 urlrep=saga.completed_steps[1]['result'].urlrep
 ),
 compensation=lambda: _notify_admin_sheets_failure(saga.completed_steps)
 )
 
 try:
 saga.execute()
 return _build_success_response(saga)
 except Exception as e:
 saga.save_state() # Сохраняем состояние для ручного восстановления
 return _build_error_response(e)

Как отмечает Fabio Martineli, специалист по распределенным транзакциям, паттерн компенсирующих транзакций позволяет выполнять действия для отката предыдущих операций при необходимости, что идеально подходит для сценариев, где традиционный откат невозможен.


Практическая реализация для вашего сценария

На основе рассмотренных паттернов, вот практическая реализация для вашего конкретного сценария с использованием комбинации подходов:

python
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, Any
import uuid
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TransactionStatus(Enum):
 PENDING = "pending"
 COMPLETED = "completed"
 FAILED = "failed"
 COMPENSATED = "compensated"

@dataclass
class TransactionStep:
 name: str
 operation: Callable
 compensation: Optional[Callable] = None
 result: Optional[Any] = None
 status: TransactionStatus = TransactionStatus.PENDING

class DistributedTransactionOrchestrator:
 def __init__(self, correlation_id: Optional[str] = None):
 self.correlation_id = correlation_id or str(uuid.uuid4())
 self.steps: list[TransactionStep] = []
 self.failed_step: Optional[TransactionStep] = None
 
 def add_step(self, name: str, operation: Callable, compensation: Optional[Callable] = None):
 self.steps.append(TransactionStep(name, operation, compensation))
 
 def execute(self) -> dict:
 try:
 for step in self.steps:
 step.result = step.operation()
 step.status = TransactionStatus.COMPLETED
 logger.info(f"Step {step.name} completed successfully")
 
 return self._build_success_response()
 
 except Exception as e:
 self.failed_step = next(step for step in self.steps if step.status == TransactionStatus.PENDING)
 logger.error(f"Step {self.failed_step.name} failed: {str(e)}")
 
 # Запускаем компенсации для завершенных шагов
 self._execute_compensations()
 
 return self._build_error_response(e)
 
 def _execute_compensations(self):
 # Выполняем компенсации в обратном порядке
 for step in reversed(self.steps):
 if step.status == TransactionStatus.COMPLETED and step.compensation:
 try:
 step.compensation(step.result)
 step.status = TransactionStatus.COMPENSATED
 logger.info(f"Compensation for step {step.name} completed successfully")
 except Exception as comp_error:
 logger.error(f"Compensation failed for step {step.name}: {str(comp_error)}")
 step.status = TransactionStatus.FAILED
 
 def _build_success_response(self) -> dict:
 last_step = self.steps[-1]
 return {
 "status": "success",
 "correlation_id": self.correlation_id,
 "message": "All steps completed successfully",
 "data": last_step.result
 }
 
 def _build_error_response(self, error: Exception) -> dict:
 return {
 "status": "error",
 "correlation_id": self.correlation_id,
 "message": f"Transaction failed at step {self.failed_step.name}: {str(error)}",
 "failed_step": self.failed_step.name,
 "completed_steps": [step.name for step in self.steps if step.status == TransactionStatus.COMPLETED],
 "compensated_steps": [step.name for step in self.steps if step.status == TransactionStatus.COMPENSATED]
 }

# Реализация для вашего сервиса
class PaymentService:
 def __init__(self):
 self.orchestrator = DistributedTransactionOrchestrator()
 
 def process_payment(self, request: RegisterPaymentRequest) -> dict:
 # Настраиваем шаги транзакции
 self.orchestrator.add_step(
 name="register_payment",
 operation=lambda: self._register_payment(request),
 compensation=self._compensate_registration
 )
 
 self.orchestrator.add_step(
 name="verify_payment",
 operation=lambda: self._verify_payment(request),
 compensation=self._compensate_verification
 )
 
 self.orchestrator.add_step(
 name="write_to_sheets",
 operation=lambda: self._write_to_sheets(request),
 compensation=self._notify_admin
 )
 
 # Выполняем транзакцию
 return self.orchestrator.execute()
 
 def _register_payment(self, request: RegisterPaymentRequest):
 # Регистрация платежа
 payment = InsuranceService.register_payment(
 transaction_id=request.transaction_id,
 payment_id=request.payment_id,
 amount=request.amount,
 invoice_id_list=request.invoice_id_list,
 )
 logger.info(f"Payment registered with ID: {payment.register_id}")
 return payment
 
 def _compensate_registration(self, payment):
 try:
 InsuranceService.cancel_registration(payment.register_id)
 logger.info(f"Payment registration compensated: {payment.register_id}")
 except Exception as e:
 logger.error(f"Failed to compensate payment registration: {str(e)}")
 # Сохраняем информацию для ручного восстановления
 self._save_manual_recovery_info("payment_registration", payment, e)
 
 def _verify_payment(self, request: RegisterPaymentRequest):
 # Получение счетов
 invoices = []
 for invoice_id in request.invoice_id_list:
 invoice = InvoiceRepository.filter(idefact=invoice_id).first()
 if invoice:
 invoices.append(invoice)
 
 # Верификация платежа
 verification = InsuranceService.verify_payment(
 register_id=self.orchestrator.steps[0].result.register_id,
 invoices=invoices
 )
 logger.info(f"Payment verified with verification ID: {verification.verification_id}")
 return verification
 
 def _compensate_verification(self, verification):
 try:
 InsuranceService.cancel_verification(verification.verification_id)
 logger.info(f"Payment verification compensated: {verification.verification_id}")
 except Exception as e:
 logger.error(f"Failed to compensate payment verification: {str(e)}")
 self._save_manual_recovery_info("payment_verification", verification, e)
 
 def _write_to_sheets(self, request: RegisterPaymentRequest):
 # Запись в Google Sheets
 SheetsService.write_payment(
 invoices=request.invoice_id_list,
 payment=self.orchestrator.steps[0].result, # Результат регистрации
 numreling=self.orchestrator.steps[1].result.numreling, # Результат верификации
 urlrep=self.orchestrator.steps[1].result.urlrep
 )
 logger.info("Payment information written to Google Sheets")
 return {"status": "sheets_written"}
 
 def _notify_admin(self, context):
 # Уведомление администратора о необходимости ручного вмешательства
 AdminNotificationService.send_notification(
 subject="Payment processing failure",
 message=f"Payment processing failed at sheets writing step. "
 f"Correlation ID: {self.orchestrator.correlation_id}",
 context=context
 )
 logger.info("Admin notification sent for manual intervention")
 
 def _save_manual_recovery_info(self, operation: str, data: Any, error: Exception):
 # Сохранение информации для ручного восстановления
 recovery_info = {
 "operation": operation,
 "correlation_id": self.orchestrator.correlation_id,
 "data": data,
 "error": str(error),
 "timestamp": datetime.now()
 }
 RecoveryRepository.save(recovery_info)

# Использование в контроллере
@endpoint(route="register-payment", methods=["POST"])
@staticmethod
def register_payment(http_request: Request) -> RestResponse:
 request = RegisterPaymentRequest.model_validate_json(http_request.body)
 
 payment_service = PaymentService()
 result = payment_service.process_payment(request)
 
 if result["status"] == "success":
 return RestResponse(
 status=rest_status.HTTP_200_OK,
 data=Response(
 code="0",
 error=False,
 message="Payment validated successfully",
 data=result["data"],
 ).to_dict(),
 )
 else:
 return RestResponse(
 status=rest_status.HTTP_500_INTERNAL_SERVER_ERROR,
 data=Response(
 code="1",
 error=True,
 message=result["message"],
 data={
 "correlation_id": result["correlation_id"],
 "failed_step": result["failed_step"],
 "completed_steps": result["completed_steps"],
 "compensated_steps": result["compensated_steps"]
 },
 ).to_dict(),
 )

Эта реализация использует гибридный подход, объединяющий элементы TCC и Saga паттернов, и обеспечивает:

  1. Четкую компенсацию для операций, которые можно откатить
  2. Уведомление администратора для операций, которые невозможно откатить
  3. Отслеживание состояния всех операций для восстановления после сбоев
  4. Логирование всех действий и ошибок для мониторинга

Рекомендации по обработке ошибок в контроллерах

Основываясь на лучших практиках работы с распределенными транзакциями, вот рекомендации по организации обработки ошибок в контроллерах:

1. Используйте паттерны компенсации вместо традиционного отката

python
# Плохо: попытка откатить то, что нельзя откатить
try:
 # Выполнение операций
 operation1()
 operation2()
 operation3()
except Exception as e:
 # Попытка отката (часто невозможна для внешних API)
 try:
 operation3.rollback()
 operation2.rollback()
 operation1.rollback()
 except:
 pass # Откат не удался
 raise e

# Хорошо: использование компенсации
try:
 # Выполнение операций
 result1 = operation1()
 result2 = operation2(result1)
 result3 = operation3(result2)
except Exception as e:
 # Запуск компенсаций в обратном порядке
 if 'result3' in locals():
 compensate3(result3)
 if 'result2' in locals():
 compensate2(result2)
 if 'result1' in locals():
 compensate1(result1)
 raise e

2. Внедрите механизм идемпотентности

python
def operation_with_idempotency(payment_id: str, operation: Callable):
 # Проверяем, не выполнялась ли уже операция
 if IdempotencyRepository.exists(payment_id):
 return IdempotencyRepository.get_result(payment_id)
 
 # Выполняем операцию
 try:
 result = operation()
 # Сохраняем результат
 IdempotencyRepository.save(payment_id, result, status="completed")
 return result
 except Exception as e:
 # Сохраняем ошибку для повторных попыток
 IdempotencyRepository.save(payment_id, None, status="failed", error=str(e))
 raise e

3. Реализуйте Circuit Breaker для защиты от каскадных сбоев

python
class PaymentProcessingCircuitBreaker:
 def __init__(self, failure_threshold=5, recovery_timeout=60):
 self.failure_threshold = failure_threshold
 self.recovery_timeout = recovery_timeout
 self.failure_count = 0
 self.last_failure_time = None
 self.state = "closed" # closed, open, half-open
 
 def execute(self, operation):
 if self.state == "open":
 if self._should_attempt_reset():
 self.state = "half-open"
 else:
 raise CircuitBreakerOpenException("Circuit breaker is open")
 
 try:
 result = operation()
 self._on_success()
 return result
 except Exception as e:
 self._on_failure()
 raise e
 
 def _should_attempt_reset(self):
 return (datetime.now() - self.last_failure_time).seconds > self.recovery_timeout
 
 def _on_success(self):
 if self.state == "half-open":
 self.state = "closed"
 self.failure_count = 0
 
 def _on_failure(self):
 self.failure_count += 1
 self.last_failure_time = datetime.now()
 
 if self.failure_count >= self.failure_threshold:
 self.state = "open"

# Использование в контроллере
circuit_breaker = PaymentProcessingCircuitBreaker()

@endpoint(route="register-payment", methods=["POST"])
def register_payment(http_request: Request) -> RestResponse:
 try:
 return circuit_breaker.execute(lambda: _process_payment(http_request))
 except CircuitBreakerOpenException:
 return _build_circuit_breaker_response()
 except Exception as e:
 return _build_error_response(e)

4. Внедрите систему мониторинга и оповещений

python
class TransactionMonitor:
 def __init__(self):
 self.metrics = {
 "successful_transactions": 0,
 "failed_transactions": 0,
 "compensations_triggered": 0,
 "average_processing_time": 0
 }
 
 def start_transaction(self, correlation_id: str):
 return TransactionContext(correlation_id, self)
 
 def record_success(self, processing_time: float):
 self.metrics["successful_transactions"] += 1
 self._update_average_time(processing_time)
 self._check_alerts()
 
 def record_failure(self, processing_time: float):
 self.metrics["failed_transactions"] += 1
 self._update_average_time(processing_time)
 self._check_alerts()
 
 def record_compensation(self):
 self.metrics["compensations_triggered"] += 1
 self._check_alerts()
 
 def _update_average_time(self, new_time: float):
 # Простая реализация скользящего среднего
 alpha = 0.1
 self.metrics["average_processing_time"] = (
 alpha * new_time + 
 (1 - alpha) * self.metrics["average_processing_time"]
 )
 
 def _check_alerts(self):
 if self.metrics["failed_transactions"] > 10:
 AlertService.send("High failure rate detected")
 
 if self.metrics["compensations_triggered"] > 5:
 AlertService.send("High compensation rate detected")
 
 if self.metrics["average_processing_time"] > 30:
 AlertService.send("Slow transaction processing detected")

class TransactionContext:
 def __init__(self, correlation_id: str, monitor: TransactionMonitor):
 self.correlation_id = correlation_id
 self.monitor = monitor
 self.start_time = datetime.now()
 
 def __enter__(self):
 return self
 
 def __exit__(self, exc_type, exc_val, exc_tb):
 processing_time = (datetime.now() - self.start_time).total_seconds()
 
 if exc_type is None:
 self.monitor.record_success(processing_time)
 else:
 self.monitor.record_failure(processing_time)

5. Реализуйте механизм ручного восстановления

python
class ManualRecoveryService:
 @staticmethod
 def create_recovery_task(failed_step: str, correlation_id: str, context: dict):
 recovery_task = {
 "task_id": str(uuid.uuid4()),
 "failed_step": failed_step,
 "correlation_id": correlation_id,
 "context": context,
 "status": "pending",
 "created_at": datetime.now(),
 "assigned_to": None
 }
 RecoveryTaskRepository.save(recovery_task)
 return recovery_task["task_id"]
 
 @staticmethod
 def assign_recovery_task(task_id: str, admin_user: str):
 task = RecoveryTaskRepository.get_by_id(task_id)
 if task and task["status"] == "pending":
 task["assigned_to"] = admin_user
 task["status"] = "assigned"
 RecoveryTaskRepository.save(task)
 
 @staticmethod
 def complete_recovery_task(task_id: str, resolution_notes: str):
 task = RecoveryTaskRepository.get_by_id(task_id)
 if task:
 task["status"] = "completed"
 task["resolution_notes"] = resolution_notes
 task["completed_at"] = datetime.now()
 RecoveryTaskRepository.save(task)

# Использование при компенсации
def _notify_admin(self, context):
 # Создаем задачу для ручного восстановления
 task_id = ManualRecoveryService.create_recovery_task(
 failed_step="write_to_sheets",
 correlation_id=self.orchestrator.correlation_id,
 context={
 "payment_info": context,
 "step_results": [step.result for step in self.orchestrator.steps if step.status == TransactionStatus.COMPLETED]
 }
 )
 
 # Уведомляем администратора
 AdminNotificationService.send_notification(
 subject="Manual recovery required",
 message=f"Please complete recovery task {task_id} for failed transaction",
 action_url=f"/admin/recovery-tasks/{task_id}"
 )

Эти рекомендации помогут создать надежную систему обработки ошибок для контроллеров с вложенными сервисами, где традиционный откат транзакций невозможен.


Заключение и лучшие практики

Обработка ошибок в контроллерах с вложенными сервисами, когда операции невозможно откатить, требует перехода от традиционных транзакционных подходов к распределенным транзакциям на основе компенсационных механизмов.

Ключевые выводы:

  1. Выберите подходящий паттерн:
  • TCC (Try-Confirm-Cancel) - подходит для сценариев с четкими фазами операций
  • Circuit Choreography - хорошо для сложных цепочек с разными уровнями восстановления
  • Saga - идеален для асинхронной обработки и долгих транзакций
  1. Внедрите надежную систему компенсации:
  • Идите в обратном порядке при компенсации
  • Обрабатывайте ошибки компенсации отдельно
  • Сохраняйте состояние для ручного восстановления
  1. Защищайте систему от каскадных сбоев:
  • Используйте Circuit Breaker
  • Внедряйте таймауты и повторные попытки
  • Мониторьте метрики производительности
  1. Обеспечьте видимость и возможность восстановления:
  • Используйте Correlation ID для отслеживания
  • Реализуйте систему мониторинга и оповещений
  • Создайте механизм ручного восстановления для сложных сценариев

Как отмечает Erwin Kramer, автор паттерна Circuit Choreography, важно поддерживать замкнутую цепь во все времена и иметь четкий план действий для каждого возможного сценария сбоя.

Для вашего конкретного случая с внешними API и Google Sheets рекомендуется гибридный подход, сочетающий элементы TCC и Saga паттернов, с акцентом на надежную компенсацию операций регистрации и верификации, а уведомление администратора для операций с Google Sheets.

Не забывайте, что в мире распределенных систем согласованность данных достигается не через традиционные механизмы отката, а через продуманную архитектуру компенсационных транзакций и мониторинга состояния системы.


Источники

  1. NTcc-TransactionCore — Реализация шаблона TCC для .NET Core: https://github.com/wzl-bxg/NTcc-TransactionCore
  2. Circuit Choreography — Паттерн хореографии для распределенных транзакций: https://github.com/erwinkramer/circuit-choreography
  3. Pattern Compensating Transaction — Паттерн компенсирующих транзакций: https://github.com/fabiomartineli/pattern-compensating-transaction
  4. Distributed Transactions Patterns — Руководство по паттернам распределенных транзакций: https://docs.microsoft.com/en-us/azure/architecture/patterns/transactional-message-pattern
  5. Saga Pattern Implementation — Практическая реализация паттерна Saga: https://docs.microsoft.com/en-us/azure/architecture/patterns/saga
  6. Circuit Breaker Pattern — Защита от каскадных сбоев в распределенных системах: https://docs.microsoft.com/en-us/azure/architecture/patterns/circuit-breaker
  7. Idempotency Patterns — Механизмы идемпотентности для распределенных систем: https://martinfowler.com/bliki/IdempotentOperation.html
  8. Correlation ID Pattern — Отслеживание запросов в распределенных системах: https://docs.microsoft.com/en-us/azure/architecture/patterns/correlation-id
  9. Compensation Transaction Management — Управление транзакциями компенсации: https://www.enterpriseintegrationpatterns.com/CompensatingTransaction.html
  10. Distributed System Monitoring — Мониторинг распределенных систем: https://www.researchgate.net/publication/305590286_A_Survey_of_Distributed_Systems_Monitoring_Techniques_and_Tools
wzl-bxg / Разработчик

Паттерн TCC (Try-Confirm-Cancel) предоставляет эффективное решение для управления распределенными транзакциями. Реализуется через аннотацию [Compensable], которая определяет методы подтверждения и отмены транзакций. Каждый сервис обрабатывает атомарные операции независимо, а транзакции компенсации выполняются при сбоях. Подход позволяет гарантировать согласованность данных даже при отсутствии возможности отката через механизмы компенсации, что идеально подходит для вашей ситуации с внешними API.

Erwin Kramer / Архитектор

Паттерн Circuit Choreography поддерживает замкнутую цепь с маршрутами вперед (forward), назад (backward) и восстановления (restoration). Ключевые элементы: обязательное использование Correlation ID для отслеживания связей между сервисами, уровни восстановления (Level 1 - полный откат, Level 2 - частичный откат, Level 3 - сохранение транзакции), и длительные процессы для восстановления цепи при сбоях. Интеграция с Circuit Breaker предотвращает каскадные сбои в распределенной системе, что критически важно при работе с несколькими внешними API.

Fabio Martineli Gonçalves / Разработчик

Паттерн компенсирующих транзакций позволяет выполнять действия для отката предыдущих операций при необходимости. Важные аспекты: идемпотентная обработка (гарантия уникального результата при многократном выполнении), асинхронное выполнение компенсационных транзакций, и последовательное восстановление состояния. Каждый сервис самостоятельно отвечает за обработку транзакций компенсации, так как именно он знает, как правильно откатить выполненную операцию. Для вашего случая с Google Sheets можно реализовать механизм компенсации через повторную попытку записи или уведомление администратора.

Авторы
wzl-bxg / Разработчик
Разработчик
Erwin Kramer / Архитектор
Архитектор
Fabio Martineli Gonçalves / Разработчик
Разработчик
Проверено модерацией
НейроОтветы
Модерация