书生大模型实战营-进阶关-Lagent 自定义你的 Agent 智能体

Lagent 自定义你的 Agent 智能体

  • Lagent 介绍
  • 环境配置
  • Lagent Web体验
    • 第1步,启动大模型API服务
    • 第2步,启动 Lagent 的 Web页面
  • 基于 Lagent 自定义智能体

Lagent 介绍

Lagent 是一个轻量级、开源的基于大语言模型的智能体(agent)框架,支持用户快速地将一个大语言模型转变为多种类型的智能体,并提供了一些典型工具为大语言模型赋能。

环境配置

# 创建环境
conda create -n agent_camp3 python=3.10 -y
# 激活环境
conda activate agent_camp3
# 安装 torch
conda install pytorch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 pytorch-cuda=12.1 -c pytorch -c nvidia -y
# 安装其他依赖包
pip install termcolor==2.4.0
pip install lmdeploy==0.5.2

# 安装 lagent
cd /root/demo
git clone https://github.com/InternLM/lagent.git
cd lagent && git checkout 81e7ace && pip install -e .

Lagent Web体验

第1步,启动大模型API服务

首先,我们先使用 LMDeploy 部署 InternLM2.5-7B-Chat,并启动一个 API Server。

conda activate agent_demo
lmdeploy serve api_server /share/new_models/Shanghai_AI_Laboratory/internlm2_5-7b-chat --model-name internlm2_5-7b-chat

第2步,启动 Lagent 的 Web页面

修改examples/internlm2_agent_web_demo.py脚本中的模型名称与模型地址,如下:

import copy
import hashlib
import json
import os

import streamlit as st

from lagent.actions import ActionExecutor, ArxivSearch, IPythonInterpreter
from lagent.agents.internlm2_agent import INTERPRETER_CN, META_CN, PLUGIN_CN, Internlm2Agent, Internlm2Protocol
from lagent.llms.lmdeploy_wrapper import LMDeployClient
from lagent.llms.meta_template import INTERNLM2_META as META
from lagent.schema import AgentStatusCode

# from streamlit.logger import get_logger


class SessionState:

    def init_state(self):
        """Initialize session state variables."""
        st.session_state['assistant'] = []
        st.session_state['user'] = []

        action_list = [
            ArxivSearch(),
        ]
        st.session_state['plugin_map'] = {
            action.name: action
            for action in action_list
        }
        st.session_state['model_map'] = {}
        st.session_state['model_selected'] = None
        st.session_state['plugin_actions'] = set()
        st.session_state['history'] = []

    def clear_state(self):
        """Clear the existing session state."""
        st.session_state['assistant'] = []
        st.session_state['user'] = []
        st.session_state['model_selected'] = None
        st.session_state['file'] = set()
        if 'chatbot' in st.session_state:
            st.session_state['chatbot']._session_history = []


class StreamlitUI:

    def __init__(self, session_state: SessionState):
        self.init_streamlit()
        self.session_state = session_state

    def init_streamlit(self):
        """Initialize Streamlit's UI settings."""
        st.set_page_config(
            layout='wide',
            page_title='lagent-web',
            page_icon='./docs/imgs/lagent_icon.png')
        st.header(':robot_face: :blue[Lagent] Web Demo ', divider='rainbow')
        st.sidebar.title('模型控制')
        st.session_state['file'] = set()
        st.session_state['ip'] = None

    def setup_sidebar(self):
        """Setup the sidebar for model and plugin selection."""
        # model_name = st.sidebar.selectbox('模型选择:', options=['internlm'])
        model_name = st.sidebar.text_input('模型名称:', value='internlm2_5-7b-chat')
        meta_prompt = st.sidebar.text_area('系统提示词', value=META_CN)
        da_prompt = st.sidebar.text_area('数据分析提示词', value=INTERPRETER_CN)
        plugin_prompt = st.sidebar.text_area('插件提示词', value=PLUGIN_CN)
        model_ip = st.sidebar.text_input('模型IP:', value='127.0.0.1:23333')
        if model_name != st.session_state[
                'model_selected'] or st.session_state['ip'] != model_ip:
            st.session_state['ip'] = model_ip
            model = self.init_model(model_name, model_ip)
            self.session_state.clear_state()
            st.session_state['model_selected'] = model_name
            if 'chatbot' in st.session_state:
                del st.session_state['chatbot']
        else:
            model = st.session_state['model_map'][model_name]

        plugin_name = st.sidebar.multiselect(
            '插件选择',
            options=list(st.session_state['plugin_map'].keys()),
            default=[],
        )
        da_flag = st.sidebar.checkbox(
            '数据分析',
            value=False,
        )
        plugin_action = [
            st.session_state['plugin_map'][name] for name in plugin_name
        ]

        if 'chatbot' in st.session_state:
            if len(plugin_action) > 0:
                st.session_state['chatbot']._action_executor = ActionExecutor(
                    actions=plugin_action)
            else:
                st.session_state['chatbot']._action_executor = None
            if da_flag:
                st.session_state[
                    'chatbot']._interpreter_executor = ActionExecutor(
                        actions=[IPythonInterpreter()])
            else:
                st.session_state['chatbot']._interpreter_executor = None
            st.session_state['chatbot']._protocol._meta_template = meta_prompt
            st.session_state['chatbot']._protocol.plugin_prompt = plugin_prompt
            st.session_state[
                'chatbot']._protocol.interpreter_prompt = da_prompt
        if st.sidebar.button('清空对话', key='clear'):
            self.session_state.clear_state()
        uploaded_file = st.sidebar.file_uploader('上传文件')

        return model_name, model, plugin_action, uploaded_file, model_ip

    def init_model(self, model_name, ip=None):
        """Initialize the model based on the input model name."""
        model_url = f'http://{ip}'
        st.session_state['model_map'][model_name] = LMDeployClient(
            model_name=model_name,
            url=model_url,
            meta_template=META,
            max_new_tokens=1024,
            top_p=0.8,
            top_k=100,
            temperature=0,
            repetition_penalty=1.0,
            stop_words=['<|im_end|>'])
        return st.session_state['model_map'][model_name]

    def initialize_chatbot(self, model, plugin_action):
        """Initialize the chatbot with the given model and plugin actions."""
        return Internlm2Agent(
            llm=model,
            protocol=Internlm2Protocol(
                tool=dict(
                    begin='{start_token}{name}\n',
                    start_token='<|action_start|>',
                    name_map=dict(
                        plugin='<|plugin|>', interpreter='<|interpreter|>'),
                    belong='assistant',
                    end='<|action_end|>\n',
                ), ),
            max_turn=7)

    def render_user(self, prompt: str):
        with st.chat_message('user'):
            st.markdown(prompt)

    def render_assistant(self, agent_return):
        with st.chat_message('assistant'):
            for action in agent_return.actions:
                if (action) and (action.type != 'FinishAction'):
                    self.render_action(action)
            st.markdown(agent_return.response)

    def render_plugin_args(self, action):
        action_name = action.type
        args = action.args
        import json
        parameter_dict = dict(name=action_name, parameters=args)
        parameter_str = '```json\n' + json.dumps(
            parameter_dict, indent=4, ensure_ascii=False) + '\n```'
        st.markdown(parameter_str)

    def render_interpreter_args(self, action):
        st.info(action.type)
        st.markdown(action.args['text'])

    def render_action(self, action):
        st.markdown(action.thought)
        if action.type == 'IPythonInterpreter':
            self.render_interpreter_args(action)
        elif action.type == 'FinishAction':
            pass
        else:
            self.render_plugin_args(action)
        self.render_action_results(action)

    def render_action_results(self, action):
        """Render the results of action, including text, images, videos, and
        audios."""
        if (isinstance(action.result, dict)):
            if 'text' in action.result:
                st.markdown('```\n' + action.result['text'] + '\n```')
            if 'image' in action.result:
                # image_path = action.result['image']
                for image_path in action.result['image']:
                    image_data = open(image_path, 'rb').read()
                    st.image(image_data, caption='Generated Image')
            if 'video' in action.result:
                video_data = action.result['video']
                video_data = open(video_data, 'rb').read()
                st.video(video_data)
            if 'audio' in action.result:
                audio_data = action.result['audio']
                audio_data = open(audio_data, 'rb').read()
                st.audio(audio_data)
        elif isinstance(action.result, list):
            for item in action.result:
                if item['type'] == 'text':
                    st.markdown('```\n' + item['content'] + '\n```')
                elif item['type'] == 'image':
                    image_data = open(item['content'], 'rb').read()
                    st.image(image_data, caption='Generated Image')
                elif item['type'] == 'video':
                    video_data = open(item['content'], 'rb').read()
                    st.video(video_data)
                elif item['type'] == 'audio':
                    audio_data = open(item['content'], 'rb').read()
                    st.audio(audio_data)
        if action.errmsg:
            st.error(action.errmsg)


def main():
    # logger = get_logger(__name__)
    # Initialize Streamlit UI and setup sidebar
    if 'ui' not in st.session_state:
        session_state = SessionState()
        session_state.init_state()
        st.session_state['ui'] = StreamlitUI(session_state)

    else:
        st.set_page_config(
            layout='wide',
            page_title='lagent-web',
            page_icon='./docs/imgs/lagent_icon.png')
        st.header(':robot_face: :blue[Lagent] Web Demo ', divider='rainbow')
    _, model, plugin_action, uploaded_file, _ = st.session_state[
        'ui'].setup_sidebar()

    # Initialize chatbot if it is not already initialized
    # or if the model has changed
    if 'chatbot' not in st.session_state or model != st.session_state[
            'chatbot']._llm:
        st.session_state['chatbot'] = st.session_state[
            'ui'].initialize_chatbot(model, plugin_action)
        st.session_state['session_history'] = []

    for prompt, agent_return in zip(st.session_state['user'],
                                    st.session_state['assistant']):
        st.session_state['ui'].render_user(prompt)
        st.session_state['ui'].render_assistant(agent_return)

    if user_input := st.chat_input(''):
        with st.container():
            st.session_state['ui'].render_user(user_input)
        st.session_state['user'].append(user_input)
        # Add file uploader to sidebar
        if (uploaded_file
                and uploaded_file.name not in st.session_state['file']):

            st.session_state['file'].add(uploaded_file.name)
            file_bytes = uploaded_file.read()
            file_type = uploaded_file.type
            if 'image' in file_type:
                st.image(file_bytes, caption='Uploaded Image')
            elif 'video' in file_type:
                st.video(file_bytes, caption='Uploaded Video')
            elif 'audio' in file_type:
                st.audio(file_bytes, caption='Uploaded Audio')
            # Save the file to a temporary location and get the path

            postfix = uploaded_file.name.split('.')[-1]
            # prefix = str(uuid.uuid4())
            prefix = hashlib.md5(file_bytes).hexdigest()
            filename = f'{prefix}.{postfix}'
            file_path = os.path.join(root_dir, filename)
            with open(file_path, 'wb') as tmpfile:
                tmpfile.write(file_bytes)
            file_size = os.stat(file_path).st_size / 1024 / 1024
            file_size = f'{round(file_size, 2)} MB'
            # st.write(f'File saved at: {file_path}')
            user_input = [
                dict(role='user', content=user_input),
                dict(
                    role='user',
                    content=json.dumps(dict(path=file_path, size=file_size)),
                    name='file')
            ]
        if isinstance(user_input, str):
            user_input = [dict(role='user', content=user_input)]
        st.session_state['last_status'] = AgentStatusCode.SESSION_READY
        for agent_return in st.session_state['chatbot'].stream_chat(
                st.session_state['session_history'] + user_input):
            if agent_return.state == AgentStatusCode.PLUGIN_RETURN:
                with st.container():
                    st.session_state['ui'].render_plugin_args(
                        agent_return.actions[-1])
                    st.session_state['ui'].render_action_results(
                        agent_return.actions[-1])
            elif agent_return.state == AgentStatusCode.CODE_RETURN:
                with st.container():
                    st.session_state['ui'].render_action_results(
                        agent_return.actions[-1])
            elif (agent_return.state == AgentStatusCode.STREAM_ING
                  or agent_return.state == AgentStatusCode.CODING):
                # st.markdown(agent_return.response)
                # 清除占位符的当前内容,并显示新内容
                with st.container():
                    if agent_return.state != st.session_state['last_status']:
                        st.session_state['temp'] = ''
                        placeholder = st.empty()
                        st.session_state['placeholder'] = placeholder
                    if isinstance(agent_return.response, dict):
                        action = f"\n\n {agent_return.response['name']}: \n\n"
                        action_input = agent_return.response['parameters']
                        if agent_return.response[
                                'name'] == 'IPythonInterpreter':
                            action_input = action_input['command']
                        response = action + action_input
                    else:
                        response = agent_return.response
                    st.session_state['temp'] = response
                    st.session_state['placeholder'].markdown(
                        st.session_state['temp'])
            elif agent_return.state == AgentStatusCode.END:
                st.session_state['session_history'] += (
                    user_input + agent_return.inner_steps)
                agent_return = copy.deepcopy(agent_return)
                agent_return.response = st.session_state['temp']
                st.session_state['assistant'].append(
                    copy.deepcopy(agent_return))
            st.session_state['last_status'] = agent_return.state


if __name__ == '__main__':
    root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    root_dir = os.path.join(root_dir, 'tmp_dir')
    os.makedirs(root_dir, exist_ok=True)
    main()
cd /root/agent_demo/lagent
conda activate agent_demo
streamlit run examples/internlm2_agent_web_demo.py #启动前先修改模型名称与模型地址

此处可能有以下报错:

ModuleNotFoundError: No module named 'griffe.enumerations'

解决办法:将lagent/actions/base_action.py文件中的16行from griffe.enumerations import DocstringSectionKind修改为from griffe import DocstringSectionKind

在这里插入图片描述

基于 Lagent 自定义智能体

水墨画工具脚本:

import json
import requests

from lagent.actions.base_action import BaseAction, tool_api
from lagent.actions.parser import BaseParser, JsonParser
from lagent.schema import ActionReturn, ActionStatusCode


class MagicMaker(BaseAction):
    styles_option = [
        'dongman',  # 动漫
        'guofeng',  # 国风
        'xieshi',   # 写实
        'youhua',   # 油画
        'manghe',   # 盲盒
    ] #风格
    aspect_ratio_options = [
        '16:9', '4:3', '3:2', '1:1',
        '2:3', '3:4', '9:16'
    ] #尺寸

    def __init__(self,
                 style='guofeng',
                 aspect_ratio='4:3'):
        super().__init__()
        if style in self.styles_option:
            self.style = style
        else:
            raise ValueError(f'The style must be one of {self.styles_option}')
        
        if aspect_ratio in self.aspect_ratio_options:
            self.aspect_ratio = aspect_ratio
        else:
            raise ValueError(f'The aspect ratio must be one of {aspect_ratio}')
    
    @tool_api
    def generate_image(self, keywords: str) -> dict:
        """Run magicmaker and get the generated image according to the keywords.

        Args:
            keywords (:class:`str`): the keywords to generate image

        Returns:
            :class:`dict`: the generated image
                * image (str): path to the generated image
        """
        try:
            response = requests.post(
                url='https://magicmaker.openxlab.org.cn/gw/edit-anything/api/v1/bff/sd/generate',
                data=json.dumps({
                    "official": True,
                    "prompt": keywords,
                    "style": self.style,
                    "poseT": False,
                    "aspectRatio": self.aspect_ratio
                }),
                headers={'content-type': 'application/json'}
            )
        except Exception as exc:
            return ActionReturn(
                errmsg=f'MagicMaker exception: {exc}',
                state=ActionStatusCode.HTTP_ERROR)
        image_url = response.json()['data']['imgUrl']
        return {'image': image_url}

展示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/871340.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

家里猫毛到处飞怎么办?如何清理?用宠物空气净化器去除猫毛

我家三只布偶原住民&#xff0c;都是掉毛怪&#xff0c;刚好还是不同的颜色&#xff0c;黑的灰的白的...家里和画板似的&#xff0c;每天都被猫毛上色&#xff0c;清扫时超级崩溃。沙发上、床上、地板上这些常见的地方就不用说了&#xff0c;甚至水杯和碗筷边偶尔也能看见猫毛&…

微服务的基本理解和使用

目录​​​​​​​ 一、微服务基础知识 1、系统架构的演变 &#xff08;1&#xff09;单体应用架构 &#xff08;2&#xff09;垂直应用架构 &#xff08;3&#xff09;分布式SOA架构 &#xff08;4&#xff09;微服务架构 &#xff08;5&#xff09;SOA与微服务的关系…

wincc报警如何通过短信发送给手机

单位使用WINCC上位机监控现场&#xff0c;需要把报警信息发送到指定手机上&#xff0c;能否实现&#xff1f;通过巨控GRMOPC系列远程智能控制终端&#xff0c;简单配置即可实现wincc报警短信传送到手机。配置过程无需任何通讯程序&#xff0c;也不要写任何触发脚本。 GRMOPC模…

Java中接口

接口的定义和使用 练习 public abstract class Animal {private String name;private int age;public Animal() {}public Animal(String name, int age) {this.name name;this.age age;}public String getName() {return name;}public void setName(String name) {this.name…

LIN通讯

目录 1 PLinApi.h 2 TLINFrameEntry 结构体 3 自定义函数getTLINFrameEntry 4 TLINScheduleSlot 结构体 5 自定义函数 getTLINScheduleSlot 6 自定义LIN_SetScheduleInit函数 7 自定义 LIN_StartSchedule 8 发送函数 9 线程接收函数 1 PLinApi.h 这是官方头文件 ///…

开源原型设计工具Penpot

Penpot是一个现代化、开源的协同设计平台&#xff0c;专为跨职能团队打造&#xff0c;提供了强大的在线设计和原型制作功能。 以下是对Penpot的详细介绍&#xff1a; 一、平台特点 开源与免费&#xff1a;Penpot是一个完全免费且开放源代码的项目&#xff0c;允许社区贡献和定…

20:【stm32】定时器一:时基单元

时基单元 1、什么是定时器2、时基单元的基本结构2.1&#xff1a;脉冲的来源2.2&#xff1a;预分频器PSC2.3&#xff1a;计数器CNT2.4&#xff1a;update事件与预加载 3、标准库编程3.1&#xff1a;通过定时器中断来设置延迟函数 1、什么是定时器 定时器是一种专门负责定时功能…

45.【C语言】指针(重难点)(H)

目录&#xff1a; 22.函数指针变量 *创建 *使用 *两段代码分析 23.函数指针数组 *基本用法 *作用 往期推荐 22.函数指针变量 *创建 类比数组指针变量的定义&#xff1a;存放数组地址的指针变量&#xff0c;同理函数指针变量存放函数的地址 格式 函数的返回类型 (*指针变量的…

产品帮助中心如何搭建?五步让客户满意度提升100%

一、引言 创建帮助文章的好处是节省了招募大量客户联系代理的昂贵成本。它们现在通过解决客户的早期问题而无需支持干预&#xff0c;并为自助提供逐步指导&#xff0c;从而取代了支持代理。 当您创建帮助文章时&#xff0c;您会构建知识库并为将来保留它。这些帮助文章充当新…

作业帮 TiDB 7.5.x 使用经验

作者&#xff1a; 是我的海 原文来源&#xff1a; https://tidb.net/blog/5f9784d3 近期在使用 TiDB 时遇到的一些小问题的梳理总结&#xff0c;大部分版本都在6.5.6和7.5.2 1、limit 导致的扫描量过大的优化 研发定时任务每天需要扫描大量数据&#xff0c;到时机器网卡被…

开放式耳机好还是入耳式耳机好?2024五款热销开放式耳机推荐!

开放式耳机与入耳式耳机各有优缺点&#xff0c;适合不同的使用场景和用户需求。 开放式耳机的优点主要包括&#xff1a; 1. 佩戴舒适性好&#xff0c;由于设计宽松&#xff0c;不会给耳朵带来压迫感&#xff0c;适合长时间使用 。 2. 透气性能好&#xff0c;尤其在夏天或运动…

《江南:在爱开始的地方等你》将上映 赖雨濛刘冬沁演绎刻骨之恋

在等到你之前&#xff0c;我们的故事一直未完待续。 用这句话来诠释电影《江南&#xff1a;在爱开始的地方等你》最为精准不过&#xff0c;该片改编自康锐原创小说《月落姑苏》&#xff0c;由康锐导演、编剧&#xff0c;赖雨濛、刘冬沁领衔主演、朱丹妮、王沛为、金巧巧、阎青…

Linux/Windows下线程间通信机制及其API总结

线程间通信&#xff08;Thread Inter-Communication, TIC&#xff09;是指在一个进程内的多个线程之间进行数据交换和同步的方法。与进程间通信相比&#xff0c;线程间通信通常更简单、更高效&#xff0c;因为它们共享相同的内存空间。下面是一些常见的线程间通信机制及其相关A…

vue-element-admin解决三级目录的KeepAlive缓存问题(详情版)

vue-element-admin解决三级目录的KeepAlive缓存问题&#xff08;详情版&#xff09; 本文章将从问题出现的角度看看KeepAlive的缓存问题&#xff0c;然后提出两种解决方法。本文章比较详细&#xff0c;如果只是看怎么解决&#xff0c;代码怎么改&#xff0c;请前往配置版。 一…

【原创教程】电气电工07:网线的制作方法

电气电工经常会遇到做网线,我们做网线需要网线钳与测试仪。需要了解网线的两种接线标准。 我们来看一下网线钳的操作步骤: 这种压线钳也同时具有剥线、剪线功能。 用这种网线钳能制作RJ45网络线接头。RJ11电话线接头、4P电话线接头。适用于RJ45,RJ11型网线 做网线的时候我…

Temu全托管和半托管的区别:一文说清temu全托和半托的差异

TEMU在今年3月再出王炸&#xff0c;上线半托管模式。这是TEMU继全托管模式爆火跨境圈之后的又一个大动作。那么&#xff0c;TEMU全托管和TEMU半托管有哪些不同&#xff1f;卖家朋友应该如何选择呢&#xff1f;今天给大家详细拆解一下。 TEMU全托管和半托管有什么区别 首先是定…

汇编语言lea指令取数组偏移地址

最近看到一条指令&#xff0c;x86汇编&#xff0c; LEA BX, 6[DI] 根据资料&#xff0c;它的含义是&#xff0c;某数组含20个元素&#xff0c;每个元素占一个字节&#xff0c;序号为0~19。设DI指向数组开头处&#xff0c;把序号为6的元素的偏移地址送到BX中&#xff1b; lea指令…

如何进行长截图的两种方法

前言 本文主要讲2种截图方式&#xff0c;分别是谷歌和QQ。 谷歌分为Web端 和 移动端&#xff0c;选一种即可。 第一种&#xff1a;谷歌浏览器控制台自带的 1.先把控制台语言更改为中文&#xff0c;方便查看 ①.按F12&#xff0c;点击设置面板 ②.修改语言为中文并关闭 ③.点击…

BFS解决单源最短路问题

目录 迷宫中离入口最近的出口 最小基因变化 单词接龙 为高尔夫比赛砍树 迷宫中离入口最近的出口 题目 思路 使用宽度优先遍历解决这道题&#xff0c;需要一个二维数组标记是否被遍历过&#xff0c;也需要一个队列辅助完成宽度优先遍历&#xff0c;类似于水波纹一样&#x…

java接口 controller层接收list集合传参,postman 调用接口时required parameter XXX is not present

开发过程中开发一个java接口 controller层接收list集合传参&#xff0c;然后postman调用一直不成功&#xff0c;报错 使用RequestParam方式&#xff0c;如果postman 调用接口时报错required parameter XXX is not present 可能是&#xff08;value“items”&#xff09;跟你输…