用起来很麻烦,看api的工夫都已经能自己写完代码了。但现在有些开源api用的是langchain的接口,还是了解一下。参考官方文档:https://www.langchain.com.cn/docs/how_to/
1. LLM和langserve示例
以openai接口为例,可以看到分为3步:定义model,调用invoke方法,进行parse。所谓的chain,就是把带invoke的类合并起来使用:
from fastapi import FastAPI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langserve import add_routes
# 1. Create prompt template
prompt_template = ChatPromptTemplate.from_messages([
('system', "Translate the following into {language}:"),
('user', '{text}')
])
# 2. Create model
model = ChatOpenAI()
# 3. Create parser
parser = StrOutputParser()
# 4. Create chain
chain = prompt_template | model | parser
# 4. App definition
app = FastAPI(
title="LangChain Server",
version="1.0",
description="A simple API server using LangChain's Runnable interfaces",
)
# 5. Adding chain route
add_routes(
app,
chain,
path="/chain",
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="localhost", port=8000)
调用可以使用requests,也可以使用langserve的接口:
from langserve import RemoteRunnable
remote_chain = RemoteRunnable("http://localhost:8000/chain/")
remote_chain.invoke({"language": "italian", "text": "hi"})
调用自己的LLM,则需要实现_call和_llm_type方法:
下面是一个例子。message类组装prompt就不看了,直接写在自定义的llm里面就好。甚至parser也可以写在llm里面。
from langchain_core.output_parsers import StrOutputParser
from langchain_core.language_models.llms import LLM
requests.packages.urllib3.disable_warnings()
class Qwen(LLM):
def _call(self,prompt: str,stop = None):
headers = {'accept': 'application/json','Content-Type': 'application/json'}
data = json.dumps({"messages":[{'role': 'system','content': 'Translate the following into Chinese:'},{'role': 'user','content': prompt}] ,
"model": 'Qwen/Qwen2.5-72B-Instruct',"temperature": 0,"max_tokens": 1024})
res = requests.post('https://localhost/v1/chat/completions', headers=headers, data=data, verify=False).json()
return res['choices'][0]['message']['content']
def _llm_type(self):
return "Qwen"
chain = Qwen() | StrOutputParser()
chain.invoke("hi")
2. 文档加载器
文档加载器会返回一个Document对象
from langchain_community.document_loaders import UnstructuredMarkdownLoader
data = UnstructuredMarkdownLoader(file_path,mode='elements').load()
content = data[0].page_content
from langchain_community.document_loaders import PyPDFLoader
data = PyPDFLoader(file_path).lazy_load()
content = data[0].page_content
自定义文档加载器的例子如下:
from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document
class CustomDocumentLoader(BaseLoader):
def __init__(self, file_path: str):
self.file_path = file_path
def lazy_load(self):
with open(self.file_path, encoding="utf-8") as f:
for line_number,line in enumerate(f):
yield Document(page_content=line,metadata={"line_number": line_number, "source": self.file_path})
d = CustomDocumentLoader('data/biology/contents/m44386.md')
for di in d.lazy_load():
print(di)
也可以用blob接口实现加载数据功能:
from langchain_core.document_loaders import BaseBlobParser, Blob
class MyParser(BaseBlobParser):
"""A simple parser that creates a document from each line."""
def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""Parse a blob into a document line by line."""
line_number = 0
with blob.as_bytes_io() as f:
for line in f:
line_number += 1
yield Document(page_content=line,metadata={"line_number": line_number, "source": blob.source})
blob = Blob(data=b"some data from memory\nmeow")
list(parser.lazy_parse(blob))
用Blob.from_path("./meow.txt")
可以将文件读入为blob格式。
3. 分割器
- CharacterTextSplitter:最简单的按字数分割
- RecursiveCharacterTextSplitter:按字符列表顺序在这些字符上进行分割,直到块的大小足够小。
- html/markdown按标题分割/部分分割
- 使用spacy或者分割,这是一个按照句子内容进行分割的模型。类似的还有NLTK。模型在https://github.com/explosion/spacy-models/releases这里下载或者直接pip install并安装即可
from langchain.text_splitter import CharacterTextSplitter
from langchain.document_loaders import TextLoader
text_splitter = CharacterTextSplitter(chunk_size=100, chunk_overlap=0)
loader = TextLoader("./sidamingzhu.txt", encoding="utf-8")
documents = loader.load()
docs = text_splitter.split_documents(documents)
from langchain_text_splitters import SpacyTextSplitter
from langchain_text_splitters import RecursiveCharacterTextSplitter
with open("data/biology/contents/m44386.md") as f:
state_of_the_union = f.read()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=100,chunk_overlap=20)
texts = text_splitter.create_documents([state_of_the_union])
print(texts[10])
print(texts[11])
from langchain_text_splitters import HTMLHeaderTextSplitter
html_string = """
<!DOCTYPE html>
<html>
<body>
<div>
<h1>Foo</h1>
<p>Some intro text about Foo.</p>
<div>
<h2>Bar main section</h2>
<p>Some intro text about Bar.</p>
<h3>Bar subsection 1</h3>
<p>Some text about the first subtopic of Bar.</p>
<h3>Bar subsection 2</h3>
<p>Some text about the second subtopic of Bar.</p>
</div>
<div>
<h2>Baz</h2>
<p>Some text about Baz</p>
</div>
<br>
<p>Some concluding text about Foo</p>
</div>
</body>
</html>
"""
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
html_splitter = HTMLHeaderTextSplitter(headers_to_split_on)
html_header_splits = html_splitter.split_text(html_string)
html_header_splits
from langchain_text_splitters import MarkdownHeaderTextSplitter
markdown_document = "# Foo\n\n ## Bar\n\nHi this is Jim\n\nHi this is Joe\n\n ### Boo \n\n Hi this is Lance \n\n ## Baz\n\n Hi this is Molly"
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on)
md_header_splits = markdown_splitter.split_text(markdown_document)
md_header_splits
from langchain_text_splitters import SpacyTextSplitter
SpacyTextSplitter(pipeline=’zh_core_web_sm‘)
自定义splitter需要实现下面的接口:
interface TextSplitter {
chunkSize: number;
chunkOverlap: number;
createDocuments(
texts: string[],
metadatas?: Record<string, any>[],
chunkHeaderOptions: TextSplitterChunkHeaderOptions = {}
): Promise<Document[]>;
splitDocuments(
documents: Document[],
chunkHeaderOptions: TextSplitterChunkHeaderOptions = {}
): Promise<Document[]>;
}
4. embedding
VectorStore是使用embedding向量化之后的文档库。
这里介绍两种embedding的方法。第一种是直接从本地加载模型到内存:
from langchain.embeddings.sentence_transformer import SentenceTransformerEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
embedding_function = SentenceTransformerEmbeddings(model_name="embedding/")
第二种是自定义embedding:
from langchain_core.embeddings import Embeddings
class ParrotLinkEmbeddings(Embeddings):
def __init__(self, model: str):
self.model = model
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed search docs."""
return [[0.5, 0.6, 0.7] for _ in texts]
def embed_query(self, text: str) -> List[float]:
"""Embed query text."""
return self.embed_documents([text])[0]
embedding模型可以调用embed_documents和embed_query方法:
embeddings = embeddings_model.embed_documents(
[
"Hi there!",
"Oh, hello!",
"What's your name?",
"My friends call me World",
"Hello World!"
]
)
len(embeddings), len(embeddings[0])
embedded_query = embeddings_model.embed_query("What was the name mentioned in the conversation?")
embedded_query[:5]
5. vectorStore和retriver
5.1 常用vectorStore
这里介绍3种向量库,基本步骤:1. 建库from_documents;2. 向量搜索similarity_search或者similarity_search_by_vector
vector_store = InMemoryVectorStore.from_documents(pages, embedding_function)
docs = vector_store.similarity_search(" Humans have inhabited this planet for how long?", k=2)
for doc in docs:
print(f'Page {doc.metadata["page_number"]}: {doc.page_content[:300]}\n')
from langchain_community.vectorstores import Chroma
from langchain_community.vectorstores.utils import filter_complex_metadata
pages = filter_complex_metadata(pages)
db = Chroma.from_documents(pages, embedding_function)
db.similarity_search(" Humans have inhabited this planet for how long?", k=2)
from langchain_community.vectorstores import FAISS
db = FAISS.from_documents(pages, embedding_function)
db.similarity_search(" Humans have inhabited this planet for how long?", k=2)
5.2 转为retriver
可以直接将vectorStore作为retriver,这样就可以调用invoke方法了:
retriever = vectorstore.as_retriever()
retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.5}
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 1})
docs = retriever.invoke("what did the president say about ketanji brown jackson?")
5.3 MultiQueryRetriever
这里特别介绍一下MultiQueryRetriever和retriever_from_llm,可以把问题转为相似的几个问题:
import logging,json
question = "你是谁?"
logging.basicConfig()
logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)
unique_docs = retriever_from_llm.invoke(question,)
len(unique_docs)
5.4 自定义retriver
如果要自定义的话,需要实现parser和模板:
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.prompts import PromptTemplate
from pydantic import BaseModel, Field
class LineListOutputParser(BaseOutputParser):
def parse(self, text: str):
lines = text.strip().split("\n")
return list(filter(None, lines))
output_parser = LineListOutputParser()
QUERY_PROMPT = PromptTemplate(
input_variables=["question"],
template="""You are an AI language model assistant. Your task is to generate five
different versions of the given user question to retrieve relevant documents from a vector
database. By generating multiple perspectives on the user question, your goal is to help
the user overcome some of the limitations of the distance-based similarity search.
Provide these alternative questions separated by newlines.
Original question: {question}""",
)
llm_chain = QUERY_PROMPT | llm | output_parser
retriever = MultiQueryRetriever(
retriever=db.as_retriever(), llm_chain=llm_chain, parser_key="lines"
) # "lines" is the key (attribute name) of the parsed output
retriever.invoke("What does the course say about regression?")
import uuid
from langchain.retrievers.multi_vector import MultiVectorRetriever
# The storage layer for the parent documents
store = InMemoryByteStore()
id_key = "doc_id"
# The retriever (empty to start)
retriever = MultiVectorRetriever(
vectorstore=vectorstore,
byte_store=store,
id_key=id_key,
)
doc_ids = [str(uuid.uuid4()) for _ in docs]
child_text_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
sub_docs = []
for i, doc in enumerate(docs):
_id = doc_ids[i]
_sub_docs = child_text_splitter.split_documents([doc])
for _doc in _sub_docs:
_doc.metadata[id_key] = _id
sub_docs.extend(_sub_docs)
retriever.vectorstore.add_documents(sub_docs)
retriever.docstore.mset(list(zip(doc_ids, docs)))
retriever.vectorstore.similarity_search("justice breyer")[0]
自定义检索器需要:
from langchain_core.retrievers import BaseRetriever
class ToyRetriever(BaseRetriever):
documents: List[Document]
"""List of documents to retrieve from."""
k: int
"""Number of top results to return"""
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
matching_documents = []
for document in documents:
if len(matching_documents) > self.k:
return matching_documents
if query.lower() in document.page_content.lower():
matching_documents.append(document)
return matching_documents
retriever = ToyRetriever(documents=documents, k=3)
retriever.invoke("that")
await retriever.ainvoke("that")
retriever.batch(["dog", "cat"])
async for event in retriever.astream_events("bar", version="v1"):
print(event)
5.5 基于metadata的结构化查询
下面是基于metadata的结构化查询例子,使用SelfQueryRetriever
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
docs = [
Document(
page_content="A bunch of scientists bring back dinosaurs and mayhem breaks loose",
metadata={"year": 1993, "rating": 7.7, "genre": "science fiction"},
),
Document(
page_content="Leo DiCaprio gets lost in a dream within a dream within a dream within a ...",
metadata={"year": 2010, "director": "Christopher Nolan", "rating": 8.2},
),
Document(
page_content="A psychologist / detective gets lost in a series of dreams within dreams within dreams and Inception reused the idea",
metadata={"year": 2006, "director": "Satoshi Kon", "rating": 8.6},
),
Document(
page_content="A bunch of normal-sized women are supremely wholesome and some men pine after them",
metadata={"year": 2019, "director": "Greta Gerwig", "rating": 8.3},
),
Document(
page_content="Toys come alive and have a blast doing so",
metadata={"year": 1995, "genre": "animated"},
),
Document(
page_content="Three men walk into the Zone, three men walk out of the Zone",
metadata={
"year": 1979,
"director": "Andrei Tarkovsky",
"genre": "thriller",
"rating": 9.9,
},
),
]
vectorstore = Chroma.from_documents(docs, OpenAIEmbeddings())
from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain_openai import ChatOpenAI
metadata_field_info = [
AttributeInfo(
name="genre",
description="The genre of the movie. One of ['science fiction', 'comedy', 'drama', 'thriller', 'romance', 'action', 'animated']",
type="string",
),
AttributeInfo(
name="year",
description="The year the movie was released",
type="integer",
),
AttributeInfo(
name="director",
description="The name of the movie director",
type="string",
),
AttributeInfo(
name="rating", description="A 1-10 rating for the movie", type="float"
),
]
document_content_description = "Brief summary of a movie"
llm = ChatOpenAI(temperature=0)
retriever = SelfQueryRetriever.from_llm(
llm,
vectorstore,
document_content_description,
metadata_field_info,
)
retriever.invoke("I want to watch a movie rated higher than 8.5")
我们可以通过将 enable_limit=True来限制要获取的文档数量。
5.6 BM25检索
BM25 是一种基于词频和逆文档频率(TF-IDF)的传统检索算法,非常适合关键词匹配。我们使用 BM25Retriever.from_texts 方法来创建 BM25 检索器:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_community.vectorstores import FAISS
# 定义第一组文档,这些文档将用于 BM25 检索器
doc_list_1 = [
"这是一个测试句子",
"温格高赢得了2023环法冠军",
"波士顿马拉松是历史悠久的一项比赛",
"何杰即将出战巴黎奥运会的马拉松项目",
"珍宝将不再赞助温格高所在的车队",
]
# 定义第二组文档,这些文档将用于 FAISS 检索器
doc_list_2 = [
"波加查擅长陡坡进攻,而温格高则更擅长长坡",
"温格高的最大摄氧量居然有97!",
"北京奥运会在2008年8月8日开幕",
"基普乔格是东京马拉松的金牌得主",
]
bm25_retriever = BM25Retriever.from_texts(
doc_list_1, metadatas=[{"source": 1}] * len(doc_list_1)
)
bm25_retriever.k = 2 # 设置 BM25 检索器返回的文档数量
faiss_vectorstore = FAISS.from_texts(
doc_list_2, embedding_function, metadatas=[{"source": 2}] * len(doc_list_2)
)
faiss_retriever = faiss_vectorstore.as_retriever(search_kwargs={"k": 2})
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, faiss_retriever], weights=[0.5, 0.5]
)
docs = ensemble_retriever.invoke("温格高")
print(docs)
page_contents = [doc.page_content for doc in docs]
print(page_contents)
6. 结果压缩
上下文压缩检索器将查询传递给基础检索器,获取初始文档并将其传递给文档压缩器。文档压缩器接收文档列表,通过减少文档内容或完全删除文档来缩短列表。
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(base_compressor=compressor, base_retriever=retriever)
compressed_docs = compression_retriever.invoke("Humans have inhabited this planet for how long?")
可以再加一个嵌入过滤器,通过对文档和查询进行嵌入,仅返回与查询具有足够相似嵌入的文档:
from langchain.retrievers.document_compressors import EmbeddingsFilter
embeddings_filter = EmbeddingsFilter(embeddings=embedding_function, similarity_threshold=0.6)
compression_retriever = ContextualCompressionRetriever(base_compressor=embeddings_filter, base_retriever=retriever)
compression_retriever.invoke("Humans have inhabited this planet for how long?")
使用文档压缩器管道,我们还可以轻松地将多个压缩器按顺序组合在一起:
from langchain.retrievers.document_compressors import DocumentCompressorPipeline
from langchain_community.document_transformers import EmbeddingsRedundantFilter
from langchain_text_splitters import CharacterTextSplitter
splitter = CharacterTextSplitter(chunk_size=300, chunk_overlap=0, separator=". ")
redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings)
relevant_filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.76)
pipeline_compressor = DocumentCompressorPipeline(transformers=[splitter, redundant_filter, relevant_filter])
compression_retriever = ContextualCompressionRetriever(base_compressor=pipeline_compressor, base_retriever=retriever)
compressed_docs = compression_retriever.invoke("What did the president say about Ketanji Jackson Brown")