本文翻译整理自 Hierarchical Agent Teams
- 一、前言
- 二、设置
- 三、创建工具
- 四、Helper Utilities
- 五、定义代理 Team
- 研究 Team
- 文档写作Team
- 六、添加图层
在前面的示例(Agent Supervisor)中,我们引入了单个主管节点的概念,用于在不同的工作节点之间路由工作。
AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation
- Define the agents’ tools to access the web and write files
- 定义代理访问Web和写入文件的工具
- Define some utilities to help create the graph and agents
- 定义一些实用程序来帮助创建图和代理
- Create and define each team (web research + doc writing)
- 创建和定义每个Team (网络研究+文档编写)
- Compose everything together.
- 把一切组合在一起。
%%capture --no-stderr
%pip install -U langgraph langchain langchain_openai langchain_experimental
import getpass
import os
def _set_if_undefined(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"Please provide your {var}")
注册LangSmith以快速发现问题并提高LangGraph项目的性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序 - 在此处阅读有关如何开始的更多信息。
每个Team 将由一个或多个代理组成,每个代理都有一个或多个工具。下面,定义您的不同Team 要使用的所有工具。
我们将从研究Team 开始。
ResearchTeam 工具
研究Team 工具
研究Team 可以使用搜索引擎和url刮刀在网络上查找信息。请随时在下面添加其他功能以提高Team 绩效!
from typing import Annotated, List
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool
tavily_tool = TavilySearchResults(max_results=5)
def scrape_webpages(urls: List[str]) -> str:
"""Use requests and bs4 to scrape the provided web pages for detailed information."""
loader = WebBaseLoader(urls)
docs = loader.load()
return "\n\n".join(
f'<Document name="{doc.metadata.get("title", "")}">\n{doc.page_content}\n</Document>'
for doc in docs
API Reference: WebBaseLoaderTavilySearchResultstool
文档编写Team 工具
接下来,我们将提供一些工具供文档编写Team 使用。我们在下面定义了一些基本的文件访问工具。
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict, Optional
from langchain_experimental.utilities import PythonREPL
from typing_extensions import TypedDict
_TEMP_DIRECTORY = TemporaryDirectory()
def create_outline(
points: Annotated[List[str], "List of main points or sections."],
file_name: Annotated[str, "File path to save the outline."],
) -> Annotated[str, "Path of the saved outline file."]:
"""Create and save an outline."""
with (WORKING_DIRECTORY / file_name).open("w") as file:
for i, point in enumerate(points):
file.write(f"{i + 1}. {point}\n")
return f"Outline saved to {file_name}"
def read_document(
file_name: Annotated[str, "File path to save the document."],
start: Annotated[Optional[int], "The start line. Default is 0"] = None,
end: Annotated[Optional[int], "The end line. Default is None"] = None,
) -> str:
"""Read the specified document."""
with (WORKING_DIRECTORY / file_name).open("r") as file:
lines = file.readlines()
if start is not None:
start = 0
return "\n".join(lines[start:end])
def write_document(
content: Annotated[str, "Text content to be written into the document."],
file_name: Annotated[str, "File path to save the document."],
) -> Annotated[str, "Path of the saved document file."]:
"""Create and save a text document."""
with (WORKING_DIRECTORY / file_name).open("w") as file:
return f"Document saved to {file_name}"
def edit_document(
file_name: Annotated[str, "Path of the document to be edited."],
inserts: Annotated[
Dict[int, str],
"Dictionary where key is the line number (1-indexed) and value is the text to be inserted at that line.",
) -> Annotated[str, "Path of the edited document file."]:
"""Edit a document by inserting text at specific line numbers."""
with (WORKING_DIRECTORY / file_name).open("r") as file:
lines = file.readlines()
sorted_inserts = sorted(inserts.items())
for line_number, text in sorted_inserts:
if 1 <= line_number <= len(lines) + 1:
lines.insert(line_number - 1, text + "\n")
return f"Error: Line number {line_number} is out of range."
with (WORKING_DIRECTORY / file_name).open("w") as file:
return f"Document edited and saved to {file_name}"
# Warning: This executes code locally, which can be unsafe when not sandboxed
repl = PythonREPL()
def python_repl(
code: Annotated[str, "The python code to execute to generate your chart."],
"""Use this to execute python code. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user."""
result = repl.run(code)
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
return f"Successfully executed:\n\`\`\`python\n{code}\n\`\`\`\nStdout: {result}"
API Reference: PythonREPL
四、Helper Utilities
- 创建一个工作代理。
- 为子图创建主管。
from typing import List, Optional
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph, START
from langchain_core.messages import HumanMessage, trim_messages
llm = ChatOpenAI(model="gpt-4o-mini")
trimmer = trim_messages(
def agent_node(state, agent, name):
result = agent.invoke(state)
return {
"messages": [HumanMessage(content=result["messages"][-1].content, name=name)]
def create_team_supervisor(llm: ChatOpenAI, system_prompt, members) -> str:
"""An LLM-based router."""
options = ["FINISH"] + members
function_def = {
"name": "route",
"description": "Select the next role.",
"parameters": {
"title": "routeSchema",
"type": "object",
"properties": {
"next": {
"title": "Next",
"anyOf": [
{"enum": options},
"required": ["next"],
prompt = ChatPromptTemplate.from_messages(
("system", system_prompt),
"Given the conversation above, who should act next?"
" Or should we FINISH? Select one of: {options}",
).partial(options=str(options), team_members=", ".join(members))
return (
| trimmer
| llm.bind_functions(functions=[function_def], function_call="route")
| JsonOutputFunctionsParser()
API Reference: JsonOutputFunctionsParserChatPromptTemplateMessagesPlaceholderChatOpenAIHumanMessagetrim_messagesENDStateGraphSTART
五、定义代理 Team
现在我们可以定义我们的等级Team 了。“选择你的球员!”
研究 Team
研究Team 将有一个搜索代理和一个网页抓取“research_agent”作为两个工作节点。让我们创建这些,以及Team 主管。
import functools
import operator
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai.chat_models import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# ResearchTeam graph state
class ResearchTeamState(TypedDict):
# A message is added after each team member finishes
messages: Annotated[List[BaseMessage], operator.add]
# The team members are tracked so they are aware of
# the others' skill-sets
team_members: List[str]
# Used to route work. The supervisor calls a function
# that will update this every time it makes a decision
next: str
llm = ChatOpenAI(model="gpt-4o")
search_agent = create_react_agent(llm, tools=[tavily_tool])
search_node = functools.partial(agent_node, agent=search_agent, name="Search")
research_agent = create_react_agent(llm, tools=[scrape_webpages])
research_node = functools.partial(agent_node, agent=research_agent, name="WebScraper")
supervisor_agent = create_team_supervisor(
"You are a supervisor tasked with managing a conversation between the"
" following workers: Search, WebScraper. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH.",
["Search", "WebScraper"],
API Reference: BaseMessageHumanMessageChatOpenAIcreate_react_agent
research_graph = StateGraph(ResearchTeamState)
research_graph.add_node("Search", search_node)
research_graph.add_node("WebScraper", research_node)
research_graph.add_node("supervisor", supervisor_agent)
# Define the control flow
research_graph.add_edge("Search", "supervisor")
research_graph.add_edge("WebScraper", "supervisor")
lambda x: x["next"],
{"Search": "Search", "WebScraper": "WebScraper", "FINISH": END},
research_graph.add_edge(START, "supervisor")
chain = research_graph.compile()
# The following functions interoperate between the top level graph state
# and the state of the research sub-graph
# this makes it so that the states of each graph don't get intermixed
def enter_chain(message: str):
results = {
"messages": [HumanMessage(content=message)],
return results
research_chain = enter_chain | chain
from IPython.display import Image, display
for s in research_chain.stream(
"when is Taylor Swift's next tour?", {"recursion_limit": 100}
if "__end__" not in s:
import operator
from pathlib import Path
# Document writing team graph state
class DocWritingState(TypedDict):
# This tracks the team's conversation internally
messages: Annotated[List[BaseMessage], operator.add]
# This provides each worker with context on the others' skill sets
team_members: str
# This is how the supervisor tells langgraph who to work next
next: str
# This tracks the shared directory state
current_files: str
# This will be run before each worker agent begins work
# It makes it so they are more aware of the current state
# of the working directory.
def prelude(state):
written_files = []
if not WORKING_DIRECTORY.exists():
written_files = [
f.relative_to(WORKING_DIRECTORY) for f in WORKING_DIRECTORY.rglob("*")
except Exception:
if not written_files:
return {**state, "current_files": "No files written."}
return {
"current_files": "\nBelow are files your team has written to the directory:\n"
+ "\n".join([f" - {f}" for f in written_files]),
llm = ChatOpenAI(model="gpt-4o")
doc_writer_agent = create_react_agent(
llm, tools=[write_document, edit_document, read_document]
# Injects current directory working state before each call
context_aware_doc_writer_agent = prelude | doc_writer_agent
doc_writing_node = functools.partial(
agent_node, agent=context_aware_doc_writer_agent, name="DocWriter"
note_taking_agent = create_react_agent(llm, tools=[create_outline, read_document])
context_aware_note_taking_agent = prelude | note_taking_agent
note_taking_node = functools.partial(
agent_node, agent=context_aware_note_taking_agent, name="NoteTaker"
chart_generating_agent = create_react_agent(llm, tools=[read_document, python_repl])
context_aware_chart_generating_agent = prelude | chart_generating_agent
chart_generating_node = functools.partial(
agent_node, agent=context_aware_note_taking_agent, name="ChartGenerator"
doc_writing_supervisor = create_team_supervisor(
"You are a supervisor tasked with managing a conversation between the"
" following workers: {team_members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH.",
["DocWriter", "NoteTaker", "ChartGenerator"],
# Create the graph here:
# Note that we have unrolled the loop for the sake of this doc
authoring_graph = StateGraph(DocWritingState)
authoring_graph.add_node("DocWriter", doc_writing_node)
authoring_graph.add_node("NoteTaker", note_taking_node)
authoring_graph.add_node("ChartGenerator", chart_generating_node)
authoring_graph.add_node("supervisor", doc_writing_supervisor)
# Add the edges that always occur
authoring_graph.add_edge("DocWriter", "supervisor")
authoring_graph.add_edge("NoteTaker", "supervisor")
authoring_graph.add_edge("ChartGenerator", "supervisor")
# Add the edges where routing applies
lambda x: x["next"],
"DocWriter": "DocWriter",
"NoteTaker": "NoteTaker",
"ChartGenerator": "ChartGenerator",
authoring_graph.add_edge(START, "supervisor")
chain = authoring_graph.compile()
# The following functions interoperate between the top level graph state
# and the state of the research sub-graph
# this makes it so that the states of each graph don't get intermixed
def enter_chain(message: str, members: List[str]):
results = {
"messages": [HumanMessage(content=message)],
"team_members": ", ".join(members),
return results
# We reuse the enter/exit functions to wrap the graph
authoring_chain = (
functools.partial(enter_chain, members=authoring_graph.nodes)
| authoring_graph.compile()
from IPython.display import Image, display
for s in authoring_chain.stream(
"Write an outline for poem and then write the poem to disk.",
{"recursion_limit": 100},
if "__end__" not in s:
{'supervisor': {'next': 'NoteTaker'}}
{'NoteTaker': {'messages': [HumanMessage(content='The poem has been written and saved to "poem.txt".', name='NoteTaker')]}}
{'supervisor': {'next': 'FINISH'}}
from langchain_core.messages import BaseMessage
from langchain_openai.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
supervisor_node = create_team_supervisor(
"You are a supervisor tasked with managing a conversation between the"
" following teams: {team_members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH.",
["ResearchTeam", "PaperWritingTeam"],
API Reference: BaseMessageChatOpenAI
# Top-level graph state
class State(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
next: str
def get_last_message(state: State) -> str:
return state["messages"][-1].content
def join_graph(response: dict):
return {"messages": [response["messages"][-1]]}
# Define the graph.
super_graph = StateGraph(State)
# First add the nodes, which will do the work
super_graph.add_node("ResearchTeam", get_last_message | research_chain | join_graph)
"PaperWritingTeam", get_last_message | authoring_chain | join_graph
super_graph.add_node("supervisor", supervisor_node)
# Define the graph connections, which controls how the logic
# propagates through the program
super_graph.add_edge("ResearchTeam", "supervisor")
super_graph.add_edge("PaperWritingTeam", "supervisor")
lambda x: x["next"],
"PaperWritingTeam": "PaperWritingTeam",
"ResearchTeam": "ResearchTeam",
super_graph.add_edge(START, "supervisor")
super_graph = super_graph.compile()
from IPython.display import Image, display
for s in super_graph.stream(
"messages": [
content="Write a brief research report on the North American sturgeon. Include a chart."
{"recursion_limit": 150},
if "__end__" not in s:
{'supervisor': {'next': 'ResearchTeam'}}
{'ResearchTeam': {'messages': [HumanMessage(content="Unfortunately, ... documents.", name='WebScraper')]}}
{'supervisor': {'next': 'PaperWritingTeam'}}