内容目录

前言

我们在上一章《【项目实战】基于Agent的金融问答系统之项目简介》中简单介绍了项目背景以及数据集情况,本章将介绍RAG模块的实现。

功能列表

参考之前所学内容《大模型之初识RAG》,我们需要实现如下功能:

  • 向量库的基础功能
    • 连接向量库
    • 数据入库
  • 文件导入
    • PDF文件的读取
    • PDF文件的切分
    • 调用向量库接口入库
  • 文件检索
    • 连接向量库
    • 检索器检索文件

开发过程

1、规划工程文件

项目开始之后,我们如果能够抑制住直接撸代码的冲动,改为提前做好规划,这会是一个好的习惯。
为此,我提前做了如下规划:

代码管理

  • 代码使用Git进行管理,这样后期多人协作时方便代码更新、代码Merge、冲突解决等。
  • 由于国内访问Github优势会ban,所以我们将代码仓库放在Gitee上。
  • 为代码仓库起了一个响亮的名称后,仓库地址定为https://gitee.com/deadwalk/smart-finance-bot

项目目录

考虑这个项目会涉及到前端、后端、数据集、模型等,所以项目目录规划如下:

smart-finance-bot \
    |- dataset \  # 该目录用于保存PDF以及SQLite数据
    |- doc \  # 该目录用于保存文档类文件,例如:需求文档、说明文档、数据文档
    |- app \   # 该目录用于服务端代码
        |- agent \ # 该目录用于保存agent相关代码
        |- rag \   # 该目录用于保存rag相关代码
        |- test \   # 该目录用于保存测试类驱动相关代码
        |- conf \  # 该目录用于保存配置文件
            |- .qwen # 该文件保存QWen的配置文件(请自行添加对应的API KEY)
            |- .ernie # 该文件保存百度千帆的配置文件(请自行添加对应的API KEY)
    |- chatweb \   # 该目录用于保存前端页面代码
    |- scripts \   # 该目录用于保存脚本文件,例如:启动脚本、导入向量数据库脚本等
    |- test_result \   # 该目录用于保存测试结果
    |- docker \
        |- backend \ # 该目录对应后端python服务的Dockerfile
        |- frontend \ # 该目录对应前端python服务的Dockerfile

上述目录中,dataset 是直接使用git的submodul命令,直接将天池大赛提供的数据集引入到本项目中,方便后续使用。

引入方法:

git submodule add https://www.modelscope.cn/datasets/BJQW14B/bs_challenge_financial_14b_dataset.git dataset

命名规范

项目如果能够约束统一的命名规范,这对于后续代码的可读性、可维护性会提供需要便利,在此我沿用了约定俗成的代码命名规范:

  • 类名:使用大驼峰命名法,例如:MyClass
  • 函数名:使用小驼峰命名法,例如:my_function
  • 变量名:使用小驼峰命名法,例如:my_variable
  • 文件夹:使用小驼峰命名法。

整体命名时,要尽量见文知意。

2、实现基本的连接大模型的util库

代码文件及目录:app/utils/util.py


from dotenv import load_dotenv
import os

# 获取当前文件的目录
current_dir = os.path.dirname(__file__)

# 构建到 conf/.qwen 的相对路径
conf_file_path_qwen = os.path.join(current_dir, '..', 'conf', '.qwen')

# 加载千问环境变量
load_dotenv(dotenv_path=conf_file_path_qwen)

def get_qwen_models():
    """
    加载千问系列大模型
    """
    # llm 大模型
    from langchain_community.llms.tongyi import Tongyi

    llm = Tongyi(model="qwen-max", temperature=0.1, top_p=0.7, max_tokens=1024)

    # chat 大模型
    from langchain_community.chat_models import ChatTongyi

    chat = ChatTongyi(model="qwen-max", temperature=0.01, top_p=0.2, max_tokens=1024)
    # embedding 大模型
    from langchain_community.embeddings import DashScopeEmbeddings

    embed = DashScopeEmbeddings(model="text-embedding-v3")

    return llm, chat, embed

在app/conf/.qwen中,添加对应的API KEY,例如:

DASHSCOPE_API_KEY = sk-xxxxxx   

3、实现向量库基础功能

向量库文件考虑使用Chroma实现,所以我们先实现一个向量库的类,用于完成基本的向量库连接、数据入库操作。

代码文件及目录:app/rag/chroma_conn.py

import chromadb
from chromadb import Settings
from langchain_chroma import Chroma

class ChromaDB:
    def __init__(self,
                 chroma_server_type="local",  # 服务器类型:http是http方式连接方式,local是本地读取文件方式
                 host="localhost", port=8000,  # 服务器地址,http方式必须指定
                 persist_path="chroma_db",  # 数据库的路径:如果是本地连接,需要指定数据库的路径
                 collection_name="langchain",  # 数据库的集合名称
                 embed=None  # 数据库的向量化函数
                 ):

        self.host = host
        self.port = port
        self.path = persist_path
        self.embed = embed
        self.store = None

        # 如果是http协议方式连接数据库
        if chroma_server_type == "http":
            client = chromadb.HttpClient(host=host, port=port)

            self.store = Chroma(collection_name=collection_name,
                                embedding_function=embed,
                                client=client)

        if chroma_server_type == "local":
            self.store = Chroma(collection_name=collection_name,
                                embedding_function=embed,
                                persist_directory=persist_path)

        if self.store is None:
            raise ValueError("Chroma store init failed!")

    def add_with_langchain(self, docs):
        """
        将文档添加到数据库
        """
        self.store.add_documents(documents=docs)

    def get_store(self):
        """
        获得向量数据库的对象实例
        """
        return self.store

在实际项目实践过程中,我们发现导入Chroma数据时使用本地化连接方式更快一些,所以对连接方式做了两个参数的扩展,local 代表本地连接,http 代表远程连接。

4、实现入库功能

本着先跑通流程,再优化交互过程的思路,对于PDF文件入向量库的过程,我们先通过一段脚本实现(暂不做前端UI的交互)。

代码文件及目录:app/rag/pdf_processor.py

import os
import logging
import time
from tqdm import tqdm
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from rag.chroma_conn import ChromaDB

class PDFProcessor:
    def __init__(self,
                 directory,  # PDF文件所在目录
                 chroma_server_type,  # ChromaDB服务器类型
                 persist_path,  # ChromaDB持久化路径
                 embed):  # 向量化函数

        self.directory = directory
        self.file_group_num = 80  # 每组处理的文件数
        self.batch_num = 6  # 每次插入的批次数量

        self.chunksize = 500  # 切分文本的大小
        self.overlap = 100  # 切分文本的重叠大小

        self.chroma_db = ChromaDB(chroma_server_type=chroma_server_type,
                                  persist_path=persist_path,
                                  embed=embed)
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )

    def load_pdf_files(self):
        """
        加载目录下的所有PDF文件
        """
        pdf_files = []
        for file in os.listdir(self.directory):
            if file.lower().endswith('.pdf'):
                pdf_files.append(os.path.join(self.directory, file))

        logging.info(f"Found {len(pdf_files)} PDF files.")
        return pdf_files

    def load_pdf_content(self, pdf_path):
        """
        读取PDF文件内容
        """
        pdf_loader = PyMuPDFLoader(file_path=pdf_path)
        docs = pdf_loader.load()
        logging.info(f"Loading content from {pdf_path}.")
        return docs

    def split_text(self, documents):
        """
        将文本切分成小段
        """
        # 切分文档
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunksize,
            chunk_overlap=self.overlap,
            length_function=len,
            add_start_index=True,
        )

        docs = text_splitter.split_documents(documents)

        logging.info("Split text into smaller chunks with RecursiveCharacterTextSplitter.")
        return docs

    def insert_docs_chromadb(self, docs, batch_size=6):
        """
        将文档插入到ChromaDB
        """
        # 分批入库
        logging.info(f"Inserting {len(docs)} documents into ChromaDB.")

        # 记录开始时间
        start_time = time.time()
        total_docs_inserted = 0

        # 计算总批次
        total_batches = (len(docs) + batch_size - 1) // batch_size

        with tqdm(total=total_batches, desc="Inserting batches", unit="batch") as pbar:
            for i in range(0, len(docs), batch_size):
                # 获取当前批次的样本
                batch = docs[i:i + batch_size]

                # 将样本入库
                self.chroma_db.add_with_langchain(batch)
                # self.chroma_db.async_add_with_langchain(batch)

                # 更新已插入的文档数量
                total_docs_inserted += len(batch)

                # 计算并显示当前的TPM
                elapsed_time = time.time() - start_time  # 计算已用时间(秒)
                if elapsed_time > 0:  # 防止除以零
                    tpm = (total_docs_inserted / elapsed_time) * 60  # 转换为每分钟插入的文档数
                    pbar.set_postfix({"TPM": f"{tpm:.2f}"})  # 更新进度条的后缀信息

                # 更新进度条
                pbar.update(1)

    def process_pdfs_group(self, pdf_files_group):
        # 读取PDF文件内容
        pdf_contents = []

        for pdf_path in pdf_files_group:
            # 读取PDF文件内容
            documents = self.load_pdf_content(pdf_path)

            # 将documents 逐一添加到pdf_contents
            pdf_contents.extend(documents)

        # 将文本切分成小段
        docs = self.split_text(pdf_contents)

        # 将文档插入到ChromaDB
        self.insert_docs_chromadb(docs, self.batch_num)

    def process_pdfs(self):
        # 获取目录下所有的PDF文件
        pdf_files = self.load_pdf_files()

        group_num = self.file_group_num

        # group_num 个PDF文件为一组,分批处理
        for i in range(0, len(pdf_files), group_num):
            pdf_files_group = pdf_files[i:i + group_num]
            self.process_pdfs_group(pdf_files_group)

        print("PDFs processed successfully!")

5、测试导入功能

因为Python的导入库的原因(一般都是从工作目录查找),所以我们在项目根目录下创建test_framework.py,方便后续统一测试工作。

smart-finance-bot \
    |- app \   # 该目录用于服务端代码
        |- rag \   # 该目录用于保存rag相关代码
            |- pdf_processor.py
            |- chroma_conn.py
        |- test_framework.py

代码文件: app/test_framework.py

# 测试导入PDF到向量库主流程
def test_import():
    from rag.pdf_processor import PDFProcessor
    from utils.util import get_qwen_models

    llm , chat , embed = get_qwen_models()
    # embed = get_huggingface_embeddings()

    directory = "./app/dataset/pdf"
    persist_path = "chroma_db"
    server_type = "local"

    # 创建 PDFProcessor 实例
    pdf_processor = PDFProcessor(directory=directory,
                                 chroma_server_type=server_type,
                                 persist_path=persist_path,
                                 embed=embed)

    # 处理 PDF 文件
    pdf_processor.process_pdfs()

if __name__ == "__main__":
    test_import()

1、通过命令行启动ChromaDB服务端:

chroma run --path chroma_db --port 8000 --host localhost

2、运行test_framework.py

运行效果:

备注:一般测试框架会使用Pytest并且编写相应的单元测试函数,本次项目中由于项目较小且函数返回结果不固定,所以就没有写UnitTest。如果想了解Pytest的使用示例,可以参考我的其他代码仓库,例如:UnitTest的使用

6、实现检索功能

代码文件: app/rag/rag.py

import logging
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.runnables.base import RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from .chroma_conn import ChromaDB

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class RagManager:
    def __init__(self,
                 chroma_server_type="http",
                 host="localhost", port=8000,
                 persist_path="chroma_db",
                 llm=None, embed=None):
        self.llm = llm
        self.embed = embed

        chrom_db = ChromaDB(chroma_server_type=chroma_server_type,
                            host=host, port=port,
                            persist_path=persist_path,
                            embed=embed)
        self.store = chrom_db.get_store()

    def get_chain(self, retriever):
        """获取RAG查询链"""

        # RAG系统经典的 Prompt (A 增强的过程)
        prompt = ChatPromptTemplate.from_messages([
            ("human", """You are an assistant for question-answering tasks. Use the following pieces 
          of retrieved context to answer the question. 
          If you don't know the answer, just say that you don't know. 
          Use three sentences maximum and keep the answer concise.
          Question: {question} 
          Context: {context} 
          Answer:""")
        ])
        # 将 format_docs 方法包装为 Runnable
        format_docs_runnable = RunnableLambda(self.format_docs)
        # RAG 链
        rag_chain = (
                {"context": retriever | format_docs_runnable,
                 "question": RunnablePassthrough()}
                | prompt
                | self.llm
                | StrOutputParser()
        )

        return rag_chain

    def format_docs(self, docs):
        # 返回检索到的资料文件名称
        logging.info(f"检索到资料文件个数:{len(docs)}")
        retrieved_files = "\n".join([doc.metadata["source"] for doc in docs])
        logging.info(f"资料文件分别是:\n{retrieved_files}")

        retrieved_content = "\n\n".join(doc.page_content for doc in docs)
        logging.info(f"检索到的资料为:\n{retrieved_content}")

        return retrieved_content

    def get_retriever(self, k=4, mutuality=0.3):
        retriever = self.store.as_retriever(search_type="similarity_score_threshold",
                                            search_kwargs={"k": k, "score_threshold": mutuality})

        return retriever

    def get_result(self, question, k=4, mutuality=0.3):
        """获取RAG查询结果"""

        retriever = self.get_retriever(k, mutuality)

        rag_chain = self.get_chain(retriever)

        return rag_chain.invoke(input=question)

以上是实现了一个使用基本检索器的RAG,其中:

  • 代码中通过chroma_conn.py模块连接到ChromaDB数据库,并使用ChromaDB的as_retriever方法创建一个检索器。

7、测试检索效果

在test_framework.py中添加RAG的测试调用函数。

代码文件:app/test_framework.py

# 测试RAG主流程
def test_rag():
    from rag.rag import RagManager
    from utils.util import get_qwen_models

    llm, chat, embed = get_qwen_models()
    rag = RagManager(host="localhost", port=8000, llm=llm, embed=embed)

    result = rag.get_result("湖南长远锂科股份有限公司变更设立时作为发起人的法人有哪些?")

    print(result)

if __name__ == "__main__":
    test_rag()              # RAG测试函数
    # test_import()         # 批量导入PDF测试函数

注释掉批量导入函数,开启test_rag()函数,运行效果:

至此,我们完成了RAG模块的基本功能,它包括PDF文件的批量导入以及检索功能。

内容小结

  • 首先,我们创建了一个ChromaDB的类,封装了基础的Chroma连接、插入、检索。
  • 其次,我们实现了PDFProcessor类,该类中会调用ChromaDB类的插入函数,将批量读取的PDF文件进行切分后保存至向量库。
  • 然后,我们实现了RagManager类,该类中封装了RAG的检索链,并定义了检索的参数。
  • 最后,我们实现了一个测试函数,用于测试RAG的检索功能。
  • 除此之外,有两个注意事项:
    • 在项目初期,进行合理的项目文件目录规划,可以有效减少项目维护的难度。
    • 在项目行进中,通过搭建测试框架,可以方便函数验证以及后续重构的回归测试。

附录

欢迎查看该系列的其他文章:

发表评论

您的电子邮箱地址不会被公开。 必填项已用 * 标注

分类文章

personal_logo
Dongming
自由职业者

推荐活动

推荐文章

【项目实战】基于Agent的金融问答系统:RAG的检索增强之上下文重排和压缩
【项目实战】基于Agent的金融问答系统:RAG的检索增强之ElasticSearch
【项目实战】基于Agent的金融问答系统:前后端流程打通
【项目实战】基于Agent的金融问答系统:代码重构
【项目实战】基于Agent的金融问答系统:Agent框架的构建
【项目实战】基于Agent的金融问答系统:RAG检索模块初建成
【项目实战】基于Agent的金融问答系统:项目简介
【课程总结】day29:大模型之深入了解Retrievers解析器
【课程总结】day28:大模型之深入探索RAG流程
【课程总结】day30:大模型之Agent的初步了解
内容目录
滚动至顶部