利用 Linkerd 和嵌入式 Lua 为 Azure Service Bus 构建动态消息路由网关


系统中有超过三十个微服务在持续不断地产生各类业务事件。最初的设计是在每个服务内部硬编码逻辑,将事件推送到特定的 Azure Service Bus 主题。随着业务规则的迭代,比如需要根据用户等级、事件类型、甚至请求体中的某个特定字段,将同一事件路由到不同的主题或队列时,这种模式的弊端暴露无遗:每次路由逻辑变更都意味着要修改、测试、发布多个微服务。这不仅是重复劳动,更是潜在的生产事故源头。

我们迫切需要一个中心化的消息网关,它必须满足几个苛刻的条件:

  1. 动态性: 路由规则的变更不应触发网关服务的重新部署。业务人员或数据分析师应该能以配置化的方式快速调整分发策略。
  2. 高性能: 作为所有事件的入口,其延迟必须极低,不能成为系统瓶颈。
  3. 可观测性: 必须无缝融入我们现有的基于 Linkerd 的服务网格,以便利用其 mTLS、遥测、重试等能力。
  4. 轻量级: 避免引入像 Apache APISIX 或 Kong 这样功能强大但相对重型的 API 网关,因为我们90%的场景只是内部消息路由,不需要它们复杂的插件生态。

初步构想是开发一个专用的、轻量级的 Go 服务。Go 的并发性能和静态编译特性非常适合网络代理这类场景。而实现动态性的关键,我们把目光投向了 Lua。将 Lua 解释器嵌入到 Go 应用中,路由逻辑就可以用 Lua 脚本来编写。这些脚本可以作为 Kubernetes ConfigMap 挂载到服务中,更新脚本只需 kubectl apply,无需重启 Pod。这套方案理论上完美契合我们的需求。

第一步:构建 Go 代理与 Lua 执行引擎

我们的代理本质上是一个 HTTP 服务器,它接收 POST 请求,将请求体作为消息内容,然后根据嵌入的 Lua 脚本执行结果,将消息转发到 Azure Service Bus 的指定目的地。

核心依赖是 gopher-lua,一个纯 Go 实现的 Lua 5.1 虚拟机。

// main.go
package main

import (
	"context"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
	"github.com/yuin/gopher-lua"
)

const (
	// 从环境变量读取配置
	listenAddr        = ":8080"
	luaScriptPath     = "/config/router.lua" // Lua脚本将通过ConfigMap挂载
	serviceBusConnStr = "AZURE_SERVICE_BUS_CONNECTION_STRING"
)

// Router is our main application struct
type Router struct {
	luaStatePool *sync.Pool
	sbClient     *azservicebus.Client
	script       string
}

// NewRouter initializes the router with a Lua state pool and Azure Service Bus client
func NewRouter(connStr string) (*Router, error) {
	scriptBytes, err := ioutil.ReadFile(luaScriptPath)
	if err != nil {
		return nil, fmt.Errorf("failed to read lua script at %s: %w", luaScriptPath, err)
	}
	log.Printf("Successfully loaded Lua script from %s", luaScriptPath)

	client, err := azservicebus.NewClientFromConnectionString(connStr, nil)
	if err != nil {
		return nil, fmt.Errorf("failed to create service bus client: %w", err)
	}
	log.Println("Successfully connected to Azure Service Bus")

	return &Router{
		sbClient: client,
		script:   string(scriptBytes),
		luaStatePool: &sync.Pool{
			New: func() interface{} {
				// 为每个 goroutine 创建一个独立的 Lua state,避免并发问题
				L := lua.NewState()
				return L
			},
		},
	}, nil
}

// ServeHTTP is the handler for incoming HTTP requests
func (r *Router) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	if req.Method != http.MethodPost {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	body, err := ioutil.ReadAll(req.Body)
	if err != nil {
		log.Printf("Error reading request body: %v", err)
		http.Error(w, "Failed to read request body", http.StatusInternalServerError)
		return
	}
	defer req.Body.Close()

	// 从池中获取一个 Lua state
	L := r.luaStatePool.Get().(*lua.LState)
	defer r.luaStatePool.Put(L) // 处理完请求后归还

	// 加载并执行脚本
	if err := L.DoString(r.script); err != nil {
		log.Printf("Error executing lua script: %v", err)
		http.Error(w, "Internal script execution error", http.StatusInternalServerError)
		return
	}
	
	// 调用 Lua 中的 `route` 函数
	if err := r.callLuaRouter(L, req, body); err != nil {
		log.Printf("Error in lua routing logic: %v", err)
		// 根据错误类型返回不同的状态码
		if luaErr, ok := err.(*lua.ApiError); ok {
			http.Error(w, fmt.Sprintf("Lua routing error: %s", luaErr.Object.String()), http.StatusBadRequest)
		} else {
			http.Error(w, "Internal routing error", http.StatusInternalServerError)
		}
		return
	}
	
	w.WriteHeader(http.StatusAccepted)
	w.Write([]byte("Message accepted for routing"))
}

func main() {
	connStr := os.Getenv(serviceBusConnStr)
	if connStr == "" {
		log.Fatalf("Environment variable %s is not set", serviceBusConnStr)
	}

	router, err := NewRouter(connStr)
	if err != nil {
		log.Fatalf("Failed to initialize router: %v", err)
	}

	log.Printf("Starting message router on %s", listenAddr)
	if err := http.ListenAndServe(listenAddr, router); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

这段代码搭建了基础框架。关键点在于 luaStatePoolgopher-luaLState 不是并发安全的,因此我们使用 sync.Pool 为每个请求的 goroutine 提供一个独立的 Lua 虚拟机实例,这在生产环境中至关重要。

第二步:设计 Go 与 Lua 之间的 API 契约

为了让 Lua 脚本能对请求进行判断,Go 代码必须向 Lua 环境暴露一些必要的函数和数据。我们不希望把整个 http.Request 对象都传进去,这会暴露过多细节且难以控制。我们定义了一套最小化的 API,让 Lua 脚本可以安全地获取所需信息。

sequenceDiagram
    participant Go as Go Proxy
    participant Lua as Lua VM
    participant ASB as Azure Service Bus

    Go->>Lua: call function `route(request)`
    Lua->>Go: get_header("X-User-Tier")
    Go-->>Lua: "premium"
    Lua->>Go: get_json_field("eventType")
    Go-->>Lua: "OrderCreated"
    Lua->>Go: set_destination("premium-orders")
    Go-->>Lua: (ack)
    Lua->>Go: set_property("traceId", "xyz-123")
    Go-->>Lua: (ack)
    Go->>ASB: Send Message to "premium-orders" topic
    ASB-->>Go: (ack)

我们需要在 callLuaRouter 函数中实现这个契约。

// router_logic.go (扩展)
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
	"github.com/tidwall/gjson"
	"github.com/yuin/gopher-lua"
)

func (r *Router) callLuaRouter(L *lua.LState, req *http.Request, body []byte) error {
	// 准备传递给 Lua 的 request table
	requestTable := L.NewTable()
	
	// 1. 暴露 get_header 函数
	requestTable.RawSetString("get_header", L.NewFunction(func(L *lua.LState) int {
		headerName := L.ToString(1)
		headerValue := req.Header.Get(headerName)
		L.Push(lua.LString(headerValue))
		return 1 // 返回值的数量
	}))

	// 2. 暴露 get_json_field 函数,使用gjson提高性能
	requestTable.RawSetString("get_json_field", L.NewFunction(func(L *lua.LState) int {
		jsonPath := L.ToString(1)
		value := gjson.GetBytes(body, jsonPath)
		if !value.Exists() {
			L.Push(lua.LNil)
		} else {
			L.Push(lua.LString(value.String()))
		}
		return 1
	}))
	
	// 3. 暴露 get_body 函数
	requestTable.RawSetString("get_body", L.NewFunction(func(L *lua.LState) int {
		L.Push(lua.LString(string(body)))
		return 1
	}))

	// 用于接收 Lua 返回结果的变量
	var destinationTopic string
	messageProperties := make(map[string]interface{})
	var transformedBody []byte = body // 默认使用原始body

	// 4. 暴露 set_destination 函数
	L.SetGlobal("set_destination", L.NewFunction(func(L *lua.LState) int {
		dest := L.ToString(1)
		if dest == "" {
			L.ArgError(1, "destination cannot be empty")
			return 0
		}
		destinationTopic = dest
		return 0
	}))
	
	// 5. 暴露 set_property 函数
	L.SetGlobal("set_property", L.NewFunction(func(L *lua.LState) int {
		key := L.ToString(1)
		// 在真实项目中,这里需要更复杂的类型判断
		value := L.ToString(2) 
		messageProperties[key] = value
		return 0
	}))
	
	// 6. 暴露 set_transformed_body 函数
	L.SetGlobal("set_transformed_body", L.NewFunction(func(L *lua.LState) int {
		newBody := L.ToString(1)
		transformedBody = []byte(newBody)
		return 0
	}))


	// 调用 Lua 的全局函数 `route`
	err := L.CallByParam(lua.P{
		Fn:      L.GetGlobal("route"),
		NRet:    0,
		Protect: true,
	}, requestTable)

	if err != nil {
		return fmt.Errorf("lua execution failed: %w", err)
	}

	// 检查 Lua 脚本是否设置了目的地
	if destinationTopic == "" {
		return fmt.Errorf("lua script did not set a destination")
	}

	// 发送消息到 Service Bus
	return r.sendToServiceBus(destinationTopic, transformedBody, messageProperties)
}

func (r *Router) sendToServiceBus(topic string, body []byte, props map[string]interface{}) error {
	// 在生产环境中,sender 应该被缓存和复用
	sender, err := r.sbClient.NewSender(topic, nil)
	if err != nil {
		return fmt.Errorf("failed to create sender for topic %s: %w", topic, err)
	}
	defer sender.Close(context.Background())

	message := &azservicebus.Message{
		Body:        body,
		ApplicationProperties: props,
	}

	// 设置一个合理的超时
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	err = sender.SendMessage(ctx, message, nil)
	if err != nil {
		log.Printf("Failed to send message to topic %s. Error: %v", topic, err)
		return fmt.Errorf("failed to send message to topic %s: %w", topic, err)
	}
	
	log.Printf("Message successfully routed to topic: %s", topic)
	return nil
}

这里的设计体现了几个务实的考量:

  • 性能: 使用 gjson 来解析 JSON,它比标准的 encoding/json 在仅需读取少数几个字段时要快得多。
  • 安全: Lua 脚本在一个受限的环境中运行,它只能调用我们明确暴露的函数,无法执行文件 I/O 或网络调用等危险操作。
  • 灵活性: Lua 脚本不仅能决定目的地,还能修改消息体 (set_transformed_body) 和添加元数据 (set_property),这为未来更复杂的处理流程预留了空间。

第三步:编写路由逻辑 Lua 脚本

现在,业务逻辑可以完全用 Lua 来表达。这个脚本将作为 ConfigMap 部署。

-- /config/router.lua

-- json 是一个常用的 Lua JSON 库,我们需要在 Docker 镜像中包含它
-- 但为了简化,假设我们只处理字符串和header

-- 全局函数 route,接收一个 request table 作为参数
function route(request)
    -- 从 header 获取用户等级
    local user_tier = request:get_header("X-User-Tier")
    
    -- 从 body 中获取事件类型
    local event_type = request:get_json_field("type")
    
    -- 获取一个嵌套字段
    local user_id = request:get_json_field("data.user.id")
    
    -- 默认目标
    local destination = "generic-events"

    -- 规则 1: 所有高级用户的订单创建事件进入专属队列
    if user_tier == "premium" and event_type == "OrderCreated" then
        destination = "premium-orders"
    -- 规则 2: 所有用户登录事件进入审计队列
    elseif event_type == "UserLoggedIn" then
        destination = "audit-logs"
    -- 规则 3: 来自特定测试用户的所有事件进入测试队列
    elseif user_id and string.match(user_id, "^test-user-") then
        destination = "test-events"
    end
    
    -- 设置最终目的地
    set_destination(destination)

    -- 为所有消息添加追踪ID
    local trace_id = request:get_header("X-Trace-ID")
    if trace_id ~= "" then
        set_property("traceId", trace_id)
    end
    
    -- 示例:对审计日志进行内容转换
    if destination == "audit-logs" then
        local original_body = request:get_body()
        -- 实际场景中会用 json 库来操作
        local new_body = '{"audit_event": true, "original": ' .. original_body .. '}'
        set_transformed_body(new_body)
    end
end

这个脚本清晰地展示了如何组合使用 get_headerget_json_field 来实现复杂的条件路由。业务逻辑的变更现在只需要修改这个文件,然后执行 kubectl apply -f configmap.yaml

第四步:容器化与 Kubernetes 部署

最后一步是将其打包为 Docker 镜像,并通过 Kubernetes 部署,同时注入 Linkerd sidecar。

Dockerfile:

# 使用多阶段构建减小镜像体积
FROM golang:1.20-alpine AS builder

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . .
# 编译为静态链接的二进制文件
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app .

# 最终镜像
FROM alpine:3.18

RUN apk --no-cache add ca-certificates

WORKDIR /app

COPY --from=builder /app/app .

# ConfigMap 将挂载到 /config 目录
VOLUME /config

EXPOSE 8080

CMD ["./app"]

Kubernetes 部署清单 (deployment.yaml, service.yaml, configmap.yaml):

# configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: message-router-script
data:
  router.lua: |
    -- [[ 将上面的 Lua 脚本内容粘贴到这里 ]]
    function route(request)
        local user_tier = request:get_header("X-User-Tier")
        local event_type = request:get_json_field("type")
        local user_id = request:get_json_field("data.user.id")
        local destination = "generic-events"
        if user_tier == "premium" and event_type == "OrderCreated" then
            destination = "premium-orders"
        elseif event_type == "UserLoggedIn" then
            destination = "audit-logs"
        elseif user_id and string.match(user_id, "^test-user-") then
            destination = "test-events"
        end
        set_destination(destination)
        local trace_id = request:get_header("X-Trace-ID")
        if trace_id ~= "" then
            set_property("traceId", trace_id)
        end
    end
---
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: message-router
  labels:
    app: message-router
spec:
  replicas: 3
  selector:
    matchLabels:
      app: message-router
  template:
    metadata:
      labels:
        app: message-router
      annotations:
        # 关键: 让 Linkerd 自动注入 sidecar
        linkerd.io/inject: enabled
    spec:
      containers:
      - name: router
        image: your-registry/message-router:1.0.0
        ports:
        - containerPort: 8080
        env:
        - name: AZURE_SERVICE_BUS_CONNECTION_STRING
          valueFrom:
            secretKeyRef:
              name: azure-service-bus-secrets
              key: connectionString
        volumeMounts:
        - name: script-volume
          mountPath: /config
        resources:
          requests:
            cpu: "100m"
            memory: "128Mi"
          limits:
            cpu: "500m"
            memory: "256Mi"
      volumes:
      - name: script-volume
        configMap:
          name: message-router-script
---
# service.yaml
apiVersion: v1
kind: Service
metadata:
  name: message-router-svc
spec:
  selector:
    app: message-router
  ports:
  - name: http
    port: 80
    targetPort: 8080

部署后,message-router Pod 会被 Linkerd 控制平面识别,并自动注入 linkerd-proxy sidecar。现在,所有进出 message-router 服务的流量都受到了 Linkerd 的管理。我们可以用 linkerd viz stat deploy/message-router 来查看它的实时流量、成功率和延迟,就像对待集群中任何其他服务一样。当上游服务调用 http://message-router-svc 时,请求会先经过 Linkerd 代理,实现服务间的 mTLS 加密,然后才到达我们的 Go 应用。

方案的局限性与未来展望

这个方案在我们的生产环境中运行稳定,极大地提升了业务规则迭代的效率。但它并非银弹。当前实现有几个可以改进的地方:

  1. Lua 脚本的热加载: 目前更新脚本需要通过滚动更新 Deployment 来让 Pod 重新加载 ConfigMap。虽然比重新构建镜像快得多,但并非实时。更优的方案是让 Go 应用监视挂载文件的变化,或者建立一个控制平面来动态推送脚本,实现真正的热更新。
  2. 沙箱的健壮性: 虽然 gopher-lua 本身是安全的,但一个写得不好的 Lua 脚本(例如死循环)仍然可能耗尽单个 Pod 的 CPU 资源。我们需要在 Go 代码中为 Lua 虚拟机的执行设置超时和指令数限制(gopher-lua 本身不直接支持,但可以通过 context 和 goroutine 来模拟实现),防止恶意或有缺陷的脚本影响整个服务的稳定性。
  3. 复杂的脚本依赖: 当前方案不支持 require 外部 Lua 模块。如果路由逻辑变得极其复杂,需要拆分为多个文件或依赖公共库,就需要设计一套更完善的模块加载机制。

尽管存在这些局限,但通过将 Linkerd 的网络能力、Go 的高性能和 Lua 的动态性结合,我们为 Azure Service Bus 构建了一个高度灵活且可观测的消息路由层,这是一个在特定问题域下权衡成本与效率的典型工程决策。


  目录