Другое

Как поддерживать синхронные подключения MCP в LangChain

Узнайте, как создавать постоянные синхронные подключения к серверам MCP в LangChain. Полное руководство с примерами кода для поддержания открытых подключений и немедленного доступа к инструментам.

Как я могу поддерживать постоянное соединение с сервером Model Context Protocol (MCP) в синхронном режиме при работе с LangChain? У меня запущен MCP-сервер на localhost:3000, который предоставляет инструменты, и я хочу создать обёрточный класс, который поддерживает открытое соединение с сервером, чтобы инструменты были доступны в любой момент. В документации приведены только примеры с использованием асинхронных менеджеров контекста с streamablehttp_client, но мне нужен синхронный подход для реализации моего класса. Какой рекомендуемый способ достичь этого шаблона с постоянным соединением?

Создание постоянного синхронного подключения к серверу MCP с помощью LangChain

Создание постоянного синхронного подключения к серверу MCP с помощью LangChain требует понимания доступных вариантов транспорта и адаптации асинхронных паттернов к синхронным реализациям. Хотя документация LangChain MCP в основном демонстрирует асинхронные подходы, существует несколько стратегий, которые можно использовать для достижения вашей цели.

Содержание

Понимание типов подключений MCP

Адаптеры LangChain MCP поддерживают два основных метода транспорта для подключения к серверам MCP:

  1. Транспорт STDIO: Запускает сервер как подпроцесс и общается через стандартный ввод/вывод
  2. Транспортируемый HTTP-транспорт: Подключается к автономному HTTP-серверу, работающему независимо

Для вашего случая использования localhost:3000 наиболее подходящим выбором, скорее всего, будет транспортируемый HTTP-транспорт, так как вы работаете с существующим сервером, работающим на сетевом порту. Согласно документации LangChain, HTTP-транспорт поддерживает удаленные подключения и разработан для большей надежности при постоянных подключениях, чем stdio.

Стратегии синхронного подключения

Хотя протокол MCP и адаптеры LangChain в основном асинхронны по своей природе, у вас есть несколько вариантов создания синхронных подключений:

1. Использование синхронных оберток на основе HTTP

Класс MultiServerMCPClient можно настроить с HTTP-подключениями, которые могут быть более пригодны для синхронных паттернов. На основе результатов исследования, его можно настроить следующим образом:

python
from langchain_mcp_adapters.client import MultiServerMCPClient

# Настройка HTTP-подключения к вашему серверу localhost:3000
client_config = {
    "local_server": {
        "transport": "streamable_http",
        "url": "http://localhost:3000/mcp",
        "headers": {
            "Authorization": "Bearer YOUR_TOKEN",
        },
    }
}

2. Реализация адаптеров “асинхронно-синхронного” преобразования

Вы можете создавать классы-обертки, которые связывают асинхронные и синхронные паттерны с помощью asyncio.run() или asyncio.new_event_loop() для обработки асинхронных операций синхронно.

3. Использование постоянных HTTP-сессий

Для действительно постоянных подключений рассмотрите возможность использования библиотек, таких как aiohttp (с синхронной оберткой) или requests с управлением сессиями для поддержания постоянных подключений к вашему серверу MCP.

Паттерны реализации постоянного подключения

Вот рекомендуемые паттерны для поддержания постоянных подключений:

Паттерн 1: Класс одиночного (singleton) MCP-клиента

python
import asyncio
from threading import Lock
from langchain_mcp_adapters.client import MultiServerMCPClient

class PersistentMCPClient:
    _instance = None
    _lock = Lock()
    
    def __new__(cls, config=None):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    cls._instance._initialized = False
        return cls._instance
    
    def __init__(self, config=None):
        if self._initialized:
            return
        self.config = config or {}
        self._client = None
        self._session = None
        self._initialized = True
    
    def get_client(self):
        """Получить или создать MCP-клиент синхронно"""
        if self._client is None:
            self._client = self._create_client_sync()
        return self._client
    
    def _create_client_sync(self):
        """Создать MCP-клиент с использованием преобразования асинхронного в синхронный"""
        async def _create_async():
            return MultiServerMCPClient(self.config)
        
        # Запуск асинхронного создания в текущем цикле событий
        try:
            loop = asyncio.get_event_loop()
            if loop.is_running():
                # Если цикл запущен, создать новый клиент в существующем цикле
                return asyncio.run_coroutine_threadsafe(
                    _create_async(), loop
                ).result()
            else:
                # Если цикл не запущен, создать новый
                return asyncio.run(_create_async())
        except RuntimeError:
            # Цикл событий не существует, создать новый
            return asyncio.run(_create_async())
    
    def get_session(self, server_name="default"):
        """Получить постоянную сессию для конкретного сервера"""
        if self._session is None:
            self._session = self.get_client()
        return self._session

Паттерн 2: Пул подключений с асинхронным управлением

python
import threading
from queue import Queue
from langchain_mcp_adapters.client import MultiServerMCPClient

class MCPConnectionPool:
    def __init__(self, config, max_connections=5):
        self.config = config
        self.max_connections = max_connections
        self.pool = Queue(maxsize=max_connections)
        self._lock = threading.Lock()
        
    def get_connection(self):
        """Получить подключение из пула или создать новое"""
        try:
            return self.pool.get_nowait()
        except:
            return self._create_connection()
    
    def _create_connection(self):
        """Создать новое MCP-подключение синхронно"""
        async def _create_async():
            return MultiServerMCPClient(self.config)
        
        # Запуск асинхронного создания
        return asyncio.run(_create_async())
    
    def return_connection(self, connection):
        """Вернуть подключение в пул"""
        try:
            self.pool.put_nowait(connection)
        except:
            # Пул заполнен, закрыть избыточное подключение
            self._close_connection(connection)
    
    def _close_connection(self, connection):
        """Закрыть MCP-подключение"""
        if hasattr(connection, 'close'):
            try:
                if asyncio.iscoroutinefunction(connection.close):
                    asyncio.run(connection.close())
                else:
                    connection.close()
            except:
                pass

Лучшие практики для синхронной интеграции с MCP

1. Управление циклом событий

  • Тщательно обрабатывайте циклы событий, чтобы избежать конфликтов
  • Используйте asyncio.run() для автономных синхронных приложений
  • Используйте asyncio.get_event_loop() для интеграции с существующим асинхронным кодом

2. Управление жизненным циклом подключения

  • Реализуйте правильную очистку подключений в методах __del__
  • Используйте менеджеры контекста, где это возможно, даже в синхронном коде
  • Мониторьте состояние подключения и реализуйте логику переподключения

3. Обработка ошибок

  • Корректно обрабатывайте таймауты подключения и сетевые ошибки
  • Реализуйте логику повторных попыток для временных сбоев
  • Ведите журнал состояния подключения для отладки

4. Вопросы производительности

  • Повторно используйте подключения вместо создания новых для каждого запроса
  • Рассмотрите возможность использования пула подключений для приложений с высокой пропускной способностью
  • Мониторьте использование памяти с постоянными подключениями

Полный пример реализации

Вот полный класс-обертка, который поддерживает постоянные подключения к вашему серверу MCP localhost:3000:

python
import asyncio
import threading
from contextlib import contextmanager
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

class PersistentMCPWrapper:
    """
    Синхронная обертка для постоянных подключений MCP.
    Поддерживает открытые подключения к серверам MCP для немедленного доступа к инструментам.
    """
    
    def __init__(self, server_url="http://localhost:3000/mcp", server_name="mcp_server"):
        self.server_url = server_url
        self.server_name = server_name
        self._client = None
        self._tools = None
        self._agent = None
        self._lock = threading.Lock()
        self._initialized = False
        
        # Конфигурация для MCP-клиента
        self.client_config = {
            server_name: {
                "transport": "streamable_http",
                "url": server_url,
                "headers": {
                    "Content-Type": "application/json",
                },
            }
        }
    
    def initialize(self):
        """Инициализировать подключение MCP и загрузить инструменты синхронно"""
        if self._initialized:
            return
            
        with self._lock:
            if self._initialized:
                return
                
            try:
                # Создать MCP-клиент синхронно
                self._client = self._create_client_sync()
                
                # Загрузить инструменты синхронно
                self._tools = self._load_tools_sync()
                
                # Инициализировать агента
                llm = ChatOpenAI(model="gpt-4o")
                self._agent = create_react_agent(llm, self._tools)
                
                self._initialized = True
                print(f"Успешно подключено к MCP-серверу по адресу {self.server_url}")
                
            except Exception as e:
                print(f"Не удалось инициализировать подключение MCP: {e}")
                raise
    
    def _create_client_sync(self):
        """Создать MCP-клиент синхронно, запуская асинхронный код"""
        async def _create_async():
            return MultiServerMCPClient(self.client_config)
        
        try:
            # Попытаться получить существующий цикл событий
            loop = asyncio.get_event_loop()
            if loop.is_running():
                # Если цикл запущен, использовать потокобезопасное выполнение
                future = asyncio.run_coroutine_threadsafe(_create_async(), loop)
                return future.result(timeout=30)
            else:
                # Создать новый цикл событий
                return asyncio.run(_create_async())
        except RuntimeError:
            # Цикл событий не существует
            return asyncio.run(_create_async())
    
    def _load_tools_sync(self):
        """Загрузить инструменты MCP синхронно"""
        async def _load_async():
            async with self._client.session(self.server_name) as session:
                return await load_mcp_tools(session)
        
        try:
            loop = asyncio.get_event_loop()
            if loop.is_running():
                future = asyncio.run_coroutine_threadsafe(_load_async(), loop)
                return future.result(timeout=30)
            else:
                return asyncio.run(_load_async())
        except RuntimeError:
            return asyncio.run(_load_async())
    
    @contextmanager
    def get_client(self):
        """Менеджер контекста для доступа к MCP-клиенту"""
        if not self._initialized:
            self.initialize()
        try:
            yield self._client
        finally:
            pass  # Клиент остается постоянным
    
    @contextmanager
    def get_tools(self):
        """Менеджер контекста для доступа к инструментам"""
        if not self._initialized:
            self.initialize()
        try:
            yield self._tools
        finally:
            pass  # Инструменты остаются загруженными
    
    @contextmanager
    def get_agent(self):
        """Менеджер контекста для доступа к агенту"""
        if not self._initialized:
            self.initialize()
        try:
            yield self._agent
        finally:
            pass  # Агент остается инициализированным
    
    def invoke_agent(self, messages):
        """Вызвать агента с сообщениями синхронно"""
        if not self._initialized:
            self.initialize()
            
        async def _invoke_async():
            async with self.get_agent() as agent:
                return await agent.ainvoke({"messages": messages})
        
        try:
            loop = asyncio.get_event_loop()
            if loop.is_running():
                future = asyncio.run_coroutine_threadsafe(_invoke_async(), loop)
                return future.result(timeout=60)
            else:
                return asyncio.run(_invoke_async())
        except RuntimeError:
            return asyncio.run(_invoke_async())
    
    def __del__(self):
        """Очистка подключений при уничтожении объекта"""
        if hasattr(self, '_client') and self._client:
            try:
                if hasattr(self._client, 'close'):
                    if asyncio.iscoroutinefunction(self._client.close):
                        asyncio.run(self._client.close())
                    else:
                        self._client.close()
            except:
                pass


# Пример использования
if __name__ == "__main__":
    # Инициализация обертки с вашим MCP-сервером
    mcp_wrapper = PersistentMCPWrapper(
        server_url="http://localhost:3000/mcp",
        server_name="local_mcp"
    )
    
    # Использование обертки синхронно
    try:
        # Вызов агента
        response = mcp_wrapper.invoke_agent("Какая погода сегодня?")
        print("Ответ агента:", response)
        
        # Или использование менеджеров контекста для тонкого контроля
        with mcp_wrapper.get_tools() as tools:
            print("Доступные инструменты:", [name for name in tools.keys()])
            
    except Exception as e:
        print(f"Ошибка: {e}")

Эта реализация предоставляет надежный способ поддержания постоянных подключений к вашему серверу MCP при предоставлении синхронного интерфейса. Обертка обрабатывает преобразование “асинхронно-синхронное” внутренне и правильно управляет жизненным циклом подключения.

Для получения более подробной информации о протоколе MCP и интеграции с LangChain обратитесь к официальной документации и репозиторию GitHub langchain-mcp-adapters.

Источники

  1. Документация LangChain Model Context Protocol
  2. Репозиторий GitHub LangChain MCP Adapters
  3. Документация транспортируемого HTTP-транспорта
  4. Справочник по API MultiServerMCPClient
  5. Руководство по интеграции MCP клиент-сервер
  6. Пакет PyPI LangChain MCP Adapters

Заключение

Поддержание постоянных синхронных подключений к серверам MCP с помощью LangChain требует объединения асинхронной природы протокола с потребностями синхронных приложений. Ключевые выводы:

  1. Используйте HTTP-транспорт: Транспортируемый HTTP-транспорт более подходит для постоянных подключений, чем stdio, для сетевых серверов, таких как ваша настройка localhost:3000.

  2. Реализуйте мост “асинхронно-синхронный”: Создавайте классы-обертки, которые используют asyncio.run() или asyncio.run_coroutine_threadsafe() для преобразования асинхронных операций в синхронные.

  3. Паттерн Singleton: Используйте паттерны singleton или пула подключений для поддержания постоянных подключений на протяжении жизненного цикла вашего приложения.

  4. Правильное управление ресурсами: Реализуйте методы правильной очистки для обеспечения корректного закрытия подключений, когда они больше не нужны.

  5. Обработка ошибок: Создавайте надежную обработку ошибок для сетевых таймаутов и сбоев подключения для обеспечения надежности.

Следуя этим паттернам, вы можете создать надежную синхронную обертку для вашего сервера MCP, которая поддерживает постоянные подключения и предоставляет немедленный доступ к инструментам в любой момент, точно как требуется для вашей реализации класса.

Авторы
Проверено модерацией
Модерация