一个棘手的现实摆在面前:我们有一个用C#编写的庞大、稳定的核心库存管理服务,它承载了所有关键的业务逻辑。与此同时,一个新的Web前端需要对库存变动做出近乎实时的响应。直接轮询数据库的方案在性能和可伸缩性上是不可接受的,而改造整个C#服务以支持WebSocket则意味着巨大的回归测试风险和开发成本。系统间的强耦合也让我们夜不能寐。
初步构想是引入一个轻量级的、高性能的中间层,专门负责实时通信。这个中间层将订阅核心服务的状态变更,并通过WebSocket将更新推送给前端。这种事件驱动的架构不仅能实现前后端解耦,还能为未来更多的实时订阅场景提供基础。
技术选型决策过程充满了权衡。核心服务维持C#和.NET生态不变,这是业务连续性的基本要求。对于新的实时通信层,性能和资源占用是首要考量,因为它可能需要维护成千上万的并发长连接。Rust及其生态中的Actix-web框架因其出色的性能、内存安全和高并发处理能力进入了我们的视野。前端状态管理则选择了MobX,其响应式原理与这种事件驱动的数据流完美契合。
这种架构天然地选择了BASE理论(Basically Available, Soft state, Eventually consistent)而非ACID。当C#服务完成一笔库存扣减后,它会发布一个事件。这个事件被实时通信层消费并推送至前端。在这个过程中,存在一个短暂的时间窗口,后端数据已经更新,但前端尚未收到通知。我们接受这种最终一致性,以换取系统的整体高可用性、性能和松耦合。
整个系统将部署在Kubernetes (K8s)上,利用其服务发现、自动伸缩和容错能力来管理这两个异构的微服务。
架构概览
整体数据流如下:
graph TD
subgraph Kubernetes Cluster
subgraph C# Pod
A[C# Inventory Service]
end
subgraph Rust Pod
C[Actix-web WebSocket Service]
end
B[Internal Event Endpoint]
A -- 1. HTTP POST Event --> B
B -- "k8s service discovery" --> C
C -- 3. Consume Event & Broadcast --> D
end
U[User Browser] -- 2. WebSocket Connection --> Ingress --> C
D((WebSocket Clients))
U -- "React with MobX" --> D
- C# 库存服务: 核心业务逻辑处理单元。当库存发生变化(如出库、入库),它会构造一个事件并通过HTTP POST请求发送到Kubernetes集群内部的一个特定服务地址。
- Actix-web WebSocket服务:
- 暴露一个内部HTTP端点,用于接收来自C#服务的事件。
- 维护一个WebSocket服务器,管理所有活跃的前端客户端连接。
- 接收到事件后,将数据广播给所有相关的WebSocket客户端。
- 前端 (React + MobX):
- 与Actix-web服务建立持久的WebSocket连接。
- MobX Store监听WebSocket消息,并自动更新应用状态,触发UI的响应式渲染。
核心实现:C# 事件生产者
在C#核心库存服务中,我们需要在一个业务操作完成后,触发一个事件通知。这里我们不引入复杂的消息队列客户端,而是使用最简单的HttpClient向K8s内部服务发送一个POST请求。这种方式在服务网格(Service Mesh)环境下尤其简单可靠。
假设我们有一个InventoryService类:
// File: Services/InventoryService.cs
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
public class InventoryService
{
private readonly ILogger<InventoryService> _logger;
private readonly HttpClient _httpClient;
private readonly string _eventEndpointUrl;
public InventoryService(ILogger<InventoryService> logger, IHttpClientFactory httpClientFactory)
{
_logger = logger;
_httpClient = httpClientFactory.CreateClient("EventNotifier");
// 在生产环境中,这个URL应该通过配置注入,例如 "http://actix-ws-service.default.svc.cluster.local/event"
_eventEndpointUrl = Environment.GetEnvironmentVariable("EVENT_ENDPOINT_URL") ?? "http://localhost:8080/event";
}
// 核心业务方法:更新库存
public async Task<bool> UpdateStockAsync(string productId, int newQuantity)
{
_logger.LogInformation("Updating stock for product {ProductId} to {NewQuantity}", productId, newQuantity);
// ...
// 在这里执行数据库操作
// ...
// 模拟数据库操作成功
var success = true;
if (!success)
{
_logger.LogError("Failed to update stock in database for product {ProductId}", productId);
return false;
}
// 数据库操作成功后,发布事件
await NotifyStockUpdateAsync(productId, newQuantity);
return true;
}
// 负责发送事件通知的方法
private async Task NotifyStockUpdateAsync(string productId, int quantity)
{
var stockEvent = new StockUpdateEvent
{
ProductId = productId,
NewQuantity = quantity,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
var jsonPayload = JsonSerializer.Serialize(stockEvent);
var content = new StringContent(jsonPayload, Encoding.UTF8, "application/json");
try
{
_logger.LogInformation("Sending stock update event to {Endpoint}", _eventEndpointUrl);
HttpResponseMessage response = await _httpClient.PostAsync(_eventEndpointUrl, content);
if (response.IsSuccessStatusCode)
{
_logger.LogInformation("Successfully sent stock update event for product {ProductId}", productId);
}
else
{
// 这里的错误处理很关键。生产环境中可能需要重试机制或将失败的事件存入死信队列。
var responseBody = await response.Content.ReadAsStringAsync();
_logger.LogError(
"Failed to send stock update event. Status: {StatusCode}, Response: {ResponseBody}",
response.StatusCode,
responseBody);
}
}
catch (HttpRequestException ex)
{
_logger.LogError(ex, "HTTP request exception while sending event for product {ProductId}", productId);
}
catch (Exception ex)
{
_logger.LogError(ex, "An unexpected error occurred while sending event for {ProductId}", productId);
}
}
}
// 事件的数据结构
public class StockUpdateEvent
{
public string ProductId { get; set; }
public int NewQuantity { get; set; }
public long Timestamp { get; set; }
}
在Startup.cs中进行配置:
// File: Startup.cs
public void ConfigureServices(IServiceCollection services)
{
// ...
services.AddHttpClient("EventNotifier", client =>
{
// 设置默认的请求头或超时策略
client.Timeout = TimeSpan.FromSeconds(5);
});
services.AddScoped<InventoryService>();
// ...
}
这段C#代码的核心在于NotifyStockUpdateAsync方法。它将库存变更封装成一个JSON对象,并通过HTTP POST发送。注意,URL是从环境变量获取的,这使得它在Kubernetes环境中可以轻松配置为指向Actix-web服务的内部DNS名称。错误处理和日志记录是生产级代码必不可少的部分。
核心实现:Actix-web 实时通信服务
这是系统的枢纽。它需要同时处理来自内部的HTTP事件和来自外部的WebSocket连接。我们将使用Actix的Actor模型来优雅地管理WebSocket会话。
main.rs 文件结构如下:
// File: src/main.rs
use actix::prelude::*;
use actix_web::{get, post, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder};
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
mod ws;
mod hub;
// 用于接收C#服务事件的结构体
#[derive(Deserialize, Debug)]
struct StockUpdateEvent {
#[serde(rename = "ProductId")]
product_id: String,
#[serde(rename = "NewQuantity")]
new_quantity: i32,
#[serde(rename = "Timestamp")]
timestamp: i64,
}
// 启动WebSocket连接的路由
#[get("/ws")]
async fn websocket_route(
req: HttpRequest,
stream: web::Payload,
hub_addr: web::Data<Addr<hub::Hub>>,
) -> Result<HttpResponse, Error> {
// 将HTTP请求升级为WebSocket连接,并创建一个新的WebSocket Actor
ws::ws_start(req, stream, hub_addr.get_ref().clone())
}
// 接收内部事件的路由
#[post("/event")]
async fn event_receiver(
event: web::Json<StockUpdateEvent>,
hub_addr: web::Data<Addr<hub::Hub>>,
) -> impl Responder {
log::info!("Received event: {:?}", event);
// 将事件发送给Hub Actor进行广播
hub_addr.do_send(hub::BroadcastMessage {
product_id: event.product_id.clone(),
message: serde_json::to_string(&event.into_inner()).unwrap_or_default(),
});
HttpResponse::Ok().body("Event received")
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
// 创建Hub Actor的地址,它将在所有worker线程之间共享
let hub = hub::Hub::new().start();
log::info!("Starting server at http://0.0.0.0:8080");
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(hub.clone())) // 将Hub地址注入到应用状态中
.service(websocket_route)
.service(event_receiver)
})
.bind(("0.0.0.0", 8080))?
.run()
.await
}
hub.rs 文件负责管理所有连接:
// File: src/hub.rs
use actix::prelude::*;
use std::collections::{HashMap, HashSet};
// --- Actor消息定义 ---
// 广播消息给所有订阅了特定产品的客户端
#[derive(Message)]
#[rtype(result = "()")]
pub struct BroadcastMessage {
pub product_id: String,
pub message: String,
}
// 客户端连接时,向Hub注册
#[derive(Message)]
#[rtype(result = "()")]
pub struct Connect {
pub addr: Addr<super::ws::WsConn>,
}
// 客户端断开连接时,从Hub注销
#[derive(Message)]
#[rtype(result = "()")]
pub struct Disconnect {
pub addr: Addr<super::ws::WsConn>,
}
// 客户端订阅一个产品
#[derive(Message)]
#[rtype(result = "()")]
pub struct Subscribe {
pub addr: Addr<super::ws::WsConn>,
pub product_id: String,
}
// --- Hub Actor 实现 ---
pub struct Hub {
// addr -> Set<product_id>
sessions: HashMap<Addr<super::ws::WsConn>, HashSet<String>>,
// product_id -> Set<addr>
subscriptions: HashMap<String, HashSet<Addr<super::ws::WsConn>>>,
}
impl Hub {
pub fn new() -> Hub {
Hub {
sessions: HashMap::new(),
subscriptions: HashMap::new(),
}
}
}
impl Actor for Hub {
type Context = Context<Self>;
}
impl Handler<Connect> for Hub {
type Result = ();
fn handle(&mut self, msg: Connect, _: &mut Context<Self>) {
log::info!("Hub: New connection");
self.sessions.insert(msg.addr, HashSet::new());
}
}
impl Handler<Disconnect> for Hub {
type Result = ();
fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
log::info!("Hub: Connection disconnected");
if let Some(products) = self.sessions.remove(&msg.addr) {
for product_id in products {
if let Some(subscribers) = self.subscriptions.get_mut(&product_id) {
subscribers.remove(&msg.addr);
}
}
}
}
}
impl Handler<Subscribe> for Hub {
type Result = ();
fn handle(&mut self, msg: Subscribe, _: &mut Context<Self>) {
log::info!("Hub: Client subscribing to product {}", msg.product_id);
// 更新会话的订阅列表
if let Some(session_subs) = self.sessions.get_mut(&msg.addr) {
session_subs.insert(msg.product_id.clone());
}
// 更新产品的订阅者列表
self.subscriptions
.entry(msg.product_id)
.or_default()
.insert(msg.addr);
}
}
impl Handler<BroadcastMessage> for Hub {
type Result = ();
fn handle(&mut self, msg: BroadcastMessage, _: &mut Context<Self>) {
log::info!("Hub: Broadcasting update for product {}", msg.product_id);
if let Some(subscribers) = self.subscriptions.get(&msg.product_id) {
for addr in subscribers {
addr.do_send(super::ws::WsMessage(msg.message.clone()));
}
}
}
}
ws.rs 文件定义了单个WebSocket连接的行为:
// File: src/ws.rs
use actix::prelude::*;
use actix_web::{web, Error, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use std::time::{Duration, Instant};
use super::hub;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Message)]
#[rtype(result = "()")]
pub struct WsMessage(pub String);
pub struct WsConn {
hb: Instant,
hub_addr: Addr<hub::Hub>,
}
impl WsConn {
pub fn new(hub_addr: Addr<hub::Hub>) -> Self {
Self {
hb: Instant::now(),
hub_addr,
}
}
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
log::warn!("WebSocket client heartbeat failed, disconnecting.");
act.hub_addr.do_send(hub::Disconnect { addr: ctx.address() });
ctx.stop();
return;
}
ctx.ping(b"");
});
}
}
impl Actor for WsConn {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
let addr = ctx.address();
self.hub_addr
.send(hub::Connect { addr })
.into_actor(self)
.then(|res, _, ctx| {
if res.is_err() {
ctx.stop();
}
fut::ready(())
})
.wait(ctx);
}
fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
self.hub_addr.do_send(hub::Disconnect { addr: ctx.address() });
Running::Stop
}
}
// 接收来自Hub的消息
impl Handler<WsMessage> for WsConn {
type Result = ();
fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
ctx.text(msg.0);
}
}
// 处理客户端发来的消息
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsConn {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Text(text)) => {
// 假设客户端通过文本消息订阅产品 e.g., "subscribe:PRODUCT_123"
let parts: Vec<&str> = text.splitn(2, ':').collect();
if parts.len() == 2 && parts[0] == "subscribe" {
let product_id = parts[1].trim().to_string();
self.hub_addr.do_send(hub::Subscribe {
addr: ctx.address(),
product_id,
});
}
}
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
}
_ => (),
}
}
}
pub fn ws_start(
req: HttpRequest,
stream: web::Payload,
hub_addr: Addr<hub::Hub>,
) -> Result<HttpResponse, Error> {
let ws = WsConn::new(hub_addr);
ws::start(ws, &req, stream)
}
这个Rust服务设计精良:
- Actor模型:
Hubactor作为中心协调者,WsConnactor代表每个客户端连接。它们通过消息传递进行通信,避免了锁和复杂的并发问题。 - 订阅模式:
Hub维护了产品ID到订阅者列表的映射,确保事件只被发送给感兴趣的客户端,而不是盲目广播。 - 心跳机制:
WsConn中实现了心跳检测,可以及时清理掉僵尸连接,这对维护一个稳定的长连接服务至关重要。
核心实现:前端 MobX Store
前端使用MobX来管理状态。当WebSocket收到消息时,我们只需更新MobX Store中的可观察属性,UI便会自动更新。
// file: src/stores/InventoryStore.js
import { makeAutoObservable, observable, action, runInAction } from 'mobx';
class InventoryStore {
// 使用Map来存储产品库存,key是productId
products = observable.map();
connectionStatus = 'disconnected';
ws = null;
constructor() {
makeAutoObservable(this, {
products: observable,
connectionStatus: observable,
connect: action,
subscribeToProduct: action,
handleMessage: action,
});
this.connect();
}
connect() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
console.log('WebSocket already connected.');
return;
}
// URL应该来自配置文件
const WS_URL = 'ws://your-k8s-ingress/ws';
this.ws = new WebSocket(WS_URL);
this.connectionStatus = 'connecting';
this.ws.onopen = () => {
console.log('WebSocket connection established.');
runInAction(() => {
this.connectionStatus = 'connected';
});
// 连接成功后可以自动订阅一些初始产品
this.subscribeToProduct('PRODUCT_123');
};
this.ws.onmessage = (event) => {
this.handleMessage(event.data);
};
this.ws.onclose = () => {
console.log('WebSocket connection closed. Reconnecting in 5s...');
runInAction(() => {
this.connectionStatus = 'disconnected';
});
setTimeout(() => this.connect(), 5000);
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
runInAction(() => {
this.connectionStatus = 'error';
});
this.ws.close(); // 会触发 onclose 中的重连逻辑
};
}
subscribeToProduct(productId) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(`subscribe:${productId}`);
console.log(`Sent subscription request for ${productId}`);
} else {
console.warn('Cannot subscribe, WebSocket is not open.');
}
}
handleMessage(data) {
try {
const update = JSON.parse(data);
// 这里假设收到的数据结构与C#发送的事件一致
if (update.ProductId && typeof update.NewQuantity !== 'undefined') {
console.log(`Received update for ${update.ProductId}: ${update.NewQuantity}`);
runInAction(() => {
this.products.set(update.ProductId, update.NewQuantity);
});
}
} catch (error) {
console.error('Failed to parse WebSocket message:', error);
}
}
}
export const inventoryStore = new InventoryStore();
React组件的使用将非常简单:
import React, { useEffect } from 'react';
import { observer } from 'mobx-react-lite';
import { inventoryStore } from './stores/InventoryStore';
const ProductStock = observer(({ productId }) => {
useEffect(() => {
// 组件挂载时,确保已订阅
inventoryStore.subscribeToProduct(productId);
}, [productId]);
const stock = inventoryStore.products.get(productId);
return (
<div>
<h2>Product: {productId}</h2>
<p>Connection: {inventoryStore.connectionStatus}</p>
<p>Current Stock: {stock === undefined ? 'Loading...' : stock}</p>
</div>
);
});
export default ProductStock;
MobX的魔力在于,inventoryStore.products.set被调用后,任何观察了inventoryStore.products.get(productId)的组件(如ProductStock)都会自动重新渲染。代码简洁且意图明确。
Kubernetes 部署清单
最后,我们将这两个服务部署到K8s中。
C#服务的deployment.yaml和service.yaml:
# csharp-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: csharp-inventory-service
spec:
replicas: 2
selector:
matchLabels:
app: csharp-inventory
template:
metadata:
labels:
app: csharp-inventory
spec:
containers:
- name: inventory-app
image: your-repo/csharp-inventory-service:v1.0
ports:
- containerPort: 80
env:
- name: EVENT_ENDPOINT_URL
value: "http://actix-ws-service/event" # 指向Rust服务的内部Service
---
apiVersion: v1
kind: Service
metadata:
name: csharp-inventory-service
spec:
selector:
app: csharp-inventory
ports:
- protocol: TCP
port: 80
targetPort: 80
Actix-web服务的deployment.yaml和service.yaml:
# actix-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: actix-ws-service
spec:
replicas: 3
selector:
matchLabels:
app: actix-ws
template:
metadata:
labels:
app: actix-ws
spec:
containers:
- name: ws-app
image: your-repo/actix-ws-service:v1.0
ports:
- containerPort: 8080
resources: # Rust应用通常资源占用更低,可以设置更严格的限制
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "500m"
memory: "256Mi"
---
apiVersion: v1
kind: Service
metadata:
name: actix-ws-service
spec:
selector:
app: actix-ws
ports:
- name: http-event # 内部事件端口
protocol: TCP
port: 80
targetPort: 8080
- name: websocket # 对外暴露的WebSocket端口
protocol: TCP
port: 8080
targetPort: 8080
# 生产环境通常会使用Ingress来暴露WebSocket端口
通过K8s的Service DNS,C#服务可以稳定地通过actix-ws-service这个主机名找到并向Rust服务发送事件,而无需关心其Pod的IP地址。
局限性与未来迭代路径
当前这套架构虽然解决了核心问题,但它并非完美。首先,我们的事件传递机制是基于简单的HTTP调用,它缺乏持久性和重试保证。如果Actix-web服务在C#服务发出请求的瞬间恰好重启,这个事件就会丢失。一个生产级的系统应该在C#服务和Actix-web服务之间引入一个真正的消息队列,如NATS或RabbitMQ,以确保事件的可靠传递。
其次,当前的WebSocket Hub Actor是单点的。当Actix-web服务水平扩展到多个Pod时,连接到Pod A的客户端无法收到由Pod B处理的事件。要解决这个问题,需要引入一个共享的后端,例如Redis Pub/Sub。Hub Actor在收到事件后,不再直接广播给其管理的连接,而是发布到Redis的一个频道。所有Pod中的Hub Actor都订阅这个频道,收到消息后再广播给各自的本地连接,从而实现跨Pod的通信。
最后,可观测性是一个需要加强的领域。为了追踪一个库存变更从C#数据库事务到前端UI更新的完整生命周期,并诊断其中的延迟,我们需要引入分布式追踪系统,如OpenTelemetry,并在每个服务中植入Trace Context。