构建一套基于Kubernetes的C#与Actix-web异构微服务实时同步架构


一个棘手的现实摆在面前:我们有一个用C#编写的庞大、稳定的核心库存管理服务,它承载了所有关键的业务逻辑。与此同时,一个新的Web前端需要对库存变动做出近乎实时的响应。直接轮询数据库的方案在性能和可伸缩性上是不可接受的,而改造整个C#服务以支持WebSocket则意味着巨大的回归测试风险和开发成本。系统间的强耦合也让我们夜不能寐。

初步构想是引入一个轻量级的、高性能的中间层,专门负责实时通信。这个中间层将订阅核心服务的状态变更,并通过WebSocket将更新推送给前端。这种事件驱动的架构不仅能实现前后端解耦,还能为未来更多的实时订阅场景提供基础。

技术选型决策过程充满了权衡。核心服务维持C#和.NET生态不变,这是业务连续性的基本要求。对于新的实时通信层,性能和资源占用是首要考量,因为它可能需要维护成千上万的并发长连接。Rust及其生态中的Actix-web框架因其出色的性能、内存安全和高并发处理能力进入了我们的视野。前端状态管理则选择了MobX,其响应式原理与这种事件驱动的数据流完美契合。

这种架构天然地选择了BASE理论(Basically Available, Soft state, Eventually consistent)而非ACID。当C#服务完成一笔库存扣减后,它会发布一个事件。这个事件被实时通信层消费并推送至前端。在这个过程中,存在一个短暂的时间窗口,后端数据已经更新,但前端尚未收到通知。我们接受这种最终一致性,以换取系统的整体高可用性、性能和松耦合。

整个系统将部署在Kubernetes (K8s)上,利用其服务发现、自动伸缩和容错能力来管理这两个异构的微服务。

架构概览

整体数据流如下:

graph TD
    subgraph Kubernetes Cluster
        subgraph C# Pod
            A[C# Inventory Service]
        end
        subgraph Rust Pod
            C[Actix-web WebSocket Service]
        end

        B[Internal Event Endpoint]

        A -- 1. HTTP POST Event --> B
        B -- "k8s service discovery" --> C
        C -- 3. Consume Event & Broadcast --> D
    end

    U[User Browser] -- 2. WebSocket Connection --> Ingress --> C
    D((WebSocket Clients))
    U -- "React with MobX" --> D
  1. C# 库存服务: 核心业务逻辑处理单元。当库存发生变化(如出库、入库),它会构造一个事件并通过HTTP POST请求发送到Kubernetes集群内部的一个特定服务地址。
  2. Actix-web WebSocket服务:
    • 暴露一个内部HTTP端点,用于接收来自C#服务的事件。
    • 维护一个WebSocket服务器,管理所有活跃的前端客户端连接。
    • 接收到事件后,将数据广播给所有相关的WebSocket客户端。
  3. 前端 (React + MobX):
    • 与Actix-web服务建立持久的WebSocket连接。
    • MobX Store监听WebSocket消息,并自动更新应用状态,触发UI的响应式渲染。

核心实现:C# 事件生产者

在C#核心库存服务中,我们需要在一个业务操作完成后,触发一个事件通知。这里我们不引入复杂的消息队列客户端,而是使用最简单的HttpClient向K8s内部服务发送一个POST请求。这种方式在服务网格(Service Mesh)环境下尤其简单可靠。

假设我们有一个InventoryService类:

// File: Services/InventoryService.cs
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

public class InventoryService
{
    private readonly ILogger<InventoryService> _logger;
    private readonly HttpClient _httpClient;
    private readonly string _eventEndpointUrl;

    public InventoryService(ILogger<InventoryService> logger, IHttpClientFactory httpClientFactory)
    {
        _logger = logger;
        _httpClient = httpClientFactory.CreateClient("EventNotifier");
        // 在生产环境中,这个URL应该通过配置注入,例如 "http://actix-ws-service.default.svc.cluster.local/event"
        _eventEndpointUrl = Environment.GetEnvironmentVariable("EVENT_ENDPOINT_URL") ?? "http://localhost:8080/event";
    }

    // 核心业务方法:更新库存
    public async Task<bool> UpdateStockAsync(string productId, int newQuantity)
    {
        _logger.LogInformation("Updating stock for product {ProductId} to {NewQuantity}", productId, newQuantity);
        
        // ...
        // 在这里执行数据库操作
        // ...

        // 模拟数据库操作成功
        var success = true; 
        if (!success)
        {
            _logger.LogError("Failed to update stock in database for product {ProductId}", productId);
            return false;
        }

        // 数据库操作成功后,发布事件
        await NotifyStockUpdateAsync(productId, newQuantity);
        
        return true;
    }

    // 负责发送事件通知的方法
    private async Task NotifyStockUpdateAsync(string productId, int quantity)
    {
        var stockEvent = new StockUpdateEvent
        {
            ProductId = productId,
            NewQuantity = quantity,
            Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
        };

        var jsonPayload = JsonSerializer.Serialize(stockEvent);
        var content = new StringContent(jsonPayload, Encoding.UTF8, "application/json");

        try
        {
            _logger.LogInformation("Sending stock update event to {Endpoint}", _eventEndpointUrl);
            HttpResponseMessage response = await _httpClient.PostAsync(_eventEndpointUrl, content);

            if (response.IsSuccessStatusCode)
            {
                _logger.LogInformation("Successfully sent stock update event for product {ProductId}", productId);
            }
            else
            {
                // 这里的错误处理很关键。生产环境中可能需要重试机制或将失败的事件存入死信队列。
                var responseBody = await response.Content.ReadAsStringAsync();
                _logger.LogError(
                    "Failed to send stock update event. Status: {StatusCode}, Response: {ResponseBody}", 
                    response.StatusCode, 
                    responseBody);
            }
        }
        catch (HttpRequestException ex)
        {
            _logger.LogError(ex, "HTTP request exception while sending event for product {ProductId}", productId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "An unexpected error occurred while sending event for {ProductId}", productId);
        }
    }
}

// 事件的数据结构
public class StockUpdateEvent
{
    public string ProductId { get; set; }
    public int NewQuantity { get; set; }
    public long Timestamp { get; set; }
}

Startup.cs中进行配置:

// File: Startup.cs
public void ConfigureServices(IServiceCollection services)
{
    // ...
    services.AddHttpClient("EventNotifier", client =>
    {
        // 设置默认的请求头或超时策略
        client.Timeout = TimeSpan.FromSeconds(5);
    });
    services.AddScoped<InventoryService>();
    // ...
}

这段C#代码的核心在于NotifyStockUpdateAsync方法。它将库存变更封装成一个JSON对象,并通过HTTP POST发送。注意,URL是从环境变量获取的,这使得它在Kubernetes环境中可以轻松配置为指向Actix-web服务的内部DNS名称。错误处理和日志记录是生产级代码必不可少的部分。

核心实现:Actix-web 实时通信服务

这是系统的枢纽。它需要同时处理来自内部的HTTP事件和来自外部的WebSocket连接。我们将使用Actix的Actor模型来优雅地管理WebSocket会话。

main.rs 文件结构如下:

// File: src/main.rs
use actix::prelude::*;
use actix_web::{get, post, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder};
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

mod ws;
mod hub;

// 用于接收C#服务事件的结构体
#[derive(Deserialize, Debug)]
struct StockUpdateEvent {
    #[serde(rename = "ProductId")]
    product_id: String,
    #[serde(rename = "NewQuantity")]
    new_quantity: i32,
    #[serde(rename = "Timestamp")]
    timestamp: i64,
}

// 启动WebSocket连接的路由
#[get("/ws")]
async fn websocket_route(
    req: HttpRequest,
    stream: web::Payload,
    hub_addr: web::Data<Addr<hub::Hub>>,
) -> Result<HttpResponse, Error> {
    // 将HTTP请求升级为WebSocket连接,并创建一个新的WebSocket Actor
    ws::ws_start(req, stream, hub_addr.get_ref().clone())
}

// 接收内部事件的路由
#[post("/event")]
async fn event_receiver(
    event: web::Json<StockUpdateEvent>,
    hub_addr: web::Data<Addr<hub::Hub>>,
) -> impl Responder {
    log::info!("Received event: {:?}", event);
    
    // 将事件发送给Hub Actor进行广播
    hub_addr.do_send(hub::BroadcastMessage {
        product_id: event.product_id.clone(),
        message: serde_json::to_string(&event.into_inner()).unwrap_or_default(),
    });

    HttpResponse::Ok().body("Event received")
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

    // 创建Hub Actor的地址,它将在所有worker线程之间共享
    let hub = hub::Hub::new().start();

    log::info!("Starting server at http://0.0.0.0:8080");

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(hub.clone())) // 将Hub地址注入到应用状态中
            .service(websocket_route)
            .service(event_receiver)
    })
    .bind(("0.0.0.0", 8080))?
    .run()
    .await
}

hub.rs 文件负责管理所有连接:

// File: src/hub.rs
use actix::prelude::*;
use std::collections::{HashMap, HashSet};

// --- Actor消息定义 ---

// 广播消息给所有订阅了特定产品的客户端
#[derive(Message)]
#[rtype(result = "()")]
pub struct BroadcastMessage {
    pub product_id: String,
    pub message: String,
}

// 客户端连接时,向Hub注册
#[derive(Message)]
#[rtype(result = "()")]
pub struct Connect {
    pub addr: Addr<super::ws::WsConn>,
}

// 客户端断开连接时,从Hub注销
#[derive(Message)]
#[rtype(result = "()")]
pub struct Disconnect {
    pub addr: Addr<super::ws::WsConn>,
}

// 客户端订阅一个产品
#[derive(Message)]
#[rtype(result = "()")]
pub struct Subscribe {
    pub addr: Addr<super::ws::WsConn>,
    pub product_id: String,
}


// --- Hub Actor 实现 ---

pub struct Hub {
    // addr -> Set<product_id>
    sessions: HashMap<Addr<super::ws::WsConn>, HashSet<String>>,
    // product_id -> Set<addr>
    subscriptions: HashMap<String, HashSet<Addr<super::ws::WsConn>>>,
}

impl Hub {
    pub fn new() -> Hub {
        Hub {
            sessions: HashMap::new(),
            subscriptions: HashMap::new(),
        }
    }
}

impl Actor for Hub {
    type Context = Context<Self>;
}

impl Handler<Connect> for Hub {
    type Result = ();

    fn handle(&mut self, msg: Connect, _: &mut Context<Self>) {
        log::info!("Hub: New connection");
        self.sessions.insert(msg.addr, HashSet::new());
    }
}

impl Handler<Disconnect> for Hub {
    type Result = ();

    fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
        log::info!("Hub: Connection disconnected");
        if let Some(products) = self.sessions.remove(&msg.addr) {
            for product_id in products {
                if let Some(subscribers) = self.subscriptions.get_mut(&product_id) {
                    subscribers.remove(&msg.addr);
                }
            }
        }
    }
}

impl Handler<Subscribe> for Hub {
    type Result = ();

    fn handle(&mut self, msg: Subscribe, _: &mut Context<Self>) {
        log::info!("Hub: Client subscribing to product {}", msg.product_id);
        // 更新会话的订阅列表
        if let Some(session_subs) = self.sessions.get_mut(&msg.addr) {
            session_subs.insert(msg.product_id.clone());
        }
        // 更新产品的订阅者列表
        self.subscriptions
            .entry(msg.product_id)
            .or_default()
            .insert(msg.addr);
    }
}

impl Handler<BroadcastMessage> for Hub {
    type Result = ();

    fn handle(&mut self, msg: BroadcastMessage, _: &mut Context<Self>) {
        log::info!("Hub: Broadcasting update for product {}", msg.product_id);
        if let Some(subscribers) = self.subscriptions.get(&msg.product_id) {
            for addr in subscribers {
                addr.do_send(super::ws::WsMessage(msg.message.clone()));
            }
        }
    }
}

ws.rs 文件定义了单个WebSocket连接的行为:

// File: src/ws.rs
use actix::prelude::*;
use actix_web::{web, Error, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use std::time::{Duration, Instant};

use super::hub;

const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

#[derive(Message)]
#[rtype(result = "()")]
pub struct WsMessage(pub String);

pub struct WsConn {
    hb: Instant,
    hub_addr: Addr<hub::Hub>,
}

impl WsConn {
    pub fn new(hub_addr: Addr<hub::Hub>) -> Self {
        Self {
            hb: Instant::now(),
            hub_addr,
        }
    }

    fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
        ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
            if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
                log::warn!("WebSocket client heartbeat failed, disconnecting.");
                act.hub_addr.do_send(hub::Disconnect { addr: ctx.address() });
                ctx.stop();
                return;
            }
            ctx.ping(b"");
        });
    }
}

impl Actor for WsConn {
    type Context = ws::WebsocketContext<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        self.hb(ctx);
        let addr = ctx.address();
        self.hub_addr
            .send(hub::Connect { addr })
            .into_actor(self)
            .then(|res, _, ctx| {
                if res.is_err() {
                    ctx.stop();
                }
                fut::ready(())
            })
            .wait(ctx);
    }

    fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
        self.hub_addr.do_send(hub::Disconnect { addr: ctx.address() });
        Running::Stop
    }
}

// 接收来自Hub的消息
impl Handler<WsMessage> for WsConn {
    type Result = ();
    fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
        ctx.text(msg.0);
    }
}

// 处理客户端发来的消息
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsConn {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(ws::Message::Ping(msg)) => {
                self.hb = Instant::now();
                ctx.pong(&msg);
            }
            Ok(ws::Message::Pong(_)) => {
                self.hb = Instant::now();
            }
            Ok(ws::Message::Text(text)) => {
                // 假设客户端通过文本消息订阅产品 e.g., "subscribe:PRODUCT_123"
                let parts: Vec<&str> = text.splitn(2, ':').collect();
                if parts.len() == 2 && parts[0] == "subscribe" {
                    let product_id = parts[1].trim().to_string();
                    self.hub_addr.do_send(hub::Subscribe {
                        addr: ctx.address(),
                        product_id,
                    });
                }
            }
            Ok(ws::Message::Close(reason)) => {
                ctx.close(reason);
                ctx.stop();
            }
            _ => (),
        }
    }
}

pub fn ws_start(
    req: HttpRequest,
    stream: web::Payload,
    hub_addr: Addr<hub::Hub>,
) -> Result<HttpResponse, Error> {
    let ws = WsConn::new(hub_addr);
    ws::start(ws, &req, stream)
}

这个Rust服务设计精良:

  • Actor模型: Hub actor作为中心协调者,WsConn actor代表每个客户端连接。它们通过消息传递进行通信,避免了锁和复杂的并发问题。
  • 订阅模式: Hub维护了产品ID到订阅者列表的映射,确保事件只被发送给感兴趣的客户端,而不是盲目广播。
  • 心跳机制: WsConn中实现了心跳检测,可以及时清理掉僵尸连接,这对维护一个稳定的长连接服务至关重要。

核心实现:前端 MobX Store

前端使用MobX来管理状态。当WebSocket收到消息时,我们只需更新MobX Store中的可观察属性,UI便会自动更新。

// file: src/stores/InventoryStore.js
import { makeAutoObservable, observable, action, runInAction } from 'mobx';

class InventoryStore {
    // 使用Map来存储产品库存,key是productId
    products = observable.map();
    connectionStatus = 'disconnected';
    ws = null;

    constructor() {
        makeAutoObservable(this, {
            products: observable,
            connectionStatus: observable,
            connect: action,
            subscribeToProduct: action,
            handleMessage: action,
        });
        this.connect();
    }

    connect() {
        if (this.ws && this.ws.readyState === WebSocket.OPEN) {
            console.log('WebSocket already connected.');
            return;
        }

        // URL应该来自配置文件
        const WS_URL = 'ws://your-k8s-ingress/ws';
        this.ws = new WebSocket(WS_URL);
        this.connectionStatus = 'connecting';

        this.ws.onopen = () => {
            console.log('WebSocket connection established.');
            runInAction(() => {
                this.connectionStatus = 'connected';
            });
            // 连接成功后可以自动订阅一些初始产品
            this.subscribeToProduct('PRODUCT_123');
        };

        this.ws.onmessage = (event) => {
            this.handleMessage(event.data);
        };

        this.ws.onclose = () => {
            console.log('WebSocket connection closed. Reconnecting in 5s...');
            runInAction(() => {
                this.connectionStatus = 'disconnected';
            });
            setTimeout(() => this.connect(), 5000);
        };

        this.ws.onerror = (error) => {
            console.error('WebSocket error:', error);
            runInAction(() => {
                this.connectionStatus = 'error';
            });
            this.ws.close(); // 会触发 onclose 中的重连逻辑
        };
    }

    subscribeToProduct(productId) {
        if (this.ws && this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(`subscribe:${productId}`);
            console.log(`Sent subscription request for ${productId}`);
        } else {
            console.warn('Cannot subscribe, WebSocket is not open.');
        }
    }

    handleMessage(data) {
        try {
            const update = JSON.parse(data);
            // 这里假设收到的数据结构与C#发送的事件一致
            if (update.ProductId && typeof update.NewQuantity !== 'undefined') {
                console.log(`Received update for ${update.ProductId}: ${update.NewQuantity}`);
                runInAction(() => {
                    this.products.set(update.ProductId, update.NewQuantity);
                });
            }
        } catch (error) {
            console.error('Failed to parse WebSocket message:', error);
        }
    }
}

export const inventoryStore = new InventoryStore();

React组件的使用将非常简单:

import React, { useEffect } from 'react';
import { observer } from 'mobx-react-lite';
import { inventoryStore } from './stores/InventoryStore';

const ProductStock = observer(({ productId }) => {
    useEffect(() => {
        // 组件挂载时,确保已订阅
        inventoryStore.subscribeToProduct(productId);
    }, [productId]);

    const stock = inventoryStore.products.get(productId);

    return (
        <div>
            <h2>Product: {productId}</h2>
            <p>Connection: {inventoryStore.connectionStatus}</p>
            <p>Current Stock: {stock === undefined ? 'Loading...' : stock}</p>
        </div>
    );
});

export default ProductStock;

MobX的魔力在于,inventoryStore.products.set被调用后,任何观察了inventoryStore.products.get(productId)的组件(如ProductStock)都会自动重新渲染。代码简洁且意图明确。

Kubernetes 部署清单

最后,我们将这两个服务部署到K8s中。

C#服务的deployment.yamlservice.yaml:

# csharp-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: csharp-inventory-service
spec:
  replicas: 2
  selector:
    matchLabels:
      app: csharp-inventory
  template:
    metadata:
      labels:
        app: csharp-inventory
    spec:
      containers:
      - name: inventory-app
        image: your-repo/csharp-inventory-service:v1.0
        ports:
        - containerPort: 80
        env:
        - name: EVENT_ENDPOINT_URL
          value: "http://actix-ws-service/event" # 指向Rust服务的内部Service
---
apiVersion: v1
kind: Service
metadata:
  name: csharp-inventory-service
spec:
  selector:
    app: csharp-inventory
  ports:
  - protocol: TCP
    port: 80
    targetPort: 80

Actix-web服务的deployment.yamlservice.yaml:

# actix-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: actix-ws-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: actix-ws
  template:
    metadata:
      labels:
        app: actix-ws
    spec:
      containers:
      - name: ws-app
        image: your-repo/actix-ws-service:v1.0
        ports:
        - containerPort: 8080
        resources: # Rust应用通常资源占用更低,可以设置更严格的限制
          requests:
            cpu: "100m"
            memory: "128Mi"
          limits:
            cpu: "500m"
            memory: "256Mi"
---
apiVersion: v1
kind: Service
metadata:
  name: actix-ws-service
spec:
  selector:
    app: actix-ws
  ports:
  - name: http-event # 内部事件端口
    protocol: TCP
    port: 80
    targetPort: 8080
  - name: websocket # 对外暴露的WebSocket端口
    protocol: TCP
    port: 8080
    targetPort: 8080
# 生产环境通常会使用Ingress来暴露WebSocket端口

通过K8s的Service DNS,C#服务可以稳定地通过actix-ws-service这个主机名找到并向Rust服务发送事件,而无需关心其Pod的IP地址。

局限性与未来迭代路径

当前这套架构虽然解决了核心问题,但它并非完美。首先,我们的事件传递机制是基于简单的HTTP调用,它缺乏持久性和重试保证。如果Actix-web服务在C#服务发出请求的瞬间恰好重启,这个事件就会丢失。一个生产级的系统应该在C#服务和Actix-web服务之间引入一个真正的消息队列,如NATS或RabbitMQ,以确保事件的可靠传递。

其次,当前的WebSocket Hub Actor是单点的。当Actix-web服务水平扩展到多个Pod时,连接到Pod A的客户端无法收到由Pod B处理的事件。要解决这个问题,需要引入一个共享的后端,例如Redis Pub/Sub。Hub Actor在收到事件后,不再直接广播给其管理的连接,而是发布到Redis的一个频道。所有Pod中的Hub Actor都订阅这个频道,收到消息后再广播给各自的本地连接,从而实现跨Pod的通信。

最后,可观测性是一个需要加强的领域。为了追踪一个库存变更从C#数据库事务到前端UI更新的完整生命周期,并诊断其中的延迟,我们需要引入分布式追踪系统,如OpenTelemetry,并在每个服务中植入Trace Context。


  目录