Руководство по логированию потоковых ответов Spring AI
Узнайте, как правильно логировать потоковые ответы Spring AI с помощью ChatClientMessageAggregator. Исправьте проблемы с потоками и захватывайте содержимое ответов в реальном времени с помощью нашего подробного руководства.
Как правильно логировать потоковые ответы в Spring AI при использовании ChatClientMessageAggregator? Я реализую механизм логирования, похожий на SimpleLoggerAdvisor, но когда я устанавливаю точку останова в методе adviseStream, я замечаю, что фронтенд все еще получает вывод. Это указывает на то, что ответ, отправляемый фронтенду, и код, где я установил точку останова, работают в разных потоках. Вот моя текущая реализация:
@Override
public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest, StreamAdvisorChain streamAdvisorChain) {
String conversationId = getConversationId(chatClientRequest.context(), ChatMemory.DEFAULT_CONVERSATION_ID);
String prompt = chatClientRequest.prompt().getContents();
Instant start = Instant.now();
logger.atInfo()
.addKeyValue("event", "ai_stream_session")
.addKeyValue("conversationId", conversationId)
.addKeyValue("prompt", prompt)
.log("AI stream request logged");
Flux<ChatClientResponse> chatClientResponses = streamAdvisorChain
.nextStream(chatClientRequest);
return new ChatClientMessageAggregator()
.aggregateChatClientResponse(chatClientResponses, resp -> logResponse(conversationId, resp, start));
}
private void logResponse(String conversationId, ChatClientResponse response, Instant startTime) {
try {
ChatResponse chatResponse = response.chatResponse();
String responseText = chatResponse.getResult().getOutput().getText();
Instant endTime = Instant.now();
long durationMs = Duration.between(startTime, endTime).toMillis();
LoggingEventBuilder logBuilder = logger.atInfo()
.addKeyValue("response", responseText)
.addKeyValue("durationMs", durationMs);
if (chatResponse != null && chatResponse.getMetadata() != null) {
ChatResponseMetadata meta = chatResponse.getMetadata();
Usage usage = meta.getUsage();
logBuilder
.addKeyValue("model", meta.getModel())
.addKeyValue("promptTokens", usage != null ? usage.getPromptTokens() : 0)
.addKeyValue("completionTokens", usage != null ? usage.getCompletionTokens() : 0)
.addKeyValue("totalTokens", usage != null ? usage.getTotalTokens() : 0);
}
logBuilder.log("AI response logged");
} catch (Exception e) {
logger.atError()
.addKeyValue("conversationId", conversationId)
.addKeyValue("error", e.getMessage())
.setCause(e)
.log("Failed to log AI response");
}
}
Как я могу обеспечить корректное захват содержимого потокового ответа в логировании, и что может вызывать наблюдаемую мной проблему с потоками?
Проблема с потоками, с которой вы столкнулись, является распространенной в сценариях потоковой передачи Spring AI из-за реактивного nature конвейера потоковой передачи. При использовании ChatClientMessageAggregator с логированием, аналогичным SimpleLoggerAdvisor, операции агрегации и логирования происходят на разных потоках, чем доставка потоковой передачи, что приводит к получению фронтендом данных до того, как будет достигнута точка останова вашего логирования.
Содержание
- Понимание проблемы с потоками
- Правильное логирование потоковых ответов
- Поведение ChatClientMessageAggregator
- Альтернативные подходы к логированию
- Лучшие практики для логирования потоковой передачи
- Полное решение для реализации
Понимание проблемы с потоками
В сценариях потоковой передачи Spring AI доставка ответов работает на реактивном конвейере, где каждый фрагмент обрабатывается независимо. Проблема с потоками, которую вы наблюдаете, возникает по следующим причинам:
- Поток потоковой передачи: Фактические фрагменты потокового ответа обрабатываются на потоке, управляемом реактивным конвейером (обычно виртуальный поток в современных приложениях Spring Boot)
- Поток агрегации:
ChatClientMessageAggregatorработает в другом пуле потоков для сбора и объединения фрагментов - Поток логирования: Ваш метод
logResponseвыполняется на еще одном потоке
Как объясняется в Spring AI Reference, потоковые ответы используют концепции реактивного программирования, такие как Flux для ответов, что inherently involves переключение потоков. Когда вы устанавливаете точку останова в методе adviseStream, фронтенд уже начал получать данные с потока потоковой передачи, в то время как ваша логика агрегации/логирования все еще обрабатывается на отдельном потоке.
Ключевое понимание: “Фронтенд все еще получает вывод” потому что потоковая передача доставляет фрагменты напрямую, не дожидаясь завершения агрегации, как отмечено в обсуждении на Stack Overflow.
Правильное логирование потоковых ответов
Для правильного захвата потоковых ответов вам нужно перехватывать потоковые данные в нескольких точках:
1. Логирование на уровне потока
Логируйте каждый фрагмент по мере его получения, а не только агрегированный результат:
@Override
public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest, StreamAdvisorChain streamAdvisorChain) {
String conversationId = getConversationId(chatClientRequest.context(), ChatMemory.DEFAULT_CONVERSATION_ID);
String prompt = chatClientRequest.prompt().getContents();
Instant start = Instant.now();
logger.atInfo()
.addKeyValue("event", "ai_stream_session")
.addKeyValue("conversationId", conversationId)
.addKeyValue("prompt", prompt)
.log("AI stream request logged");
return streamAdvisorChain.nextStream(chatClientRequest)
.doOnNext(response -> {
// Логируем каждый потоковый фрагмент
String chunk = extractChunkFromResponse(response);
logger.atDebug()
.addKeyValue("event", "ai_stream_chunk")
.addKeyValue("conversationId", conversationId)
.addKeyValue("chunk", chunk)
.log("Streaming chunk received");
})
.doOnComplete(() -> {
// Логируем завершение
Instant endTime = Instant.now();
long durationMs = Duration.between(startTime, endTime).toMillis();
logger.atInfo()
.addKeyValue("event", "ai_stream_complete")
.addKeyValue("conversationId", conversationId)
.addKeyValue("durationMs", durationMs)
.log("AI stream completed");
})
.doOnError(error -> {
logger.atError()
.addKeyValue("event", "ai_stream_error")
.addKeyValue("conversationId", conversationId)
.setCause(error)
.log("AI stream failed");
});
}
2. Логирование агрегированного ответа
Сохраняйте вашу агрегацию, но логируйте с правильным контекстом:
private void logResponse(String conversationId, ChatClientResponse response, Instant startTime) {
try {
ChatResponse chatResponse = response.chatResponse();
String responseText = chatResponse.getResult().getOutput().getText();
Instant endTime = Instant.now();
long durationMs = Duration.between(startTime, endTime).toMillis();
// MDC (Mapped Diagnostic Context) для потокобезопасного логирования
MDC.put("conversationId", conversationId);
MDC.put("event", "aggregated_response");
LoggingEventBuilder logBuilder = logger.atInfo()
.addKeyValue("response", responseText)
.addKeyValue("durationMs", durationMs);
// Добавляем метаданные, если доступны
if (chatResponse != null && chatResponse.getMetadata() != null) {
ChatResponseMetadata meta = chatResponse.getMetadata();
Usage usage = meta.getUsage();
logBuilder
.addKeyValue("model", meta.getModel())
.addKeyValue("promptTokens", usage != null ? usage.getPromptTokens() : 0)
.addKeyValue("completionTokens", usage != null ? usage.getCompletionTokens() : 0)
.addKeyValue("totalTokens", usage != null ? usage.getTotalTokens() : 0);
}
logBuilder.log("Aggregated AI response logged");
// Очищаем MDC
MDC.clear();
} catch (Exception e) {
MDC.put("conversationId", conversationId);
MDC.put("event", "logging_error");
logger.atError()
.addKeyValue("error", e.getMessage())
.setCause(e)
.log("Failed to log AI response");
MDC.clear();
}
}
Поведение ChatClientMessageAggregator
ChatClientMessageAggregator разработан для сбора потоковых фрагментов и предоставления полного ответа. Согласно исследованиям, он ведет себя следующим образом:
- Собирает фрагменты: Собирает все потоковые фрагменты ответа
- Агрегирует содержимое: Объединяет фрагменты в полный ответ
- Выполняет обратный вызов: Вызывает ваш предоставленный обратный вызов с агрегированным результатом
- Переключение потоков: Обычно выполняет обратный вызов на другом потоке, чем поток потоковой передачи
Важное замечание: Как упоминается в документации Spring AI, “Потоковые советники обрабатывают запросы и ответы как непрерывные потоки, используя концепции реактивного программирования (например, Flux для ответов)”.
Это переключение потоков является нормальным поведением в реактивном программировании, но требует особого внимания для логирования.
Альтернативные подходы к логированию
1. Прямое использование SimpleLoggerAdvisor
Самый простой подход - использовать встроенный в Spring AI SimpleLoggerAdvisor:
@Bean
public ChatClient chatClient(ChatClient.Builder builder) {
return builder
.defaultAdvisors(
new SimpleLoggerAdvisor(),
// Добавьте другие советники
new QuestionAnswerAdvisor(vectorStore, SearchRequest.defaults())
)
.build();
}
Затем настройте логирование в вашем application.yml:
logging:
level:
org.springframework.ai.chat.client.advisor: DEBUG
org.springframework.ai.chat.client: DEBUG
2. Пользовательский потоковый советник с осведомленным о потоках логированием
@Component
public class StreamAwareLoggingAdvisor implements StreamAroundAdvisor {
private static final Logger logger = LoggerFactory.getLogger(StreamAwareLoggingAdvisor.class);
@Override
public Flux<ChatClientResponse> adviseStream(ChatClientRequest request, StreamAdvisorChain chain) {
String conversationId = getConversationId(request.context(), ChatMemory.DEFAULT_CONVERSATION_ID);
String prompt = request.prompt().getContents();
Instant start = Instant.now();
// Захватываем контекст потока
String threadName = Thread.currentThread().getName();
return chain.nextStream(request)
.map(response -> {
// Логируем на том же потоке, что и потоковая передача
logStreamingChunk(conversationId, response, threadName);
return response;
})
.doOnComplete(() -> {
logStreamCompletion(conversationId, start, threadName);
})
.doOnError(error -> {
logStreamError(conversationId, error, threadName);
});
}
private void logStreamingChunk(String conversationId, ChatClientResponse response, String threadName) {
try {
String chunk = extractChunkFromResponse(response);
logger.atDebug()
.addKeyValue("thread", threadName)
.addKeyValue("conversationId", conversationId)
.addKeyValue("event", "stream_chunk")
.addKeyValue("chunk", chunk)
.log("Streaming chunk processed");
} catch (Exception e) {
logger.atError()
.addKeyValue("thread", threadName)
.addKeyValue("conversationId", conversationId)
.addKeyValue("error", e.getMessage())
.log("Error logging streaming chunk");
}
}
// Другие методы логирования...
}
3. Интеграция с реактивным конвейером
Для более сложных сценариев интегрируйте логирование непосредственно в реактивный конвейер:
@Override
public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest, StreamAdvisorChain streamAdvisorChain) {
String conversationId = getConversationId(chatClientRequest.context(), ChatMemory.DEFAULT_CONVERSATION_ID);
String prompt = chatClientRequest.prompt().getContents();
Instant start = Instant.now();
return streamAdvisorChain.nextStream(chatClientRequest)
.checkpoint("Stream logging checkpoint")
.materialize() // Преобразуем в Signal для лучшего контроля
.map(signal -> {
if (signal.isOnNext()) {
ChatClientResponse response = signal.get();
logStreamingChunk(conversationId, response);
return response;
} else if (signal.isOnError()) {
logError(conversationId, signal.getThrowable());
throw signal.getThrowable();
} else {
logCompletion(conversationId, start);
return signal.get(); // Это не будет вызвано для завершения
}
})
.dematerialize();
}
Лучшие практики для логирования потоковой передачи
1. Осведомленное о потоках логирование
- Используйте MDC (Mapped Diagnostic Context) для потокобезопасного логирования
- Включайте информацию о потоке в сообщения логирования для отладки
- Рассмотрите использование SLF4J MDC для корреляции логов по потокам
2. Вопросы производительности
- Уровни логирования: Используйте DEBUG для потоковых фрагментов, INFO для агрегированных ответов
- Асинхронное логирование: Рассмотрите асинхронные аппендеры для высокочастотного логирования потоков
- Сэмплирование: В продакшене рассмотрите сэмплирование потоковых логов для избежания влияния на производительность
3. Обработка ошибок
- Реализуйте надежную обработку ошибок в обратных вызовах потоковой передачи
- Логируйте ошибки с соответствующим контекстом (ID диалога, запрос и т.д.)
- Рассмотрите механизмы отката для сбоев логирования
4. Конфигурация
- Используйте конфигурации логирования, специфичные для профилей
- Включайте/отключайте логирование потоковой передачи на основе окружения
- Рассмотрите структурированное логирование (JSON формат) для более легкого парсинга
5. Мониторинг
- Добавьте метрики для производительности потоковой передачи
- Мониторьте влияние производительности логирования
- Настройте оповещения для сбоев логирования
Полное решение для реализации
Вот комплексная реализация, которая решает проблему с потоками:
@Component
public class StreamingLoggingAdvisor implements StreamAroundAdvisor {
private static final Logger logger = LoggerFactory.getLogger(StreamingLoggingAdvisor.class);
private final ThreadLocal<String> conversationThreadLocal = new ThreadLocal<>();
@Override
public Flux<ChatClientResponse> adviseStream(ChatClientRequest request, StreamAdvisorChain chain) {
String conversationId = getConversationId(request.context(), ChatMemory.DEFAULT_CONVERSATION_ID);
String prompt = request.prompt().getContents();
Instant start = Instant.now();
// Устанавливаем контекст потока
conversationThreadLocal.set(conversationId);
logger.atInfo()
.addKeyValue("conversationId", conversationId)
.addKeyValue("prompt", prompt)
.addKeyValue("thread", Thread.currentThread().getName())
.log("AI streaming request started");
return chain.nextStream(request)
.doOnNext(response -> {
logStreamingChunk(response);
})
.doOnComplete(() -> {
logStreamCompletion(start);
})
.doOnError(error -> {
logStreamError(error);
})
.doFinally(signalType -> {
conversationThreadLocal.remove();
});
}
private void logStreamingChunk(ChatClientResponse response) {
String conversationId = conversationThreadLocal.get();
if (conversationId == null) return;
try {
String chunk = extractChunkFromResponse(response);
logger.atDebug()
.addKeyValue("conversationId", conversationId)
.addKeyValue("thread", Thread.currentThread().getName())
.addKeyValue("event", "stream_chunk")
.addKeyValue("chunk", chunk)
.log("Streaming chunk received");
} catch (Exception e) {
logger.atError()
.addKeyValue("conversationId", conversationId)
.addKeyValue("thread", Thread.currentThread().getName())
.addKeyValue("error", e.getMessage())
.log("Error processing streaming chunk");
}
}
private void logStreamCompletion(Instant startTime) {
String conversationId = conversationThreadLocal.get();
if (conversationId == null) return;
Instant endTime = Instant.now();
long durationMs = Duration.between(startTime, endTime).toMillis();
logger.atInfo()
.addKeyValue("conversationId", conversationId)
.addKeyValue("thread", Thread.currentThread().getName())
.addKeyValue("durationMs", durationMs)
.addKeyValue("event", "stream_complete")
.log("AI streaming completed");
}
private void logStreamError(Throwable error) {
String conversationId = conversationThreadLocal.get();
if (conversationId == null) return;
logger.atError()
.addKeyValue("conversationId", conversationId)
.addKeyValue("thread", Thread.currentThread().getName())
.addKeyValue("error", error.getMessage())
.setCause(error)
.addKeyValue("event", "stream_error")
.log("AI streaming failed");
}
private String extractChunkFromResponse(ChatClientResponse response) {
return response.chatResponse()
.getResult()
.getOutput()
.getText();
}
private String getConversationId(Context context, String defaultId) {
// Ваша реализация для получения ID диалога
return defaultId;
}
}
Для использования этого с ChatClientMessageAggregator для логирования полного ответа:
@Bean
public ChatClient chatClient(ChatClient.Builder builder) {
return builder
.defaultAdvisors(
new StreamingLoggingAdvisor(),
new QuestionAnswerAdvisor(vectorStore, SearchRequest.defaults())
)
.build();
}
// В вашем сервисном слое
public Mono<String> getStreamingResponse(String userQuery) {
return chatClient.prompt()
.user(userQuery)
.stream()
.collectList() // Собираем все фрагменты
.map(chunks -> {
String fullResponse = chunks.stream()
.map(response -> response.chatResponse().getResult().getOutput().getText())
.collect(Collectors.joining());
// Логируем полный ответ
streamingLoggingAdvisor.logAggregatedResponse(fullResponse);
return fullResponse;
});
}
Заключение
Проблема с потоками, с которой вы столкнулись, является естественным следствием реактивной архитектуры потоковой передачи Spring AI. Для правильного логирования потоковых ответов:
- Понимайте реактивный конвейер: Потоковая передача использует
Fluxс inherent переключением потоков - Логируйте в нескольких точках: Как во время потоковой передачи, так и после агрегации
- Используйте осведомленное о потоках логирование: Реализуйте MDC или потоковое хранилище для корреляции контекста
- Учитывайте производительность: Используйте соответствующие уровни логирования и рассмотрите асинхронное логирование
- Используйте встроенные инструменты: Рассмотрите использование
SimpleLoggerAdvisorдля стандартных потребностей в логировании
Ключевое - понимать, что потоковая передача доставляет данные инкрементально, и ваш механизм логирования должен учитывать этот асинхронный характер, перехватывая поток в нескольких точках и правильно обрабатывая контекст потоков.
Источники
- Spring AI Reference - Chat Client API
- Spring AI Reference - Advisors API
- Stack Overflow - Logging streaming responses from Spring AI
- Spring AI SimpleLoggerAdvisor Documentation
- Baeldung - Streaming Response in Spring AI ChatClient
- BootcampToProd - Spring AI Log Model Requests and Responses
- Medium - Streaming LLM response in Spring AI