本文翻译整理自 Hierarchical Agent Teams
https://langchain-ai.github.io/langgraph/tutorials/multi_agent/hierarchical_agent_teams/
文章目录
- 一、前言
- 二、设置
- 三、创建工具
- 四、Helper Utilities
- 五、定义代理 Team
- 研究 Team
- 文档写作Team
- 六、添加图层
一、前言
在前面的示例(Agent Supervisor)中,我们引入了单个主管节点的概念,用于在不同的工作节点之间路由工作。
但是如果一个工人的工作变得太复杂怎么办?如果工人数量太多怎么办?
对于某些应用程序,如果工作按层次分布,系统可能会更有效。
您可以通过组合不同的子图并创建顶级主管以及中级主管来做到这一点。
为此,让我们构建一个简单的研究助手!该图如下所示:
AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation
本笔记本的灵感来自Wu等人的论文AutoGen:通过多代理对话启用下一代LLM应用程序。在本笔记本的其余部分,您将:
- Define the agents’ tools to access the web and write files
- 定义代理访问Web和写入文件的工具
- Define some utilities to help create the graph and agents
- 定义一些实用程序来帮助创建图和代理
- Create and define each team (web research + doc writing)
- 创建和定义每个Team (网络研究+文档编写)
- Compose everything together.
- 把一切组合在一起。
二、设置
首先,让我们安装所需的包并设置API密钥
%%capture --no-stderr
%pip install -U langgraph langchain langchain_openai langchain_experimental
import getpass
import os
def _set_if_undefined(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"Please provide your {var}")
_set_if_undefined("OPENAI_API_KEY")
_set_if_undefined("TAVILY_API_KEY")
设置LangSmith用于LangGraph开发
注册LangSmith以快速发现问题并提高LangGraph项目的性能。LangSmith允许您使用跟踪数据来调试、测试和监控使用LangGraph构建的LLM应用程序 - 在此处阅读有关如何开始的更多信息。
三、创建工具
每个Team 将由一个或多个代理组成,每个代理都有一个或多个工具。下面,定义您的不同Team 要使用的所有工具。
我们将从研究Team 开始。
ResearchTeam 工具
研究Team 工具
研究Team 可以使用搜索引擎和url刮刀在网络上查找信息。请随时在下面添加其他功能以提高Team 绩效!
from typing import Annotated, List
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool
tavily_tool = TavilySearchResults(max_results=5)
@tool
def scrape_webpages(urls: List[str]) -> str:
"""Use requests and bs4 to scrape the provided web pages for detailed information."""
loader = WebBaseLoader(urls)
docs = loader.load()
return "\n\n".join(
[
f'<Document name="{doc.metadata.get("title", "")}">\n{doc.page_content}\n</Document>'
for doc in docs
]
)
API Reference: WebBaseLoaderTavilySearchResultstool
文档编写Team 工具
接下来,我们将提供一些工具供文档编写Team 使用。我们在下面定义了一些基本的文件访问工具。
请注意,这使代理可以访问您的文件系统,这可能是不安全的。我们也没有优化工具描述以提高性能。
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict, Optional
from langchain_experimental.utilities import PythonREPL
from typing_extensions import TypedDict
_TEMP_DIRECTORY = TemporaryDirectory()
WORKING_DIRECTORY = Path(_TEMP_DIRECTORY.name)
@tool
def create_outline(
points: Annotated[List[str], "List of main points or sections."],
file_name: Annotated[str, "File path to save the outline."],
) -> Annotated[str, "Path of the saved outline file."]:
"""Create and save an outline."""
with (WORKING_DIRECTORY / file_name).open("w") as file:
for i, point in enumerate(points):
file.write(f"{i + 1}. {point}\n")
return f"Outline saved to {file_name}"
@tool
def read_document(
file_name: Annotated[str, "File path to save the document."],
start: Annotated[Optional[int], "The start line. Default is 0"] = None,
end: Annotated[Optional[int], "The end line. Default is None"] = None,
) -> str:
"""Read the specified document."""
with (WORKING_DIRECTORY / file_name).open("r") as file:
lines = file.readlines()
if start is not None:
start = 0
return "\n".join(lines[start:end])
@tool
def write_document(
content: Annotated[str, "Text content to be written into the document."],
file_name: Annotated[str, "File path to save the document."],
) -> Annotated[str, "Path of the saved document file."]:
"""Create and save a text document."""
with (WORKING_DIRECTORY / file_name).open("w") as file:
file.write(content)
return f"Document saved to {file_name}"
@tool
def edit_document(
file_name: Annotated[str, "Path of the document to be edited."],
inserts: Annotated[
Dict[int, str],
"Dictionary where key is the line number (1-indexed) and value is the text to be inserted at that line.",
],
) -> Annotated[str, "Path of the edited document file."]:
"""Edit a document by inserting text at specific line numbers."""
with (WORKING_DIRECTORY / file_name).open("r") as file:
lines = file.readlines()
sorted_inserts = sorted(inserts.items())
for line_number, text in sorted_inserts:
if 1 <= line_number <= len(lines) + 1:
lines.insert(line_number - 1, text + "\n")
else:
return f"Error: Line number {line_number} is out of range."
with (WORKING_DIRECTORY / file_name).open("w") as file:
file.writelines(lines)
return f"Document edited and saved to {file_name}"
# Warning: This executes code locally, which can be unsafe when not sandboxed
repl = PythonREPL()
@tool
def python_repl(
code: Annotated[str, "The python code to execute to generate your chart."],
):
"""Use this to execute python code. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user."""
try:
result = repl.run(code)
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
return f"Successfully executed:\n\`\`\`python\n{code}\n\`\`\`\nStdout: {result}"
API Reference: PythonREPL
四、Helper Utilities
当我们想要时,我们将创建一些实用函数以使其更加简洁:
- 创建一个工作代理。
- 为子图创建主管。
这些将为我们简化最后的图形组合代码,以便更容易看到发生了什么。
from typing import List, Optional
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph, START
from langchain_core.messages import HumanMessage, trim_messages
llm = ChatOpenAI(model="gpt-4o-mini")
trimmer = trim_messages(
max_tokens=100000,
strategy="last",
token_counter=llm,
include_system=True,
)
def agent_node(state, agent, name):
result = agent.invoke(state)
return {
"messages": [HumanMessage(content=result["messages"][-1].content, name=name)]
}
def create_team_supervisor(llm: ChatOpenAI, system_prompt, members) -> str:
"""An LLM-based router."""
options = ["FINISH"] + members
function_def = {
"name": "route",
"description": "Select the next role.",
"parameters": {
"title": "routeSchema",
"type": "object",
"properties": {
"next": {
"title": "Next",
"anyOf": [
{"enum": options},
],
},
},
"required": ["next"],
},
}
prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
(
"system",
"Given the conversation above, who should act next?"
" Or should we FINISH? Select one of: {options}",
),
]
).partial(options=str(options), team_members=", ".join(members))
return (
prompt
| trimmer
| llm.bind_functions(functions=[function_def], function_call="route")
| JsonOutputFunctionsParser()
)
API Reference: JsonOutputFunctionsParserChatPromptTemplateMessagesPlaceholderChatOpenAIHumanMessagetrim_messagesENDStateGraphSTART
五、定义代理 Team
现在我们可以定义我们的等级Team 了。“选择你的球员!”
研究 Team
研究Team 将有一个搜索代理和一个网页抓取“research_agent”作为两个工作节点。让我们创建这些,以及Team 主管。
import functools
import operator
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai.chat_models import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# ResearchTeam graph state
class ResearchTeamState(TypedDict):
# A message is added after each team member finishes
messages: Annotated[List[BaseMessage], operator.add]
# The team members are tracked so they are aware of
# the others' skill-sets
team_members: List[str]
# Used to route work. The supervisor calls a function
# that will update this every time it makes a decision
next: str
llm = ChatOpenAI(model="gpt-4o")
search_agent = create_react_agent(llm, tools=[tavily_tool])
search_node = functools.partial(agent_node, agent=search_agent, name="Search")
research_agent = create_react_agent(llm, tools=[scrape_webpages])
research_node = functools.partial(agent_node, agent=research_agent, name="WebScraper")
supervisor_agent = create_team_supervisor(
llm,
"You are a supervisor tasked with managing a conversation between the"
" following workers: Search, WebScraper. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH.",
["Search", "WebScraper"],
)
API Reference: BaseMessageHumanMessageChatOpenAIcreate_react_agent
现在我们已经创建了必要的组件,定义它们的交互很容易。将节点添加到团队图中,并定义确定转换标准的边。
research_graph = StateGraph(ResearchTeamState)
research_graph.add_node("Search", search_node)
research_graph.add_node("WebScraper", research_node)
research_graph.add_node("supervisor", supervisor_agent)
# Define the control flow
research_graph.add_edge("Search", "supervisor")
research_graph.add_edge("WebScraper", "supervisor")
research_graph.add_conditional_edges(
"supervisor",
lambda x: x["next"],
{"Search": "Search", "WebScraper": "WebScraper", "FINISH": END},
)
research_graph.add_edge(START, "supervisor")
chain = research_graph.compile()
# The following functions interoperate between the top level graph state
# and the state of the research sub-graph
# this makes it so that the states of each graph don't get intermixed
def enter_chain(message: str):
results = {
"messages": [HumanMessage(content=message)],
}
return results
research_chain = enter_chain | chain
from IPython.display import Image, display
display(Image(chain.get_graph(xray=True).draw_mermaid_png()))
我们可以直接给这个团队工作。在下面试试。
for s in research_chain.stream(
"when is Taylor Swift's next tour?", {"recursion_limit": 100}
):
if "__end__" not in s:
print(s)
print("---")
文档写作Team
使用类似的方法在下面创建文档编写团队。这一次,我们将授予每个代理访问不同文件编写工具的权限。
请注意,我们在这里向我们的代理提供文件系统访问权限,这在所有情况下都不安全。
import operator
from pathlib import Path
# Document writing team graph state
class DocWritingState(TypedDict):
# This tracks the team's conversation internally
messages: Annotated[List[BaseMessage], operator.add]
# This provides each worker with context on the others' skill sets
team_members: str
# This is how the supervisor tells langgraph who to work next
next: str
# This tracks the shared directory state
current_files: str
# This will be run before each worker agent begins work
# It makes it so they are more aware of the current state
# of the working directory.
def prelude(state):
written_files = []
if not WORKING_DIRECTORY.exists():
WORKING_DIRECTORY.mkdir()
try:
written_files = [
f.relative_to(WORKING_DIRECTORY) for f in WORKING_DIRECTORY.rglob("*")
]
except Exception:
pass
if not written_files:
return {**state, "current_files": "No files written."}
return {
**state,
"current_files": "\nBelow are files your team has written to the directory:\n"
+ "\n".join([f" - {f}" for f in written_files]),
}
llm = ChatOpenAI(model="gpt-4o")
doc_writer_agent = create_react_agent(
llm, tools=[write_document, edit_document, read_document]
)
# Injects current directory working state before each call
context_aware_doc_writer_agent = prelude | doc_writer_agent
doc_writing_node = functools.partial(
agent_node, agent=context_aware_doc_writer_agent, name="DocWriter"
)
note_taking_agent = create_react_agent(llm, tools=[create_outline, read_document])
context_aware_note_taking_agent = prelude | note_taking_agent
note_taking_node = functools.partial(
agent_node, agent=context_aware_note_taking_agent, name="NoteTaker"
)
chart_generating_agent = create_react_agent(llm, tools=[read_document, python_repl])
context_aware_chart_generating_agent = prelude | chart_generating_agent
chart_generating_node = functools.partial(
agent_node, agent=context_aware_note_taking_agent, name="ChartGenerator"
)
doc_writing_supervisor = create_team_supervisor(
llm,
"You are a supervisor tasked with managing a conversation between the"
" following workers: {team_members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH.",
["DocWriter", "NoteTaker", "ChartGenerator"],
)
通过创建对象本身,我们可以形成图形。
# Create the graph here:
# Note that we have unrolled the loop for the sake of this doc
authoring_graph = StateGraph(DocWritingState)
authoring_graph.add_node("DocWriter", doc_writing_node)
authoring_graph.add_node("NoteTaker", note_taking_node)
authoring_graph.add_node("ChartGenerator", chart_generating_node)
authoring_graph.add_node("supervisor", doc_writing_supervisor)
# Add the edges that always occur
authoring_graph.add_edge("DocWriter", "supervisor")
authoring_graph.add_edge("NoteTaker", "supervisor")
authoring_graph.add_edge("ChartGenerator", "supervisor")
# Add the edges where routing applies
authoring_graph.add_conditional_edges(
"supervisor",
lambda x: x["next"],
{
"DocWriter": "DocWriter",
"NoteTaker": "NoteTaker",
"ChartGenerator": "ChartGenerator",
"FINISH": END,
},
)
authoring_graph.add_edge(START, "supervisor")
chain = authoring_graph.compile()
# The following functions interoperate between the top level graph state
# and the state of the research sub-graph
# this makes it so that the states of each graph don't get intermixed
def enter_chain(message: str, members: List[str]):
results = {
"messages": [HumanMessage(content=message)],
"team_members": ", ".join(members),
}
return results
# We reuse the enter/exit functions to wrap the graph
authoring_chain = (
functools.partial(enter_chain, members=authoring_graph.nodes)
| authoring_graph.compile()
)
from IPython.display import Image, display
display(Image(chain.get_graph().draw_mermaid_png()))
for s in authoring_chain.stream(
"Write an outline for poem and then write the poem to disk.",
{"recursion_limit": 100},
):
if "__end__" not in s:
print(s)
print("---")
{'supervisor': {'next': 'NoteTaker'}}
---
{'NoteTaker': {'messages': [HumanMessage(content='The poem has been written and saved to "poem.txt".', name='NoteTaker')]}}
---
{'supervisor': {'next': 'FINISH'}}
---
六、添加图层
在这个设计中,我们执行了一个自上而下的规划策略。我们已经创建了两个图表,但是我们必须决定如何在两者之间分配工作。
我们将创建第三个图来协调前两个图,并添加一些连接器来定义如何在不同图之间共享这个顶级状态。
from langchain_core.messages import BaseMessage
from langchain_openai.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
supervisor_node = create_team_supervisor(
llm,
"You are a supervisor tasked with managing a conversation between the"
" following teams: {team_members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH.",
["ResearchTeam", "PaperWritingTeam"],
)
API Reference: BaseMessageChatOpenAI
# Top-level graph state
class State(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
next: str
def get_last_message(state: State) -> str:
return state["messages"][-1].content
def join_graph(response: dict):
return {"messages": [response["messages"][-1]]}
# Define the graph.
super_graph = StateGraph(State)
# First add the nodes, which will do the work
super_graph.add_node("ResearchTeam", get_last_message | research_chain | join_graph)
super_graph.add_node(
"PaperWritingTeam", get_last_message | authoring_chain | join_graph
)
super_graph.add_node("supervisor", supervisor_node)
# Define the graph connections, which controls how the logic
# propagates through the program
super_graph.add_edge("ResearchTeam", "supervisor")
super_graph.add_edge("PaperWritingTeam", "supervisor")
super_graph.add_conditional_edges(
"supervisor",
lambda x: x["next"],
{
"PaperWritingTeam": "PaperWritingTeam",
"ResearchTeam": "ResearchTeam",
"FINISH": END,
},
)
super_graph.add_edge(START, "supervisor")
super_graph = super_graph.compile()
from IPython.display import Image, display
display(Image(super_graph.get_graph().draw_mermaid_png()))
for s in super_graph.stream(
{
"messages": [
HumanMessage(
content="Write a brief research report on the North American sturgeon. Include a chart."
)
],
},
{"recursion_limit": 150},
):
if "__end__" not in s:
print(s)
print("---")
{'supervisor': {'next': 'ResearchTeam'}}
---
{'ResearchTeam': {'messages': [HumanMessage(content="Unfortunately, ... documents.", name='WebScraper')]}}
---
{'supervisor': {'next': 'PaperWritingTeam'}}
2024-10-18(五)