通过使用 MyScale 和 LangChain 创建 AI 助手来克服 RAG 的限制,以提高数据检索过程的准确性和效率。
让我们讨论一下如何使用 [MyScale]创建 AI 助手来克服这些 RAG 限制,从而提高数据检索过程的准确性和效率。我们将抓取 Hacker News 的最新故事,同时指导您完成该过程,以演示如何使用高级 SQL 向量查询增强您的 RAG 应用程序。
工具和技术
设置环境
在开始编写代码之前,我们必须确保安装了所有必需的库和依赖项。您可以使用以下方法安装它们:
pip install requests clickhouse-connect transformers openai langchain
此 pip
命令应安装此项目中所需的所有依赖项。
导入库并定义辅助函数
首先,我们将导入必要的库并定义用于从 Hacker News 获取和处理数据的辅助函数。
import requests
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
# Fetch story IDs from a specific endpoint
def fetch_story_ids(endpoint):
url = f'https://hacker-news.firebaseio.com/v0/{endpoint}.json'
response = requests.get(url)
return response.json()
# Get details of a specific item by ID
def get_item_details(item_id):
item_url = f'https://hacker-news.firebaseio.com/v0/item/{item_id}.json'
item_response = requests.get(item_url)
return item_response.json()
# Recursively fetch comments for a story
def fetch_comments(comment_ids, depth=0):
comments = []
for comment_id in comment_ids:
comment_details = get_item_details(comment_id)
if comment_details and comment_details.get('type') == 'comment':
comment_text = comment_details.get('text', '[deleted]')
comment_by = comment_details.get('by', 'Anonymous')
indent = ' ' * depth * 2
comments.append(f"{indent}Comment by {comment_by}: {comment_text}")
if 'kids' in comment_details:
comments.extend(fetch_comments(comment_details['kids'], depth + 1))
return comments
# Convert list of comments to a single string
def create_comment_string(comments):
return ' '.join(comments)
# Set the time limit to 12 hours ago
time_limit = datetime.utcnow() - timedelta(hours=12)
unix_time_limit = int(time_limit.timestamp())
这些函数获取故事 ID,获取特定项目的详细信息,递归获取评论并将评论转换为单个字符串。
获取和处理故事
接下来,我们从 Hacker News 获取最新和最热门的故事,并处理它们以提取相关数据。
# Fetch latest and top stories
latest_stories_ids = fetch_story_ids('newstories')
top_stories_ids = fetch_story_ids('topstories')
# Fetch top 20 stories
top_stories = [get_item_details(story_id) for story_id in top_stories_ids[:20]]
# Fetch all latest stories from the last 12 hours
latest_stories = [get_item_details(story_id) for story_id in latest_stories_ids if get_item_details(story_id).get('time', 0) >= unix_time_limit]
# Prepare data for DataFrame
data = []
def process_stories(stories):
for story in stories:
if story:
story_time = datetime.utcfromtimestamp(story.get('time', 0))
if story_time >= time_limit:
story_data = {
'Title': story.get('title', 'No Title'),
'URL': story.get('url', 'No URL'),
'Score': story.get('score', 0),
'Time': convert_unix_to_datetime(story.get('time', 0)),
'Writer': story.get('by', 'Anonymous'),
'Comments': story.get('descendants', 0) # Correctly handle the number of comments
}
# Fetch comments if any
if 'kids' in story:
comments = fetch_comments(story['kids'])
story_data['Comments_String'] = create_comment_string(comments)
else:
story_data['Comments_String'] = ""
data.append(story_data)
# Process latest and top stories
process_stories(latest_stories)
process_stories(top_stories)
# Create DataFrame
df = pd.DataFrame(data)
# Ensure correct data types
df['Score'] = df['Score'].astype(np.uint64)
df['Comments'] = df['Comments'].astype(np.uint64)
df['Time'] = pd.to_datetime(df['Time'])
我们使用上述定义的辅助函数从 Hacker News 获取最新和最热门的故事。我们处理获取的故事以提取相关信息,如标题、URL、分数、时间、作者和评论。我们还将评论列表转换为单个字符串。
初始化用于嵌入的 Hugging Face 模型
我们现在将使用预训练模型为故事标题和评论生成嵌入。此步骤对于创建 RAG 系统至关重要。
import torch
from transformers import AutoTokenizer, AutoModel
# Initialize the tokenizer and model for embeddings
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
# Generate embeddings after DataFrame is created
empty_embedding = np.zeros(384, dtype=np.float32) # Assuming the embedding size is 384
def generate_embeddings(texts):
inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt", max_length=512)
with torch.no_grad():
outputs = model(**inputs)
embeddings = outputs.last_hidden_state.mean(dim=1)
return embeddings.numpy().astype(np.float32).flatten()
我们将使用 Hugging Face transformers 库加载用于生成嵌入的预训练模型,并为故事标题和评论生成嵌入。
处理长评论
为了处理超出模型最大令牌长度的长评论,我们将它们拆分为可管理的部分。
# Function to handle long comments
def handle_long_comments(comments, max_length):
parts = [' '.join(comments[i:i + max_length]) for i in range(0, len(comments), max_length)]
return parts
此函数将长评论拆分为适合模型最大令牌长度的部分。
处理故事以进行嵌入
最后,我们将处理每个故事以生成标题和评论的嵌入,并创建一个最终的 Pandas DataFrame。
# Process each story for embeddings
final_data = []
for story in data:
title_embedding = generate_embeddings([story['Title']]).tolist()
comments_string = story['Comments_String']
if comments_string and isinstance(comments_string, str):
max_length = tokenizer.model_max_length # Use the model's max token length
if len(comments_string.split()) > max_length:
parts = handle_long_comments(comments_string.split(), max_length)
for part in parts:
part_comments_string = ' '.join(part)
comments_embeddings = generate_embeddings([part_comments_string]).tolist() if part_comments_string else empty_embedding.tolist()
final_data.append({
'Title': story['Title'],
'URL': story['URL'],
'Score': story['Score'],
'Time': story['Time'],
'Writer': story['Writer'],
'Comments': story['Comments'],
'Comments_String': part_comments_string,
'Title_Embedding': title_embedding,
'Comments_Embedding': comments_embeddings
})
else:
comments_embeddings = generate_embeddings([comments_string]).tolist() if comments_string else empty_embedding.tolist()
final_data.append({
'Title': story['Title'],
'URL': story['URL'],
'Score': story['Score'],
'Time': story['Time'],
'Writer': story['Writer'],
'Comments': story['Comments'],
'Comments_String': comments_string,
'Title_Embedding': title_embedding,
'Comments_Embedding': comments_embeddings
})
else:
story['Title_Embedding'] = title_embedding
story['Comments_Embedding'] = empty_embedding.tolist()
final_data.append(story)
# Create final DataFrame
final_df = pd.DataFrame(final_data)
# Ensure correct data types in the final DataFrame
final_df['Score'] = final_df['Score'].astype(np.uint64)
final_df['Comments'] = final_df['Comments'].astype(np.uint64)
final_df['Time'] = pd.to_datetime(final_df['Time'])
在此步骤中,我们处理每个故事以生成标题和评论的嵌入,在必要时处理长评论,并使用所有处理后的数据创建一个最终的 DataFrame。
连接到 MyScaleDB 并创建表
我们将使用 ClickHouse-Connect 连接到 MyScaleDB 并在其中创建一张表用来存储爬取到的故事。
import clickhouse_connect
client = clickhouse_connect.get_client(
host='your-host',
port=443,
username='your-username',
password='your-password'
)
client.command("DROP TABLE IF EXISTS default.posts")
client.command("""
CREATE TABLE default.posts (
id UInt64,
Title String,
URL String,
Score UInt64,
Time DateTime64,
Writer String,
Comments UInt64,
Title_Embedding Array(Float32),
Comments_Embedding Array(Float32),
CONSTRAINT check_data_length CHECK length(Title_Embedding) = 384
) ENGINE = MergeTree()
ORDER BY id
""")
此代码导入 clickhouse-connect 库并使用提供的凭证建立与 MyScaleDB 的连接。如果已有默认表格 default.posts ,将删除它们,并使用指定架构创建一个新表格。
注:MyScaleDB 提供一个用于保存 500 万向量的免费 pod,以便您可以立即在 RAG 应用程序中使用 MyScaleDB,无需预先付款。
高效处理复杂查询
我们将使用 clickhouse-connect 连接到 MyScaleDB,并创建一个表来存储抓取的故事。
此代码导入 clickhouse-connect 库,并使用提供的凭据建立与 MyScaleDB 的连接。如果存在,它将删除现有的表 default.posts,并使用指定架构创建一个新表。
注意: MyScaleDB 为 500 万个向量的向量存储提供了一个免费的 pod。因此,你可以在你的 RAG 应用程序中开始使用 MyScaleDB,而无需任何初始付款。
插入数据和创建向量索引
现在,我们将处理后的数据插入 MyScaleDB 表,并创建一个索引以实现高效的数据检索。
batch_size = 20 # Adjust based on your needs
num_batches = len(final_df) // batch_size
for i in range(num_batches):
start_idx = i * batch_size
end_idx = start_idx + batch_size
batch_data = final_df[start_idx:end_idx]
client.insert("default.posts", batch_data, column_names=['Title', 'URL', 'Score', "Time",'Writer', 'Comments','Title_Embedding','Comments_Embedding'])
print(f"Batch {i+1}/{num_batches} inserted.")
client.command("""
ALTER TABLE default.posts
ADD VECTOR INDEX photo_embed_index Title_Embedding
TYPE MSTG
('metric_type=Cosine')
""")
此代码将数据批量插入 default.posts 表中,以高效管理大量数据。向量索引在 Title_Embedding 列上创建。
设置查询生成提示模板
我们将设置一个提示模板,将自然语言查询转换为 MyScaleDB SQL 查询。
prompt_template = """
You are a MyScaleDB expert. Given an input question, first create a syntactically correct MyScaleDB query to run, then look at the results of the query and return the answer to the input question.
MyScaleDB queries have a vector distance function called `DISTANCE(column, array)` to compute relevance to the user's question and sort the feature array column by relevance. The `DISTANCE(column, array)` function only accepts an array column as its first argument and a `Embeddings(entity)` as its second argument. You also need a user-defined function called `Embeddings(entity)` to retrieve the entity's array.
When the query is asking for the closest rows based on a certain keyword (e.g., "AI field" or "criticizing"), you have to use this distance function to calculate the distance to the entity's array in the vector column and order by the distance to retrieve relevant rows. If the question involves time constraints (e.g., "last 7 hours"), use the `today()` function to get the current date and time.
If the question specifies the number of examples to obtain, use that number; otherwise, query for at most {top_k} results using the LIMIT clause as per MyScale. Only order according to the distance function when necessary. Never query for all columns from a table; query only the columns needed to answer the question, and wrap each column name in double quotes (") to denote them as delimited identifiers.
Be careful to use only the column names present in the tables below and ensure you know which column belongs to which table. The `ORDER BY` clause should always be after the `WHERE` clause. Do not add a semicolon to the end of the SQL.
Pay attention to the following steps when constructing the query:
1. Identify keywords in the input question (e.g., "most voted articles," "last 7 hours," "AI field").
2. Map keywords to specific query components (e.g., "most voted" maps to "Score DESC").
3. If the question involves relevance to a keyword (e.g., "criticizing"), use the distance function. Otherwise, use standard SQL clauses.
4. If the question mentions the title or comments specifically, calculate the distance accordingly. By default, calculate distance with the title.
5. Use `Embeddings(keyword)` to get embeddings for keywords and use them in the `DISTANCE` function only when the query involves a keyword relevance search.
6. Ensure to consider the comments column if explicitly mentioned in the question.
7. Don't use dist in a query where you haven't found any distance and make sure to use order by dist with other columns as well where the distance is calculated
Example questions and how to handle them:
1. "What are the most voted articles during the last 7 hours in the AI field?"
- Extract keywords: "most voted articles," "last 7 hours," "AI field."
- Map "most voted" to "Score DESC."
- Construct query for the most voted articles in the last 7 hours:
- `SELECT DISTINCT "Title", "URL", "Score", DISTANCE("Title_Embedding", Embeddings('AI field')) FROM posts1 WHERE Time >= today() - INTERVAL 7 HOUR ORDER BY Score DESC LIMIT {top_k}`
2. "Give me some comments where people are criticizing the content?"
- Extract keywords: "comments," "criticizing."
- Map "criticizing" to DISTANCE function.
- Construct query for relevant comments:
- `SELECT DISTINCT "Comments", "Score", DISTANCE("Comments_Embedding", Embeddings('criticizing')) as dist FROM posts1 ORDER BY dist LIMIT {top_k}`
3. "What were the top voted stories during the last 6 hours?"
- Extract keywords: "top voted stories," "last 6 hours."
- Map "top voted" to "Score DESC."
- Construct a simple query for the top voted stories in the last 6 hours:
- `SELECT DISTINCT "Title", "URL", "Score" FROM posts1 WHERE Time >= today() - INTERVAL 6 HOUR ORDER BY Score DESC LIMIT {top_k}`
4. "What are the trending stories in the AI field?"
- Extract keywords: "trending stories," "AI field."
- Map "trending" to "Score DESC."
- Construct query for trending stories in the AI field:
- `SELECT DISTINCT "Title", "URL", "Score", DISTANCE("Title_Embedding", Embeddings('AI field')) as dist FROM posts1 ORDER BY dist,Score DESC LIMIT {top_k}`
5. "Give me some comments that are discussing about latest trends of LLMs?"
- Extract keywords: "comments," "latest trends of LLMs."
- Map "latest trends of LLMs" to DISTANCE function.
- Construct query for comments discussing latest trends of LLMs:
- `SELECT DISTINCT "Comments", "Score", DISTANCE("Comments_Embedding", Embeddings('latest trends of LLMs')) as dist FROM posts1 ORDER BY dist LIMIT {top_k}`
Now, let's create the query based on the provided input.
======== table info ========
{table_info}
Question: {input}
SQLQuery: "
Remove \n,, " or any kind of redundant letter from the query and Be careful to not query for columns that do not exist.
"""
def generate_final_prompt(input, table_info, top_k=5):
final_prompt = prompt_template.format(input=input, table_info=table_info, top_k=top_k)
return final_prompt
此代码设置了一个提示模板,指导 LLM 根据输入问题生成正确的 MyScaleDB 查询。
设置查询参数
我们将设置查询生成的 parameters。
top_k = 5
table_info = """
posts1 (
id UInt64,
Title String,
URL String,
Score UInt64,
Time DateTime64,
Writer String,
Comments UInt64,
Title_Embedding Array(Float32),
Comments_Embedding Array(Float32)
)
"""
此代码设置要检索的 top 结果数 (top_k),定义表信息 (table_info),并为问题设置一个空输入字符串 (input)。
设置模型
在此步骤中,我们将设置 OpenAI 模型,以将用户输入转换为 SQL 查询。
from langchain.chat_models import ChatOpenAI
model = ChatOpenAI(openai_api_key="open-ai-api-key")
将文本转换为 SQL
此方法首先根据用户输入和表信息生成一个最终提示,然后使用 OpenAI 模型将文本转换为 SQL 向量查询。
def get_query(user_input):
final_prompt = generate_final_prompt(user_input, table_info, top_k)
response_text = model.predict(final_prompt)
return response_text
user_input="What are the most voted stories?"
response=get_query(user_input)
在此步骤之后,我们将获得如下查询:
'SELECT DISTINCT "Title", "URL", "Score", DISTANCE("Title_Embedding", Embeddings('AI domain')) as dist FROM posts1 ORDER BY dist, Score DESC LIMIT 5'
但 MyScaleDB DISTANCE 预期 DISTANCE(column, array)。因此,我们需要将 Embeddings(‘AI domain’) 部分转换为向量嵌入。
处理和替换查询字符串中的嵌入
此方法将用于将 Embeddings(“Extracted keywords”) 替换为 float32 数组。
import re
def process_query(query):
pattern = re.compile(r'Embeddings(([^)]+))')
matches = pattern.findall(query)
for match in matches:
processed_embedding = str(list(generate_embeddings(match)))
query = query.replace(f'Embeddings({match})', processed_embedding)
return query
query=process_query1(f"""{response}""")
此方法将查询作为 input,如果查询字符串中存在任何 Embeddings 方法,则返回更新后的查询。
执行查询
最后,我们将执行查询以从向量数据库中检索相关故事。
query=query.replace("\n","")
results = client.query(f"""{query}""")
for row in results.named_results():
print("Title ", row["Title"])
此外,你可以获取模型返回的查询,提取指定列并使用它们来获取列,如上所示。然后可以将这些结果传递回聊天模型,创建一个完整的 AI 聊天助手。这样,助手可以动态地使用直接从结果中提取的相关数据来响应用户查询,确保无缝且交互式的体验。
结论
简单的 RAG 由于专注于直接相似性搜索而用途有限。但是,当与 MyScaleDB、LangChain 等高级工具结合使用时,RAG 应用程序不仅可以满足大规模大数据管理的需求,还可以超越这些需求。它们可以处理更广泛的查询,包括基于时间和复杂的关联查询,从而显著提高当前系统的性能和效率。
那么,我们该如何学习大模型?
作为一名热心肠的互联网老兵,我决定把宝贵的AI知识分享给大家。 至于能学习到多少就看你的学习毅力和能力了 。我已将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。
一、大模型全套的学习路线
学习大型人工智能模型,如GPT-3、BERT或任何其他先进的神经网络模型,需要系统的方法和持续的努力。既然要系统的学习大模型,那么学习路线是必不可少的,下面的这份路线能帮助你快速梳理知识,形成自己的体系。
L1级别:AI大模型时代的华丽登场
L2级别:AI大模型API应用开发工程
L3级别:大模型应用架构进阶实践
L4级别:大模型微调与私有化部署
一般掌握到第四个级别,市场上大多数岗位都是可以胜任,但要还不是天花板,天花板级别要求更加严格,对于算法和实战是非常苛刻的。建议普通人掌握到L4级别即可。
以上的AI大模型学习路线,不知道为什么发出来就有点糊,高清版可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费
】
二、640套AI大模型报告合集
这套包含640份报告的合集,涵盖了AI大模型的理论研究、技术实现、行业应用等多个方面。无论您是科研人员、工程师,还是对AI大模型感兴趣的爱好者,这套报告合集都将为您提供宝贵的信息和启示。
三、大模型经典PDF籍
随着人工智能技术的飞速发展,AI大模型已经成为了当今科技领域的一大热点。这些大型预训练模型,如GPT-3、BERT、XLNet等,以其强大的语言理解和生成能力,正在改变我们对人工智能的认识。 那以下这些PDF籍就是非常不错的学习资源。
四、AI大模型商业化落地方案
作为普通人,入局大模型时代需要持续学习和实践,不断提高自己的技能和认知水平,同时也需要有责任感和伦理意识,为人工智能的健康发展贡献力量。