Как поддерживать синхронные подключения MCP в LangChain
Узнайте, как создавать постоянные синхронные подключения к серверам MCP в LangChain. Полное руководство с примерами кода для поддержания открытых подключений и немедленного доступа к инструментам.
Как я могу поддерживать постоянное соединение с сервером Model Context Protocol (MCP) в синхронном режиме при работе с LangChain? У меня запущен MCP-сервер на localhost:3000, который предоставляет инструменты, и я хочу создать обёрточный класс, который поддерживает открытое соединение с сервером, чтобы инструменты были доступны в любой момент. В документации приведены только примеры с использованием асинхронных менеджеров контекста с streamablehttp_client, но мне нужен синхронный подход для реализации моего класса. Какой рекомендуемый способ достичь этого шаблона с постоянным соединением?
Создание постоянного синхронного подключения к серверу MCP с помощью LangChain
Создание постоянного синхронного подключения к серверу MCP с помощью LangChain требует понимания доступных вариантов транспорта и адаптации асинхронных паттернов к синхронным реализациям. Хотя документация LangChain MCP в основном демонстрирует асинхронные подходы, существует несколько стратегий, которые можно использовать для достижения вашей цели.
Содержание
- Понимание типов подключений MCP
- Стратегии синхронного подключения
- Паттерны реализации постоянного подключения
- Лучшие практики для синхронной интеграции с MCP
- Полный пример реализации
Понимание типов подключений MCP
Адаптеры LangChain MCP поддерживают два основных метода транспорта для подключения к серверам MCP:
- Транспорт STDIO: Запускает сервер как подпроцесс и общается через стандартный ввод/вывод
- Транспортируемый HTTP-транспорт: Подключается к автономному HTTP-серверу, работающему независимо
Для вашего случая использования localhost:3000 наиболее подходящим выбором, скорее всего, будет транспортируемый HTTP-транспорт, так как вы работаете с существующим сервером, работающим на сетевом порту. Согласно документации LangChain, HTTP-транспорт поддерживает удаленные подключения и разработан для большей надежности при постоянных подключениях, чем stdio.
Стратегии синхронного подключения
Хотя протокол MCP и адаптеры LangChain в основном асинхронны по своей природе, у вас есть несколько вариантов создания синхронных подключений:
1. Использование синхронных оберток на основе HTTP
Класс MultiServerMCPClient можно настроить с HTTP-подключениями, которые могут быть более пригодны для синхронных паттернов. На основе результатов исследования, его можно настроить следующим образом:
from langchain_mcp_adapters.client import MultiServerMCPClient
# Настройка HTTP-подключения к вашему серверу localhost:3000
client_config = {
"local_server": {
"transport": "streamable_http",
"url": "http://localhost:3000/mcp",
"headers": {
"Authorization": "Bearer YOUR_TOKEN",
},
}
}
2. Реализация адаптеров “асинхронно-синхронного” преобразования
Вы можете создавать классы-обертки, которые связывают асинхронные и синхронные паттерны с помощью asyncio.run() или asyncio.new_event_loop() для обработки асинхронных операций синхронно.
3. Использование постоянных HTTP-сессий
Для действительно постоянных подключений рассмотрите возможность использования библиотек, таких как aiohttp (с синхронной оберткой) или requests с управлением сессиями для поддержания постоянных подключений к вашему серверу MCP.
Паттерны реализации постоянного подключения
Вот рекомендуемые паттерны для поддержания постоянных подключений:
Паттерн 1: Класс одиночного (singleton) MCP-клиента
import asyncio
from threading import Lock
from langchain_mcp_adapters.client import MultiServerMCPClient
class PersistentMCPClient:
_instance = None
_lock = Lock()
def __new__(cls, config=None):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, config=None):
if self._initialized:
return
self.config = config or {}
self._client = None
self._session = None
self._initialized = True
def get_client(self):
"""Получить или создать MCP-клиент синхронно"""
if self._client is None:
self._client = self._create_client_sync()
return self._client
def _create_client_sync(self):
"""Создать MCP-клиент с использованием преобразования асинхронного в синхронный"""
async def _create_async():
return MultiServerMCPClient(self.config)
# Запуск асинхронного создания в текущем цикле событий
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# Если цикл запущен, создать новый клиент в существующем цикле
return asyncio.run_coroutine_threadsafe(
_create_async(), loop
).result()
else:
# Если цикл не запущен, создать новый
return asyncio.run(_create_async())
except RuntimeError:
# Цикл событий не существует, создать новый
return asyncio.run(_create_async())
def get_session(self, server_name="default"):
"""Получить постоянную сессию для конкретного сервера"""
if self._session is None:
self._session = self.get_client()
return self._session
Паттерн 2: Пул подключений с асинхронным управлением
import threading
from queue import Queue
from langchain_mcp_adapters.client import MultiServerMCPClient
class MCPConnectionPool:
def __init__(self, config, max_connections=5):
self.config = config
self.max_connections = max_connections
self.pool = Queue(maxsize=max_connections)
self._lock = threading.Lock()
def get_connection(self):
"""Получить подключение из пула или создать новое"""
try:
return self.pool.get_nowait()
except:
return self._create_connection()
def _create_connection(self):
"""Создать новое MCP-подключение синхронно"""
async def _create_async():
return MultiServerMCPClient(self.config)
# Запуск асинхронного создания
return asyncio.run(_create_async())
def return_connection(self, connection):
"""Вернуть подключение в пул"""
try:
self.pool.put_nowait(connection)
except:
# Пул заполнен, закрыть избыточное подключение
self._close_connection(connection)
def _close_connection(self, connection):
"""Закрыть MCP-подключение"""
if hasattr(connection, 'close'):
try:
if asyncio.iscoroutinefunction(connection.close):
asyncio.run(connection.close())
else:
connection.close()
except:
pass
Лучшие практики для синхронной интеграции с MCP
1. Управление циклом событий
- Тщательно обрабатывайте циклы событий, чтобы избежать конфликтов
- Используйте
asyncio.run()для автономных синхронных приложений - Используйте
asyncio.get_event_loop()для интеграции с существующим асинхронным кодом
2. Управление жизненным циклом подключения
- Реализуйте правильную очистку подключений в методах
__del__ - Используйте менеджеры контекста, где это возможно, даже в синхронном коде
- Мониторьте состояние подключения и реализуйте логику переподключения
3. Обработка ошибок
- Корректно обрабатывайте таймауты подключения и сетевые ошибки
- Реализуйте логику повторных попыток для временных сбоев
- Ведите журнал состояния подключения для отладки
4. Вопросы производительности
- Повторно используйте подключения вместо создания новых для каждого запроса
- Рассмотрите возможность использования пула подключений для приложений с высокой пропускной способностью
- Мониторьте использование памяти с постоянными подключениями
Полный пример реализации
Вот полный класс-обертка, который поддерживает постоянные подключения к вашему серверу MCP localhost:3000:
import asyncio
import threading
from contextlib import contextmanager
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
class PersistentMCPWrapper:
"""
Синхронная обертка для постоянных подключений MCP.
Поддерживает открытые подключения к серверам MCP для немедленного доступа к инструментам.
"""
def __init__(self, server_url="http://localhost:3000/mcp", server_name="mcp_server"):
self.server_url = server_url
self.server_name = server_name
self._client = None
self._tools = None
self._agent = None
self._lock = threading.Lock()
self._initialized = False
# Конфигурация для MCP-клиента
self.client_config = {
server_name: {
"transport": "streamable_http",
"url": server_url,
"headers": {
"Content-Type": "application/json",
},
}
}
def initialize(self):
"""Инициализировать подключение MCP и загрузить инструменты синхронно"""
if self._initialized:
return
with self._lock:
if self._initialized:
return
try:
# Создать MCP-клиент синхронно
self._client = self._create_client_sync()
# Загрузить инструменты синхронно
self._tools = self._load_tools_sync()
# Инициализировать агента
llm = ChatOpenAI(model="gpt-4o")
self._agent = create_react_agent(llm, self._tools)
self._initialized = True
print(f"Успешно подключено к MCP-серверу по адресу {self.server_url}")
except Exception as e:
print(f"Не удалось инициализировать подключение MCP: {e}")
raise
def _create_client_sync(self):
"""Создать MCP-клиент синхронно, запуская асинхронный код"""
async def _create_async():
return MultiServerMCPClient(self.client_config)
try:
# Попытаться получить существующий цикл событий
loop = asyncio.get_event_loop()
if loop.is_running():
# Если цикл запущен, использовать потокобезопасное выполнение
future = asyncio.run_coroutine_threadsafe(_create_async(), loop)
return future.result(timeout=30)
else:
# Создать новый цикл событий
return asyncio.run(_create_async())
except RuntimeError:
# Цикл событий не существует
return asyncio.run(_create_async())
def _load_tools_sync(self):
"""Загрузить инструменты MCP синхронно"""
async def _load_async():
async with self._client.session(self.server_name) as session:
return await load_mcp_tools(session)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
future = asyncio.run_coroutine_threadsafe(_load_async(), loop)
return future.result(timeout=30)
else:
return asyncio.run(_load_async())
except RuntimeError:
return asyncio.run(_load_async())
@contextmanager
def get_client(self):
"""Менеджер контекста для доступа к MCP-клиенту"""
if not self._initialized:
self.initialize()
try:
yield self._client
finally:
pass # Клиент остается постоянным
@contextmanager
def get_tools(self):
"""Менеджер контекста для доступа к инструментам"""
if not self._initialized:
self.initialize()
try:
yield self._tools
finally:
pass # Инструменты остаются загруженными
@contextmanager
def get_agent(self):
"""Менеджер контекста для доступа к агенту"""
if not self._initialized:
self.initialize()
try:
yield self._agent
finally:
pass # Агент остается инициализированным
def invoke_agent(self, messages):
"""Вызвать агента с сообщениями синхронно"""
if not self._initialized:
self.initialize()
async def _invoke_async():
async with self.get_agent() as agent:
return await agent.ainvoke({"messages": messages})
try:
loop = asyncio.get_event_loop()
if loop.is_running():
future = asyncio.run_coroutine_threadsafe(_invoke_async(), loop)
return future.result(timeout=60)
else:
return asyncio.run(_invoke_async())
except RuntimeError:
return asyncio.run(_invoke_async())
def __del__(self):
"""Очистка подключений при уничтожении объекта"""
if hasattr(self, '_client') and self._client:
try:
if hasattr(self._client, 'close'):
if asyncio.iscoroutinefunction(self._client.close):
asyncio.run(self._client.close())
else:
self._client.close()
except:
pass
# Пример использования
if __name__ == "__main__":
# Инициализация обертки с вашим MCP-сервером
mcp_wrapper = PersistentMCPWrapper(
server_url="http://localhost:3000/mcp",
server_name="local_mcp"
)
# Использование обертки синхронно
try:
# Вызов агента
response = mcp_wrapper.invoke_agent("Какая погода сегодня?")
print("Ответ агента:", response)
# Или использование менеджеров контекста для тонкого контроля
with mcp_wrapper.get_tools() as tools:
print("Доступные инструменты:", [name for name in tools.keys()])
except Exception as e:
print(f"Ошибка: {e}")
Эта реализация предоставляет надежный способ поддержания постоянных подключений к вашему серверу MCP при предоставлении синхронного интерфейса. Обертка обрабатывает преобразование “асинхронно-синхронное” внутренне и правильно управляет жизненным циклом подключения.
Для получения более подробной информации о протоколе MCP и интеграции с LangChain обратитесь к официальной документации и репозиторию GitHub langchain-mcp-adapters.
Источники
- Документация LangChain Model Context Protocol
- Репозиторий GitHub LangChain MCP Adapters
- Документация транспортируемого HTTP-транспорта
- Справочник по API MultiServerMCPClient
- Руководство по интеграции MCP клиент-сервер
- Пакет PyPI LangChain MCP Adapters
Заключение
Поддержание постоянных синхронных подключений к серверам MCP с помощью LangChain требует объединения асинхронной природы протокола с потребностями синхронных приложений. Ключевые выводы:
-
Используйте HTTP-транспорт: Транспортируемый HTTP-транспорт более подходит для постоянных подключений, чем stdio, для сетевых серверов, таких как ваша настройка localhost:3000.
-
Реализуйте мост “асинхронно-синхронный”: Создавайте классы-обертки, которые используют
asyncio.run()илиasyncio.run_coroutine_threadsafe()для преобразования асинхронных операций в синхронные. -
Паттерн Singleton: Используйте паттерны singleton или пула подключений для поддержания постоянных подключений на протяжении жизненного цикла вашего приложения.
-
Правильное управление ресурсами: Реализуйте методы правильной очистки для обеспечения корректного закрытия подключений, когда они больше не нужны.
-
Обработка ошибок: Создавайте надежную обработку ошибок для сетевых таймаутов и сбоев подключения для обеспечения надежности.
Следуя этим паттернам, вы можете создать надежную синхронную обертку для вашего сервера MCP, которая поддерживает постоянные подключения и предоставляет немедленный доступ к инструментам в любой момент, точно как требуется для вашей реализации класса.