构建从 GKE 上的 Spark 到浏览器 WASM 的大规模数据交互式分析架构


我们需要为业务团队构建一个工具,允许他们对一个规模达数百GB的金融时间序列数据集进行高频次的交互式探索分析。用户的操作模式是:加载一个特定市场在某个时间窗口的完整数据集,然后通过调整多个复杂参数(例如波动率阈值、移动平均窗口、相关性因子),实时观察一个复杂衍生品模型的计算结果。

定义复杂技术问题

问题的核心在于“实时观察”。每次参数微调都意味着一次完整的重算。数据集在内存中解压后可能达到数GB,涉及的计算是CPU密集型的。这直接引出了架构选型上的第一个分水岭:计算应该在哪里发生?在服务端还是客户端?

方案 A: 纯服务端实时计算架构

这是最符合直觉的方案。前端作为一个纯粹的UI层,捕捉用户输入,然后通过API请求后端执行计算。

graph LR
    subgraph 浏览器
        A[用户调整参数] --> B{React UI};
    end
    subgraph GCP
        C[API Gateway] --> D[Spring Boot 服务];
        D -- gRPC/REST --> E[实时计算集群/服务];
        E -- 读取 --> F[GCS/BigQuery: 预处理数据];
    end
    B -- API 请求 --> C;
    E -- 计算结果 --> D;
    D -- 响应 --> B;
    B -- 更新图表 --> G[数据可视化];

架构分析:

  1. 数据流: Apache Spark 作业在 GKE (Google Kubernetes Engine) 上定期运行,将原始数据清洗、聚合,并存入 Google Cloud Storage (GCS) 或 BigQuery 中,形成适合查询的宽表或 Parquet 文件。
  2. 计算流: 用户的每次参数调整都会触发一个 HTTP 请求到 Spring Boot 服务。该服务根据参数,向一个专门的计算服务(可能是一个独立的、可自动伸缩的 Spark Streaming 作业或者一个内存计算网格)发起请求。计算服务从 GCS 拉取数据,执行计算,然后返回结果。
  3. 优势:
    • 逻辑集中: 所有的核心算法和业务逻辑都封装在后端,易于维护、更新和保护知识产权。
    • 客户端轻量: 浏览器只需负责渲染,对客户端硬件要求低。
    • 数据安全: 原始数据不离开服务端,安全性高。
  4. 劣势:
    • 交互延迟: 整个链路 用户输入 -> HTTP请求 -> 服务调度 -> 数据加载 -> 计算 -> 结果返回 -> 前端渲染,即使在最优情况下,也难以将端到端延迟控制在100ms以内。对于需要“实时”反馈的探索性分析,这种延迟是致命的。
    • 服务端成本与压力: 每次微调都是一次完整的后端计算。如果有100个分析师同时使用,服务端的计算和网络IO压力将是巨大的。为了应对峰值,我们需要在GKE上配置非常激进的HPA(Horizontal Pod Autoscaler)策略,这直接导致了高昂的云资源成本。
    • 状态管理复杂: 如果计算是无状态的,每次都需要加载数据。如果要实现有状态的计算以加速,那么服务端需要维护每个用户的会话和上下文数据,这会极大地增加架构复杂性。

在真实项目中,方案 A 对于低频次、高延迟容忍度的报表生成是可行的。但对于我们要求的高频交互场景,其固有的网络延迟和服务器成本使其成为一个无法接受的选项。

方案 B: 客户端 WASM 混合计算架构

这个方案的核心思想是将计算的重心从服务端转移到客户端。服务端只负责一次性的数据“空投”,后续所有的计算都在用户的浏览器中,由一个高性能的 WebAssembly (WASM) 模块完成。

graph TD
    subgraph "Phase 1: 初始数据加载"
        A[Spark on GKE 预处理] --> B[GCS Parquet 数据];
        C[用户打开页面] --> D[React App];
        D -- 请求初始数据 --> E[Spring Boot API];
        E -- 流式读取 --> B;
        E -- 二进制流响应 --> D;
    end
    subgraph "Phase 2: 客户端实时交互"
        F[用户调整参数] --> G{React UI};
        G -- 调用WASM函数(携带参数) --> H[WebAssembly 计算模块];
        H -- 直接在内存中计算 --> I[浏览器内存中的二进制数据];
        H -- 返回结果 --> G;
        G -- 更新图表 --> J[数据可视化];
    end
    D -- 加载数据到内存 --> I;

架构分析:

  1. 数据流: 与方案A类似,Spark on GKE 仍然负责对海量原始数据进行预处理。但其产出是经过高度优化的、可以直接被客户端计算引擎消费的二进制格式(例如 Apache Arrow 或自定义的 FlatBuffers 格式),存储在 GCS。
  2. 计算流:
    • 初始加载: 用户选择数据集后,前端向 Spring Boot API 发起请求。API 不会解析数据,而是直接从 GCS 以流式方式(Streaming)将这个可能数百MB的二进制文件作为 application/octet-stream 高效地传输给客户端。
    • 本地计算: 浏览器接收到二进制数据后,将其存入内存。一个用 Rust 或 C++ 编写并编译为 WASM 的计算模块被加载。用户的每一次参数调整,都只是在本地调用这个 WASM 模块的函数。WASM 直接在浏览器内存中对二进制数据进行高速计算,并立即返回结果。
  3. 优势:
    • 极致交互体验: 一旦初始数据加载完成,后续所有计算都没有网络延迟,响应速度仅取决于客户端的CPU性能,通常在几毫秒到几十毫秒之间,实现了真正的“实时”交互。
    • 服务器成本极低: 服务端的主要职责退化为静态文件服务器。GKE 集群的资源消耗从持续的高CPU计算转变为短时间的高网络IO,总体成本显著下降。
    • 架构简化: 服务端无需管理复杂的计算状态和用户会话,变成了无状态的、易于水平扩展的数据分发服务。
  4. 劣势:
    • 初始加载时间: 首次加载数据块可能耗时较长,需要设计良好的加载动画和用户预期管理。
    • 客户端资源消耗: 对用户的电脑配置有一定要求,特别是内存。需要仔细管理内存,避免浏览器崩溃。
    • 逻辑暴露风险: 核心计算逻辑被编译到 WASM 中分发给了客户端。虽然 WASM 比 JavaScript 更难逆向工程,但这依然是一个需要评估的安全风险。

最终选择与理由

对于我们的业务场景,交互性是压倒一切的核心指标。用户愿意接受几秒甚至几十秒的初始加载时间,以换取后续流畅无阻的探索体验。因此,我们最终决定采用 方案 B:客户端 WASM 混合计算架构。它将云端(GKE上的Spark)的大规模批处理能力与边缘端(浏览器中的WASM)的低延迟计算能力完美结合,是解决此类问题的理想模型。

核心实现概览

1. Apache Spark on GKE: 数据预处理与格式化

我们的目标是生成一种对 WASM 计算引擎极其友好的二进制格式。JSON 的解析开销太大,我们选择了 Parquet 作为存储格式,因为它列式存储的特性非常适合后续的分析查询。

Spark 作业的核心逻辑并不复杂,关键在于 schema 的设计和分区的策略。

SparkApplication GKE Manifest (spark-preprocessor.yaml):

#
# This manifest defines a Spark batch application to be run on GKE.
# It uses the official Spark on Kubernetes operator.
#
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: fin-data-preprocessor
  namespace: data-processing
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/my-project/spark-app:1.0.0"
  imagePullPolicy: Always
  mainClass: com.mycompany.spark.FinancialDataPreprocessor
  mainApplicationFile: "local:///opt/spark/jars/my-spark-app.jar"
  arguments:
    - "--inputPath"
    - "gs://raw-financial-data/timeseries/"
    - "--outputPath"
    - "gs://precomputed-financial-data/interactive-parquet/"
  sparkVersion: "3.3.1"
  restartPolicy:
    type: Never
  driver:
    cores: 1
    memory: "4g"
    labels:
      version: 3.3.1
    serviceAccount: spark-gke-sa # Service account with GCS access
  executor:
    cores: 4
    instances: 10
    memory: "16g"
    labels:
      version: 3.3.1

Spark 作业核心代码 (FinancialDataPreprocessor.scala):

package com.mycompany.spark

import org.apache.spark.sql.{SparkSession, SaveMode}
import org.apache.spark.sql.functions._
import org.slf4j.LoggerFactory

object FinancialDataPreprocessor {
  private val logger = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    // A proper argument parsing library should be used in production.
    val inputPath = args(1)
    val outputPath = args(3)

    val spark = SparkSession.builder
      .appName("FinancialDataPreprocessor")
      .getOrCreate()

    logger.info(s"Starting preprocessing from $inputPath to $outputPath")

    try {
      val rawDF = spark.read.json(inputPath)

      // In a real scenario, this involves complex business logic:
      // cleaning, joining with other datasets, feature engineering etc.
      val processedDF = rawDF
        .withColumn("timestamp_ms", (col("time").cast("double") * 1000).cast("long"))
        .withColumn("trade_date", to_date(from_unixtime(col("timestamp_ms") / 1000)))
        .select(
          "market_id",
          "trade_date",
          "timestamp_ms",
          "price",
          "volume",
          "volatility_index"
        )
        // Coalesce is important to control the output file size.
        // We want a single file per partition for efficient client download.
        .coalesce(1)

      // Partitioning is key for the API to locate data efficiently.
      processedDF.write
        .mode(SaveMode.Overwrite)
        .partitionBy("market_id", "trade_date")
        .parquet(outputPath)

      logger.info("Preprocessing completed successfully.")
    } catch {
      case e: Exception =>
        logger.error("An error occurred during Spark job execution.", e)
        throw e // Fail the job explicitly
    } finally {
      spark.stop()
    }
  }
}

2. Spring Framework: 高吞吐二进制数据流接口

这个 Spring Boot 服务是数据从云端到客户端的桥梁。它的性能至关重要。我们严禁在服务内部进行任何形式的数据解析或转换。它就是一个高效的管道。

application.yml 配置:

# Spring Boot configuration for GCP integration
spring:
  cloud:
    gcp:
      storage:
        project-id: "my-gcp-project"
        credentials:
          # In GKE, it's best to use Workload Identity
          # This is for local testing
          location: "file:/path/to/gcp-credentials.json"
  # Increase server limits for large file transfers
  servlet:
    multipart:
      max-file-size: 512MB
      max-request-size: 512MB

# Custom application properties
data:
  gcs:
    bucket-name: "precomputed-financial-data"
    base-path: "interactive-parquet"

DataStreamController.java:

package com.mycompany.api;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;

@RestController
@RequestMapping("/api/v1/data")
public class DataStreamController {

    private static final Logger logger = LoggerFactory.getLogger(DataStreamController.class);

    private final Storage gcsClient;
    private final String bucketName;
    private final String basePath;

    public DataStreamController(Storage gcsClient,
                                @Value("${data.gcs.bucket-name}") String bucketName,
                                @Value("${data.gcs.base-path}") String basePath) {
        this.gcsClient = gcsClient;
        this.bucketName = bucketName;
        this.basePath = basePath;
    }

    @GetMapping(path = "/{marketId}/{tradeDate}", produces = "application/octet-stream")
    public ResponseEntity<StreamingResponseBody> streamData(
            @PathVariable String marketId,
            @PathVariable String tradeDate) {

        // Construct the path based on Spark's partitioning scheme.
        // A single Parquet file is expected in the directory.
        String gcsPath = String.format("%s/market_id=%s/trade_date=%s/", basePath, marketId, tradeDate);
        logger.info("Attempting to stream data from GCS path: {}", gcsPath);

        // Find the actual blob (file) within the partition directory
        Blob dataBlob = findParquetFile(gcsPath);

        if (dataBlob == null) {
            logger.warn("No data found for market {} on date {}", marketId, tradeDate);
            return ResponseEntity.notFound().build();
        }

        StreamingResponseBody responseBody = outputStream -> {
            try (OutputStream os = outputStream) {
                dataBlob.downloadTo(os);
                logger.debug("Successfully streamed {} bytes.", dataBlob.getSize());
            } catch (IOException e) {
                // This exception might be thrown if the client closes the connection.
                logger.error("IO error during streaming data from GCS.", e);
                // Can't send an error response here as headers are already sent.
            }
        };

        HttpHeaders headers = new HttpHeaders();
        headers.setContentLength(dataBlob.getSize());
        // Advise the browser to treat this as a downloadable file, helps with memory management.
        headers.set(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"data.parquet\"");

        return new ResponseEntity<>(responseBody, headers, HttpStatus.OK);
    }

    private Blob findParquetFile(String directoryPath) {
        // GCS Java client logic to iterate through blobs in a directory
        // and find the first file ending with .parquet
        // ... implementation omitted for brevity ...
        // In production, this should handle cases with no files or multiple files.
        var page = gcsClient.list(bucketName, Storage.BlobListOption.prefix(directoryPath));
        for (Blob blob : page.iterateAll()) {
            if (blob.getName().endsWith(".parquet")) {
                return blob;
            }
        }
        return null;
    }
}

3. WebAssembly (Rust) & Frontend Integration

这是方案的核心。我们使用 Rust 和 wasm-pack 来创建一个高性能的计算模块。前端使用 TypeScript 和 React,并通过 CSS Modules 来保证组件样式的隔离性,这在构建复杂的数据可视化面板时至关重要。

Rust 计算引擎 (lib.rs):

// In Cargo.toml, we need dependencies like wasm-bindgen, serde, serde-wasm-bindgen,
// and crucially, a Parquet reader library for Rust like `parquet`.
use wasm_bindgen::prelude::*;
use serde::{Serialize, Deserialize};
use std::io::Cursor;
use parquet::file::reader::{SerializedFileReader, FileReader};
use parquet::record::RecordReader;

// This struct defines the output format for JavaScript.
#[wasm_bindgen]
#[derive(Serialize, Deserialize, Debug)]
pub struct CalculationResult {
    pub total_points: usize,
    pub filtered_points: usize,
    pub computed_value: f64,
    // In a real app, this might contain an array of values for charting.
}

// Our core data record structure, matching the Parquet file.
#[derive(Debug)]
struct TimeSeriesRecord {
    timestamp_ms: i64,
    price: f64,
    volume: f64,
    volatility_index: f64,
}

// The main function exposed to JavaScript.
// It takes raw Parquet file bytes and a filter parameter.
#[wasm_bindgen]
pub fn run_model(parquet_data: &[u8], volatility_threshold: f64) -> Result<JsValue, JsValue> {
    console_error_panic_hook::set_once(); // For better error messages in JS console

    let cursor = Cursor::new(parquet_data);
    let file_reader = SerializedFileReader::new(cursor)
        .map_err(|e| JsValue::from_str(&format!("Failed to read Parquet header: {}", e)))?;

    let mut total_points = 0;
    let mut filtered_points = 0;
    let mut weighted_price_sum = 0.0;
    let mut total_volume = 0.0;

    // Iterate over row groups in the Parquet file
    for row_group_reader in file_reader.get_row_iter(None).into_iter() {
        let row_group_reader = row_group_reader.unwrap();
        total_points += row_group_reader.num_rows();

        let record_reader = RecordReader::new(
            row_group_reader,
            None, // projection: None means read all columns
            None,
        );

        for record in record_reader {
             let record = record.unwrap(); // Simplified error handling
             // Here you'd map Parquet record fields to your struct. This is complex.
             // A more robust solution would use a macro or a library to do this mapping.
             let price = record.get_field(2).unwrap().as_double().unwrap();
             let volume = record.get_field(3).unwrap().as_double().unwrap();
             let volatility = record.get_field(4).unwrap().as_double().unwrap();

             if volatility > volatility_threshold {
                 filtered_points += 1;
                 weighted_price_sum += price * volume;
                 total_volume += volume;
             }
        }
    }
    
    let computed_value = if total_volume > 0.0 { weighted_price_sum / total_volume } else { 0.0 };

    let result = CalculationResult {
        total_points,
        filtered_points,
        computed_value,
    };
    
    // Serialize the Rust struct into a JavaScript object.
    serde_wasm_bindgen::to_value(&result).map_err(|e| e.into())
}

React Component (AnalysisPanel.tsx):

import React, { useState, useEffect, useCallback } from 'react';
// WASM package is built with `wasm-pack build --target web`
import init, { run_model, CalculationResult } from 'my-wasm-analysis-engine';
import styles from './AnalysisPanel.module.css'; // Using CSS Modules

interface PanelProps {
  marketId: string;
  tradeDate: string;
}

const AnalysisPanel: React.FC<PanelProps> = ({ marketId, tradeDate }) => {
  const [isReady, setIsReady] = useState(false);
  const [dataBuffer, setDataBuffer] = useState<ArrayBuffer | null>(null);
  const [isLoading, setIsLoading] = useState(true);
  const [error, setError] = useState<string | null>(null);
  const [volatility, setVolatility] = useState(0.5);
  const [result, setResult] = useState<CalculationResult | null>(null);

  // 1. Initialize WASM and fetch data
  useEffect(() => {
    const load = async () => {
      try {
        setIsLoading(true);
        setError(null);
        
        // Initialize WASM module
        await init(); 
        setIsReady(true);
        
        const response = await fetch(`/api/v1/data/${marketId}/${tradeDate}`);
        if (!response.ok) {
          throw new Error(`Failed to fetch data: ${response.statusText}`);
        }
        const buffer = await response.arrayBuffer();
        setDataBuffer(buffer);
      } catch (e: any) {
        console.error("Initialization failed:", e);
        setError(e.message);
      } finally {
        setIsLoading(false);
      }
    };
    load();
  }, [marketId, tradeDate]);

  // 2. Run calculation when parameters change
  const runCalculation = useCallback(() => {
    if (!isReady || !dataBuffer) return;

    try {
      console.time('WASM Calculation');
      // This is the magic: a direct, synchronous call to high-speed code.
      const res = run_model(new Uint8Array(dataBuffer), volatility);
      setResult(res as CalculationResult);
      console.timeEnd('WASM Calculation');
    } catch (e) {
      console.error("WASM execution error:", e);
      setError("Calculation failed. Check console for details.");
    }
  }, [isReady, dataBuffer, volatility]);

  // Re-run calculation whenever volatility changes
  useEffect(() => {
    runCalculation();
  }, [runCalculation]);

  if (isLoading) {
    // The CSS class is scoped to this component, preventing style leaks.
    return <div className={styles.loadingSpinner}>Loading data and engine...</div>;
  }

  if (error) {
    return <div className={styles.errorBox}>Error: {error}</div>;
  }

  return (
    <div className={styles.panelContainer}>
      <div className={styles.controls}>
        <label>Volatility Threshold: {volatility.toFixed(2)}</label>
        <input
          type="range"
          min="0"
          max="1"
          step="0.01"
          value={volatility}
          onChange={(e) => setVolatility(parseFloat(e.target.value))}
        />
      </div>
      <div className={styles.results}>
        {result && (
          <>
            <p>Total Data Points: {result.total_points.toLocaleString()}</p>
            <p>Filtered Points: {result.filtered_points.toLocaleString()}</p>
            <h3>Computed Value: {result.computed_value.toFixed(4)}</h3>
          </>
        )}
      </div>
    </div>
  );
};

export default AnalysisPanel;

CSS Module (AnalysisPanel.module.css):

/* These class names are locally scoped to the AnalysisPanel component */
.panelContainer {
  font-family: sans-serif;
  border: 1px solid #ccc;
  border-radius: 8px;
  padding: 20px;
  max-width: 600px;
  margin: 20px auto;
  box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}

.controls {
  margin-bottom: 20px;
}

.controls label {
  display: block;
  margin-bottom: 10px;
}

.controls input[type="range"] {
  width: 100%;
}

.results {
  background-color: #f9f9f9;
  padding: 15px;
  border-radius: 4px;
}

.errorBox {
  color: #d8000c;
  background-color: #ffbaba;
  padding: 10px;
  border-radius: 4px;
}

架构的扩展性与局限性

此架构并非万能药。它的适用边界非常清晰。

扩展性:

  • 新算法: 我们可以向 Rust 库中添加更多的 #[wasm_bindgen] 函数,编译成新的 WASM 模块,让前端可以按需加载和执行不同的模型,而无需更改后端。
  • 数据源: Spark 作业可以接入更多的数据源。只要最终产出的 Parquet schema 保持一致或向前兼容,前端和计算模块就无需变动。

局限性:

  • 数据大小上限: 该模式的“阿喀琉斯之踵”是客户端内存。浏览器对单个 Tab 的内存使用有限制(通常在2-4GB左右)。这意味着我们能一次性加载的数据量有物理上限。对于需要对TB级数据进行交互式分析的场景,此架构不适用,必须回归到服务端计算模型,例如使用 ClickHouse 或 Apache Doris。
  • 初始加载性能: 虽然后续交互快,但首次加载的体验是需要持续优化的。可能需要采用更激进的数据压缩、分块加载、或者在用户登录时预加载等策略。
  • 安全性: 核心计算逻辑分发到了客户端。对于金融、风控等领域的顶级机密算法,这可能是不可接受的。在这种情况下,必须牺牲部分交互性,将最敏感的部分保留在服务端执行。
  • 开发复杂性: 这套技术栈要求团队同时具备后端大数据(Spark/Scala)、云原生运维(GKE/K8s)、后端API(Java/Spring)、底层语言(Rust)和现代前端(TS/React)的综合能力,对团队技能要求较高。

  目录