我们团队最近搭建的 RAG(检索增强生成)原型在内部演示时效果不错,但安全评审直接给我们亮了红灯。问题出在数据访问层:整个 LangChain 应用使用一个高权限的数据库服务账号连接到后端的文档知识库。这意味着,理论上任何用户通过巧妙的提示词工程,都有可能诱导 LLM 查询到其本无权访问的敏感数据。在一个多租户的 SaaS 平台,这种设计是绝对无法接受的。
最初的修补思路是在应用层做权限校验——在执行查询前后,根据用户会话判断其租户ID,然后手动在 SQL 查询中拼接 WHERE tenant_id = '...' 子句。这个方案很快被否决了。它不仅容易出错(任何一个漏掉校验的查询路径都是一个漏洞),而且随着业务逻辑变复杂,权限规则会散落在代码的各个角落,最终变成一场维护噩רוב。
我们需要一个更彻底的方案,一个能将安全策略与数据本身绑定的模型。核心构想是:废除应用层的“上帝视角”,让 LangChain Agent 的每一次数据库交互都以终端用户的真实身份进行。数据访问的权限控制,应该下沉到数据库本身,由数据库来强制执行,而不是应用代码。这正是零信任架构在数据层的体现。
技术选型落在了 CockroachDB 上。除了其本身作为分布式 SQL 数据库的弹性伸缩能力外,两个关键特性使其成为这个架构的核心:
- 行级安全(Row-Level Security, RLS): 允许我们定义复杂的安全策略,这些策略会自动应用于所有对表的查询,确保用户只能看到他们被授权访问的数据行。
- 基于 JWT 的身份验证: CockroachDB 可以直接消费和验证 JSON Web Tokens (JWT),并能从 JWT 的声明(claims)中提取用户信息,如用户ID、租户ID等。
这两者结合,形成了一个完美的闭环:应用层负责验证用户身份并颁发一个包含身份信息的短生命周期 JWT;然后,应用将这个 JWT 作为凭证去连接 CockroachDB;最后,CockroachDB 验证 JWT 并使用其中的声明来执行相应的 RLS 策略。整个过程中,应用代码无需关心具体的过滤逻辑,数据库成为了安全边界的最终执行者。
架构流程设计
在深入代码之前,整个安全访问流程可以用下面的序列图来描述。
sequenceDiagram
participant User as 终端用户
participant AgentApp as LangChain Agent 应用 (Python)
participant AuthService as 模拟身份认证服务
participant CRDB as CockroachDB 集群
User->>AgentApp: 发起查询请求 (携带认证信息)
AgentApp->>AuthService: 请求用户JWT (e.g., user='alice', tenant='tenant-a')
AuthService-->>AgentApp: 返回签名的JWT
Note over AgentApp,CRDB: Agent 使用此JWT构建
一个用户专属的数据库连接
AgentApp->>CRDB: 使用JWT建立连接并执行SQL (由Agent生成)
CRDB->>CRDB: 1. 验证JWT签名
CRDB->>CRDB: 2. 提取JWT Claims (tenant_id)
CRDB->>CRDB: 3. 应用RLS策略, 自动添加 `WHERE tenant_id = 'tenant-a'`
CRDB-->>AgentApp: 返回过滤后的安全数据
AgentApp->>AgentApp: 将安全数据注入LLM Prompt
AgentApp-->>User: 返回LLM生成的最终答案
这个流程的关键在于,数据库连接是动态的、用户专属的,并且是短暂的。
步骤一:配置 CockroachDB 的安全环境
首先,我们需要在 CockroachDB 中配置相应的表、角色和安全策略。假设我们有一个简单的多租户文档知识库。
-- 连接到 CockroachDB 集群
-- cockroach sql --certs-dir "certs" --host "localhost"
-- 创建数据库
CREATE DATABASE IF NOT EXISTS secure_rag;
USE secure_rag;
-- 1. 创建文档表,包含租户ID用于数据隔离
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id STRING NOT NULL,
content STRING NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT now()
);
-- 索引对于查询性能至关重要
CREATE INDEX ON documents (tenant_id);
-- 2. 插入一些测试数据,分属不同租户
INSERT INTO documents (tenant_id, content, metadata) VALUES
('tenant-a', 'Tenant A financial report for Q1 2023.', '{"department": "finance", "year": 2023}'),
('tenant-a', 'Marketing strategy for Tenant A new product launch.', '{"department": "marketing"}'),
('tenant-b', 'Tenant B technical documentation for API v2.', '{"department": "engineering"}'),
('tenant-b', 'Onboarding guide for new hires at Tenant B.', '{"department": "hr"}');
-- 3. 创建一个基础角色,所有通过JWT认证的用户都将属于这个角色
-- 这个角色本身不授予任何表的直接访问权限
CREATE ROLE jwt_user;
-- 4. 启用行级安全策略
-- 这是最关键的一步。我们为 documents 表启用 RLS。
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
-- 5. 创建安全策略
-- 这条策略规定,一个用户只有当其JWT声明中的 'tid' 字段值
-- 与文档行中的 'tenant_id' 字段值相同时,才能访问该行。
-- `crdb_internal.get_database_jwt_claims()` 是一个内置函数,用于解析当前会话JWT的声明。
CREATE POLICY tenant_access_policy ON documents
FOR ALL -- 应用于 SELECT, INSERT, UPDATE, DELETE
USING (tenant_id = (crdb_internal.get_database_jwt_claims() ->> 'tid')::STRING)
WITH CHECK (tenant_id = (crdb_internal.get_database_jwt_claims() ->> 'tid')::STRING);
-- 6. 授予 jwt_user 角色对表的访问权限,但实际访问会受到RLS策略的限制
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE documents TO jwt_user;
这里的核心是 tenant_access_policy。它将数据访问权限与 JWT 的内容动态绑定。我们没有写任何硬编码的 WHERE 子句,而是定义了一个通用的规则。
步骤二:实现一个模拟的 JWT 颁发服务
在生产环境中,这会是 Okta、Keycloak 或自研的身份认证中心。为了演示,我们用 Python 的 fastapi 和 pyjwt 快速搭建一个。
# auth_service.py
import uvicorn
import jwt
import time
from fastapi import FastAPI, HTTPException, Body
from pydantic import BaseModel
# 在真实项目中,这必须从安全配置中加载,并且是一个非对称加密密钥对的私钥
SECRET_KEY = "a-very-secret-key-for-demonstration"
ALGORITHM = "HS256"
app = FastAPI()
class UserCredentials(BaseModel):
username: str
tenant_id: str
@app.post("/token")
def generate_token(credentials: UserCredentials):
"""
根据用户名和租户ID颁发一个短生命周期的JWT。
"""
if not credentials.username or not credentials.tenant_id:
raise HTTPException(status_code=400, detail="Username and tenant_id are required")
# 令牌有效期设置为5分钟,强制客户端频繁刷新
expiration_time = int(time.time()) + 300
# 构建JWT的payload,这里的 'tid' 必须与CockroachDB RLS策略中使用的声明键匹配
payload = {
"sub": credentials.username,
"tid": credentials.tenant_id,
"exp": expiration_time,
"iat": int(time.time())
}
try:
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return {"access_token": token}
except Exception as e:
# 添加日志记录
print(f"Error encoding JWT: {e}")
raise HTTPException(status_code=500, detail="Could not generate token")
if __name__ == "__main__":
# 运行: uvicorn auth_service:app --reload
uvicorn.run(app, host="0.0.0.0", port=8001)
这个服务非常简单,它接收用户名和租户ID,然后返回一个包含 sub (subject) 和 tid (tenant id) 声明的JWT。
步骤三:构建具备安全上下文的 LangChain Agent
现在是整合的核心部分。我们需要修改 LangChain 与数据库交互的方式。默认的 SQLDatabase 类使用一个固定的连接串,这不符合我们的动态JWT认证模型。因此,我们需要创建一个自定义的连接器,它能在每次交互前获取最新的用户JWT。
# secure_agent_app.py
import os
import requests
from typing import Any, Dict
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from langchain.agents import create_sql_agent
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_openai import OpenAI
from langchain_community.utilities import SQLDatabase
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- 配置区 ---
# 确保设置了OpenAI API密钥
# os.environ["OPENAI_API_KEY"] = "sk-..."
COCKROACHDB_USER = "jwt_user" # 使用我们创建的无密码角色
COCKROACHDB_HOST = "localhost"
COCKROACHDB_PORT = 26257
COCKROACHDB_DATABASE = "secure_rag"
# 注意:cert-dir需要指向你的CockroachDB证书目录
COCKROACHDB_CERTS_DIR = "certs"
AUTH_SERVICE_URL = "http://localhost:8001/token"
class PerRequestJWTConnector:
"""
一个自定义数据库连接器,为每个请求动态获取JWT并创建SQLAlchemy引擎。
这是整个安全架构在应用层的核心实现。
"""
def __init__(self, user_context: Dict[str, Any]):
if "username" not in user_context or "tenant_id" not in user_context:
raise ValueError("user_context must contain 'username' and 'tenant_id'")
self.user_context = user_context
self._engine = self._create_engine_with_jwt()
def _get_jwt(self) -> str:
"""从认证服务获取用户的JWT。"""
try:
response = requests.post(AUTH_SERVICE_URL, json=self.user_context)
response.raise_for_status()
return response.json()["access_token"]
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get JWT for {self.user_context}: {e}")
raise ConnectionError("Could not authenticate with the identity service.")
def _create_engine_with_jwt(self) -> Engine:
"""使用获取到的JWT创建SQLAlchemy引擎。"""
token = self._get_jwt()
# CockroachDB的连接字符串格式要求密码字段为JWT
# 用户是'jwt_user',这是一个无密码的角色,认证完全依赖JWT
conn_str = (
f"cockroachdb://{COCKROACHDB_USER}:{token}@{COCKROACHDB_HOST}:{COCKROACHDB_PORT}/{COCKROACHDB_DATABASE}"
f"?sslmode=verify-full&sslrootcert={COCKROACHDB_CERTS_DIR}/ca.crt"
)
logger.info(f"Creating engine for tenant '{self.user_context['tenant_id']}'...")
return create_engine(conn_str)
def get_engine(self) -> Engine:
"""返回已创建的引擎实例。"""
return self._engine
def create_secure_sql_database(user_context: Dict[str, Any]) -> SQLDatabase:
"""
工厂函数,用于创建一个使用动态JWT认证的SQLDatabase实例。
"""
connector = PerRequestJWTConnector(user_context)
engine = connector.get_engine()
# 我们只暴露 'documents' 表给 LLM,防止它查询系统表或其他敏感表
return SQLDatabase(engine=engine, include_tables=['documents'])
def run_agent_query(user_context: Dict[str, Any], query: str):
"""
模拟一个完整的用户请求处理流程。
"""
logger.info(f"\n--- Running query for user: {user_context['username']} in tenant: {user_context['tenant_id']} ---")
logger.info(f"User Query: '{query}'")
try:
# 1. 为当前用户上下文创建安全的数据库连接
db = create_secure_sql_database(user_context)
# 2. 初始化LLM和SQL Toolkit
llm = OpenAI(temperature=0)
toolkit = SQLDatabaseToolkit(db=db, llm=llm)
agent_executor = create_sql_agent(
llm=llm,
toolkit=toolkit,
verbose=True, # 在演示中开启 verbose 以便观察 Agent 的思考过程
handle_parsing_errors=True # 生产中建议开启,增加鲁棒性
)
# 3. 执行查询
result = agent_executor.invoke(query)
logger.info(f"Agent Response: {result['output']}")
except Exception as e:
logger.error(f"An error occurred during agent execution: {e}", exc_info=True)
if __name__ == "__main__":
# 单元测试思路:
# 1. Mock PerRequestJWTConnector._get_jwt 方法,返回一个预设的、不同租户的JWT。
# 2. 验证 agent_executor.invoke(...) 之后,SQLDatabase.run() 被调用时的SQL查询是否*没有*包含手写的WHERE子句。
# 3. 验证端到端测试,确保不同租户上下文返回了正确的数据子集。
# 场景一: Alice (tenant-a) 查询她的财务报告
alice_context = {"username": "alice", "tenant_id": "tenant-a"}
run_agent_query(alice_context, "What is the content of the financial report from 2023?")
# 场景二: Bob (tenant-b) 查询同样的问题,他应该什么也得不到
bob_context = {"username": "bob", "tenant_id": "tenant-b"}
run_agent_query(bob_context, "What is the content of the financial report from 2023?")
# 场景三: Bob (tenant-b) 查询他自己租户的文档
run_agent_query(bob_context, "Find the technical documentation for API v2.")
运行这段代码,你会观察到截然不同的结果:
- 对于 Alice 的查询,Agent 能找到
tenant-a的财务报告并给出答案。在 verbose 日志中,你会看到它生成的 SQL 类似SELECT content FROM documents WHERE metadata->>'year' = 2023 AND content LIKE '%financial report%'。注意,这里没有tenant_id的过滤,因为这是由数据库在后端透明完成的。 - 对于 Bob 的第一个查询,Agent 同样会生成类似的 SQL,但当这个查询在 CockroachDB 中执行时,RLS 策略会因为 Bob 的 JWT(
tid: tenant-b)而自动加上WHERE tenant_id = 'tenant-b'的条件,导致查询结果为空。Agent 会诚实地回答“我没有找到相关信息”。 - 对于 Bob 的第二个查询,他能成功获取到
tenant-b的技术文档。
这证明了我们的架构是成功的。应用层代码是租户无关的,它只负责传递身份。安全策略的实施完全由数据库保证,实现了关注点分离,极大地增强了系统的安全性和可维护性。
当前方案的局限性与未来迭代
尽管该方案解决了核心的安全问题,但在生产环境中落地前仍有一些需要权衡的方面。
首先是性能开销。为每个请求都获取一个新的JWT并可能建立新的数据库连接,会带来显著的延迟。在真实项目中,PerRequestJWTConnector 需要被优化。一种策略是在应用层维护一个基于用户身份的连接池,或者至少缓存JWT直到其过期。但管理动态连接池比管理一个静态连接池要复杂得多,需要仔细处理连接的生命周期和回收问题。
其次,RLS 策略的管理本身也可能成为一个挑战。随着业务规则变得复杂(例如,不仅基于租户,还基于用户角色、文档部门等),SQL 中的策略定义可能会变得冗长且难以调试。这要求我们有配套的自动化测试和版本控制流程来管理这些安全策略,将其作为数据库 schema 的一部分进行演进,即“安全策略即代码”。
最后,Agent 生成的 SQL 仍然存在一定的不可预测性。虽然 RLS 提供了一个强大的安全护栏,但复杂的查询可能会因为性能问题或触发了某些数据库的边界情况而失败。建立一个坚固的监控和审计层,记录 Agent 生成的 SQL 和数据库的响应,对于问题排查和持续优化是必不可少的。未来的一个方向是探索更受限的工具,比如让 Agent 调用预定义的、参数化的查询 API,而不是给予它完整的 SQL 执行能力,但这会在灵活性上做出妥协。