基于 eBPF 与 Flask 实现两阶段提交协议的无侵入式可观测性架构


排查一次生产环境的分布式事务失败,根因可能散落在多个微服务的日志、不同的网络链路,甚至操作系统层面。当一个使用两阶段提交(2PC)协议的事务在 COMMIT 阶段卡住时,确定是哪个参与者未能正确响应 PREPARE 请求,或者网络分区导致了消息丢失,往往需要耗费数小时的联调与日志筛查。传统的应用层日志记录(Logging)和分布式追踪(Tracing)虽然有效,但侵入性强,需要修改业务代码,并且在高吞吐量场景下可能引入显著的性能开销。

我们的痛点很明确:需要一种方法,能在不触碰任何应用代码的前提下,像X光一样“看穿”服务间的2PC协议交互,精准捕捉到每一次 PREPAREVOTECOMMITABORT 消息。这让我们将目光投向了eBPF。

初步构想与技术选型

基本思路是构建一个轻量级的2PC实现作为实验床,然后开发一个eBPF工具,挂载到内核的网络处理路径上,监听特定端口的TCP流量,解析出我们的2PC协议内容,并将其作为结构化事件输出。

  1. 应用层协议实现 (Flask): 为了快速验证,我们不需要一个全功能的事务管理器。使用Flask构建一个RESTful风格的2PC协调者(Coordinator)和若干参与者(Participant)是最高效的选择。协议本身可以用简单的JSON通过HTTP传递,虽然在生产环境中可能会选用gRPC,但JSON的优势在于调试和eBPF解析的便捷性。
  2. 无侵入观测 (eBPF): eBPF是实现这一目标的核心。我们不使用libpcap这种传统的网络包捕获技术,因为它在用户态和内核态之间复制数据的开销较大。eBPF程序直接在内核中运行,可以在数据包进入网络协议栈时进行处理,只将我们关心的、解析后的结构化数据发送到用户态。我们将使用BCC(BPF Compiler Collection)工具链,它允许我们用Python编写用户态控制逻辑,用C编写内核态的eBPF程序。
  3. 自动化构建与测试 (CircleCI): 这个项目的依赖比较特殊,既有Python应用,又有需要内核头文件才能编译的eBPF程序。在CI/CD流水线中正确配置构建环境是确保项目可维护性的关键。CircleCI的Docker执行环境和灵活的config.yml可以很好地应对这种混合编译需求。

步骤化实现:从协议到观测

阶段一:构建2PC协议的Flask应用

我们先定义一个极简的2PC协议。所有通信都通过HTTP POST请求完成,请求体是JSON。

  • 消息类型: PREPARE, VOTE_COMMIT, VOTE_ABORT, GLOBAL_COMMIT, GLOBAL_ABORT
  • 核心字段: tx_id (事务ID), type (消息类型), sender (发送方标识)

1. 协调者 (Coordinator) 实现: coordinator.py

协调者负责发起事务、记录事务状态,并根据参与者的投票结果做出最终决定。在真实项目中,事务状态必须持久化(如写入WAL日志),但这里为了简化,我们仅使用内存字典。

# coordinator.py
import uuid
import time
import requests
import threading
from flask import Flask, request, jsonify

app = Flask(__name__)

# !!! 警告: 生产环境中绝不能用内存字典存储事务状态 !!!
# 这里仅为演示目的。生产级实现需要基于WAL的持久化存储。
transactions = {}
tx_lock = threading.Lock()

PARTICIPANTS = ["http://127.0.0.1:5001", "http://127.0.0.1:5002"]

@app.route('/start_transaction', methods=['POST'])
def start_transaction():
    """外部客户端发起一个新事务的入口点"""
    tx_id = str(uuid.uuid4())
    payload = request.json
    
    with tx_lock:
        transactions[tx_id] = {
            "state": "INIT",
            "payload": payload,
            "votes": {},
            "start_time": time.time()
        }

    # 启动一个新的线程来处理2PC流程,避免阻塞HTTP请求
    threading.Thread(target=execute_2pc, args=(tx_id,)).start()

    return jsonify({"status": "Transaction started", "tx_id": tx_id}), 202

def execute_2pc(tx_id):
    """核心的2PC逻辑执行"""
    
    # --- 阶段一: PREPARE ---
    print(f"[TX:{tx_id}] Coordinator: Starting PREPARE phase.")
    with tx_lock:
        transactions[tx_id]["state"] = "PREPARING"

    votes_received = 0
    total_participants = len(PARTICIPANTS)
    
    for p_url in PARTICIPANTS:
        try:
            # 发送PREPARE请求
            resp = requests.post(
                f"{p_url}/prepare",
                json={"tx_id": tx_id, "type": "PREPARE", "sender": "coordinator"},
                timeout=2.0
            )
            if resp.status_code == 200 and resp.json().get("vote") == "VOTE_COMMIT":
                with tx_lock:
                    transactions[tx_id]["votes"][p_url] = "VOTE_COMMIT"
                print(f"[TX:{tx_id}] Coordinator: Received VOTE_COMMIT from {p_url}")
            else:
                raise Exception("Received VOTE_ABORT or bad response")
        except Exception as e:
            with tx_lock:
                transactions[tx_id]["votes"][p_url] = "VOTE_ABORT"
            print(f"[TX:{tx_id}] Coordinator: Received VOTE_ABORT from {p_url} due to: {e}")

    # --- 阶段二: COMMIT / ABORT ---
    decision = "GLOBAL_COMMIT"
    with tx_lock:
        tx_data = transactions[tx_id]
        # 只要有一个参与者投了ABORT,整个事务就必须ABORT
        if any(v == "VOTE_ABORT" for v in tx_data["votes"].values()):
            decision = "GLOBAL_ABORT"
        
        # 确保收到了所有参与者的投票
        if len(tx_data["votes"]) != total_participants:
             decision = "GLOBAL_ABORT" # 超时或网络问题也视为ABORT

        tx_data["state"] = "COMMITTING" if decision == "GLOBAL_COMMIT" else "ABORTING"

    print(f"[TX:{tx_id}] Coordinator: Decision is {decision}. Broadcasting...")

    for p_url in PARTICIPANTS:
        try:
            requests.post(
                f"{p_url}/decision",
                json={"tx_id": tx_id, "type": decision, "sender": "coordinator"},
                timeout=2.0
            )
        except Exception as e:
            # 这里的错误处理是2PC的难点之一。
            # 如果COMMIT消息发送失败,协调者需要不断重试,直到参与者确认为止。
            print(f"[TX:{tx_id}] Coordinator: Failed to send {decision} to {p_url}: {e}. Needs retry logic.")

    with tx_lock:
        final_state = "COMMITTED" if decision == "GLOBAL_COMMIT" else "ABORTED"
        transactions[tx_id]["state"] = final_state
    
    print(f"[TX:{tx_id}] Coordinator: Transaction finished with state: {final_state}")


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

2. 参与者 (Participant) 实现: participant.py

参与者模拟一个数据库或资源管理器。它接收PREPARE请求,并根据自身状态(我们用随机数模拟成功或失败)投票。一旦投了VOTE_COMMIT,它就必须准备好提交,不能单方面反悔。

# participant.py
import os
import random
import time
from flask import Flask, request, jsonify

app = Flask(__name__)
PORT = int(os.environ.get("PORT", 5001))

# 模拟参与者的内部状态
# 生产环境中,这里应该是写入本地事务日志等操作
prepared_transactions = {}

@app.route('/prepare', methods=['POST'])
def prepare():
    """响应协调者的PREPARE请求"""
    data = request.json
    tx_id = data.get("tx_id")
    
    # 模拟业务逻辑检查,随机决定是否可以提交
    # 在真实项目中,这里会检查库存、余额等,并锁定资源
    can_commit = random.random() > 0.2  # 80%的成功率
    
    if can_commit:
        prepared_transactions[tx_id] = "PREPARED"
        print(f"[Participant:{PORT}] [TX:{tx_id}] Voted VOTE_COMMIT.")
        return jsonify({"tx_id": tx_id, "vote": "VOTE_COMMIT"}), 200
    else:
        prepared_transactions[tx_id] = "ABORTED"
        print(f"[Participant:{PORT}] [TX:{tx_id}] Voted VOTE_ABORT.")
        return jsonify({"tx_id": tx_id, "vote": "VOTE_ABORT"}), 400

@app.route('/decision', methods=['POST'])
def decision():
    """接收最终决定并执行"""
    data = request.json
    tx_id = data.get("tx_id")
    decision_type = data.get("type")

    if tx_id not in prepared_transactions:
        # 如果没收到过prepare请求,这是一个异常情况
        return jsonify({"status": "error", "message": "Transaction not found"}), 404

    if decision_type == "GLOBAL_COMMIT":
        # 只有在PREPARED状态下才能提交
        if prepared_transactions.get(tx_id) == "PREPARED":
            prepared_transactions[tx_id] = "COMMITTED"
            print(f"[Participant:{PORT}] [TX:{tx_id}] Transaction COMMITTED.")
            return jsonify({"status": "committed"}), 200
        else:
            # 这是一种协议错误,一个已经决定ABORT的参与者不应该收到COMMIT
            print(f"[Participant:{PORT}] [TX:{tx_id}] ERROR: Received COMMIT for an un-prepared transaction.")
            return jsonify({"status": "error", "message": "Protocol violation"}), 500
            
    elif decision_type == "GLOBAL_ABORT":
        prepared_transactions[tx_id] = "ABORTED"
        print(f"[Participant:{PORT}] [TX:{tx_id}] Transaction ABORTED.")
        return jsonify({"status": "aborted"}), 200

    return jsonify({"status": "unknown decision"}), 400

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=PORT, debug=True)

现在,我们可以启动一个协调者和两个参与者:
python coordinator.py
PORT=5001 python participant.py
PORT=5002 python participant.py

然后通过 curl -X POST -H "Content-Type: application/json" -d '{"data": "some_payload"}' http://127.0.0.1:5000/start_transaction 来触发一个事务。观察终端输出,可以看到整个2PC流程。

阶段二:eBPF 无侵入式协议追踪器

这是整个方案的核心。我们将编写一个eBPF程序来捕获和解析上述Flask应用之间的HTTP流量。

1. 内核态 C 代码: 2pc_tracer.c

这段C代码将被编译成BPF字节码并加载到内核。我们使用kprobe挂载到tcp_sendmsg函数上,这是一个发送TCP数据的通用内核函数。

#include <uapi/linux/ptrace.h>
#include <net/sock.h>
#include <linux/bpf.h>

#define MAX_MSG_SIZE 256 // 定义我们关心的最大消息长度

// 定义发送给用户空间的数据结构
struct event_t {
    u64 ts;
    u32 saddr;
    u32 daddr;
    u16 sport;
    u16 dport;
    char payload[MAX_MSG_SIZE];
};

// BPF_PERF_OUTPUT宏创建了一个perf事件输出通道,名为'events'
BPF_PERF_OUTPUT(events);

// kprobe: 挂载到tcp_sendmsg内核函数入口
int trace_tcp_sendmsg(struct pt_regs *ctx, struct sock *sk, struct msghdr *msg) {
    // 过滤条件1: 只关心IPv4
    if (sk->__sk_common.skc_family != AF_INET) {
        return 0;
    }

    // 过滤条件2: 只关心我们的目标端口 (5000, 5001, 5002)
    u16 dport = sk->__sk_common.skc_dport;
    dport = ntohs(dport);
    if (dport != 5000 && dport != 5001 && dport != 5002) {
        return 0;
    }
    
    // 过滤条件3: 只关心POST请求,并且包含'tx_id'
    // 这是一个非常tricky的部分。我们需要遍历msghdr来找到HTTP payload。
    // 在真实项目中,解析HTTP协议会更复杂。这里我们做一个简化,
    // 假设payload在第一个iovec中。
    if (msg->msg_iter.type != ITER_IOVEC) {
        return 0;
    }

    struct iovec *iov = msg->msg_iter.iov;
    if (iov->iov_len < 10) { // 长度太小,不可能是我们的请求
        return 0;
    }
    
    char *payload_ptr = (char *)iov->iov_base;
    char comm_check[5];
    // bpf_probe_read_user_str 安全地从用户空间读取数据到内核
    bpf_probe_read_user_str(&comm_check, sizeof(comm_check), payload_ptr);

    // 粗略检查是否是POST请求
    if (comm_check[0] != 'P' || comm_check[1] != 'O' || comm_check[2] != 'S' || comm_check[3] != 'T') {
        return 0;
    }
    
    // 创建事件并填充数据
    struct event_t event = {};
    event.ts = bpf_ktime_get_ns();
    event.saddr = sk->__sk_common.skc_rcv_saddr;
    event.daddr = sk->__sk_common.skc_daddr;
    event.sport = ntohs(sk->__sk_common.skc_num);
    event.dport = dport;
    
    // 再次安全读取payload内容
    bpf_probe_read_user_str(&event.payload, MAX_MSG_SIZE, payload_ptr);

    // 通过perf buffer将事件发送到用户空间
    events.perf_submit(ctx, &event, sizeof(event));

    return 0;
}

这里的坑在于,直接从struct msghdr中解析出完整的HTTP请求体是复杂的,因为数据可能分散在多个iovec中。我们做了简化,只读取第一个iovec并查找关键字。对于我们这个简单的JSON over HTTP协议是够用的,但对于复杂的HTTP实现(如分块传输),则需要更复杂的解析逻辑,甚至考虑使用uprobe挂载到SSL/TLS库函数(在加密场景下)。

2. 用户态 Python 控制脚本: run_tracer.py

这个Python脚本使用BCC库来加载、附加eBPF程序,并从perf buffer中读取事件进行处理和展示。

#!/usr/bin/python3
# run_tracer.py
from bcc import BPF
import ctypes as ct
import socket
import struct
import json
import re

# 正则表达式用于从HTTP Payload中提取我们的JSON
# 这是一个脆弱但有效的演示方法
JSON_REGEX = re.compile(r'(\{.*\})')

class Event(ct.Structure):
    _fields_ = [
        ("ts", ct.c_ulonglong),
        ("saddr", ct.c_uint),
        ("daddr", ct.c_uint),
        ("sport", ct.c_ushort),
        ("dport", ct.c_ushort),
        ("payload", ct.c_char * 256)
    ]

def ip_to_str(addr):
    return socket.inet_ntoa(struct.pack("I", addr))

def print_event(cpu, data, size):
    event = ct.cast(data, ct.POINTER(Event)).contents
    
    try:
        payload_str = event.payload.decode('utf-8', 'ignore')
        
        # 提取JSON部分
        match = JSON_REGEX.search(payload_str)
        if not match:
            return

        json_data = json.loads(match.group(1))
        tx_id = json_data.get("tx_id", "N/A")
        msg_type = json_data.get("type", "UNKNOWN")

        saddr_str = ip_to_str(event.saddr)
        daddr_str = ip_to_str(event.daddr)

        print(f"[eBPF Tracer] TX_ID: {tx_id[:8]}... | {msg_type:<15} | {saddr_str}:{event.sport} -> {daddr_str}:{event.dport}")

    except (json.JSONDecodeError, UnicodeDecodeError):
        # 忽略无法解析的payload
        pass
    except Exception as e:
        print(f"Error processing event: {e}")

# 加载eBPF C代码
b = BPF(src_file="2pc_tracer.c")

# 将eBPF程序挂载到tcp_sendmsg函数
b.attach_kprobe(event="tcp_sendmsg", fn_name="trace_tcp_sendmsg")

print("Attaching eBPF tracer to tcp_sendmsg... Press Ctrl+C to exit.")

# 打开perf buffer,并设置回调函数
b["events"].open_perf_buffer(print_event)

# 循环读取perf buffer中的事件
while True:
    try:
        b.perf_buffer_poll()
    except KeyboardInterrupt:
        exit()

现在,保持Flask应用运行,在另一个终端以sudo权限运行追踪器:sudo python3 run_tracer.py
再次用curl发起事务,你会看到追踪器输出了清晰的、格式化的协议交互过程,而我们没有修改一行Flask代码。

当某个参与者投票VOTE_ABORT时,输出会是这样的:

Attaching eBPF tracer to tcp_sendmsg... Press Ctrl+C to exit.
[eBPF Tracer] TX_ID: f3a1c2d4... | PREPARE         | 127.0.0.1:45876 -> 127.0.0.1:5001
[eBPF Tracer] TX_ID: f3a1c2d4... | PREPARE         | 127.0.0.1:45878 -> 127.0.0.1:5002
# 此时,我们没有捕获响应,因为我们只hook了sendmsg。一个完整的追踪器也应hook recvmsg。
# 但即使只看发送,我们也能看到接下来的ABORT广播。
[eBPF Tracer] TX_ID: f3a1c2d4... | GLOBAL_ABORT    | 127.0.0.1:45880 -> 127.0.0.1:5001
[eBPF Tracer] TX_ID: f3a1c2d4... | GLOBAL_ABORT    | 127.0.0.1:45882 -> 127.0.0.1:5002

阶段三:CI/CD 流水线自动化

在CircleCI中构建这个项目需要处理Python环境和eBPF编译环境。

.circleci/config.yml

version: 2.1

orbs:
  python: circleci/[email protected]

jobs:
  build-and-test:
    docker:
      # 使用一个包含完整构建工具和内核头文件的镜像
      # 真实项目中,会构建一个专门的CI镜像来加速
      - image: ubuntu:20.04
    steps:
      - checkout
      
      - run:
          name: Install System Dependencies (Python & BPF)
          command: |
            apt-get update
            # DEBIAN_FRONTEND=noninteractive 避免安装过程中的交互提示
            DEBIAN_FRONTEND=noninteractive apt-get install -y \
              python3 python3-pip python3-flask \
              bpfcc-tools linux-headers-$(uname -r) \
              clang llvm libelf-dev build-essential
              
      - run:
          name: Install Python Application Dependencies
          command: |
            pip3 install requests
            
      - run:
          name: Run Application Linter/Tests (Placeholder)
          # 在真实项目中,这里会运行pytest等
          command: |
            echo "Running application tests..."
            # python3 -m pytest tests/
            
      - run:
          name: Verify eBPF Tracer Compiles
          # 这是一个关键的测试:确保eBPF C代码能够被BCC成功加载和编译。
          # 我们使用一个简单的python命令来尝试加载BPF代码,如果失败则CI失败。
          # 这可以捕获C代码中的语法错误或与内核头文件不兼容的问题。
          command: |
            echo "Attempting to load eBPF program..."
            # -c '...' 允许我们执行一小段Python代码
            # BCC()的构造函数会执行编译,如果失败会抛出异常
            python3 -c "from bcc import BPF; bpf = BPF(src_file='2pc_tracer.c')"
            echo "eBPF program loaded successfully."

workflows:
  main:
    jobs:
      - build-and-test

这个CI配置的关键在于:

  1. 环境选择: 使用了一个基础的Ubuntu镜像,并手动安装所有依赖,包括bpfcc-tools和对应版本的linux-headers。这是一个常见的坑,内核头文件必须和CI运行器(runner)的内核版本匹配。
  2. eBPF编译验证: 我们没有在CI中真正地运行整个应用并追踪,因为这需要复杂的服务编排。但我们执行了一个核心的集成测试:python3 -c "from bcc import BPF; bpf = BPF(src_file='2pc_tracer.c')"。这一行代码会触发BCC在后台调用clang/llvm来编译C代码。如果C代码有任何问题,或者缺少头文件,这个步骤就会失败,从而阻止有问题的代码被合并。

最终架构与权衡

我们最终得到的是一个三层架构:

graph TD
    subgraph User Space
        A[Coordinator: Flask] -- HTTP/JSON --> B1[Participant 1: Flask]
        A -- HTTP/JSON --> B2[Participant 2: Flask]
        C[Tracer Control: Python/BCC]
    end

    subgraph Kernel Space
        D[eBPF Program]
    end
    
    subgraph CI/CD
        E[CircleCI Pipeline]
    end
    
    C -- Loads --> D
    D -- Attaches to tcp_sendmsg --> K[Kernel TCP/IP Stack]
    K -- Perf Events --> C
    
    B1 -- Network Traffic --> K
    B2 -- Network Traffic --> K
    A -- Network Traffic --> K
    
    style A fill:#f9f,stroke:#333,stroke-width:2px
    style B1 fill:#f9f,stroke:#333,stroke-width:2px
    style B2 fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#ccf,stroke:#333,stroke-width:2px
    style D fill:#9cf,stroke:#333,stroke-width:4px

这个方案的优势是显而易见的:零应用侵入。业务开发者可以完全不感知可观测性工具的存在,专注于业务逻辑。性能开销极低,因为大部分过滤和解析工作都在内核中高效完成。

但它的局限性也必须正视。首先,这种方法强依赖于底层协议的稳定性。如果Flask应用从HTTP/1.1切换到HTTP/2,或者JSON的格式发生重大变化,eBPF程序中的解析逻辑就必须同步更新,否则追踪器就会失效。其次,它无法看到应用内部的逻辑,例如一个参与者投票VOTE_ABORT是因为数据库连接失败还是库存不足,eBPF是不知道的。最后,部署eBPF程序需要root权限,这对安全策略严格的环境是一个挑战。

未来的迭代方向可能包括:将eBPF捕获的数据导出到Prometheus,形成量化的监控指标(如事务成功率、各阶段耗时);或者结合uprobe来追踪应用层函数的调用,将内核态的网络观测与用户态的应用逻辑关联起来,获得更丰富的上下文信息。


  目录