基于 Pulumi 和 etcd 构建动态可配置的实时推荐系统前端交互架构


定义一个复杂的业务问题是架构设计的起点。传统的推荐系统通常面临一个核心矛盾:模型和策略的更新周期与用户行为的瞬息万变不匹配。多数系统依赖于离线的批处理任务(例如每天一次的 Spark 作业)来更新推荐模型,并通过 API 接口暴露结果。这种架构的更新延迟通常在小时甚至天级别。当市场活动、突发热点或用户兴趣快速漂移时,系统无法做出实时响应,导致推荐内容陈旧,点击率下降。

我们要解决的,正是这个“实时性”与“动态性”的难题。具体目标是构建一个系统,它允许运营或算法团队在不重新部署任何服务的情况下,实时变更线上的推荐策略,例如动态调整不同推荐源的权重、切换A/B测试组的算法模型、甚至针对特定用户群体推送指定的物料。这种级别的动态性要求配置管理必须是分布式的、高可用的,并且能够被后端服务近乎实时地感知。

方案A:基于数据库的传统配置管理

一个常见的思路是使用一个中心化的数据库(如 MySQL 或 PostgreSQL)来存储推荐策略。后端服务会启动一个定时器,每隔一个较短的时间(例如 30 秒)轮询配置表,检查是否有变更。

优势分析:

  • 技术成熟度高: 几乎所有工程师都熟悉关系型数据库,维护和开发成本相对较低。
  • 实现简单: 逻辑清晰,一个定时任务加一个数据库查询即可完成。
  • 事务支持: 对于复杂的配置变更,可以利用数据库的事务特性保证原子性。

劣势分析:

  • 延迟与轮询开销: “实时”是一个伪命题。轮询间隔决定了配置生效的延迟。为了降低延迟而缩短间隔,又会急剧增加数据库的无效查询负载,尤其是在服务实例数量庞大的情况下,可能形成“查询风暴”。
  • 可用性耦合: 推荐服务的核心链路强依赖于配置数据库的可用性。如果数据库发生抖动,所有服务实例的配置拉取都会失败。
  • 状态通知复杂: 数据库本身不具备主动通知机制。如果需要实现变更后立即通知,需要引入额外的消息队列或触发器机制,增加了系统复杂度。

在真实项目中,这种方案对于非核心、允许分钟级延迟的配置是可行的。但对于要求秒级甚至亚秒级响应的推荐策略调整,其固有的延迟和轮询开销是不可接受的。

方案B:基于 etcd Watch 机制的事件驱动架构

此方案引入了一个专门的分布式协调服务 etcd 作为配置中心。推荐策略以键值对的形式存储在 etcd 中。后端推荐服务不再轮询,而是与 etcd 建立一个长连接,使用其 Watch 机制来订阅特定配置前缀的变化。

优势分析:

  • 近乎实时的通知: etcdWatch 是一个基于 HTTP/2 的长连接推送模型。一旦某个 key 的值发生变化,所有监听该 key 的客户端会立即收到通知。配置生效的延迟可以降低到毫秒级。
  • 低负载: 在没有配置变更时,服务与 etcd 之间几乎没有数据传输,避免了轮询带来的资源浪费。
  • 高可用: etcd 通过 Raft 协议保证了集群自身的高可用性。即使部分节点宕机,只要多数节点存活,服务依然可以正常读写和接收通知。
  • 原子操作: etcd 提供了 Compare-and-Swap (CAS) 等原子操作,可以安全地进行并发配置修改。

劣势分析:

  • 运维复杂性增加: 引入 etcd 集群需要额外的运维和监控成本。
  • 数据模型限制: etcd 本质上是一个键值存储,不适合存储复杂的关系型数据。配置信息需要被序列化为字符串(如 JSON 或 Protobuf)进行存储。
  • 连接管理: 服务需要妥善处理与 etcd 的长连接,包括连接中断后的自动重连和 Watch 的重建。

最终选择与理由:

对于我们追求极致动态性的目标而言,方案 B 是显而易见的更优选择。它从根本上解决了轮询架构的延迟问题,将配置变更从一个“拉”模型转换为了一个高效的“推”模型。虽然增加了运维复杂性,但通过基础设施即代码(IaC)工具如 Pulumi,可以极大地简化 etcd 集群及相关服务的部署和管理,将这部分成本降至可控范围。前端的实时交互则通过 WebSocket 与后端服务连接,后端在接收到 etcd 的配置变更后,可以主动将新的推荐结果推送给前端,形成一个完整的实时闭环。

核心实现概览

整个系统分为三大部分:基础设施定义(Pulumi)、后端服务(Go)和前端应用(Vite + React + Zustand)。

graph TD
    subgraph "IaC Management (Pulumi)"
        A[Pulumi Code] -->|provisions| B(EC2 for etcd)
        A -->|provisions| C(Fargate Service for Backend)
        A -->|provisions| D(ALB with WebSocket Support)
        A -->|provisions| E(S3 Bucket for Frontend)
        A -->|provisions| F(CloudFront Distribution)
    end

    subgraph "Operator Workflow"
        G[Operator] -- etcdctl put --> H[etcd Cluster]
    end

    subgraph "Real-time Data Flow"
        I[User Browser] -- WebSocket conn --> D
        D -- forwards --> C
        H -- Watch event --> C
        C -- Recommendation Push --> D
        D -- pushes back --> I
    end

    subgraph "Frontend State"
        I -- updates --> J[Zustand Store]
        J -- re-renders --> K[React UI]
    end

    C -.->|Fetches user profile, item features| L[Data Sources]

1. 基础设施即代码:使用 Pulumi 定义整个栈

使用 Pulumi 和 TypeScript,我们可以将所有云资源声明式地定义在一个项目中。这保证了环境的一致性和可重复性。

// pulumi/index.ts
import * as aws from "@pulumi/aws";
import * as awsx from "@pulumi/awsx";
import * as pulumi from "@pulumi/pulumi";

// --- VPC & Networking Setup ---
// In a real project, this would be more complex, possibly referencing an existing VPC.
const vpc = new awsx.ec2.Vpc("recsys-vpc", {
    numberOfAvailabilityZones: 2,
});

const cluster = new awsx.ecs.Cluster("recsys-cluster", { vpc });

// --- Application Load Balancer ---
const alb = new awsx.lb.ApplicationLoadBalancer("recsys-alb", {
    vpc,
    external: true,
    securityGroups: cluster.securityGroups,
});

const wsTargetGroup = alb.createTargetGroup("ws-tg", {
    port: 8080,
    protocol: "HTTP",
    healthCheck: {
        path: "/health",
        protocol: "HTTP",
    }
});

const listener = wsTargetGroup.createListener("ws-listener", {
    port: 80,
    protocol: "HTTP", // Termination happens at ALB if using HTTPS
});

// --- Backend Service (Go) ---
const backendImage = awsx.ecs.Image.fromPath("backend-image", "../backend");

const backendService = new awsx.ecs.FargateService("backend-service", {
    cluster,
    taskDefinitionArgs: {
        container: {
            image: backendImage,
            cpu: 512,
            memory: 1024,
            portMappings: [wsTargetGroup],
            environment: [
                // Etcd endpoint would be discovered via service discovery or passed here.
                { name: "ETCD_ENDPOINTS", value: "etcd-service.local:2379" },
            ],
        },
    },
    desiredCount: 2,
});

// --- Frontend Deployment (Vite) ---
const siteBucket = new aws.s3.Bucket("site-bucket", {
    website: {
        indexDocument: "index.html",
    },
});

// This assumes the Vite project is in `../frontend` and has been built to the `dist` dir.
const siteDir = "../frontend/dist";
for (const item of require("fs").readdirSync(siteDir)) {
    new aws.s3.BucketObject(item, {
        bucket: siteBucket,
        source: new pulumi.asset.FileAsset(`${siteDir}/${item}`),
        contentType: require("mime").getType(item) || undefined,
        acl: "public-read",
    });
}

const distribution = new aws.cloudfront.Distribution("site-cdn", {
    // ... CDN configuration pointing to S3 and forwarding /ws path to ALB
    // This part is complex and involves custom origins and behaviors.
});

export const backendUrl = listener.endpoint.hostname;
export const frontendUrl = distribution.domainName;

代码剖析:

  • 模块化: awsx 库简化了 VPC、Cluster、ALB 和 Fargate 服务的创建。
  • 解耦: 后端服务和前端资源是独立定义的,但部署在同一个 Pulumi 项目中,便于统一管理。
  • 配置注入: 后端容器通过环境变量 ETCD_ENDPOINTS 获取 etcd 的地址。在生产环境中,这通常通过 AWS Cloud Map 或 Consul 等服务发现机制实现,而不是硬编码。
  • 遗漏的部分: 为简化篇幅,此代码省略了 etcd 集群本身的 Pulumi 配置(这可能涉及 EC2 实例、Auto Scaling Group 和复杂的启动脚本)、详细的 IAM 角色权限以及 CloudFront 的完整行为配置。在真实项目中,这些都是至关重要的。

2. 后端服务:Go 实现的策略监听与 WebSocket 推送

后端服务是整个系统的核心,它需要完成三件事:

  1. etcd 建立连接并持续监听策略变化。
  2. 在内存中维护当前生效的推荐策略。
  3. 管理 WebSocket 连接,根据当前策略为客户端生成并推送推荐。
// backend/main.go
package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/websocket"
	clientv3 "go.etcd.io/etcd/client/v3"
)

// RecommendationStrategy defines the structure for our dynamic configuration.
type RecommendationStrategy struct {
	Name    string            `json:"name"`
	Weights map[string]float64 `json:"weights"` // e.g., "popular": 0.7, "new": 0.3
	Filter  []string          `json:"filter"`
}

var (
	// In-memory cache for the current strategy.
	currentStrategy     RecommendationStrategy
	strategyMutex       sync.RWMutex
	defaultStrategy     = RecommendationStrategy{Name: "default", Weights: map[string]float64{"popular": 1.0}}
	strategyKey         = "/recommendation/strategy/homepage"
)

// upgrader is used to upgrade HTTP connections to WebSocket connections.
var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		// In production, you must validate the origin.
		return true
	},
}

func main() {
	// --- Initialize Etcd Client and Watcher ---
	endpoints := []string{"etcd:2379"} // Assumes Docker Compose or similar networking
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatalf("Failed to connect to etcd: %v", err)
	}
	defer cli.Close()

	// Start a goroutine to watch for strategy changes.
	go watchStrategy(cli)

	// --- Setup WebSocket Handler ---
	http.HandleFunc("/ws", handleConnections)
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("OK"))
    })

	log.Println("HTTP server started on :8080")
	err = http.ListenAndServe(":8080", nil)
	if err != nil {
		log.Fatalf("ListenAndServe failed: %v", err)
	}
}

// watchStrategy fetches the initial strategy and then watches for any future changes.
func watchStrategy(cli *clientv3.Client) {
	// 1. Initial fetch
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	resp, err := cli.Get(ctx, strategyKey)
	cancel()
	if err != nil {
		log.Printf("Failed to initially fetch strategy, using default: %v", err)
		setStrategy(defaultStrategy)
	} else if len(resp.Kvs) > 0 {
		var strategy RecommendationStrategy
		if err := json.Unmarshal(resp.Kvs[0].Value, &strategy); err == nil {
			log.Printf("Initial strategy loaded: %s", strategy.Name)
			setStrategy(strategy)
		} else {
            log.Printf("Failed to unmarshal initial strategy, using default: %v", err)
			setStrategy(defaultStrategy)
        }
	} else {
        log.Printf("No initial strategy found in etcd, using default.")
		setStrategy(defaultStrategy)
    }

	// 2. Watch for changes
	rch := cli.Watch(context.Background(), strategyKey)
	log.Println("Watching for strategy changes on key:", strategyKey)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			if ev.Type == clientv3.EventTypePut {
				var newStrategy RecommendationStrategy
				if err := json.Unmarshal(ev.Kv.Value, &newStrategy); err != nil {
					log.Printf("Error unmarshalling updated strategy: %v", err)
					continue
				}
				log.Printf("Strategy updated via etcd: %s -> %s", currentStrategy.Name, newStrategy.Name)
				setStrategy(newStrategy)
				// Here you could trigger a push to all connected clients if needed.
			}
		}
	}
}

func setStrategy(s RecommendationStrategy) {
	strategyMutex.Lock()
	defer strategyMutex.Unlock()
	currentStrategy = s
}

func getStrategy() RecommendationStrategy {
	strategyMutex.RLock()
	defer strategyMutex.RUnlock()
	return currentStrategy
}


func handleConnections(w http.ResponseWriter, r *http.Request) {
	ws, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Printf("Upgrade error: %v", err)
		return
	}
	defer ws.Close()

	log.Println("Client connected")

	// This is a simplified ticker to simulate real-time recommendation pushes.
	// A real implementation would be triggered by user actions or other events.
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			strategy := getStrategy()
			// In a real system, this function would be very complex,
			// involving calls to feature stores, model servers, etc.
			recommendations := generateRecommendations(strategy)

			err := ws.WriteJSON(recommendations)
			if err != nil {
				log.Printf("Write error: %v", err)
				return // End connection on write error
			}
		}
	}
}

// generateRecommendations is a placeholder for the actual recommendation logic.
func generateRecommendations(strategy RecommendationStrategy) map[string]interface{} {
	// The logic here would use strategy.Weights, strategy.Filter etc.
	log.Printf("Generating recommendations with strategy: %s", strategy.Name)
	return map[string]interface{}{
		"strategy": strategy.Name,
		"items": []map[string]interface{}{
			{"id": "item-123", "score": 0.98 * strategy.Weights["popular"]},
			{"id": "item-456", "score": 0.95 * strategy.Weights["new"]},
		},
		"timestamp": time.Now().Unix(),
	}
}

代码剖析:

  • 并发安全: strategyMutex 确保了对全局变量 currentStrategy 的读写是线程安全的。这是一个常见的错误来源,尤其是在 Go 中,很容易忘记并发访问共享内存的问题。
  • 容错与默认值: watchStrategy 函数在启动时会尝试从 etcd 获取初始配置。如果失败(网络问题、etcd 未就绪),它会优雅地降级到 defaultStrategy,保证了服务的健壮性。
  • Watch 逻辑: cli.Watch 返回一个 channel。Go 的 for range 语法可以持续地从这个 channel 中消费事件,这是一种非常优雅的事件驱动编程模型。
  • 责任分离: generateRecommendations 函数是业务逻辑的核心,但它与配置管理和网络通信的逻辑是解耦的。它只接收一个 RecommendationStrategy 对象作为输入。

3. 前端应用:Vite + Zustand 驱动的响应式 UI

前端需要维护一个到后端的 WebSocket 长连接,并使用一个轻量级的状态管理器(Zustand)来存储和响应推荐数据的变化。

// frontend/src/store.js
import { create } from 'zustand';

// Zustand store for managing recommendation state.
export const useRecStore = create((set) => ({
    isConnected: false,
    strategy: 'N/A',
    items: [],
    lastUpdated: null,
    
    // Actions to update the store
    setConnectionStatus: (status) => set({ isConnected: status }),
    updateRecommendations: (data) => set({
        strategy: data.strategy,
        items: data.items,
        lastUpdated: data.timestamp,
    }),
}));
// frontend/src/App.jsx
import React, { useEffect, useRef } from 'react';
import { useRecStore } from './store';
import './App.css';

const WS_URL = 'ws://<your-alb-dns>/ws'; // This should be configured via env vars

function App() {
    const { isConnected, strategy, items, lastUpdated, setConnectionStatus, updateRecommendations } = useRecStore();
    const ws = useRef(null);

    useEffect(() => {
        if (!ws.current) {
            ws.current = new WebSocket(WS_URL);

            ws.current.onopen = () => {
                console.log('WebSocket Connected');
                setConnectionStatus(true);
            };

            ws.current.onclose = () => {
                console.log('WebSocket Disconnected');
                setConnectionStatus(false);
                // Production-grade code needs a robust reconnection strategy with backoff.
            };

            ws.current.onerror = (error) => {
                console.error('WebSocket Error:', error);
            };

            ws.current.onmessage = (event) => {
                try {
                    const data = JSON.parse(event.data);
                    console.log('Received data:', data);
                    updateRecommendations(data);
                } catch (e) {
                    console.error('Failed to parse message:', e);
                }
            };
        }

        // Cleanup on component unmount
        return () => {
            if (ws.current) {
                ws.current.close();
            }
        };
    }, [setConnectionStatus, updateRecommendations]);

    return (
        <div className="container">
            <header>
                <h1>Real-time Recommendation Feed</h1>
                <div className={`status ${isConnected ? 'connected' : 'disconnected'}`}>
                    {isConnected ? '● Connected' : '● Disconnected'}
                </div>
            </header>
            <main>
                <div className="info">
                    Current Strategy: <strong>{strategy}</strong> | Last Update: {lastUpdated ? new Date(lastUpdated * 1000).toLocaleTimeString() : 'N/A'}
                </div>
                <div className="item-list">
                    {items.length === 0 ? (
                        <p>Waiting for recommendations...</p>
                    ) : (
                        items.map(item => (
                            <div key={item.id} className="item-card">
                                <h3>Item ID: {item.id}</h3>
                                <p>Score: {item.score.toFixed(4)}</p>
                            </div>
                        ))
                    )}
                </div>
            </main>
        </div>
    );
}

export default App;

代码剖析:

  • Zustand 的简洁性: 相较于 Redux,Zustand 的 API 非常简单。create 函数直接定义了 state 和 actions,没有样板代码。useRecStore hook 让组件可以轻松地订阅状态变化。
  • useEffect 与 useRef: useRef 用于在组件的多次渲染之间持久化 WebSocket 实例,而 useEffect 则负责在组件挂载时建立连接,并在卸载时清理连接。这是 React 中处理副作用的标准模式。
  • 生产环境的考量: 注释中明确指出,一个生产级的 WebSocket 客户端需要实现带指数退避的自动重连逻辑。单纯的 onclose 事件处理是不够的。

架构的扩展性与局限性

这个架构的优势在于其核心的动态配置能力。我们可以轻松地扩展它,例如,etcd 中的 key 可以变得更复杂,如 /recommendation/strategy/homepage/user_group_A,允许对不同用户分层应用不同策略。后端服务可以监听更广泛的前缀 /recommendation/strategy/ 来一次性获取所有策略,实现更复杂的路由逻辑。

然而,该架构也存在局限性。etcd 被设计为低容量、高可靠的元数据存储,不应被用作高吞吐量的消息总线。所有策略配置的总大小应该保持在合理范围内(通常是 MB 级别)。其次,系统的状态完全依赖于后端服务的内存和 etcd。如果后端服务重启,它需要重新从 etcd 加载配置,这期间可能会有短暂的配置不一致窗口。最后,当 WebSocket 连接数达到数十万甚至更高时,对后端服务的负载均衡、连接管理和资源消耗都提出了巨大挑战,可能需要引入专门的 WebSocket 网关层。


  目录