我们面临的第一个问题不是技术选型,而是组织惯性。一个拥有数百个微服务的庞大系统,技术栈横跨十年,从老旧的Java Monolith到现代的Go服务,底层依赖着MySQL集群、Elasticsearch和一套用于实时分析的ClickHouse。业务要求全链路的可观测性,但推动每个团队去修改代码、引入OpenTelemetry SDK、然后重新部署所有服务,这个过程的沟通成本和风险高到无法接受。我们需要一个能在不触碰任何应用代码的前提下,立刻提供深度数据库层面洞察的方案。
最初的想法是网络流量镜像,通过旁路抓包来分析数据库协议。这个方案很快被否决。在生产环境中,核心节点的网络流量是巨大的,完整的流量复制和深度包检测(DPI)对网络和计算资源的消耗都非常可观。更重要的是,将原始网络包从内核空间复制到用户空间进行分析,这个过程本身就引入了不可忽视的延迟和性能开销,在高峰期甚至可能导致丢包,丢失关键的观测数据。我们需要一个更高效、更底层的解决方案。
这就是eBPF进入视野的原因。它允许我们在内核中运行沙箱化的程序,直接挂钩到系统调用或网络事件上,只把处理过的、高度浓缩的数据发送到用户空间。这正是我们需要的:在数据产生的源头进行预处理,性能开销极低,且对应用完全透明。
技术选型的核心争议点在于用户空间的代理(Agent)实现语言。团队里主流是Go和Rust,但我们最终选择了一个非常规的方案:Haskell。理由如下:
- 协议解析的鲁棒性:我们需要解析多种数据库协议(MySQL原生协议、ClickHouse和Elasticsearch的HTTP协议)。Haskell强大的类型系统和顶级的解析器组合库(Parser Combinator Libraries)如Attoparsec,能让我们构建出类型安全、易于组合和测试的解析器,这在处理复杂的二进制或文本协议时,能有效避免大量边界情况的bug。
- 并发处理的正确性:代理需要同时处理来自eBPF内核模块的数据流,解析数据,关联请求与响应,并批量导出到多个后端。Haskell的并发模型,特别是Software Transactional Memory (STM),为我们处理共享状态(如请求-响应映射表)提供了一种无锁、可组合的优雅方案,极大地降低了并发编程的心智负担。
- 表达力:用Haskell描述数据转换和处理流水线非常直观。从原始字节流到结构化的遥测数据,整个过程可以用一系列纯函数来清晰地表达,可维护性很高。
当然,这也是一次技术赌博。Haskell的生态和社区规模远小于Go或Rust,但对于构建这样一个边界清晰、逻辑复杂的后端工具,我们认为它的优势大于劣势。我们的数据后端选择则相对传统:
- Elasticsearch: 存储和索引原始的、解析后的数据库交互事件。提供全文搜索能力,用于问题排查。
- ClickHouse: 存储聚合后的性能指标,如P99延迟、吞吐量、错误率等。其列式存储和查询性能非常适合做实时分析。
- Jaeger: 作为最终的分布式追踪系统。我们将从数据库交互中合成的Trace和Span数据发送到Jaeger,为开发者提供一个他们已经熟悉的可视化界面。
整个系统的架构如下:
graph TD
subgraph Host Kernel Space
App[Application] -- DB Query --> Sockets
subgraph eBPF Probes
KProbe_Send[kprobe/tcp_sendmsg]
KProbe_Recv[kprobe/tcp_recvmsg]
end
Sockets -- triggers --> KProbe_Send
Sockets -- triggers --> KProbe_Recv
KProbe_Send -- data --> PerfBuffer
KProbe_Recv -- data --> PerfBuffer
end
subgraph Host User Space
PerfBuffer -- raw events --> HaskellAgent[Haskell Observability Agent]
subgraph HaskellAgent
Parser[Protocol Parser]
Correlator[Request/Response Correlator]
Exporter[Data Exporter]
end
HaskellAgent --> Parser --> Correlator --> Exporter
end
subgraph Observability Backend
Jaeger
Elasticsearch
ClickHouse
end
Exporter -- Traces & Spans --> Jaeger
Exporter -- Raw Events --> Elasticsearch
Exporter -- Aggregated Metrics --> ClickHouse
第一步:内核的眼睛 - eBPF探针
我们的eBPF程序需要做几件事:
- 挂载到TCP的发送和接收函数上,比如
tcp_sendmsg和tcp_recvmsg。 - 过滤出我们关心的数据库端口(3306, 9200, 8123)。
- 为每个事件捕获关键元数据:时间戳、进程ID (PID)、线程ID (TID)、套接字信息。
- 将这些元数据和一小部分数据包载荷(payload)发送到用户空间的Perf Buffer。
一个简化的eBPF C代码示例如下。在真实项目中,我们会使用像libbpf-bootstrap这样的脚手架来简化开发。
// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_core_read.h>
#define MAX_PAYLOAD_SIZE 1024
// Data structure sent to user space
struct db_event {
__u64 ts_ns;
__u32 pid;
__u32 tid;
__u16 sport;
__u16 dport;
__u8 comm[TASK_COMM_LEN];
__u32 payload_size;
__u8 payload[MAX_PAYLOAD_SIZE];
__u8 is_request; // 1 for request, 0 for response
};
// Perf buffer for submitting events
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u32));
} events SEC(".maps");
// Helper to filter by target ports
static __always_inline bool is_target_port(__u16 port) {
return port == 3306 || port == 9200 || port == 8123;
}
// Common function to process socket data
static __always_inline int process_data(struct pt_regs *ctx, struct sock *sk, struct msghdr *msg, bool is_req) {
if (msg == NULL || sk == NULL) {
return 0;
}
__u16 dport = BPF_CORE_READ(sk, __sk_common.skc_dport);
dport = bpf_ntohs(dport);
__u16 sport = BPF_CORE_READ(sk, __sk_common.skc_num);
if (!is_target_port(dport) && !is_target_port(sport)) {
return 0;
}
// We only capture traffic *to* the database (requests) or *from* it (responses)
if (is_req && !is_target_port(dport)) return 0;
if (!is_req && !is_target_port(sport)) return 0;
struct db_event *event;
event = bpf_ringbuf_reserve(&events, sizeof(*event), 0);
if (!event) {
return 0;
}
event->ts_ns = bpf_ktime_get_ns();
u64 id = bpf_get_current_pid_tgid();
event->pid = id >> 32;
event->tid = (__u32)id;
bpf_get_current_comm(&event->comm, sizeof(event->comm));
event->sport = sport;
event->dport = dport;
event->is_request = is_req ? 1 : 0;
struct iov_iter *iter = &msg->msg_iter;
if (iter->type != ITER_IOVEC) {
bpf_ringbuf_discard(event, 0);
return 0;
}
const struct iovec *iov = BPF_CORE_READ(iter, iovec);
unsigned long base_addr = (unsigned long)BPF_CORE_READ(iov, iov_base);
__u32 len = BPF_CORE_READ(iov, iov_len);
__u32 payload_len = len < MAX_PAYLOAD_SIZE ? len : MAX_PAYLOAD_SIZE;
event->payload_size = payload_len;
bpf_probe_read_user(&event->payload, payload_len, (void *)base_addr);
bpf_ringbuf_submit(event, 0);
return 0;
}
SEC("kprobe/tcp_sendmsg")
int BPF_KPROBE(kprobe_tcp_sendmsg, struct sock *sk, struct msghdr *msg, size_t size) {
return process_data(ctx, sk, msg, true);
}
SEC("kprobe/tcp_recvmsg")
int BPF_KPROBE(kprobe_tcp_recvmsg, struct sock *sk, struct msghdr *msg, size_t len, int nonblock, int flags, int *addr_len) {
return process_data(ctx, sk, msg, false);
}
char LICENSE[] SEC("license") = "Dual BSD/GPL";
注释: 这个探针足够简单,但在生产环境中需要考虑更多。比如处理跨多个iovec分片的消息,以及更精确地从struct sock中获取连接的四元组信息。我们这里使用ringbuf而不是perf_buffer,因为它在多CPU场景下性能更好,且能保证事件顺序。
第二步:Haskell代理 - 数据处理核心
Haskell代理是整个系统的大脑。它的主要职责是:
- 加载eBPF程序并监听Perf Buffer。
- 将原始字节事件反序列化为Haskell的强类型数据结构。
- 根据端口和协议特征,将流量分发给对应的解析器。
- 关联请求和响应,计算延迟。
- 将结构化数据批量导出。
我们使用bpf-hs库来与eBPF交互。
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Main where
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as C8
import Data.Word
import Data.Binary.Get (runGet, getWord64le, getWord32le, getWord16le, getByteString, isEmpty)
import Control.Concurrent.Async (async, wait)
import Control.Concurrent.STM
import Control.Monad (forever)
import qualified System.BPF as BPF
import qualified System.BPF.Perf as Perf
-- 与eBPF C结构体对应的Haskell数据类型
data DbEvent = DbEvent
{ tsNs :: Word64
, pid :: Word32
, tid :: Word32
, sport :: Word16
, dport :: Word16
, comm :: BS.ByteString
, isRequest :: Bool
, payload :: BS.ByteString
} deriving (Show)
-- 从ByteString反序列化事件
parseDbEvent :: BS.ByteString -> DbEvent
parseDbEvent bs = runGet getter bs
where
getter = do
tsNs <- getWord64le
pid <- getWord32le
tid <- getWord32le
sport <- getWord16le
dport <- getWord16le
commBs <- getByteString 16
-- 去掉C字符串末尾的\0
let comm = fst $ C8.span (/= '\0') commBs
isReqVal <- getWord32le -- padding in struct might affect this
let isRequest = isReqVal == 1
payloadSize <- getWord32le -- Assuming payload size is sent separately by eBPF
payload <- getByteString (fromIntegral payloadSize)
return DbEvent{..}
main :: IO ()
main = do
-- 1. 加载eBPF程序
-- 在真实代码中,我们会从文件中读取eBPF对象
putStrLn "Loading eBPF object file..."
prog <- BPF.loadObjectFromFile "db_agent.o"
-- 2. 挂载探针
BPF.attachProbe prog "kprobe/tcp_sendmsg" False
BPF.attachProbe prog "kprobe/tcp_recvmsg" False
putStrLn "eBPF probes attached."
-- 3. 设置Perf Buffer回调
-- 我们需要一个TQueue来作为Haskell世界和eBPF世界之间的桥梁
eventQueue <- newTQueueIO :: IO (TQueue DbEvent)
let perfCallback :: Perf.PerfCallback
perfCallback _cpu dataPtr dataSize = do
bs <- BS.packCStringLen (dataPtr, fromIntegral dataSize)
let event = parseDbEvent bs
atomically $ writeTQueue eventQueue event
-- 从eBPF map中获取perf buffer的句柄
eventsMap <- BPF.findMapByName prog "events"
perf <- Perf.setupPerfMap eventsMap 4096 -- 4096是每个CPU buffer的页数
Perf.startPolling perf perfCallback
putStrLn "Listening for eBPF events..."
-- 4. 启动一个或多个工作线程来处理事件
-- 这里为了简化,只启动一个
worker <- async $ processEvents eventQueue
wait worker
-- 事件处理循环
processEvents :: TQueue DbEvent -> IO ()
processEvents queue = forever $ do
event <- atomically $ readTQueue queue
-- 在这里,我们将调用协议解析和数据导出逻辑
--
-- 伪代码:
-- case detectProtocol (dport event) of
-- Just MySQL -> handleMySQLEvent event
-- Just Elasticsearch -> handleHTTPEvent event "es"
-- Just ClickHouse -> handleHTTPEvent event "ch"
-- _ -> return ()
print event -- 简单打印
注释: 这段代码展示了Haskell代理的骨架。parseDbEvent函数体现了Haskell处理二进制数据的精确性。主循环从TQueue中取出事件,这是一个线程安全的队列,非常适合这种生产者-消费者模式。一个常见的坑是eBPF C结构体的内存对齐问题,必须确保Haskell中的解析逻辑与C结构体的内存布局完全一致,否则会解析出错误的数据。
第三步:协议解析与请求-响应关联
这是最复杂的部分。我们需要一个状态机来关联属于同一个逻辑操作的请求和响应。一个连接(由PID和套接字四元组唯一标识)可能并发处理多个请求,因此我们需要一个更精细的标识符。对于MySQL,可以用它的序列号。对于HTTP,情况更复杂,特别是Keep-Alive连接。
一个简化的关联逻辑思路是:
- 创建一个
TVar(STM中的事务变量),其内部是一个Map,键是连接的唯一标识符(例如pid:tid:sport:dport),值是正在进行中的请求信息(请求时间戳、请求内容等)。 - 当收到一个请求事件时,将其存入Map。
- 当收到一个响应事件时,从Map中查找对应的请求,计算出延迟,然后从Map中移除该条目。
- 解析数据包。对于HTTP,我们只需要解析请求行和少量头部即可获取查询信息。对于MySQL,我们需要解析其二进制协议的命令阶段来提取SQL。
下面是一个使用attoparsec解析HTTP请求行的例子,这将被用于解析ClickHouse和Elasticsearch的流量:
{-# LANGUAGE OverloadedStrings #-}
import Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Char8 as C8
import Data.Functor (void)
data HttpRequestLine = HttpRequestLine
{ method :: C8.ByteString
, uri :: C8.ByteString
, version :: C8.ByteString
} deriving (Show, Eq)
-- Attoparsec解析器
parseRequestLine :: Parser HttpRequestLine
parseRequestLine = do
m <- takeWhile1 isAlpha_ascii <* char ' '
u <- takeWhile1 (\c -> c /= ' ') <* char ' '
v <- "HTTP/" *> takeWhile1 (\c -> c /= '\r' && c /= '\n')
void (string "\r\n")
return $ HttpRequestLine m u v
-- 使用示例
testParser :: IO ()
testParser = do
let input = "POST /my_index/_search HTTP/1.1\r\nHost: localhost:9200\r\n..."
print $ parseOnly parseRequestLine (C8.pack input)
-- Output: Right (HttpRequestLine {method = "POST", uri = "/my_index/_search", version = "1.1"})
注释: Attoparsec的声明式语法让解析器代码几乎和协议的BNF范式一样清晰。这种代码的可读性和可维护性远高于手动进行字节操作。在真实项目中,我们会为MySQL的COM_QUERY包编写类似的二进制解析器。
第四步:数据导出与可视化
一旦我们有了结构化的请求-响应对(我们称之为DbTransaction),包含协议类型、请求内容、延迟、PID等信息,就可以将其导出到我们的后端。
- 导出到Elasticsearch:将
DbTransaction序列化为JSON,使用http-client或wreq库批量发送到Elasticsearch的bulk API。这为我们提供了原始事件的搜索能力。 - 导出到ClickHouse:提取关键指标(
timestamp,service_name,query_signature,latency_ms,is_error),聚合成行,通过ClickHouse的HTTP接口批量插入。查询签名是对SQL或HTTP请求进行参数化后的结果,用于聚合相似的查询。 - 导出到Jaeger:这是最有趣的部分。我们需要将一个
DbTransaction转换为一个OpenTelemetry Span。-
traceId和spanId可以随机生成。 -
parentSpanId是个挑战。在零侵入模式下,我们没有上游服务的上下文。一个常见的做法是将同一进程(PID)在短时间内发起的所有数据库调用视为同一个Trace下的兄弟Span,或者干脆让每个数据库调用都成为一个新Trace的根Span。 -
startTime是请求时间戳,endTime是响应时间戳。 - Span的Tags可以包含
db.system,db.statement,net.peer.name等语义化标签。 - 然后,我们将这个Span结构序列化为Jaeger的Thrift或Protobuf格式,通过HTTP或UDP发送给Jaeger Agent/Collector。
-
-- 伪代码,展示导出逻辑
data DbTransaction = DbTransaction { ... }
exportTransaction :: DbTransaction -> IO ()
exportTransaction tx = do
-- 使用http-conduit等库
async $ exportToES tx
async $ exportToClickHouse tx
async $ exportToJaeger tx
return ()
-- 将数据库事务转换为Jaeger Span
toJaegerSpan :: DbTransaction -> JaegerSpan
toJaegerSpan tx =
let duration = responseTimestamp tx - requestTimestamp tx
in JaegerSpan {
traceId = generateTraceId tx.pid, -- 基于PID生成一个伪TraceID
spanId = generateSpanId,
operationName = createOperationName tx, -- e.g., "mysql:SELECT", "es:POST /_search"
startTime = requestTimestamp tx,
duration = duration,
tags = [ ("db.statement", sanitizeQuery tx.query), ... ]
}
最终成果与局限性
部署这套系统后,我们无需惊动任何业务团队,就在几小时内获得了整个数据库层的深度可观测性。在Jaeger中,我们能看到某个服务的数据库调用热点和慢查询;在Grafana(对接ClickHouse)仪表盘上,我们能实时监控各个数据库集群的P99延迟和吞吐量;当出现问题时,能直接在Kibana(对接Elasticsearch)中搜索精确的请求和响应内容。
然而,这个方案并非银弹,它的局限性非常明确:
- 加密流量:这套系统完全依赖于解析明文的数据库协议。一旦数据库连接启用了TLS,eBPF探针捕获到的将是加密后的无用数据。要解决这个问题,需要将探针从网络层上移,使用uprobe挂载到应用空间的SSL/TLS库函数(如
SSL_read/SSL_write)上来获取解密前的明文数据,但这会增加配置的复杂性。 - 协议依赖性:我们的解析器是针对特定协议版本编写的。如果数据库协议发生重大变化,解析器就需要同步更新,这带来了维护成本。
- 连接池问题:对于使用长连接池的应用,将网络层的TCP四元组映射到应用层的逻辑请求会变得困难。一个TCP连接可能被多个应用线程复用,导致请求和响应的交错。精确关联需要更复杂的启发式算法或解析协议内部的请求ID。
- 上下文缺失:最大的问题是无法获取上游应用的上下文。我们知道是哪个进程(PID)发起了数据库查询,但不知道这个查询是由哪个API端点、哪个用户请求触发的。这导致我们生成的Trace是断头的,无法形成完整的端到端链路。
未来的迭代方向很清晰:一是通过uprobe技术栈解决TLS加密问题;二是在可能的情况下,与服务网格(Service Mesh)的sidecar结合,从sidecar中获取上游请求的元数据(如x-request-id),并注入到我们合成的Span中,从而将断头的数据库Trace与完整的应用Trace拼接起来,实现真正的零侵入全链路追踪。