LangGraph+MCP智能体开发
文档:https://langchain-ai.github.io/langgraph/
langgraph是什么?
用编程语言画出一个流程图,AI 沿着图中路径一步一步执行,支持循环、跳转、条件判断、记忆状态等。
pip install langgraph 安装pip install langgraph-checkpoint-postgres 使用postgre作为记忆pip install langchain-mcp-adapters mcp 安装
示例
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import AnyMessage
from langgraph.prebuilt.chat_agent_executor import AgentState
from langgraph.checkpoint.memory import InMemorySaver # 导入内存记忆
#from langgraph.checkpoint.postgres import PostgresSaver # 导入postgre记忆
#checkpointer = PostgresSaver.from_conn_string(os.getenv("DATABASE_URL"))
import os
# 设置模型调用地址,使用本地千问3
os.environ['OPENAI_API_BASE'] = 'http://192.168.1.163:11434/v1'
# 定义内存记忆
checkpointer = InMemorySaver()
# 定义模型
llm = ChatOpenAI(
model_name="qwen3:32b",
temperature=0,
max_tokens=2048
)
# 定义提示词
def prompt(state: AgentState, config: RunnableConfig) -> list[AnyMessage]:
user_name = config["configurable"].get("user_name")
system_msg = f"你是一位乐于助人的助手。称呼用户为 {user_name}."
return [{"role": "system", "content": system_msg}] + state["messages"]
# 定义工具
def get_weather(city: str) -> str:
"""获取指定城市的天气。"""
return f"{city}总是阳光明媚!"
# 定义状态结构
class CustomState(AgentState):
num: int
# 定义智能体
agent = create_react_agent(
model=llm,
tools=[get_weather],
# prompt="你是一个乐于助人的助手"
prompt=prompt,
checkpointer=checkpointer, # 添加内存记忆
state_schema=CustomState, # 状态,可变上下文
)
# 定义配置(静态上下文),自定义了一个user_name,用来在模板中读取,如果使用了记忆则需要添加thread_id区分对话
config = {"configurable": {"user_name": "李坤朋","thread_id": "1"}}
# 运行智能体
result = agent.invoke(
{"messages": [{"role": "user", "content": "旧金山的天气怎么样"}]},
config=config
)
# 提取最后一条AI消息的内容(最终回复)
ai_message = result["messages"][-1].content #这里也可以是0
# 打印模型最终回复
print(f"AI: {ai_message}")
# 第二次运行,会使用上一次的记忆
ny_response = agent.invoke({
"messages": [{"role": "user", "content": "那纽约呢?"}],
"num": 2 # 自定义可变状态
})
print(f"AI: {ny_response['messages'][-1].content}")
状态机,节点,边
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda
from pydantic import BaseModel
# 定义共享状态类型
class GraphState(BaseModel):
"""图状态对象"""
msg: str = ""
# 可以添加更多需要的字段
# 创建 Graph 对象(工作流,共享状态)
workflow = StateGraph(GraphState)
# 添加步骤(节点): 输入 -> 处理 -> 输出
def step1(state: GraphState):
"""第一个节点:设置初始消息"""
print("Step1 输入:", state)
# 返回状态更新(不是直接修改)
return {"msg": "你好"}
def step2(state: GraphState):
"""第二个节点:添加额外信息"""
print("Step2 输入:", state)
# 返回状态更新
return {"msg": state.msg + ",我可以帮助你查询天气"}
workflow.add_node("greeting", RunnableLambda(step1))
workflow.add_node("reply", RunnableLambda(step2))
# 设置边(连接关系)
workflow.set_entry_point("greeting")
workflow.add_edge("greeting", "reply")
workflow.add_edge("reply", END)
# 构建执行图
graph = workflow.compile()
# 运行图并获取最终状态
res = graph.invoke({})
print("最终状态:", res)
MCP服务端
pip install mcp
如果采用本地调用,服务端文件不用运行
langchin_mcp_ser_demo.py:
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("toolser") # MCP服务名:toolser
# 注意这里的函数一定要有"""说明,这将让模型能理解函数作用
# 使用装饰器将add函数注册为MCP工具
@mcp.tool()
def add(a: int, b: int) -> int:
"""将两个数字相加"""
print("执行了相加函数")
return a + b
@mcp.tool()
def get_weather(city: str) -> str:
"""获取指定城市的天气"""
print("执行了获取天气函数")
return f"{city}今天阳光明媚!"
# 这个文件由客户端在新进程中执行,必须存在mcp.run客户端才能收到信息
# 启动MCP服务器,使用本地方式,streamable-http为远程流式
mcp.run(transport="stdio")
langchin_mcp_client_demo.py:
import asyncio
import os
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
os.environ['OPENAI_API_BASE'] = 'http://192.168.1.163:11434/v1'
# 定义模型
llm = ChatOpenAI(
model_name="qwen3:32b",
temperature=0,
max_tokens=2048
)
# 定义异步主函数
async def main():
# 在异步函数内部使用 async with
async with MultiServerMCPClient(
{
"toolser": {
"command": "python", # 使用python命令执行文件,相当于用它去执行了ser中的py文件
# 替换为您的 math_server.py 文件的绝对路径
"args": ["langchin_mcp_ser_demo.py"],
"transport": "stdio",
}
}
) as client:
# 创建一个反应式agent
agent = create_react_agent(
llm,
client.get_tools()
)
math_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "计算 3 + 5等于多少?"}]}
)
print("数学问题响应:", math_response)
weather_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "纽约市的天气怎么样?"}]}
)
print("天气问题响应:", weather_response)
if __name__ == "__main__":
asyncio.run(main())