Public Docs
【模型量化】深度学习模型量化 & 量化理论 & 各平台的量化过程 & 硬件加速
【TVM】TI关于TVM的使用测试与分析
【LLM&LVM】大模型开源工程思维导图
SmartSip
【北航卓越工程师】《汽车前沿技术导论:智能驾驶》讲义
【工具链】Yocto使用介绍——使用Yocto创建一个树莓派的系统镜像
【工具链】使用ssh+dialog指令设定服务器指定用户仅容器访问
【推理引擎】一篇关于模型推理的详细对比与学习
【推理引擎】关于TVM中的Schedule优化详解(On going)
【LLM微调】使用litgpt进行私有数据集模型微调的测试总结
【TVM】在TVM Relay中创建一个自定义操作符
【STT+LLM+TTS】如何使用语音转文字模型+大预言模型+语音生成模型完成一个类人的语音交互机器人
【RAG】 通过RAG构建垂直领域的LLM Agent的方法探索
【RAG】GraphRAG精读与测试(On going)
【AI Agent】MetaGPT精读与学习
【AI Base】Ilya Sutskever 27篇必读论文分享清单
【Nvidia】Jetson AGX Orin/ Jetson Orin nano 硬件测试调试内容(On going)
【BI/DI】LLM Using in BI Testing Scenario (On going)
【Nvidia】How to Activate a Camera on Nvidia Platform in Details
【RAS-PI】树莓派驱动开发
【行业咨询阅读】关注实时咨询和分析
【mobileye】2024 Driving AI
【mobileye】SDS_Safety_Architecture
【yolo】yolov8测试
【nvidia】Triton server实践
【alibaba】MNN(on updating)
【OpenAI】Triton(on updating)
【CAIS】关于Compound AI Systems的思考
【Nvidia】关于Cuda+Cudnn+TensorRT推理环境
【BEV】BEVDet在各个平台上的执行效率及优化(On Updating)
【Chip】AI在芯片设计和电路设计中的应用
【Chip】ChiPFormer
【Chip】关于布线的学习
【Chip】MaskPlace论文精读与工程复现优化
【gynasium】强化学习初体验
【Cadence】X AI
【transformer】MinGPT开源工程学习
【中间件】针对apollo 10.0中关于cyberRT性能优化的深度解读和思考
【Robotics】调研了解当前机器人开发者套件(on updating)
【Robotics】ROS CON China 2024 文档技术整理与感想总结(上2024.12.7,中2024.12.8,下场外产品)
【algorithm】关于模型、数据与标注规范的平衡问题
【nvidia】DLA的学习了解与使用
【nvidia】构建nvidia嵌入式平台的交叉编译环境(其他环境平台可借鉴)
【2025AI生成式大会】2025大会个人总结
【Robotics】 Create Quadruped Robot RL FootStep Training Environment In IsaacLab
【Robotics】如何一个人较为完整的完成一个机器人系统软件算法层面的设计与开发
【VLM】读懂多模态大模型评价指标
【VLM】大模型部署的端侧部署性能与精度评估方法与分析
【Nvidia】Jetson Orin 平台VLM部署方法与指标评测
【Database】向量数据库
【SoC】性能与功耗评估
【MCP】MCP探索
文档发布于【Feng's Docs】
-
+
首页
【MCP】MCP探索
# 0 Pre-Talk 快速了解掌握和使用MCP,学习脉络和进一步展开: 1. 了解MCP原始文章和相关解读 2. 学习使用FastMCP,了解接口调用尝试 3. 查看别人的MCP实现,Baidu map mcp server 和官方example代码 4. 编写自己的代码尝试核心功能 5. 对比MCP和其他协议的区别; # 1 了解MCP原始文章和相关解读 网上文章都写的很好,这边找几张原始图,方便自己理解。带着问题学习了解效率更高,在使用MCP开发MCP server之前,我能想到的几个核心问题是: - mcp调用和function call或者restful调用的区别是什么?  - agent是如何通过mcp协议实现工具调用的? 模型通过mcpserver返回的tool list列表,根据tool description,resource description的详细说明和定义来选择是否使用工具,如果工具的表述内容不够清晰或者描述有误,那就有可能造成LLM无法准确调用工具。同时因为整个MCP协议实际上是模型通过上下文内容调用相关功能(数据,工具,工作流) 的协议,所以对于结构信息的解析后以zero shot的方式去输出也仍然考验模型的能力,如果模型没有理解这些上下文结构,或者对于工具描述产生幻觉, 那就有可能造成最终输出结果的不准确。所以作为MCP协议的提出方,anthropic 可能针对于该协议有对自家模型进行过专门训练。 - MCP Host: The AI application that coordinates and manages one or multiple MCP clients - MCP Client: A component that maintains a connection to an MCP server and obtains context from an MCP server for the MCP host to use - MCP Server: A program that provides context to MCP clients  - 模型如何明确是根据自己的模型能力输出结果,还是调用工具通过工具得到结果? 模型通过prompt对于场景进行理解,有些可以通过prompt直接进行干预,例如prompt中可以包含“计算位数超过四位数,必须使用计算器”这样的强制要求,也可以通过强化学习训练模型关于工具使用的能力。总之关于模型是否调用工具,不同的模型可能会有所不同,既依赖于模型本身的能力,也依赖于mcp server给出的描述信息。 - LLM怎么知道要调用什么工具? 通过description和模型自己对于场景的理解。 - Agent ,mcp client, mcp server,用户等等之间是个什么关系?  - 除了MCP协议之外,还有其他厂商也开发了一些协议,例如ACP,A2A,ANP协议,为什么还需要其他协议,差别是什么,MCP协议还有哪些不足的地方?  通过对源码的阅读和对于协议的阅读,以上问题迎刃而解,我也梳理清楚了一些关系,之后,以百度地图的mcp代码和官方的代码为例,看看他们都开发了什么作为一个best practice。最后自己写个小玩意儿做个测试。 ## 1.1 解读MCP原文档 MCP (Model Context Protocol) is an open-source standard for connecting AI applications to external systems.Using MCP, AI applications like Claude or ChatGPT can connect to data sources (e.g. local files, databases), tools (e.g. search engines, calculators) and workflows (e.g. specialized prompts)—enabling them to access key information and perform tasks.Think of MCP like a USB-C port for AI applications. Just as USB-C provides a standardized way to connect electronic devices, MCP provides a standardized way to connect AI applications to external systems.  MCP主要给模型提供了数据(sources)、工具(tools)、工作流定义(specialized prompts)三种信息。 其他内容这篇文档写得很好,不用再抄写别人的东西了https://zhuanlan.zhihu.com/p/1907460765629251827 ## 1.2 关于MCP 安全 因为MCP server会给mcp client直接暴露数据信息,所以Authorization也是一个重要的环节,他能保证用户通过mcp clients 智能获得经过授权的信息,或者只有被授权的用户才能获得endpoint。这里MCP官方给出了使用OAuth2.1的解决方案。https://modelcontextprotocol.io/docs/tutorials/security/authorization - OAuth2.0:https://dev.mi.com/console/doc/detail?pId=711  # 2 学习使用FastMCP,了解接口调用尝试 了解完MCP整个的协议框架和要求,真正开始上手开始了解的时候才发现是简单也复杂。 首先重要的一点是,官方的代码仓mcp pythonsdk其实是对应FastMcp1.0的版本,而基于1.0版本的演化FastMcp单独的代码仓是对应2.0的版本,在升级之后,还是有一些接口上面的区别。这个在尝试使用的时候注意不要混用,可能会带来一些接口调用上的问题。不论是哪种,都能够帮助你快速上手。 ``` bash # pip list |grep mcp fastmcp 2.13.0.2 # https://github.com/jlowin/fastmcp mcp 1.20.0 # https://github.com/modelcontextprotocol/python-sdk ``` ## 2.1 一个最精简的例子(以fastmcp 2.0 为例) - Mcp server 从下面的例子可以看到只需要7行代码,就可以构建一个最简单的mcp server 包含有一个整数加法运算的工具包不加任何参数得话,默认使用stdio的方式运行,需要使用stdio的方式调用,也支持sse或者http的方式。如果想要调试,可以使用官方推荐的 inspector工具 https://modelcontextprotocol.io/docs/tools/inspector 个人理解有点像postman之于restful api调用。当然也可以直接写个mcp client用来测试。 ``` python # server.py from fastmcp import FastMCP mcp = FastMCP("Demo 🚀") @mcp.tooldef add(a: int, b: int) -> int: """Add two numbers""" return a + b if __name__ == "__main__": mcp.run() # run as http # mcp.run(transport="http", host="127.0.0.1", port=8000) # run as stdio # mcp.run(transport="stdio") # run as sse # mcp.run(transport="sse", host="127.0.0.1", port=8000) ``` ``` bash python server2.0.py ╭──────────────────────────────────────────────────────────────────────────────╮ │ │ │ ▄▀▀ ▄▀█ █▀▀ ▀█▀ █▀▄▀█ █▀▀ █▀█ │ │ █▀ █▀█ ▄▄█ █ █ ▀ █ █▄▄ █▀▀ │ │ │ │ FastMCP 2.13.0.2 │ │ │ │ │ │ 🖥 Server name: Demo 🚀 │ │ │ │ 📦 Transport: STDIO │ │ │ │ 📚 Docs: https://gofastmcp.com │ │ 🚀 Hosting: https://fastmcp.cloud │ │ │ ╰──────────────────────────────────────────────────────────────────────────────╯ [11/07/25 11:48:37] INFO Starting MCP server 'Demo 🚀' with transport 'stdio' server.py:1966 ``` - Mcp client ``` python from fastmcp import Client import asyncio async def main(): # Connect via stdio to a local script # async with Client("server.py") as client: # tools = await client.list_tools() # print(f"Available tools: {tools}") # result = await client.call_tool("add", {"a": 5, "b": 3}) # print(f"Result: {result.content[0].text}") # # Connect via SSE # async with Client("http://localhost:8000/sse") as client: # tools = await client.list_tools() # print(f"Available tools: {tools}") # result = await client.call_tool("add", {"a": 5, "b": 3}) # print(f"Result: {result.content[0].text}") async with Client("http://localhost:8000/mcp") as client: tools = await client.list_tools() print(f"Available tools: {tools}") print(f"\n") result = await client.call_tool("add", {"a": 5, "b": 3}) print(f"Result: {result.content[0].text}") if __name__ == '__main__': asyncio.run(main()) ``` ``` bash python client2.0.py Available tools: [Tool(name='add', title=None, description='Add two numbers', inputSchema={'properties': {'a': {'type': 'integer'}, 'b': {'type': 'integer'}}, 'required': ['a', 'b'], 'type': 'object'}, outputSchema={'properties': {'result': {'type': 'integer'}}, 'required': ['result'], 'type': 'object', 'x-fastmcp-wrap-result': True}, icons=None, annotations=None, meta={'_fastmcp': {'tags': []}})] Result: 8 ``` ## 2.2 完成了最简单的例子,当然就是继续探索(关于安全) 这个播客发布于MCP协议一周年之际,里面解释了一些安全方面的需求和案例,https://www.youtube.com/watch?v=z6XWYCM3Q8s mcp的关网上单独有一个章节用来描述关于MCP的安全认证,足以说明关于AI安全这部分的重要程度,https://modelcontextprotocol.io/docs/tutorials/security/authorization,个人通过阅读理解这应该是作为mcp server开发最核心的一部分安全内容。核心是以下五条出现时要要考了mcp 授权,这些条件使得在使用用户数据或者server要暴露敏感信息时应该遵循的条件。 - Your server accesses user-specific data (emails, documents, databases) - You need to audit who performed which actions - Your server grants access to its APIs that require user consent - You’re building for enterprise environments with strict access controls - You want to implement rate limiting or usage tracking per user 以下框图展示了整个的认证流程:因为MCP直接使用了OAuth2.1标准,所以整个流程和1.2章节的OAuth2.0的流程图类似,2.1只是在2.0的基础上做了一些优化和简化。从网上的信息可以看到,有专门的组织在开发OAuth3.0的标准。  服务端代码如下包括三部分,配置config.py, token_verifyier.py, server.py 整个配置可以参考官方,说明,实际操作中遇到一些401的问题,主要是因为一些配置导致的 ``` python # token_verifier.py """Token verifier implementation using OAuth 2.0 Token Introspection (RFC 7662).""" import logging from typing import Any from mcp.server.auth.provider import AccessToken, TokenVerifier from mcp.shared.auth_utils import check_resource_allowed, resource_url_from_server_url logger = logging.getLogger(__name__) class IntrospectionTokenVerifier(TokenVerifier): """Token verifier that uses OAuth 2.0 Token Introspection (RFC 7662). """ def __init__( self, introspection_endpoint: str, server_url: str, client_id: str, client_secret: str, ): self.introspection_endpoint = introspection_endpoint self.server_url = server_url self.client_id = client_id self.client_secret = client_secret self.resource_url = resource_url_from_server_url(server_url) async def verify_token(self, token: str) -> AccessToken | None: """Verify token via introspection endpoint.""" import httpx if not self.introspection_endpoint.startswith(("https://", "http://localhost", "http://127.0.0.1")): return None timeout = httpx.Timeout(10.0, connect=5.0) limits = httpx.Limits(max_connections=10, max_keepalive_connections=5) async with httpx.AsyncClient( timeout=timeout, limits=limits, verify=True, ) as client: try: form_data = { "token": token, "client_id": self.client_id, "client_secret": self.client_secret, } headers = {"Content-Type": "application/x-www-form-urlencoded"} response = await client.post( self.introspection_endpoint, data=form_data, headers=headers, ) if response.status_code != 200: return None data = response.json() print("introspection response:", data) if not data.get("active", False): return None if not self._validate_resource(data): return None print(f"============ valid ==================") print(data.get("scope", "").split() if data.get("scope") else []) acc_t = AccessToken( token=token, client_id=data.get("client_id", "unknown"), scopes=data.get("scope", "").split() if data.get("scope") else [], expires_at=data.get("exp"), resource=data.get("aud"), # Include resource in token ) print(f"============ acc_t ==================") return acc_t except Exception as e: return None def _validate_resource(self, token_data: dict[str, Any]) -> bool: """Validate token was issued for this resource server. Rules: - Reject if 'aud' missing. - Accept if any audience entry matches the derived resource URL. - Supports string or list forms per JWT spec. """ if not self.server_url or not self.resource_url: return False aud: list[str] | str | None = token_data.get("aud") if isinstance(aud, list): return any(self._is_valid_resource(a) for a in aud) if isinstance(aud, str): return self._is_valid_resource(aud) return False def _is_valid_resource(self, resource: str) -> bool: """Check if the given resource matches our server.""" return check_resource_allowed(self.resource_url, resource) ``` ``` python """Configuration settings for the MCP auth server.""" import os from typing import Optional class Config: """Configuration class that loads from environment variables with sensible defaults.""" # MCP server settings HOST: str = os.getenv("HOST", "localhost") PORT: int = int(os.getenv("PORT", "3001")) # Auth server settings AUTH_HOST: str = os.getenv("AUTH_HOST", "localhost") AUTH_PORT: int = int(os.getenv("AUTH_PORT", "8080")) AUTH_REALM: str = os.getenv("AUTH_REALM", "master") # OAuth client settings OAUTH_CLIENT_ID: str = os.getenv("OAUTH_CLIENT_ID", "test") OAUTH_CLIENT_SECRET: str = os.getenv("OAUTH_CLIENT_SECRET", "xSj1YwiwGp6WA8I5aFfzB3QjIkQYhdJo") # Server settings MCP_SCOPE: str = os.getenv("MCP_SCOPE", "mcp:tools") OAUTH_STRICT: bool = os.getenv("OAUTH_STRICT", "false").lower() in ("true", "1", "yes") TRANSPORT: str = os.getenv("TRANSPORT", "streamable-http") @property def server_url(self) -> str: """Build the server URL.""" return f"http://{self.HOST}:{self.PORT}" @property def auth_base_url(self) -> str: """Build the auth server base URL.""" return f"http://{self.AUTH_HOST}:{self.AUTH_PORT}/realms/{self.AUTH_REALM}/" def validate(self) -> None: """Validate configuration.""" if self.TRANSPORT not in ["sse", "streamable-http"]: raise ValueError(f"Invalid transport: {self.TRANSPORT}. Must be 'sse' or 'streamable-http'") # Global configuration instance config = Config() ``` ``` python # server.py # from fastmcp import FastMCP, Context import datetime import logging from typing import Any from pydantic import AnyHttpUrl from mcp.server.auth.settings import AuthSettings from mcp.server.fastmcp.server import FastMCP, Context from config import config from token_verifier import IntrospectionTokenVerifier logger = logging.getLogger(__name__) def create_oauth_urls() -> dict[str, str]: """Create OAuth URLs based on configuration (Keycloak-style).""" from urllib.parse import urljoin auth_base_url = config.auth_base_url return { "issuer": auth_base_url, "introspection_endpoint": urljoin(auth_base_url, "protocol/openid-connect/token/introspect"), "authorization_endpoint": urljoin(auth_base_url, "protocol/openid-connect/auth"), "token_endpoint": urljoin(auth_base_url, "protocol/openid-connect/token"), } def create_server() -> FastMCP: """Create and configure the FastMCP server.""" config.validate() oauth_urls = create_oauth_urls() token_verifier = IntrospectionTokenVerifier( introspection_endpoint=oauth_urls["introspection_endpoint"], server_url=config.server_url, client_id=config.OAUTH_CLIENT_ID, client_secret=config.OAUTH_CLIENT_SECRET, ) app = FastMCP( name="🚀 MCP Resource Server", instructions="Resource Server that validates tokens via Authorization Server introspection", host=config.HOST, port=config.PORT, debug=True, streamable_http_path="/", token_verifier=token_verifier, auth=AuthSettings( issuer_url=AnyHttpUrl(oauth_urls["issuer"]), required_scopes=[config.MCP_SCOPE], resource_server_url=AnyHttpUrl(config.server_url), ), ) @app.tool(title="Addition Tool") def add(a: int, b: int) -> int: """ Add two integers. This tool demonstrates basic arithmetic operations with OAuth authentication. Args: a (int): First integer. b (int): Second integer. """ result = a + b return { "operation": "addition", "operand_a": a, "operand_b": b, "result": result, "timestamp": datetime.datetime.now().isoformat() } @app.tool(title="process_data") async def process_data(uri: str, ctx: Context): """ Log a message to the client """ await ctx.info(f"Processing {uri}...") # Read a resource from the server data = await ctx.read_resource(uri) # Ask client LLM to summarize the data summary = await ctx.sample(f"Summarize: {data.content[:500]}") # Return the summary return summary.text # Static resource @app.resource("config://version") def get_version(): return "2.0.1" # Dynamic resource template @app.resource("users://{user_id}/profile") def get_profile(user_id: int): # Fetch profile for user_id... return {"name": f"User {user_id}", "status": "active"} @app.prompt() def summarize_request(text: str) -> str: """Generate a prompt asking for a summary.""" return f"Please summarize the following text:\n\n{text}" return app def main() -> int: """Main entry point.""" logging.basicConfig(level=logging.INFO) try: config.validate() oauth_urls = create_oauth_urls() except ValueError as e: logger.error("Configuration error: %s", e) return 1 try: mcp_server = create_server() logger.info("Starting MCP Server on %s:%s", config.HOST, config.PORT) logger.info("Authorization Server: %s", oauth_urls["issuer"]) logger.info("Transport: %s", config.TRANSPORT) mcp_server.run(transport=config.TRANSPORT) return 0 except Exception: logger.exception("Server error") return 1 if __name__ == "__main__": exit(main()) ``` - client的测试代码如下 ``` python import asyncio from fastmcp.client.client import Client from fastmcp.client.transports import StreamableHttpTransport from fastmcp.client.auth.bearer import BearerAuth from base64 import b64encode import httpx class TokenClient: def __init__(self, client_id: str, client_secret: str): self.client_id = client_id self.client_secret = client_secret # 获取访问令牌 def get_token(self, token_url: str) -> str: data = { "grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret, "scope": "mcp:tools" } response = httpx.post(token_url, data=data) response.raise_for_status() print(response.json()) return response.json()["access_token"] def check_token_active(self, introspection_url: str, token: str) -> bool: auth_header = "Basic " + b64encode(f"{self.client_id}:{self.client_secret}".encode()).decode() response = httpx.post( introspection_url, headers={"Authorization": auth_header}, data={ "token": token } ) if response.status_code != 200: raise ValueError(f"Introspection failed: {response.text}") return response.json().get("active", False) async def main(): url = "http://localhost:3001/" client_id = "test" client_secret = "xSj1YwiwGp6WA8I5aFfzB3QjIkQYhdJo" client = TokenClient(client_id, client_secret) token_url = "http://localhost:8080/realms/master/protocol/openid-connect/token" introspection_url = "http://localhost:8080/realms/master/protocol/openid-connect/token/introspect" token = client.get_token(token_url) is_activate = client.check_token_active(introspection_url, token) print(is_activate) transport = StreamableHttpTransport( url=url, auth=BearerAuth(token) ) async with Client(transport) as client: resp = await client.list_tools() print("Tools:", [tool.name for tool in resp.tools]) if __name__ == "__main__": asyncio.run(main()) ``` ## 2.3 调整中的问题 ### 2.3.1 404 确保请求连接的地址正确,且sso正常启动,且配置无误,否则会出现404的问题,提示Not found  ### 2.3.2 401 如果时出现401的问题,可能会相对棘手一些,这个问题是因为授权检测失败,但是造成这个问题的原因可能有很多。 常见问题如下: 1. 授权反方式设置有误,用户密码方式或者是用户id+secret的方式,如果其中有方式未设置支持,有可能会出现该报错; 2. 设置的expire time过小导致token过期; 3. 如果是docker容器启动,容器如果本身未设置和本机时间一致,也可能导致该问题的出现; 4. token 用于 introspect 的 client 权限不够; 5. Token 未被正确传递或解码; 6. token 格式错误或被篡改; 7. 等等  如果是前四种,基本可以认为是主观问题,并非客观故意攻击,这种情况排查方法和四路如下: 1. 可以首先看看是否能够正常访问token服务器,如keycloak,可以尝试telnet ip port;  2. 能否正常访问后再确认是否能够获得token,并检查token的expires设置,token 类型,scope范围,policy等是否符合预期。  3. 如我确认无误,再确认token是否active,如果非active看看原因,检查服务器时间;  4. 如果以上确认都没有问题,可以用 https://www.jwt.io/把token解析出来,和服务器配置进行比较http://10.163.176.59:8080/realms/master/.well-known/openid-configuration 结果对比一下kid是否一致 ## JWT Payload 字段释义  # 3 baidumap MCP server https://github.com/baidu-maps/mcp/blob/main/src/baidu-map/python/src/mcp_server_baidu_maps/map.py # 4 官方example https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/clients/simple-chatbot/mcp_simple_chatbot/main.py#L196 # 5 connect amap mcp server - ollama ``` python import json import asyncio import re import sys from typing import Optional from contextlib import AsyncExitStack from mcp import ClientSession, StdioServerParameters from mcp.client.sse import sse_client import httpx # 用于异步 HTTP 请求 def format_tools_for_llm(tool) -> str: args_desc = [] if "properties" in tool.inputSchema: for param_name, param_info in tool.inputSchema["properties"].items(): arg_desc = ( f"- {param_name}: {param_info.get('description', 'No description')}" ) if param_name in tool.inputSchema.get("required", []): arg_desc += " (required)" args_desc.append(arg_desc) return f"Tool: {tool.name}\nDescription: {tool.description}\nArguments:\n{chr(10).join(args_desc)}" class Client: def __init__(self): self._exit_stack: Optional[AsyncExitStack] = None self.session: Optional[ClientSession] = None self._lock = asyncio.Lock() # 防止并发连接/断开问题 self.is_connected = False self.ollama_url = "http://localhost:11434/api/chat" # Ollama API 地址 self.model = "llama3" # Ollama 模型名 self.messages = [] async def connect_server(self, server_config): async with self._lock: url = server_config["mcpServers"]["amap-amap-sse"]["url"] print(f"尝试连接到: {url}") self._exit_stack = AsyncExitStack() sse_cm = sse_client(url) streams = await self._exit_stack.enter_async_context(sse_cm) print("SSE 流已获取。") session_cm = ClientSession(streams[0], streams[1]) self.session = await self._exit_stack.enter_async_context(session_cm) print("ClientSession 已创建。") await self.session.initialize() print("Session 已初始化。") response = await self.session.list_tools() self.tools = {tool.name: tool for tool in response.tools} print(f"成功获取 {len(self.tools)} 个工具:") for name, tool in self.tools.items(): print(f" - {name}: {tool.description[:50]}...") print("连接成功并准备就绪。") response = await self.session.list_tools() tools = response.tools tools_description = "\n".join([format_tools_for_llm(tool) for tool in tools]) system_prompt = ( "You are a helpful assistant with access to these tools:\n\n" f"{tools_description}\n" "Choose the appropriate tool based on the user's question. " "If no tool is needed, reply directly.\n\n" "IMPORTANT: When you need to use a tool, you must ONLY respond with " "the exact JSON object format below, nothing else:\n" "{\n" ' "tool": "tool-name",\n' ' "arguments": {\n' ' "argument-name": "value"\n' " }\n" "}\n\n" '"```json" is not allowed' "After receiving a tool's response:\n" "1. Transform the raw data into a natural, conversational response\n" "2. Keep responses concise but informative\n" "3. Focus on the most relevant information\n" "4. Use appropriate context from the user's question\n" "5. Avoid simply repeating the raw data\n\n" "Please use only the tools that are explicitly defined above." ) self.messages.append({"role": "system", "content": system_prompt}) async def disconnect(self): async with self._lock: await self._exit_stack.aclose() async def chat(self, prompt, role="user"): self.messages.append({"role": role, "content": prompt}) # Ollama Chat API async with httpx.AsyncClient() as client: payload = { "model": self.model, "messages": self.messages } resp = await client.post(self.ollama_url, json=payload) resp.raise_for_status() data = resp.json() llm_response = data.get("message", {}).get("content", "") return llm_response async def execute_tool(self, llm_response: str): try: pattern = r"```json\n(.*?)\n?```" match = re.search(pattern, llm_response, re.DOTALL) if match: llm_response = match.group(1) tool_call = json.loads(llm_response) if "tool" in tool_call and "arguments" in tool_call: response = await self.session.list_tools() tools = response.tools if any(tool.name == tool_call["tool"] for tool in tools): try: print(f"[提示]:正在调用工具 {tool_call['tool']}") result = await self.session.call_tool( tool_call["tool"], tool_call["arguments"] ) if isinstance(result, dict) and "progress" in result: progress = result["progress"] total = result["total"] percentage = (progress / total) * 100 print(f"Progress: {progress}/{total} ({percentage:.1f}%)") return f"Tool execution result: {result}" except Exception as e: error_msg = f"Error executing tool: {str(e)}" print(error_msg) return error_msg return f"No server found with tool: {tool_call['tool']}" return llm_response except json.JSONDecodeError: return llm_response async def chat_loop(self): print("MCP 客户端启动") print("输入 /bye 退出") while True: prompt = input(">>> ").strip() if "/bye" in prompt.lower(): break response = await self.chat(prompt) self.messages.append({"role": "assistant", "content": response}) result = await self.execute_tool(response) while result != response: response = await self.chat(result, "system") self.messages.append( {"role": "assistant", "content": response} ) result = await self.execute_tool(response) print(response) def load_server_config(config_file): with open(config_file) as f: return json.load(f) async def main(): try: server_config = load_server_config("servers_config.json") client = Client() await client.connect_server(server_config) await client.chat_loop() except Exception as e: print(f"主程序发生错误: {type(e).__name__}: {e}") finally: print("\n正在关闭客户端...") await client.disconnect() print("客户端已关闭。") if __name__ == '__main__': asyncio.run(main()) ``` - Deepseek ``` python import json import asyncio import re import sys from typing import Optional from contextlib import AsyncExitStack from mcp import ClientSession, StdioServerParameters from mcp.client.sse import sse_client from dotenv import load_dotenv from openai import AsyncOpenAI, OpenAI def format_tools_for_llm(tool) -> str: """对tool进行格式化 Returns: 格式化之后的tool描述 """ args_desc = [] if "properties" in tool.inputSchema: for param_name, param_info in tool.inputSchema["properties"].items(): arg_desc = ( f"- {param_name}: {param_info.get('description', 'No description')}" ) if param_name in tool.inputSchema.get("required", []): arg_desc += " (required)" args_desc.append(arg_desc) return f"Tool: {tool.name}\nDescription: {tool.description}\nArguments:\n{chr(10).join(args_desc)}" class Client: def __init__(self): self._exit_stack: Optional[AsyncExitStack] = None self.session: Optional[ClientSession] = None self._lock = asyncio.Lock() # 防止并发连接/断开问题 self.is_connected = False self.client = AsyncOpenAI( base_url="https://api.deepseek.com", api_key="<你的API key>", ) self.model = "deepseek-chat" self.messages = [] async def connect_server(self, server_config): async with self._lock: # 防止并发调用 connect url = server_config["mcpServers"]["amap-amap-sse"]["url"] print(f"尝试连接到: {url}") self._exit_stack = AsyncExitStack() # 1. 进入 SSE 上下文,但不退出 sse_cm = sse_client(url) # 手动调用 __aenter__ 获取流,并存储上下文管理器以便后续退出 streams = await self._exit_stack.enter_async_context(sse_cm) print("SSE 流已获取。") # 2. 进入 Session 上下文,但不退出 session_cm = ClientSession(streams[0], streams[1]) # 手动调用 __aenter__ 获取 session self.session = await self._exit_stack.enter_async_context(session_cm) print("ClientSession 已创建。") # 3. 初始化 Session await self.session.initialize() print("Session 已初始化。") # 4. 获取并存储工具列表 response = await self.session.list_tools() self.tools = {tool.name: tool for tool in response.tools} print(f"成功获取 {len(self.tools)} 个工具:") for name, tool in self.tools.items(): print(f" - {name}: {tool.description[:50]}...") # 打印部分描述 print("连接成功并准备就绪。") # 列出可用工具 response = await self.session.list_tools() tools = response.tools tools_description = "\n".join([format_tools_for_llm(tool) for tool in tools]) # 修改系统提示 system_prompt = ( "You are a helpful assistant with access to these tools:\n\n" f"{tools_description}\n" "Choose the appropriate tool based on the user's question. " "If no tool is needed, reply directly.\n\n" "IMPORTANT: When you need to use a tool, you must ONLY respond with " "the exact JSON object format below, nothing else:\n" "{\n" ' "tool": "tool-name",\n' ' "arguments": {\n' ' "argument-name": "value"\n' " }\n" "}\n\n" '"```json" is not allowed' "After receiving a tool's response:\n" "1. Transform the raw data into a natural, conversational response\n" "2. Keep responses concise but informative\n" "3. Focus on the most relevant information\n" "4. Use appropriate context from the user's question\n" "5. Avoid simply repeating the raw data\n\n" "Please use only the tools that are explicitly defined above." ) self.messages.append({"role": "system", "content": system_prompt}) async def disconnect(self): """关闭 Session 和连接。""" async with self._lock: await self._exit_stack.aclose() async def chat(self, prompt, role="user"): """与LLM进行交互""" self.messages.append({"role": role, "content": prompt}) # 初始化 LLM API 调用 response = await self.client.chat.completions.create( model=self.model, messages=self.messages, ) llm_response = response.choices[0].message.content return llm_response async def execute_tool(self, llm_response: str): """Process the LLM response and execute tools if needed. Args: llm_response: The response from the LLM. Returns: The result of tool execution or the original response. """ import json try: pattern = r"```json\n(.*?)\n?```" match = re.search(pattern, llm_response, re.DOTALL) if match: llm_response = match.group(1) tool_call = json.loads(llm_response) if "tool" in tool_call and "arguments" in tool_call: # result = await self.session.call_tool(tool_name, tool_args) response = await self.session.list_tools() tools = response.tools if any(tool.name == tool_call["tool"] for tool in tools): try: print(f"[提示]:正在调用工具 {tool_call['tool']}") result = await self.session.call_tool( tool_call["tool"], tool_call["arguments"] ) if isinstance(result, dict) and "progress" in result: progress = result["progress"] total = result["total"] percentage = (progress / total) * 100 print(f"Progress: {progress}/{total} ({percentage:.1f}%)") # print(f"[执行结果]: {result}") return f"Tool execution result: {result}" except Exception as e: error_msg = f"Error executing tool: {str(e)}" print(error_msg) return error_msg return f"No server found with tool: {tool_call['tool']}" return llm_response except json.JSONDecodeError: return llm_response async def chat_loop(self): """运行交互式聊天循环""" print("MCP 客户端启动") print("输入 /bye 退出") while True: prompt = input(">>> ").strip() if "/bye" in prompt.lower(): break response = await self.chat(prompt) self.messages.append({"role": "assistant", "content": response}) result = await self.execute_tool(response) while result != response: response = await self.chat(result, "system") self.messages.append( {"role": "assistant", "content": response} ) result = await self.execute_tool(response) print(response) def load_server_config(config_file): with open(config_file) as f: return json.load(f) async def main(): try: server_config = load_server_config("servers_config.json") client = Client() await client.connect_server(server_config) await client.chat_loop() except Exception as e: print(f"主程序发生错误: {type(e).__name__}: {e}") finally: # 无论如何,最后都要尝试断开连接并清理资源 print("\n正在关闭客户端...") await client.disconnect() print("客户端已关闭。") if __name__ == '__main__': # 我要去济南奥体中心出差,请你查询附近5km的酒店,为我安排行程 北京未来三天的天气怎么样 asyncio.run(main()) ``` - Sglang ``` python import json import asyncio import re import sys from typing import Optional from contextlib import AsyncExitStack from mcp import ClientSession, StdioServerParameters from mcp.client.sse import sse_client from dotenv import load_dotenv from openai import AsyncOpenAI def format_tools_for_llm(tool) -> str: args_desc = [] if "properties" in tool.inputSchema: for param_name, param_info in tool.inputSchema["properties"].items(): arg_desc = ( f"- {param_name}: {param_info.get('description', 'No description')}" ) if param_name in tool.inputSchema.get("required", []): arg_desc += " (required)" args_desc.append(arg_desc) return f"Tool: {tool.name}\nDescription: {tool.description}\nArguments:\n{chr(10).join(args_desc)}" class Client: def __init__(self): self._exit_stack: Optional[AsyncExitStack] = None self.session: Optional[ClientSession] = None self._lock = asyncio.Lock() self.is_connected = False # 这里改为你的SGLang服务地址和API KEY self.client = AsyncOpenAI( base_url="http://192.168.1.102:30000/v1", # SGLang默认地址,端口和路径请按实际情况修改 api_key="None", # SGLang可以不需要API KEY,或者按你的配置填写 ) self.model = "qwen:7b-chat" # SGLang模型名称,按实际模型名填写 self.messages = [] async def connect_server(self, server_config): async with self._lock: url = server_config["mcpServers"]["amap-amap-sse"]["url"] print(f"尝试连接到: {url}") self._exit_stack = AsyncExitStack() sse_cm = sse_client(url) streams = await self._exit_stack.enter_async_context(sse_cm) print("SSE 流已获取。") session_cm = ClientSession(streams[0], streams[1]) self.session = await self._exit_stack.enter_async_context(session_cm) print("ClientSession 已创建。") await self.session.initialize() print("Session 已初始化。") response = await self.session.list_tools() self.tools = {tool.name: tool for tool in response.tools} print(f"成功获取 {len(self.tools)} 个工具:") for name, tool in self.tools.items(): print(f" - {name}: {tool.description[:50]}...") print("连接成功并准备就绪。") response = await self.session.list_tools() tools = response.tools tools_description = "\n".join([format_tools_for_llm(tool) for tool in tools]) system_prompt = ( "You are a helpful assistant with access to these tools:\n\n" f"{tools_description}\n" "Choose the appropriate tool based on the user's question. " "If no tool is needed, reply directly.\n\n" "IMPORTANT: When you need to use a tool, you must ONLY respond with " "the exact JSON object format below, nothing else:\n" "{\n" ' "tool": "tool-name",\n' ' "arguments": {\n' ' "argument-name": "value"\n' " }\n" "}\n\n" '"```json" is not allowed' "After receiving a tool's response:\n" "1. Transform the raw data into a natural, conversational response\n" "2. Keep responses concise but informative\n" "3. Focus on the most relevant information\n" "4. Use appropriate context from the user's question\n" "5. Avoid simply repeating the raw data\n\n" "Please use only the tools that are explicitly defined above." ) self.messages.append({"role": "system", "content": system_prompt}) async def disconnect(self): async with self._lock: await self._exit_stack.aclose() async def chat(self, prompt, role="user"): self.messages.append({"role": role, "content": prompt}) # SGLang的OpenAI API调用方式与官方一致 response = await self.client.chat.completions.create( model=self.model, messages=self.messages, ) llm_response = response.choices[0].message.content return llm_response async def execute_tool(self, llm_response: str): try: pattern = r"{[\s\S]*}" # 更宽松地匹配JSON match = re.search(pattern, llm_response) if match: tool_call = json.loads(match.group()) if "tool" in tool_call and "arguments" in tool_call: print(f"[提示]:正在调用工具 {tool_call['tool']}") result = await self.session.call_tool( tool_call["tool"], tool_call["arguments"] ) # 这里你可以自定义格式化输出 return f"工具【{tool_call['tool']}】调用结果:{result}" return llm_response # 不是工具调用,则直接返回原文 except Exception as e: return f"工具调用异常: {str(e)}" async def chat_prompt(self, prompt): # 1. 用户提问 response = await self.chat(prompt) self.messages.append({"role": "assistant", "content": response}) # 2. 判断是否工具调用 tool_result = await self.execute_tool(response) if tool_result != response: # 3. 工具已调用,拿到结果,组织“整理措辞”请求 # 你可以用如下提示 organize_prompt = ( f"用户问题:{prompt}\n" f"工具返回结果(请用自然语言简明整理):{tool_result}" ) # 4. 让模型整理 final_response = await self.chat(organize_prompt, role="system") self.messages.append({"role": "assistant", "content": final_response}) print(final_response) else: # 没有工具调用,直接输出模型回复 print(response) def load_server_config(config_file): with open(config_file) as f: return json.load(f) async def main(): try: server_config = load_server_config("servers_config.json") client = Client() await client.connect_server(server_config) # await client.chat_prompt("北京今天的天气怎么样?") await client.chat_prompt("陕西省西安市周围200km有哪些重要景点?") except Exception as e: print(f"主程序发生错误: {type(e).__name__}: {e}") finally: print("\n正在关闭客户端...") await client.disconnect() print("客户端已关闭。") if __name__ == '__main__': asyncio.run(main()) ``` - servers_config.json ``` python { "mcpServers": { "amap-amap-sse": { "url": "https://mcp.amap.com/sse?key=[api_key]" } } } ``` # Annexe - https://zhuanlan.zhihu.com/p/1907460765629251827 - https://www.ruanx.net/mcp-protocol/ - https://modelcontextprotocol.io/docs/getting-started/intro - https://modelcontextprotocol.io/docs/tools/inspector
dingfeng
2026年1月4日 14:20
22
0 条评论
转发文档
收藏文档
上一篇
下一篇
评论
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档
PDF文档(打印)
分享
链接
类型
密码
更新密码