Другое

Руководство по логированию потоковых ответов Spring AI

Узнайте, как правильно логировать потоковые ответы Spring AI с помощью ChatClientMessageAggregator. Исправьте проблемы с потоками и захватывайте содержимое ответов в реальном времени с помощью нашего подробного руководства.

Как правильно логировать потоковые ответы в Spring AI при использовании ChatClientMessageAggregator? Я реализую механизм логирования, похожий на SimpleLoggerAdvisor, но когда я устанавливаю точку останова в методе adviseStream, я замечаю, что фронтенд все еще получает вывод. Это указывает на то, что ответ, отправляемый фронтенду, и код, где я установил точку останова, работают в разных потоках. Вот моя текущая реализация:

java
@Override
public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest, StreamAdvisorChain streamAdvisorChain) {
    String conversationId = getConversationId(chatClientRequest.context(), ChatMemory.DEFAULT_CONVERSATION_ID);
    String prompt = chatClientRequest.prompt().getContents();
    Instant start = Instant.now();

    logger.atInfo()
            .addKeyValue("event", "ai_stream_session")
            .addKeyValue("conversationId", conversationId)
            .addKeyValue("prompt", prompt)
            .log("AI stream request logged");
    
    Flux<ChatClientResponse> chatClientResponses = streamAdvisorChain
            .nextStream(chatClientRequest);

    return new ChatClientMessageAggregator()
            .aggregateChatClientResponse(chatClientResponses, resp -> logResponse(conversationId, resp, start));
}

private void logResponse(String conversationId, ChatClientResponse response, Instant startTime) {
    try {
        ChatResponse chatResponse = response.chatResponse();
        String responseText = chatResponse.getResult().getOutput().getText();
        Instant endTime = Instant.now();
        long durationMs = Duration.between(startTime, endTime).toMillis();

        LoggingEventBuilder logBuilder = logger.atInfo()
                .addKeyValue("response", responseText)
                .addKeyValue("durationMs", durationMs);

        if (chatResponse != null && chatResponse.getMetadata() != null) {
            ChatResponseMetadata meta = chatResponse.getMetadata();
            Usage usage = meta.getUsage();

            logBuilder
                    .addKeyValue("model", meta.getModel())
                    .addKeyValue("promptTokens", usage != null ? usage.getPromptTokens() : 0)
                    .addKeyValue("completionTokens", usage != null ? usage.getCompletionTokens() : 0)
                    .addKeyValue("totalTokens", usage != null ? usage.getTotalTokens() : 0);
        }
        logBuilder.log("AI response logged");
    } catch (Exception e) {
        logger.atError()
                .addKeyValue("conversationId", conversationId)
                .addKeyValue("error", e.getMessage())
                .setCause(e)
                .log("Failed to log AI response");
    }
}

Как я могу обеспечить корректное захват содержимого потокового ответа в логировании, и что может вызывать наблюдаемую мной проблему с потоками?

Проблема с потоками, с которой вы столкнулись, является распространенной в сценариях потоковой передачи Spring AI из-за реактивного nature конвейера потоковой передачи. При использовании ChatClientMessageAggregator с логированием, аналогичным SimpleLoggerAdvisor, операции агрегации и логирования происходят на разных потоках, чем доставка потоковой передачи, что приводит к получению фронтендом данных до того, как будет достигнута точка останова вашего логирования.


Содержание


Понимание проблемы с потоками

В сценариях потоковой передачи Spring AI доставка ответов работает на реактивном конвейере, где каждый фрагмент обрабатывается независимо. Проблема с потоками, которую вы наблюдаете, возникает по следующим причинам:

  1. Поток потоковой передачи: Фактические фрагменты потокового ответа обрабатываются на потоке, управляемом реактивным конвейером (обычно виртуальный поток в современных приложениях Spring Boot)
  2. Поток агрегации: ChatClientMessageAggregator работает в другом пуле потоков для сбора и объединения фрагментов
  3. Поток логирования: Ваш метод logResponse выполняется на еще одном потоке

Как объясняется в Spring AI Reference, потоковые ответы используют концепции реактивного программирования, такие как Flux для ответов, что inherently involves переключение потоков. Когда вы устанавливаете точку останова в методе adviseStream, фронтенд уже начал получать данные с потока потоковой передачи, в то время как ваша логика агрегации/логирования все еще обрабатывается на отдельном потоке.

Ключевое понимание: “Фронтенд все еще получает вывод” потому что потоковая передача доставляет фрагменты напрямую, не дожидаясь завершения агрегации, как отмечено в обсуждении на Stack Overflow.


Правильное логирование потоковых ответов

Для правильного захвата потоковых ответов вам нужно перехватывать потоковые данные в нескольких точках:

1. Логирование на уровне потока

Логируйте каждый фрагмент по мере его получения, а не только агрегированный результат:

java
@Override
public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest, StreamAdvisorChain streamAdvisorChain) {
    String conversationId = getConversationId(chatClientRequest.context(), ChatMemory.DEFAULT_CONVERSATION_ID);
    String prompt = chatClientRequest.prompt().getContents();
    Instant start = Instant.now();

    logger.atInfo()
            .addKeyValue("event", "ai_stream_session")
            .addKeyValue("conversationId", conversationId)
            .addKeyValue("prompt", prompt)
            .log("AI stream request logged");
    
    return streamAdvisorChain.nextStream(chatClientRequest)
            .doOnNext(response -> {
                // Логируем каждый потоковый фрагмент
                String chunk = extractChunkFromResponse(response);
                logger.atDebug()
                        .addKeyValue("event", "ai_stream_chunk")
                        .addKeyValue("conversationId", conversationId)
                        .addKeyValue("chunk", chunk)
                        .log("Streaming chunk received");
            })
            .doOnComplete(() -> {
                // Логируем завершение
                Instant endTime = Instant.now();
                long durationMs = Duration.between(startTime, endTime).toMillis();
                logger.atInfo()
                        .addKeyValue("event", "ai_stream_complete")
                        .addKeyValue("conversationId", conversationId)
                        .addKeyValue("durationMs", durationMs)
                        .log("AI stream completed");
            })
            .doOnError(error -> {
                logger.atError()
                        .addKeyValue("event", "ai_stream_error")
                        .addKeyValue("conversationId", conversationId)
                        .setCause(error)
                        .log("AI stream failed");
            });
}

2. Логирование агрегированного ответа

Сохраняйте вашу агрегацию, но логируйте с правильным контекстом:

java
private void logResponse(String conversationId, ChatClientResponse response, Instant startTime) {
    try {
        ChatResponse chatResponse = response.chatResponse();
        String responseText = chatResponse.getResult().getOutput().getText();
        Instant endTime = Instant.now();
        long durationMs = Duration.between(startTime, endTime).toMillis();

        // MDC (Mapped Diagnostic Context) для потокобезопасного логирования
        MDC.put("conversationId", conversationId);
        MDC.put("event", "aggregated_response");
        
        LoggingEventBuilder logBuilder = logger.atInfo()
                .addKeyValue("response", responseText)
                .addKeyValue("durationMs", durationMs);

        // Добавляем метаданные, если доступны
        if (chatResponse != null && chatResponse.getMetadata() != null) {
            ChatResponseMetadata meta = chatResponse.getMetadata();
            Usage usage = meta.getUsage();

            logBuilder
                    .addKeyValue("model", meta.getModel())
                    .addKeyValue("promptTokens", usage != null ? usage.getPromptTokens() : 0)
                    .addKeyValue("completionTokens", usage != null ? usage.getCompletionTokens() : 0)
                    .addKeyValue("totalTokens", usage != null ? usage.getTotalTokens() : 0);
        }
        logBuilder.log("Aggregated AI response logged");
        
        // Очищаем MDC
        MDC.clear();
    } catch (Exception e) {
        MDC.put("conversationId", conversationId);
        MDC.put("event", "logging_error");
        logger.atError()
                .addKeyValue("error", e.getMessage())
                .setCause(e)
                .log("Failed to log AI response");
        MDC.clear();
    }
}

Поведение ChatClientMessageAggregator

ChatClientMessageAggregator разработан для сбора потоковых фрагментов и предоставления полного ответа. Согласно исследованиям, он ведет себя следующим образом:

  1. Собирает фрагменты: Собирает все потоковые фрагменты ответа
  2. Агрегирует содержимое: Объединяет фрагменты в полный ответ
  3. Выполняет обратный вызов: Вызывает ваш предоставленный обратный вызов с агрегированным результатом
  4. Переключение потоков: Обычно выполняет обратный вызов на другом потоке, чем поток потоковой передачи

Важное замечание: Как упоминается в документации Spring AI, “Потоковые советники обрабатывают запросы и ответы как непрерывные потоки, используя концепции реактивного программирования (например, Flux для ответов)”.

Это переключение потоков является нормальным поведением в реактивном программировании, но требует особого внимания для логирования.


Альтернативные подходы к логированию

1. Прямое использование SimpleLoggerAdvisor

Самый простой подход - использовать встроенный в Spring AI SimpleLoggerAdvisor:

java
@Bean
public ChatClient chatClient(ChatClient.Builder builder) {
    return builder
            .defaultAdvisors(
                    new SimpleLoggerAdvisor(),
                    // Добавьте другие советники
                    new QuestionAnswerAdvisor(vectorStore, SearchRequest.defaults())
            )
            .build();
}

Затем настройте логирование в вашем application.yml:

yaml
logging:
  level:
    org.springframework.ai.chat.client.advisor: DEBUG
    org.springframework.ai.chat.client: DEBUG

2. Пользовательский потоковый советник с осведомленным о потоках логированием

java
@Component
public class StreamAwareLoggingAdvisor implements StreamAroundAdvisor {

    private static final Logger logger = LoggerFactory.getLogger(StreamAwareLoggingAdvisor.class);

    @Override
    public Flux<ChatClientResponse> adviseStream(ChatClientRequest request, StreamAdvisorChain chain) {
        String conversationId = getConversationId(request.context(), ChatMemory.DEFAULT_CONVERSATION_ID);
        String prompt = request.prompt().getContents();
        Instant start = Instant.now();
        
        // Захватываем контекст потока
        String threadName = Thread.currentThread().getName();
        
        return chain.nextStream(request)
                .map(response -> {
                    // Логируем на том же потоке, что и потоковая передача
                    logStreamingChunk(conversationId, response, threadName);
                    return response;
                })
                .doOnComplete(() -> {
                    logStreamCompletion(conversationId, start, threadName);
                })
                .doOnError(error -> {
                    logStreamError(conversationId, error, threadName);
                });
    }

    private void logStreamingChunk(String conversationId, ChatClientResponse response, String threadName) {
        try {
            String chunk = extractChunkFromResponse(response);
            logger.atDebug()
                    .addKeyValue("thread", threadName)
                    .addKeyValue("conversationId", conversationId)
                    .addKeyValue("event", "stream_chunk")
                    .addKeyValue("chunk", chunk)
                    .log("Streaming chunk processed");
        } catch (Exception e) {
            logger.atError()
                    .addKeyValue("thread", threadName)
                    .addKeyValue("conversationId", conversationId)
                    .addKeyValue("error", e.getMessage())
                    .log("Error logging streaming chunk");
        }
    }
    
    // Другие методы логирования...
}

3. Интеграция с реактивным конвейером

Для более сложных сценариев интегрируйте логирование непосредственно в реактивный конвейер:

java
@Override
public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest, StreamAdvisorChain streamAdvisorChain) {
    String conversationId = getConversationId(chatClientRequest.context(), ChatMemory.DEFAULT_CONVERSATION_ID);
    String prompt = chatClientRequest.prompt().getContents();
    Instant start = Instant.now();

    return streamAdvisorChain.nextStream(chatClientRequest)
            .checkpoint("Stream logging checkpoint")
            .materialize() // Преобразуем в Signal для лучшего контроля
            .map(signal -> {
                if (signal.isOnNext()) {
                    ChatClientResponse response = signal.get();
                    logStreamingChunk(conversationId, response);
                    return response;
                } else if (signal.isOnError()) {
                    logError(conversationId, signal.getThrowable());
                    throw signal.getThrowable();
                } else {
                    logCompletion(conversationId, start);
                    return signal.get(); // Это не будет вызвано для завершения
                }
            })
            .dematerialize();
}

Лучшие практики для логирования потоковой передачи

1. Осведомленное о потоках логирование

  • Используйте MDC (Mapped Diagnostic Context) для потокобезопасного логирования
  • Включайте информацию о потоке в сообщения логирования для отладки
  • Рассмотрите использование SLF4J MDC для корреляции логов по потокам

2. Вопросы производительности

  • Уровни логирования: Используйте DEBUG для потоковых фрагментов, INFO для агрегированных ответов
  • Асинхронное логирование: Рассмотрите асинхронные аппендеры для высокочастотного логирования потоков
  • Сэмплирование: В продакшене рассмотрите сэмплирование потоковых логов для избежания влияния на производительность

3. Обработка ошибок

  • Реализуйте надежную обработку ошибок в обратных вызовах потоковой передачи
  • Логируйте ошибки с соответствующим контекстом (ID диалога, запрос и т.д.)
  • Рассмотрите механизмы отката для сбоев логирования

4. Конфигурация

  • Используйте конфигурации логирования, специфичные для профилей
  • Включайте/отключайте логирование потоковой передачи на основе окружения
  • Рассмотрите структурированное логирование (JSON формат) для более легкого парсинга

5. Мониторинг

  • Добавьте метрики для производительности потоковой передачи
  • Мониторьте влияние производительности логирования
  • Настройте оповещения для сбоев логирования

Полное решение для реализации

Вот комплексная реализация, которая решает проблему с потоками:

java
@Component
public class StreamingLoggingAdvisor implements StreamAroundAdvisor {

    private static final Logger logger = LoggerFactory.getLogger(StreamingLoggingAdvisor.class);
    
    private final ThreadLocal<String> conversationThreadLocal = new ThreadLocal<>();

    @Override
    public Flux<ChatClientResponse> adviseStream(ChatClientRequest request, StreamAdvisorChain chain) {
        String conversationId = getConversationId(request.context(), ChatMemory.DEFAULT_CONVERSATION_ID);
        String prompt = request.prompt().getContents();
        Instant start = Instant.now();

        // Устанавливаем контекст потока
        conversationThreadLocal.set(conversationId);
        
        logger.atInfo()
                .addKeyValue("conversationId", conversationId)
                .addKeyValue("prompt", prompt)
                .addKeyValue("thread", Thread.currentThread().getName())
                .log("AI streaming request started");

        return chain.nextStream(request)
                .doOnNext(response -> {
                    logStreamingChunk(response);
                })
                .doOnComplete(() -> {
                    logStreamCompletion(start);
                })
                .doOnError(error -> {
                    logStreamError(error);
                })
                .doFinally(signalType -> {
                    conversationThreadLocal.remove();
                });
    }

    private void logStreamingChunk(ChatClientResponse response) {
        String conversationId = conversationThreadLocal.get();
        if (conversationId == null) return;

        try {
            String chunk = extractChunkFromResponse(response);
            logger.atDebug()
                    .addKeyValue("conversationId", conversationId)
                    .addKeyValue("thread", Thread.currentThread().getName())
                    .addKeyValue("event", "stream_chunk")
                    .addKeyValue("chunk", chunk)
                    .log("Streaming chunk received");
        } catch (Exception e) {
            logger.atError()
                    .addKeyValue("conversationId", conversationId)
                    .addKeyValue("thread", Thread.currentThread().getName())
                    .addKeyValue("error", e.getMessage())
                    .log("Error processing streaming chunk");
        }
    }

    private void logStreamCompletion(Instant startTime) {
        String conversationId = conversationThreadLocal.get();
        if (conversationId == null) return;

        Instant endTime = Instant.now();
        long durationMs = Duration.between(startTime, endTime).toMillis();

        logger.atInfo()
                .addKeyValue("conversationId", conversationId)
                .addKeyValue("thread", Thread.currentThread().getName())
                .addKeyValue("durationMs", durationMs)
                .addKeyValue("event", "stream_complete")
                .log("AI streaming completed");
    }

    private void logStreamError(Throwable error) {
        String conversationId = conversationThreadLocal.get();
        if (conversationId == null) return;

        logger.atError()
                .addKeyValue("conversationId", conversationId)
                .addKeyValue("thread", Thread.currentThread().getName())
                .addKeyValue("error", error.getMessage())
                .setCause(error)
                .addKeyValue("event", "stream_error")
                .log("AI streaming failed");
    }

    private String extractChunkFromResponse(ChatClientResponse response) {
        return response.chatResponse()
                .getResult()
                .getOutput()
                .getText();
    }

    private String getConversationId(Context context, String defaultId) {
        // Ваша реализация для получения ID диалога
        return defaultId;
    }
}

Для использования этого с ChatClientMessageAggregator для логирования полного ответа:

java
@Bean
public ChatClient chatClient(ChatClient.Builder builder) {
    return builder
            .defaultAdvisors(
                    new StreamingLoggingAdvisor(),
                    new QuestionAnswerAdvisor(vectorStore, SearchRequest.defaults())
            )
            .build();
}

// В вашем сервисном слое
public Mono<String> getStreamingResponse(String userQuery) {
    return chatClient.prompt()
            .user(userQuery)
            .stream()
            .collectList() // Собираем все фрагменты
            .map(chunks -> {
                String fullResponse = chunks.stream()
                        .map(response -> response.chatResponse().getResult().getOutput().getText())
                        .collect(Collectors.joining());
                
                // Логируем полный ответ
                streamingLoggingAdvisor.logAggregatedResponse(fullResponse);
                return fullResponse;
            });
}

Заключение

Проблема с потоками, с которой вы столкнулись, является естественным следствием реактивной архитектуры потоковой передачи Spring AI. Для правильного логирования потоковых ответов:

  1. Понимайте реактивный конвейер: Потоковая передача использует Flux с inherent переключением потоков
  2. Логируйте в нескольких точках: Как во время потоковой передачи, так и после агрегации
  3. Используйте осведомленное о потоках логирование: Реализуйте MDC или потоковое хранилище для корреляции контекста
  4. Учитывайте производительность: Используйте соответствующие уровни логирования и рассмотрите асинхронное логирование
  5. Используйте встроенные инструменты: Рассмотрите использование SimpleLoggerAdvisor для стандартных потребностей в логировании

Ключевое - понимать, что потоковая передача доставляет данные инкрементально, и ваш механизм логирования должен учитывать этот асинхронный характер, перехватывая поток в нескольких точках и правильно обрабатывая контекст потоков.


Источники

  1. Spring AI Reference - Chat Client API
  2. Spring AI Reference - Advisors API
  3. Stack Overflow - Logging streaming responses from Spring AI
  4. Spring AI SimpleLoggerAdvisor Documentation
  5. Baeldung - Streaming Response in Spring AI ChatClient
  6. BootcampToProd - Spring AI Log Model Requests and Responses
  7. Medium - Streaming LLM response in Spring AI
Авторы
Проверено модерацией
Модерация