【AI Agent系列】【MetaGPT多智能体学习】5. 多智能体案例拆解 - 基于MetaGPT的智能体辩论(附完整代码)

本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。

本文为该课程的第四章(多智能体开发)的第三篇笔记。主要是对课程刚开始环境搭建成功后跑通的第一个MetaGPT多智能体案例 - 多智能体辩论,进行学习,并拆解其中的实现步骤和原理。

系列笔记

  • 【AI Agent系列】【MetaGPT多智能体学习】0. 环境准备 - 升级MetaGPT 0.7.2版本及遇到的坑
  • 【AI Agent系列】【MetaGPT多智能体学习】1. 再理解 AI Agent - 经典案例和热门框架综述
  • 【AI Agent系列】【MetaGPT多智能体学习】2. 重温单智能体开发 - 深入源码,理解单智能体运行框架
  • 【AI Agent系列】【MetaGPT多智能体学习】3. 开发一个简单的多智能体系统,兼看MetaGPT多智能体运行机制
  • 【AI Agent系列】【MetaGPT多智能体学习】4. 基于MetaGPT的Team组件开发你的第一个智能体团队

文章目录

  • 系列笔记
  • 0. 辩论Action的定义
  • 1. 辩论智能体的定义
    • 1.1 让两个智能体交替运行的重点
    • 1.2 ~~重写 _observe 函数~~
    • 1.3 重写 _act 函数
    • 1.4 最终Role的代码
  • 2. 实例化智能体
  • 3. 完整代码及运行结果
  • 4. 总结

  • 之前跑通环境的文章:【AI Agent系列】【MetaGPT多智能体学习】0. 环境准备 - 升级MetaGPT 0.7.2版本及遇到的坑

多智能体辩论需求。假设是两个智能体进行辩论。

0. 辩论Action的定义

辩论进行的动作都一样,所以Action可以共用一个。除了 Prompt 需要费点功夫想,其它的流程都和之前一样,平平无奇。

class SpeakAloud(Action):
    """Action: Speak out aloud in a debate (quarrel)"""

    PROMPT_TEMPLATE: str = """
    ## BACKGROUND
    Suppose you are {name}, you are in a debate with {opponent_name}.
    ## DEBATE HISTORY
    Previous rounds:
    {context}
    ## YOUR TURN
    Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments,
    craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue:
    """
    name: str = "SpeakAloud"

    async def run(self, context: str, name: str, opponent_name: str):
        prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name)
        # logger.info(prompt)

        rsp = await self._aask(prompt)

        return rsp

1. 辩论智能体的定义

智能体的职能也一样,所以可以共用一个Role的构造。

1.1 让两个智能体交替运行的重点

它的Action就是上面的辩论Action,通过 self.set_actions([SpeakAloud]) 设置。

它的动作时机通过 self._watch([UserRequirement, SpeakAloud]) 来设置,可以看到它观察环境中出现 UserRequirementSpeakAloud 来源的消息时,会有所动作。

那么问题来了,现在辩论的智能体Role和Action都只有一个,Role1说完之后放到环境中的信息来源是 SpeakAloud,Role2说完后放到环境中的也是 SpeakAloud,如果还是之前的代码,那当环境中出现 SpeakAloud时,Role1 和 Role2 都会触发动作了。这是不对的。

我们需要的是Role1等待Role2说完再说,Role2等待Role1说完后再说,交替着表达。

怎么实现这一点呢?回想我们之前学的单智能体的运行周期,_observe —> _react(_think + _act) —> publish_message,能阻止智能体不动作的是 _observe 函数。

1.2 重写 _observe 函数

先来看下原_observe函数的实现:

async def _observe(self, ignore_memory=False) -> int:
    """Prepare new messages for processing from the message buffer and other sources."""
    # Read unprocessed messages from the msg buffer.
    news = []
    if self.recovered:
        news = [self.latest_observed_msg] if self.latest_observed_msg else []
    if not news:
        news = self.rc.msg_buffer.pop_all()
    # Store the read messages in your own memory to prevent duplicate processing.
    old_messages = [] if ignore_memory else self.rc.memory.get()
    self.rc.memory.add_batch(news)
    # Filter out messages of interest.
    self.rc.news = [
        n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
    ]
    self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None  # record the latest observed msg

    # Design Rules:
    # If you need to further categorize Message objects, you can do so using the Message.set_meta function.
    # msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer.
    news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]
    if news_text:
        logger.debug(f"{self._setting} observed: {news_text}")
    return len(self.rc.news)

它的结果 len(self.rc.news) 只要大于0,这个智能体就会执行 _react,开始行动。self.rc.news怎么来的?重点看源码中的这句:

# Filter out messages of interest.
self.rc.news = [
    n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
]

self.rc.news 是从 news 中过滤出了该智能体所关心的消息 n.cause_by in self.rc.watch 和 指定发送给该智能体的消息 self.name in n.send_to。而 news 其实就是 msg_buffermsg_buffer 来自环境的 publish_message

再来看下环境 publish_message 的代码:

def publish_message(self, message: Message, peekable: bool = True) -> bool:
        """
        Distribute the message to the recipients.
        In accordance with the Message routing structure design in Chapter 2.2.1 of RFC 116, as already planned
        in RFC 113 for the entire system, the routing information in the Message is only responsible for
        specifying the message recipient, without concern for where the message recipient is located. How to
        route the message to the message recipient is a problem addressed by the transport framework designed
        in RFC 113.
        """
        logger.debug(f"publish_message: {message.dump()}")
        found = False
        # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
        for role, addrs in self.member_addrs.items():
            if is_send_to(message, addrs):
                role.put_message(message)
                found = True
        if not found:
            logger.warning(f"Message no recipients: {message.dump()}")
        self.history += f"\n{message}"  # For debug

        return True

看里面的 is_send_to 函数:

def is_send_to(message: "Message", addresses: set):
    """Return whether it's consumer"""
    if MESSAGE_ROUTE_TO_ALL in message.send_to:
        return True

    for i in addresses:
        if i in message.send_to:
            return True
    return False

如果指定了 message.send_to,则环境只会将该消息发送给指定的Role

光看上面的代码和我的文字可能不是很好理解,容易绕晕,我给你们画了个图:

在这里插入图片描述

如上图的流程,这对于我们的辩论智能体来说其实就够了。Role1执行完动作后,将结果消息指定发送给Role2(通过message的send_to参数),这样Role1的msg_buffer中就不会出现它自身的这个消息,下次run时msg_buffer就直接是空的,就不运行了,等待Role2的消息。同样的,Role2执行完后将消息指定发送给Role1。这样就实现了交互行动,辩论的过程。

教程中用的0.6.6版本,不知道源码是不是这样。但是它为了实现这个辩论过程,还重写了 _observe 函数:

async def _observe(self) -> int:
    await super()._observe()
    # accept messages sent (from opponent) to self, disregard own messages from the last round
    self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}] # 第二次筛选
    return len(self.rc.news)

可以参考下。但是我用的0.7.2版本,看源码和实践证明,不用重写_observe了。

1.3 重写 _act 函数

这里为什么要重写 _act 函数,最主要的原因是要填执行完结果消息中的 send_to 参数(源码中是没有填的)。还有就是组装辩论的上下文(对手说了什么 context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories))。

async def _act(self) -> Message:
    logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
    todo = self.rc.todo  # An instance of SpeakAloud

    memories = self.get_memories()
    context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
    print(context)

    rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)

    msg = Message(
        content=rsp,
        role=self.profile,
        cause_by=type(todo),
        sent_from=self.name,
        send_to=self.opponent_name,
    )
    self.rc.memory.add(msg)

    return msg

cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name,
这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。

给大家再看一下组装的上下文的示例,有个更直观的认识(Trump发言时,收到的上文是Biden及其之前发言的内容):

在这里插入图片描述

1.4 最终Role的代码

class Debator(Role):
    name: str = ""
    profile: str = ""
    opponent_name: str = ""

    def __init__(self, **data: Any):
        super().__init__(**data)
        self.set_actions([SpeakAloud])
        self._watch([UserRequirement, SpeakAloud])

	## 0.7.2 版本可以不用重写 _observe,具体原因分析见上文
    # async def _observe(self) -> int:
    #     await super()._observe()
    #     # accept messages sent (from opponent) to self, disregard own messages from the last round
    #     self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}]
    #     return len(self.rc.news)

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
        todo = self.rc.todo  # An instance of SpeakAloud

        memories = self.get_memories()
        context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
        print(context)

        rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)

        msg = Message(
            content=rsp,
            role=self.profile,
            cause_by=type(todo),
            sent_from=self.name,
            send_to=self.opponent_name,
        )
        self.rc.memory.add(msg)

        return msg

2. 实例化智能体

实例化两个智能体,Team组件的用法前面已经学习过,可参考我的上篇文章。

其中值得注意的是,team.run_project(idea, send_to="Biden") 通过 send_to 参数指定谁先发言。

async def debate(idea: str, investment: float = 3.0, n_round: int = 5):
    """Run a team of presidents and watch they quarrel. :)"""
    Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump")
    Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden")
    team = Team()
    team.hire([Biden, Trump])
    team.invest(investment)
    team.run_project(idea, send_to="Biden")  # send debate topic to Biden and let him speak first
    await team.run(n_round=n_round)

3. 完整代码及运行结果

完整代码:


import asyncio
import platform
from typing import Any

import fire

from metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.team import Team


class SpeakAloud(Action):
    """Action: Speak out aloud in a debate (quarrel)"""

    PROMPT_TEMPLATE: str = """
    ## BACKGROUND
    Suppose you are {name}, you are in a debate with {opponent_name}.
    ## DEBATE HISTORY
    Previous rounds:
    {context}
    ## YOUR TURN
    Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments,
    craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue:
    """
    name: str = "SpeakAloud"

    async def run(self, context: str, name: str, opponent_name: str):
        prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name)
        # logger.info(prompt)

        rsp = await self._aask(prompt)

        return rsp


class Debator(Role):
    name: str = ""
    profile: str = ""
    opponent_name: str = ""

    def __init__(self, **data: Any):
        super().__init__(**data)
        self.set_actions([SpeakAloud])
        self._watch([UserRequirement, SpeakAloud])

    # async def _observe(self) -> int:
    #     await super()._observe()
    #     # accept messages sent (from opponent) to self, disregard own messages from the last round
    #     self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}]
    #     return len(self.rc.news)

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
        todo = self.rc.todo  # An instance of SpeakAloud

        memories = self.get_memories()
        context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
        print(context)

        rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)

        msg = Message(
            content=rsp,
            role=self.profile,
            cause_by=type(todo),
            sent_from=self.name,
            send_to=self.opponent_name,
        )
        self.rc.memory.add(msg)

        return msg


async def debate(idea: str, investment: float = 3.0, n_round: int = 5):
    """Run a team of presidents and watch they quarrel. :)"""
    Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump")
    Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden")
    team = Team()
    team.hire([Biden, Trump])
    team.invest(investment)
    team.run_project(idea, send_to="Biden")  # send debate topic to Biden and let him speak first
    await team.run(n_round=n_round)


def main(idea: str, investment: float = 3.0, n_round: int = 10):
    """
    :param idea: Debate topic, such as "Topic: The U.S. should commit more in climate change fighting"
                 or "Trump: Climate change is a hoax"
    :param investment: contribute a certain dollar amount to watch the debate
    :param n_round: maximum rounds of the debate
    :return:
    """
    if platform.system() == "Windows":
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(debate(idea, investment, n_round))


if __name__ == "__main__":
    fire.Fire(main("Topic: The U.S. should commit more in climate change fighting"))

运行结果(交替发言):

在这里插入图片描述

4. 总结

这节内容最主要的是让我们了解和实践了多智能体间更多的消息交互和订阅机制,主要是 send_to 参数的使用。

cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name, 这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。


站内文章一览

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/421469.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AI Word Helper (Chorme Extentions) AI单词助手(谷歌浏览器插件)

AI Word Helper (Chorme Extentions) AI单词助手(谷歌浏览器插件) 英文网站,划词查单词,还是看不懂?因为单词意思那么多,词性搞不清,上下文搞不清,出来的意思就没法用,G…

【Spring整合H2内嵌数据库】

目录 H2介绍整合引入依赖配置相关参数编写sql文件启动spring boot项目,登录控制台 jpa实践entityservicedao 《饮酒》.(陶渊明) 结庐在人境,而无车马喧。 问君何能尔,心远地自偏。 采菊东篱下,悠然见南山…

【模型复现】自制数据集上复现目标检测域自适应 SSDA-YOLO

【模型复现】自制数据集上复现目标检测域自适应 SSDA-YOLO 1. 环境安装2. 数据集制作2.1 数据准备2.2 数据结构 3. 模型训练3.1 数据文件配置3.2 训练超参数配置3.3 模型训练 4. 模型验证4.1 验证超参数配置4.2 模型验证 5. 模型推理5.1 推理超参数配置5.2 模型推理 6. 踩坑记录…

计算机网络-典型网络组网架构

前面基本网络知识已经能够满足中小企业的需要了,今天来看下一些基本网络组网架构。 首先网络是分层架构,从接入层到汇聚层再到核心层,然后接入运营商出口。内部包括有线网络、无线网络,出口一般可以使用路由器或者防火墙进行安全防…

Docker:开启访问端口 || IDEA开发工具快速部署到服务器

参考文档:Docker开启远程安全访问-腾讯云开发者社区-腾讯云 一、Linux服务器环境 1.1、安装Linux 1.2、安装docker 二、Linux服务器配置docker开放端口 2.1、首先进入docker配置文件打开 /usr/lib/systemd/system/docker.service 或运行以下命令 vi /usr…

智能咖啡厅助手:人形机器人 +融合大模型,行为驱动的智能咖啡厅机器人(机器人大模型与具身智能挑战赛)

智能咖啡厅助手:人形机器人 融合大模型,行为驱动的智能咖啡厅机器人(机器人大模型与具身智能挑战赛) “机器人大模型与具身智能挑战赛”的参赛作品。的目标是结合前沿的大模型技术和具身智能技术,开发能在模拟的咖啡厅场景中承担服务员角色并…

蓝桥杯:真题讲解3(C++版)附带解析

报纸页数 来自:2016年七届省赛大学C组真题(共8道题) 分析: --画出报纸长的样子,如果我们在上面多画一张报纸,那么就符合题意的5,6,11,12。 观察这张图:观察3&#xf…

React之数据绑定以及表单处理

一、表单元素 像<input>、<textarea>、<option>这样的表单元素不同于其他元素&#xff0c;因为他们可以通过用户交互发生变化。这些元素提供的界面使响应用户交互的表单数据处理更加容易 交互属性&#xff0c;用户对一下元素交互时通过onChange回调函数来监听…

阿尔卡特Adixen ADP/ADS 系列 2 干泵使用说明

阿尔卡特Adixen ADP/ADS 系列 2 干泵使用说明

JVM 第二部分-2(堆,方法区)

4.堆 堆 一个Java程序&#xff08;main方法&#xff09;对应一个jvm实例&#xff0c;一个jvm实例只有一个堆空间堆是jvm启动的时候就被创建&#xff0c;大小也确定了。大小可以用参数设置。堆是jvm管理的一块最大的内存空间 核心区域&#xff0c;是垃圾回收的重点区域堆可以位…

基于SSM的高校竞赛和考级查询系统(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于SSM的高校竞赛和考级查询系统&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Sp…

带着问题阅读源码——Spring MVC是如何将url注册到RequestMappingHandlerMapping?

背景 在 Spring MVC 中&#xff0c;DispatcherServlet 是前端控制器&#xff08;front controller&#xff09;&#xff0c;它负责接收所有的 HTTP 请求并将它们映射到相应的处理器&#xff08;handler&#xff09;。为了实现这一点&#xff0c;Spring MVC 使用了适配器模式将…

Atcoder ABC342 E - Last Train

Last Train&#xff08;最后一班火车&#xff09; 时间限制&#xff1a;2s 内存限制&#xff1a;1024MB 【原题地址】 所有图片源自Atcoder&#xff0c;题目译文源自脚本Atcoder Better! 点击此处跳转至原题 【问题描述】 【输入格式】 【输出格式】 【样例1】 【样例输入…

<网络安全>《61 微课堂<第1课 南北向流量是什么?>》

1 形象化解释 在网络安全中&#xff0c;经常听到南北向流量这个词。那究竟是什么意思呢&#xff1f; 这里的南北&#xff0c;就是地图上的东西南北&#xff0c;是方向。我们在画网络架构图时&#xff0c;往往是由上到下依次是web层、应用层、数据层&#xff0c;流量从web层到…

数据结构——跳表

简单介绍跳表 跳表&#xff08;Skip List&#xff09;是一种可以进行对数级别查找的数据结构&#xff0c;它通过在数据中构建多级索引来提高查询效率。跳表是一种基于链表的随机化数据结构&#xff0c;其本质是由多个链表组成&#xff0c;每个链表中的元素都是原始链表中的元素…

图神经网络导论 - 刘知远

一、神经网络基础 近年来&#xff0c;机器学习领域的发展迅速&#xff0c;主要表现在多种神经网络架构的出现。尽管不同的神经网络架构相差甚远&#xff0c;但现有的神经网络架构可以分为几个类别&#xff1a; 卷积神经网路是前馈神经网路的特殊形式&#xff0c;FNN通常是全…

RISC-V特权架构 - 中断与异常概述

RISC-V特权架构 - 中断与异常概述 1 中断概述2 异常概述3 广义上的异常3.1 同步异常3.2 异步异常3.3 常见同步异常和异步异常 本文属于《 RISC-V指令集基础系列教程》之一&#xff0c;欢迎查看其它文章。 1 中断概述 中断&#xff08;Interrupt&#xff09;机制&#xff0c;即…

java实现图片转pdf,并通过流的方式进行下载(前后端分离)

首先需要导入相关依赖&#xff0c;由于具体依赖本人也不是记得很清楚了&#xff0c;所以简短的说一下。 iText&#xff1a;PDF 操作库&#xff0c;用于创建和操作 PDF 文件。可通过 Maven 或 Gradle 引入 iText 依赖。 MultipartFile&#xff1a;Spring 框架中处理文件上传的类…

MyBatis 学习(四)之 SQL 映射文件

目录 1 SQL 映射文件介绍 2 select 元素 3 insert 元素 4 update 和 delete 元素 5 sql 元素 6 parameterType 元素 7 resultType 元素 8 resultMap 元素&#xff08;重要&#xff09; 9 参考文档 1 SQL 映射文件介绍 映射器是 MyBatis 中最复杂并且是最重要的…

机器学习 -- 梯度下降算法加深

梯度下降算法 在机器学习中&#xff0c;梯度下降算法常用于最小化代价函数&#xff08;或损失函数&#xff09;&#xff0c;以此来优化模型的参数。代价函数衡量的是模型预测值与实际值之间的差异。通过最小化这个函数&#xff0c;我们可以找到模型预测最准确的参数。 代价函…