LangGraph自适应RAG

LangGraph自适应RAG

    • 介绍
    • 索引
    • LLMs
    • web 搜索工具
    • graph
      • graph state
      • graph flow
      • build graph
        • 执行

介绍

自适应 RAG 是一种 RAG 策略,它将 (1) 查询分析 (2) 主动/自校正 RAG 结合起来。
在文章中,他们报告了查询分析到路由获取:

  • No Retrieval
  • Single-shot RAG
  • Iterative RAG

让我们使用 LangGraph 在此基础上进行构建。
在我们的实现中,我们将在以下之间进行路由:

  • 网络搜索:与最近事件相关的问题
  • 自校正 RAG:针对与我们的索引相关的问题
    在这里插入图片描述

索引

from typing import List
import requests
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Vearch

from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import (
    BaseModel
)
from langchain_text_splitters import RecursiveCharacterTextSplitter

from common.constant import VEARCH_ROUTE_URL, BGE_M3_EMB_URL


class Bgem3Embeddings(BaseModel, Embeddings):
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        print(texts)
        return []

    def embed_query(self, text: str) -> List[float]:
        if not text:
            return []
        return cop_embeddings(text)


"""
bg3m3转向量
"""


def cop_embeddings(input: str) -> list:
    if not input.strip():
        return []

    headers = {
        "Content-Type": "application/json"
    }

    params = {
        "sentences": [input],
        "type": "dense"
    }

    response = requests.post(BGE_M3_EMB_URL, headers=headers, json=params)
    if response.status_code == 200:
        cop_embeddings_result = response.json()
        if not cop_embeddings_result or 'embeddings' not in cop_embeddings_result or not cop_embeddings_result[
            'embeddings']:
            return []

        original_vector = cop_embeddings_result['embeddings'][0]
        original_size = len(original_vector)

        # 将1024的向量兼容为1536,适配openai向量接口
        adaptor_vector = [0.0] * 1536
        for i in range(min(original_size, 1536)):
            adaptor_vector[i] = original_vector[i]

        return adaptor_vector
    else:
        print(f"cop_embeddings error: {response.text}")
        return []


# Docs to index
urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

# 加载文档
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

# 文档分块
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=500, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

# 数据存储到向量库
embeddings_model = Bgem3Embeddings()
# embeddings_model, VEARCH_ROUTE_URL,"lanchain_autogpt","lanchain_autogpt_db", 3,
vectorstore = Vearch.from_documents(
    documents=doc_splits,
    embedding=embeddings_model,
    path_or_url=VEARCH_ROUTE_URL,
    table_name="lanchain_autogpt",
    db_name="lanchain_autogpt_db",
    flag=3
)
retriever = vectorstore.as_retriever()

LLMs

### Router

from typing import Literal

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI

from common.common import PROXY_URL, API_KEY
from index1 import retriever


# Data model
class RouteQuery(BaseModel):
    """将用户查询路由到最相关的数据源。"""

    datasource: Literal["vectorstore", "web_search"] = Field(
        ...,
        description="给定一个用户问题,选择将其发送到web_search或vectorstore。",
    )


# LLM with function call
llm = ChatOpenAI(model_name="gpt-4o", api_key=API_KEY, base_url=PROXY_URL, temperature=0)
structured_llm_router = llm.with_structured_output(RouteQuery)

# Prompt
system = """你是将用户问题传送到vectorstore或web_search的专家。
vectorstore包含与agents、prompt engineering和adversarial attacks相关的文档。
使用向量库回答有关这些主题的问题。否则,请使用web_search。"""
route_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)

question_router = route_prompt | structured_llm_router

# 案例1:数据源选择
# print(
#     question_router.invoke(
#         {"question": "谁将成为NFL选秀的第一人?"}
#     )
# )
# print(question_router.invoke({"question": "Agent memory有哪些类型?"}))

# Data model
class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    binary_score: str = Field(
        description="文档与问题相关, 'yes' or 'no'"
    )


structured_llm_grader = llm.with_structured_output(GradeDocuments)

# Prompt
system = """你是一个评估检索到的文档和用户问题的相关性的分级员。 \n 
    如果文档包含与用户问题相关的关键字或语义,则将其评为相关。 \n
    它不需要是一个严格的测试。目标是过滤掉错误的检索。 \n
    给出二进制分数 'yes' or 'no' 表示文档是否与问题相关。"""
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "检索到的文档: \n\n {document} \n\n 用户问题: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader
question = "agent memory"
docs = retriever.get_relevant_documents(question)
doc_txt = docs[1].page_content
# 案例2:检索评估
# print(retrieval_grader.invoke({"question": question, "document": doc_txt}))


from langchain import hub
from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt = hub.pull("rlm/rag-prompt")

# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Chain
rag_chain = prompt | llm | StrOutputParser()

# Run
# generation = rag_chain.invoke({"context": docs, "question": question})
# 案例3:prompt = hub.pull("rlm/rag-prompt") 生成
# print(generation)


# Data model
class GradeHallucinations(BaseModel):
    """Binary score for hallucination present in generation answer."""

    binary_score: str = Field(
        description="答案以事实为基础, 'yes' 或 'no'"
    )

structured_llm_grader = llm.with_structured_output(GradeHallucinations)

# Prompt
system = """你是一名评估LLM生成是否以一组检索到的事实为基础/支持的分级员。 \n 
     给一个二进制分数 'yes' 或 'no'. 'Yes' 意味着答案以一系列事实为基础。"""
hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "设置事实: \n\n {documents} \n\n 大模型生成: {generation}"),
    ]
)

hallucination_grader = hallucination_prompt | structured_llm_grader
# hgr = hallucination_grader.invoke({"documents": docs, "generation": generation})
# 案例4:幻觉评估
# print(hgr)


### Answer Grader


# Data model
class GradeAnswer(BaseModel):
    """Binary score to assess answer addresses question."""

    binary_score: str = Field(
        description="答案是否解决了问题, 'yes' 或 'no'"
    )


structured_llm_grader = llm.with_structured_output(GradeAnswer)

# Prompt你是一名评估答案是否能解决问题的评分员
system = """你是一名评估答案是否能解决问题的评分员 \n 
     给一个二进制分数 'yes' 或 'no'。 'Yes' 意味着答案解决了问题。"""
answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "用户问题: \n\n {question} \n\n 大模型生成: {generation}"),
    ]
)

answer_grader = answer_prompt | structured_llm_grader
# agr = answer_grader.invoke({"question": question, "generation": generation})
# 案例5:答复评估
# print(agr)


# Prompt
system = """你是一个问题重写器,可以将输入问题转换为更好的版本,该版本针对向量库检索进行了优化。
查看输入并尝试推理潜在的语义意图/含义。"""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "下面是原始问题: \n\n {question} \n 提出一个改进的问题。",
        ),
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()
# qrr = question_rewriter.invoke({"question": question})
# 案例6:问题重写
# print(qrr)

web 搜索工具

import os

from langchain_community.tools.tavily_search import TavilySearchResults

from common.common import TAVILY_API_KEY

# 提前通过 https://app.tavily.com/home 申请
os.environ["TAVILY_API_KEY"] = TAVILY_API_KEY

tavily_tool = TavilySearchResults(k=3)

graph

将流程捕获为图表。

graph state

from typing import List

from typing_extensions import TypedDict


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

    question: str
    generation: str
    documents: List[str]

graph flow

from langchain.schema import Document
from index1 import retriever
from llm2 import rag_chain, retrieval_grader, question_rewriter, question_router, hallucination_grader, answer_grader
from webstool3 import web_search_tool


# 检索向量库中的doc
def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}


# 大模型生成
def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


# 文档评估
def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": filtered_docs, "question": question}


# 输入重写
def transform_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    # Re-write question
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": better_question}


# web搜索
def web_search(state):
    """
    Web search based on the re-phrased question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with appended web results
    """

    print("---WEB SEARCH---")
    question = state["question"]

    # Web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)

    return {"documents": web_results, "question": question}


### Edges ###


def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    source = question_router.invoke({"question": question})
    if source.datasource == "web_search":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "web_search"
    elif source.datasource == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"


# 生成答案 还是 重新生成问题
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---"
        )
        return "transform_query"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"


# 确定生成是否基于文档并回答问题。
def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score.binary_score

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score.binary_score
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

build graph

from langgraph.graph import END, StateGraph

from pprint import pprint

from common.common import show_img
from gflow5 import web_search, retrieve, grade_documents, generate, transform_query, route_question, decide_to_generate, \
    grade_generation_v_documents_and_question
from gstate4 import GraphState

# 定义工作流
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("web_search", web_search)  # web search
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generatae
workflow.add_node("transform_query", transform_query)  # transform_query

# Build graph
workflow.set_conditional_entry_point(
    route_question,
    {
        "web_search": "web_search",
        "vectorstore": "retrieve",
    },
)
workflow.add_edge("web_search", "generate")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    },
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "transform_query",
    },
)

# Compile
app = workflow.compile()

在这里插入图片描述

执行
# Run
inputs = {
    "question": "熊队的哪位球员有望在2024年的NFL选秀中获得第一名?"
}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/719448.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

示例:WPF中应用Grid的SharedSizeGroup设置整齐的布局

一、目的&#xff1a;应用Grid的SharedSizeGroup设置整齐的布局 二、实现 <ItemsControl ItemsSource"{local:GetStudents Count5}"><ItemsControl.ItemTemplate><DataTemplate><Grid ShowGridLines"True"><Grid.ColumnDefinit…

无代码爬虫八爪鱼采集器-如何采集携程网指定酒店差评信息

场景描述&#xff1a;有一些酒店会分析同行的差评原因&#xff0c;以便提前做预案&#xff0c;避免自己酒店也放同样的错误。他们通过采集携程网指定酒店的提取中差评&#xff0c;使用的采集工具为无代码爬虫软件八爪鱼采集器免费版&#xff0c;下载链接&#xff1a;1.软件分享…

龙芯的 新世界 与 旧世界

但是基本可以 确定 旧世界应该是 有 mips 的代码的。 新世界 应该是 loongarch . 这是 龙芯派 2k300 的连接。 6.Github相关仓库 龙芯派相关源码仓库&#xff1a;https://github.com/LoongsonDotNETCommunity/LoongsonPI 龙芯派Cookbook仓库&#xff1a;https://github.com/L…

LangChain入门学习笔记(一)——Hello World

什么是LangChain LangChain是一个开源&#xff08;github repo&#xff09;的大语言模型应用开发框架&#xff0c;提供了一整套的工具、方法和接口去帮助程序员构建基于大语言模型的端到端应用。LangChain是长链&#xff08;long chain&#xff09;的意思&#xff0c;它的一个…

二叉树专题

94. 二叉树的中序遍历 给定一个二叉树的根节点 root &#xff0c;返回 它的 中序 遍历 。递归实现【左->根->右】 import java.util.*; /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* …

中国最厉害的改名大师颜廷利:食物的真正人生意义是识悟

在探索人生意义的深邃征途中&#xff0c;我们本应以“识悟”为航标&#xff0c;不断扬帆远航&#xff0c;以实现自我的升华。然而&#xff0c;当回望人世繁华&#xff0c;古往今来&#xff0c;无论男女老少&#xff0c;似乎都在“食物”的陪伴下&#xff0c;徘徊往复&#xff0…

nc网络收发测试-tcp客户端\TCP服务器\UDP\UDP广播

netcat&#xff08;nc&#xff09;&#xff1a; 作用&#xff1a;一个功能强大的网络工具&#xff0c;提供了简单的网络测试和网络编程功能。工作原理&#xff1a;可以用于建立TCP或UDP连接&#xff0c;并发送和接收数据。示例用法&#xff1a; 监听TCP端口&#xff1a;nc -l 1…

在低侧电流检测中使用单端放大器:误差源和布局技巧

低侧检测的主要优点是可以使用相对简单的配置来放大分流电阻器两端的电压。例如&#xff0c;通用运算放大器的同相配置可能是需要能够在消费市场领域竞争的成本敏感型电机控制应用的有效选择。 基于同相配置的电路图如图1所示。 图1。 然而&#xff0c;这种低成本解决方案可能…

java实现图片水印添加并自动上传七牛云

图片左下角水印添加 满足需求&#xff1a;可以对不同类型尺寸的照片、图片进行水印的添加&#xff0c;实现尺寸自适应添加水印。 水印效果 代码实现 Controller package com.wlh.zetc.restore.controller;import cn.hutool.core.date.DateUtil; import com.alibaba.nacos.c…

前端可观测性系统建设

一. 背景 随着前端业务的日趋庞大&#xff0c;及时发现和解决业务中的问题、优化用户体验、实时监控业务健康度变得愈发重要。在业务层面&#xff0c;我们希望能够监控每次发布版本后&#xff0c;核心功能是否有显著提升或至少没有负面影响&#xff0c;核心接口是否正常运作&a…

太速科技-基于XCVU9P+ C6678的100G光纤的加速卡

基于XCVU9P C6678的100G光纤的加速卡 一、板卡概述 二、技术指标 • 板卡为自定义结构&#xff0c;板卡大小332mmx260mm; • FPGA采用Xilinx Virtex UltralSCALE 系列芯片 XCVU9P; • FPGA挂载4组FMC HPC 连接器; • 板载4路QSPF&#xff0c;每路数据速…

【深度学习驱动流体力学】剖析流体力学可视化paraview原理

目录 1.paraview版本2.配置过程检查插件库文件配置 ParaView 环境变量启动 ParaView 并检查插件3.可视化测试插件功能 3.加载数据进行可视化第一步: 导入案例第二步:查看当前目录未更新前的内容第三步:使用 blockMesh 命令生成腔体案例的网格第四步:运行仿真icoFoam第五步:使用…

ESP32蓝牙串口通讯

文章目录 一、前言二、代码三、运行 一、前言 ESP32支持经典蓝牙和低功耗蓝牙&#xff08;BLE&#xff09;,经典蓝牙可在计算机上模拟出一个串口&#xff0c;使得ESP32可以以串口的方式和计算机通信。 二、代码 #include "BluetoothSerial.h"String device_name …

以CMDB为基础构建DevOps平台体系

在当今数字化转型的浪潮中&#xff0c;企业IT运维模式正从传统的资产管理向现代化的资源管理转变。配置管理数据库&#xff08;CMDB&#xff09;作为IT运维的核心组成部分&#xff0c;其在DevOps平台中的重要性愈加凸显。通过国信证券和招商银行的实际案例&#xff0c;我们将详…

Redis缓存与数据库双写不一致及解决方法

1.缓存与数据库双写不一致 在大并发下&#xff0c;同时操作数据库与缓存会存在数据不一致性问题 1.1 双写不一致情况 1.2 读写并发不一致 2.解决方法 对于并发几率很小的数据(如个人维度的订单数据、用户数据等)&#xff0c;这种几乎不用考虑这个问题&#xff0c;很少会发生…

Stable Diffusion 3 开源了,完全不输 Midjourney

前段时间我介绍过一款文字生视频的 AI 工具&#xff1a; SadTalker&#xff0c; 当时咱们是作为 Stable Diffusion 的插件来安装的。 那基于 Stable Diffusion 呢&#xff0c;咱们今天就来聊聊新开源的 Stable Diffusion 3。 在文字生成图片这个领域&#xff0c;一直是有三个…

springSecurity(二):实现登入获取token与解析token

登入生成token 主要思想 springSecurity使用UsernamePasswordAuthenticationToken类来封装用户名和密码的认证信息 代码实现 发起登入请求后&#xff0c;进入到login()方法 /*** 在接口中我们通过AuthenticationManager的authenticate方法来进行用户认证,* 所以需要在Secur…

MySQL----表级锁行级锁排它锁和共享锁意向锁

MySQL的锁机制 锁&#xff08;Locking&#xff09;是数据库在并发访问时保证数据一致性和完整性的主要机制。在 MySQL 中&#xff0c;不同存储引擎使用不同的加锁方式&#xff1b;我们以 InnoDB 存储引擎为例介绍 MySQL 中的锁机制&#xff0c;其他存储引擎中的锁相对简单一些…

隐藏element的DateTimePicker组件自带的清空按钮

管理台页面使用到el-date-picker&#xff0c;type datetimerange 但是组件自带了清空按钮&#xff0c;实际上这个控件业务上代表开始时间和结束时间是一个必填选项&#xff0c;所有想要把清空按钮隐藏掉。 查看了文档https://element.eleme.io/#/zh-CN/component/datetime-p…

C++内联函数-auto关键字-for循环-空指针

内联函数 1.1概念&#xff1a; 以 inline 修饰 的函数叫做内联函数&#xff0c; 编译时 C 编译器会在 调用内联函数的地方展开 &#xff0c;没有函数调 用建立栈帧的开销&#xff0c;内联函数提升程序运行的效率。 #define _CRT_SECURE_NO_WARNINGSint ADD(int left, int rig…