НейроАгент

Исправление ошибки 'Stream was already consumed' в Azure Functions с YARP

Решение ошибки 'Stream was already consumed' в Azure Functions с обратным прокси YARP. Узнайте правильные методы обработки потоков и конфигурации промежуточного ПО.

Azure Functions CreateLessonPlan API завершается с ошибкой “Поток уже был использован” при перенаправлении через прокси YARP

Я столкнулся с конкретной проблемой с конечной точкой CreateLessonPlan в моем приложении Azure Function, которая завершается с ошибкой “Поток уже был использован” при проксировании с хоста на воркер. Все остальные API в том же проекте работают корректно.

Сведения о конечной точке:

Тело запроса:

json
{
  "SubjectId": 1,
  "TeacherId": "628d0f85-6915-4df6-b769-c636e433abab",
  "School": "Trường THPT Kim Sơn B",
  "Class": "12A1",
  "LessonTitle": "Bài 1. Định nghĩa – Quy tắc tính đạo hàm",
  "Time": "1 tiết"
}

Наблюдаемое поведение:

  1. Функциональный хост вызывает Functions.CreateLessonPlan
  2. Аутентификация запроса успешна, но userId и role равны null
  3. Прокси пытается повторить 10 раз перед отказом
  4. В ошибке указано, что поток запроса уже был использован

Логи ошибок:

System.Net.Http.HttpRequestException: An error occurred while sending the request.
 ---> System.InvalidOperationException: Stream was already consumed.
  at Yarp.ReverseProxy.Forwarder.StreamCopyHttpContent.SerializeToStreamAsync(...)

Ожидаемое поведение:
API должен выполняться нормально и возвращать сгенерированный ответ с планом урока, аналогично другим конечным точкам в проекте.

Дополнительный контекст:

  • Сообщения о предупреждениях об отсутствии десятичной точности (HasColumnType, HasPrecision и т.д.) кажутся не связанными с крахом
  • Ошибка прокси возникает сразу после аутентификации и до того, как EF Core запишет какие-либо данные
  • Эта проблема может быть вызвана повторным использованием потока тела запроса или промежуточным программным обеспечением, которое считывает поток дважды

Как можно решить ошибку “Поток уже был использован” специально для конечной точки CreateLessonPlan в Azure Functions с настройкой обратного прокси YARP?

НейроАгент

Ошибка “Stream was already consumed” в Azure Functions с YARP reverse proxy

Ошибка “Stream was already consumed” в вашем конечном точке Azure Functions CreateLessonPlan возникает, когда обратный прокси YARP пытается переслать запрос, но поток тела запроса уже был прочитан промежуточным программным обеспечением или логикой аутентификации перед проксированием. Это обычно происходит, когда пользовательское промежуточное программное обеспечение читает тело запроса для ведения журнала, проверки или целей аутентификации без правильной буферизации или клонирования потока для последующего использования. Решение включает в себя реализацию правильных методов обработки потоков в вашем промежуточном программном обеспечении и обеспечение доступности потоков запросов для процесса проксирования YARP.

Содержание

Понимание проблемы потребления потока

Ошибка “Stream was already consumed” указывает на то, что System.Net.Http.HttpRequestContent.SerializeToStreamAsync() вызывается для потока, который уже был прочитан или освобожден. В контексте работы YARP reverse proxy с Azure Functions это обычно происходит, когда:

  1. Промежуточное программное обеспечение читает тело запроса: Любой компонент промежуточного программного обеспечения, который обращается к context.Request.Body без правильной обработки потока, потребляет поток, делая его недоступным для процесса проксирования YARP.

  2. Логика аутентификации потребляет поток: Если ваше промежуточное программное обеспечение аутентификации читает тело запроса для проверки токенов или извлечения информации о пользователе, оно может оставить поток в состоянии потребления.

  3. Поведение потока в Azure Functions: Изолированная модель процессов в Azure Functions имеет специфические требования обработки потоков, которые отличаются от традиционных приложений ASP.NET Core.

Как объясняется в документации YARP от Microsoft, YARP маршрутизирует и преобразует URL-адреса и заголовки запросов, но поток тела запроса должен оставаться доступным для прозрачного проксирования. Когда поток потребляется до проксирования, YARP не может переслать данные запроса в нижестоящий сервис.


Распространенные причины в Azure Functions с YARP

Потребление потока промежуточным программным обеспечением аутентификации

Ваше наблюдение, что “userId и role равны null”, указывает на то, что логика аутентификации выполняется, но, возможно, потребляет поток запроса. Промежуточное программное обеспечение аутентификации часто необходимо читать тела запросов для проверки токенов, особенно в пользовательских схемах аутентификации.

Промежуточное программное обеспечение для чтения тела запроса для ведения журнала или проверки

Промежуточное программное обеспечение, предназначенное для ведения журналов тел запросов или выполнения проверки, часто сталкивается с этой проблемой. Как указано в руководстве Microsoft, когда инициализаторы телеметрии или промежуточное программное обеспечение читают тела запросов, они могут оставлять потоки в нерабочем состоянии.

Потребление данных формы

Как отмечено в проблеме YARP #1412, доступ к context.Request.Form потребляет тело запроса, и поскольку тело по умолчанию не буферизируется, YARP не может правильно проксировать запрос afterward.

Обработка потоков в изолированном процессе Azure Functions

Изолированная модель процессов в Azure Functions имеет уникальные характеристики потоков. Как показано в проблеме GitHub #1636, доступ к HttpRequestData.Body в промежуточном программном обеспечении требует специальной обработки для избежания проблем потребления потоков.


Шаги диагностики для выявления основной причины

1. Включите подробное ведение журнала

Добавьте комплексное ведение журнала для отслеживания времени и места доступа к телу запроса:

csharp
app.Use(async (context, next) =>
{
    var originalBody = context.Request.Body;
    Log.Information("Поток тела запроса доступен в: {StackTrace}", Environment.StackTrace);
    
    try
    {
        await next();
    }
    finally
    {
        // Сброс позиции потока, если возможно
        if (originalBody.CanSeek)
        {
            originalBody.Position = 0;
        }
    }
});

2. Проверьте порядок промежуточного программного обеспечения

Проверьте конфигурацию конвейера вашего промежуточного программного обеспечения. Промежуточное программное обеспечение аутентификации и любое промежуточное программное обеспечение для чтения тела запроса должны быть правильно расположены относительно промежуточного программного обеспечения проксирования YARP.

3. Проверка состояния потока

Добавьте промежуточное программное обеспечение для проверки состояния потока перед проксированием:

csharp
app.Use(async (context, next) =>
{
    var canRead = context.Request.Body.CanRead;
    var canSeek = context.Request.Body.CanSeek;
    var position = context.Request.Body.Position;
    
    Log.Information("Состояние потока - CanRead: {CanRead}, CanSeek: {CanSeek}, Position: {Position}", 
        canRead, canSeek, position);
    
    await next();
});

4. Тест клонирования тела запроса

Временно реализуйте клонирование тела запроса для изоляции проблемы:

csharp
app.Use(async (context, next) =>
{
    // Клонируем тело запроса
    var clonedBody = await CloneRequestBody(context.Request.Body);
    
    // Заменяем исходное тело на клон
    context.Request.Body = clonedBody;
    
    await next();
});

async Task<Stream> CloneRequestBody(Stream originalBody)
{
    var memoryStream = new MemoryStream();
    await originalBody.CopyToAsync(memoryStream);
    memoryStream.Position = 0;
    return memoryStream;
}

Решения и обходные пути

Решение 1: Реализация правильной буферизации потока

Создайте класс промежуточного программного обеспечения, который буферизирует тело запроса без его потребления:

csharp
public class StreamBufferingMiddleware
{
    private readonly RequestDelegate _next;

    public StreamBufferingMiddleware(RequestDelegate next)
    {
        _next = next;
    }

    public async Task Invoke(HttpContext context)
    {
        // Буферизуем только POST/PUT запросы с содержимым
        if (context.Request.Method == "POST" || context.Request.Method == "PUT")
        {
            context.Request.EnableBuffering();
            
            // Сохраняем исходную позицию потока
            var originalPosition = context.Request.Body.Position;
            
            try
            {
                await _next(context);
            }
            finally
            {
                // Восстанавливаем исходную позицию
                context.Request.Body.Position = originalPosition;
            }
        }
        else
        {
            await _next(context);
        }
    }
}

Решение 2: Использование элементов контекста для передачи данных

Вместо чтения тела запроса в промежуточном программном обеспечении храните данные в элементах контекста, как рекомендуется в проблеме GitHub Azure Functions #1636:

csharp
app.Use(async (context, next) =>
{
    if (context.Request.Method == "POST")
    {
        var requestBody = await new StreamReader(context.Request.Body).ReadToEndAsync();
        context.Items["RequestBody"] = requestBody;
        
        // Сбрасываем поток для последующего потребления
        context.Request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
        context.Request.Body.Position = 0;
    }
    
    await next();
});

// В вашей функции Azure
public class CreateLessonPlan
{
    [Function("CreateLessonPlan")]
    public HttpResponseData Run([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req)
    {
        var requestBody = req.HttpContext.Items["RequestBody"]?.ToString();
        // Используем сохраненное тело запроса
    }
}

Решение 3: Специфическая обработка потоков для YARP

Настройте YARP для более плавной обработки потребления потоков с помощью ForwarderRequestConfig:

csharp
services.AddReverseProxy()
    .LoadFromConfig(Configuration.GetSection("ReverseProxy"))
    .AddTransforms(transformBuilderContext =>
    {
        transformBuilderContext.RequestTransforms.Add(transformContext =>
        {
            // Убеждаемся, что тело запроса доступно для проксирования
            if (transformContext.ProxyRequest.Content != null)
            {
                transformContext.ProxyRequest.Content = new StreamCopyHttpContent(transformContext.HttpContext.Request.Body);
            }
            return ValueTask.CompletedTask;
        });
    });

Решение 4: Пользовательский обертка потока

Реализуйте обертку потока, которая позволяет множественное чтение:

csharp
public class ReusableStream : Stream
{
    private readonly Stream _originalStream;
    private readonly MemoryStream _buffer;
    private bool _buffered = false;

    public ReusableStream(Stream originalStream)
    {
        _originalStream = originalStream;
        _buffer = new MemoryStream();
    }

    public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        if (!_buffered)
        {
            _buffered = true;
            await _originalStream.CopyToAsync(_buffer);
            _buffer.Position = 0;
        }

        return await _buffer.ReadAsync(buffer, offset, count, cancellationToken);
    }

    // Реализуйте другие методы Stream...
}

Лучшие практики предотвращения

1. Руководства по проектированию промежуточного программного обеспечения

  • Избегайте чтения тел запросов, если это абсолютно необходимо
  • Используйте элементы контекста для передачи данных вместо чтения потока
  • Реализуйте правильное управление освобождением потока и позицией

2. Лучшие практики конфигурации YARP

  • Настройте соответствующие таймауты и политики повторных попыток
  • Используйте EnableBuffering() для запросов, которые могут требовать манипуляции с потоком
  • Учитывайте ограничения размера запросов для буферизации в памяти

3. Интеграция с Azure Functions

  • Используйте встроенные возможности промежуточного программного обеспечения Azure Functions
  • Используйте функции обработки потоков изолированной модели процессов
  • Рассмотрите использование новых улучшений Azure Functions в .NET 8 для лучшего управления потоками

4. Стратегия тестирования

  • Реализуйте комплексное тестирование состояния потоков
  • Добавьте модульные тесты для компонентов промежуточного программного обеспечения, взаимодействующих с потоками
  • Используйте интеграционные тесты для проверки поведения проксирования YARP с разными типами запросов

Примеры реализации

Полное решение промежуточного программного обеспечения

Вот полная реализация промежуточного программного обеспечения, которая обрабатывает потребление потоков:

csharp
public class SafeStreamMiddleware
{
    private readonly RequestDelegate _next;

    public SafeStreamMiddleware(RequestDelegate next)
    {
        _next = next;
    }

    public async Task Invoke(HttpContext context)
    {
        // Обрабатываем только POST/PUT запросы с JSON-содержимым
        if (ShouldBufferRequest(context))
        {
            await BufferAndProcessRequest(context);
        }
        else
        {
            await _next(context);
        }
    }

    private bool ShouldBufferRequest(HttpContext context)
    {
        return (context.Request.Method == "POST" || context.Request.Method == "PUT") &&
               context.Request.ContentType?.Contains("application/json") == true;
    }

    private async Task BufferAndProcessRequest(HttpContext context)
    {
        // Включаем буферизацию
        context.Request.EnableBuffering();
        
        // Читаем тело
        var requestBody = await new StreamReader(context.Request.Body).ReadToEndAsync();
        
        // Сохраняем в элементах контекста
        context.Items["RequestBody"] = requestBody;
        
        // Сбрасываем позицию потока
        context.Request.Body.Position = 0;
        
        try
        {
            await _next(context);
        }
        catch
        {
            // Очищаем ресурсы
            context.Request.Body.Dispose();
            throw;
        }
    }
}

// Регистрация в Program.cs
app.UseMiddleware<SafeStreamMiddleware>();

Функция Azure с безопасностью потоков

Реализуйте вашу функцию Azure с учетом соображений безопасности потоков:

csharp
public class CreateLessonPlan
{
    [Function("CreateLessonPlan")]
    public HttpResponseData Run(
        [HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req)
    {
        try
        {
            // Получаем тело запроса из элементов контекста (установлено промежуточным ПО)
            var requestBody = req.HttpContext.Items["RequestBody"]?.ToString();
            
            if (string.IsNullOrEmpty(requestBody))
            {
                // Запасной вариант: чтение напрямую, если промежуточное ПО не выполнялось
                requestBody = new StreamReader(req.Body).ReadToEnd();
            }

            var lessonPlan = JsonSerializer.Deserialize<LessonPlan>(requestBody);
            
            // Обрабатываем план урока
            var result = ProcessLessonPlan(lessonPlan);
            
            var response = req.CreateResponse(HttpStatusCode.OK);
            response.Headers.Add("Content-Type", "application/json; charset=utf-8");
            response.WriteString(JsonSerializer.Serialize(result));
            
            return response;
        }
        catch (Exception ex)
        {
            var response = req.CreateResponse(HttpStatusCode.InternalServerError);
            response.WriteString($"Ошибка обработки плана урока: {ex.Message}");
            return response;
        }
    }

    private LessonPlanResult ProcessLessonPlan(LessonPlan plan)
    {
        // Ваша бизнес-логика здесь
        return new LessonPlanResult { 
            Id = Guid.NewGuid(),
            Success = true,
            Message = "План урока успешно создан"
        };
    }
}

public class LessonPlan
{
    public int SubjectId { get; set; }
    public string TeacherId { get; set; }
    public string School { get; set; }
    public string Class { get; set; }
    public string LessonTitle { get; set; }
    public string Time { get; set; }
}

public class LessonPlanResult
{
    public Guid Id { get; set; }
    public bool Success { get; set; }
    public string Message { get; set; }
}

Конфигурация YARP с обработкой потоков

Настройте YARP для лучшей обработки потребления потоков:

json
{
  "ReverseProxy": {
    "Routes": {
      "lessonplan-route": {
        "ClusterId": "lessonplan-cluster",
        "Match": {
          "Path": "/api/lessonplans",
          "Methods": ["POST"]
        },
        "Transforms": [
          { "RequestHeaderRemove": "Content-Length" },
          { "RequestHeaderRemove": "Transfer-Encoding" }
        ]
      }
    },
    "Clusters": {
      "lessonplan-cluster": {
        "Destinations": {
          "destination1": {
            "Address": "https://localhost:7071/"
          }
        }
      }
    }
  }
}

Этот комплексный подход должен решить ошибку “Stream was already consumed” путем реализации правильной обработки потоков, использования элементов контекста для передачи данных и обеспечения доступности потока тела запроса для YARP при необходимости.

Источники

  1. YARP Extensibility - Request and Response Transforms | Microsoft Learn
  2. Stream was already consumed errors on unstable network (HTTP 502) · Issue #2022 · microsoft/reverse-proxy
  3. Timeout when request body is consumed before proxying · Issue #1412 · microsoft/reverse-proxy
  4. Accessing HttpRequestData.Body in middleware and triggered function · Issue #1636 · Azure/azure-functions-dotnet-worker
  5. Log request body in Azure Functions · Issue #1280 · microsoft/ApplicationInsights-dotnet
  6. Reading (https) response body in yarp’s middleware - Stack Overflow
  7. Azure Function Middleware: Request and Response Body Retrieval

Заключение

Ошибка “Stream was already consumed” в вашем конечном точке Azure Functions CreateLessonPlan с обратным прокси YARP является распространенной проблемой потоков, которая может быть решена за счет правильного проектирования промежуточного программного обеспечения и обработки потоков. Ключевые выводы включают:

  1. Выявите основную причину, добавив подробное ведение журнала для отслеживания времени и места потребления потоков запросов.

  2. Реализуйте правильную буферизацию потоков с помощью context.Request.EnableBuffering() и убедитесь, что позиции потоков сбрасываются после чтения.

  3. Используйте элементы контекста для передачи данных вместо прямого чтения тел запросов в промежуточном программном обеспечении, как рекомендовано лучшими практиками Azure Functions.

  4. Правильно настройте YARP, удалив заголовки Content-Length и Transfer-Encoding, когда потоки могут быть изменены.

  5. Тщательно тестируйте с разными сценариями запросов, чтобы убедиться, что обработка потоков работает корректно для всех конечных точек.

Следуя этим рекомендациям, вы можете решить проблему потребления потока, сохраняя функциональность вашего API CreateLessonPlan и обеспечивая правильную работу всех остальных конечных точек. Решение сочетает правильное управление потоками со специфическими требованиями Azure Functions и интеграции с обратным прокси YARP.