基于 ZooKeeper 实现对分布式 OpenCV 服务的状态感知路由网关


我们面临一个棘手的场景:一个用于实时视频流分析的微服务集群。每个服务实例都内嵌了OpenCV,负责从接收到的视频流中加载特定的机器学习模型,然后进行逐帧分析。这里的核心痛点在于,模型加载是一个极其昂贵的操作,可能需要数十秒并消耗大量内存。一旦模型加载完成,后续的帧处理则非常迅速。如果前端的负载均衡器,比如一个标准的Nginx,采用轮询或随机策略,那么同一个视频流的连续请求很可能会被分发到不同的服务实例上。这会造成灾难性的后果:每个实例都可能尝试去加载同一个模型,导致巨大的资源浪费和无法接受的延迟,并且处理的上下文完全丢失。

我们需要一个更智能的入口,一个能够理解“哪个工作节点正在处理哪个视频流”的网关。它必须确保一旦某个视频流 stream-007 被节点 worker-A 接管,之后所有关于 stream-007 的请求都必须精确地路由到 worker-A。这不仅仅是简单的会话保持(session affinity),因为工作节点是动态的、可能随时宕机,任务分配也必须是动态的。

初步构想是构建一个自定义的、具备状态感知能力的路由网关。这个网关的核心职责是维护一个动态的映射表:{stream_id -> worker_instance_address}。为了实现这个动态映射,并保证在分布式环境下的数据一致性和高可用性,ZooKeeper 成为了我们的技术选型。它提供的服务注册与发现、分布式协调能力,正是解决这个问题的关键。同时,为了快速迭代和部署,整个服务体系(包括网关和OpenCV工作节点)都将使用Jib进行容器化,彻底摆脱编写和维护Dockerfile的繁琐。

架构设计与ZNode规划

整个系统的核心是ZooKeeper中的数据结构。清晰、合理的ZNode设计是成功的关键。我们规划了两个主要的根路径:

  1. /services: 用于服务注册。所有存活的OpenCV工作节点都会在这里注册自己。这是一个持久节点。每个工作节点启动时,会在/services下创建一个临时的、有序的节点,例如 /services/worker-0000000001。节点的数据(data)将包含该实例的网络地址,如 {"host": "10.1.1.2", "port": 8080}。使用临时节点的好处是,一旦某个节点宕机或与ZooKeeper失联,该节点会自动消失,网关可以立刻感知到服务下线。

  2. /tasks: 用于任务分配和状态跟踪。当一个工作节点决定处理某个视频流时,它会尝试在/tasks下创建一个与流ID对应的临时节点。例如,当worker-A开始处理stream-007时,它会创建/tasks/stream-007,节点数据为该工作节点的ID或地址。这个创建操作必须是原子的。网关通过监听/tasks下的子节点变化,来实时更新它的路由表。

整个工作流程如下:

sequenceDiagram
    participant Client
    participant StateAwareGateway as 网关
    participant ZooKeeper
    participant OpenCVWorker as 工作节点

    OpenCVWorker->>ZooKeeper: 启动时注册(创建临时节点 /services/worker-*)
    Client->>StateAwareGateway: 发起请求 process(stream-007)
    StateAwareGateway->>ZooKeeper: 查询 /tasks/stream-007 是否存在?
    alt 任务已分配
        ZooKeeper-->>StateAwareGateway: 返回节点数据 (worker-A 地址)
        StateAwareGateway->>OpenCVWorker: 代理请求至 worker-A
        OpenCVWorker-->>StateAwareGateway: 返回处理结果
        StateAwareGateway-->>Client: 返回处理结果
    else 任务未分配
        ZooKeeper-->>StateAwareGateway: 节点不存在
        StateAwareGateway->>ZooKeeper: 从 /services 获取可用节点列表
        ZooKeeper-->>StateAwareGateway: 返回 [worker-A, worker-B]
        StateAwareGateway->>StateAwareGateway: 选择一个节点 (e.g., worker-A)
        StateAwareGateway->>OpenCVWorker: 代理请求至 worker-A (携带指令: "请接管此任务")
        OpenCVWorker->>ZooKeeper: 尝试创建临时节点 /tasks/stream-007
        alt 创建成功
            OpenCVWorker->>OpenCVWorker: 加载模型并处理请求
            OpenCVWorker-->>StateAwareGateway: 返回处理结果
            StateAwareGateway-->>Client: 返回处理结果
        else 创建失败 (已被其他节点抢占)
            OpenCVWorker-->>StateAwareGateway: 返回错误 (任务已被抢占)
            StateAwareGateway->>StateAwareGateway: 重新执行路由逻辑 (重试或查询新所有者)
        end
    end

OpenCV工作节点的实现

工作节点是一个Spring Boot应用,它集成了OpenCV(通过JavaCV)和ZooKeeper的客户端(Apache Curator)。

1. 依赖配置 (pom.xml)

<dependencies>
    <!-- Spring Boot -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- OpenCV Java Bindings -->
    <dependency>
        <groupId>org.bytedeco</groupId>
        <artifactId>javacv-platform</artifactId>
        <version>1.5.8</version>
    </dependency>

    <!-- ZooKeeper Curator Framework -->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>5.4.0</version>
    </dependency>
    
    <!-- Jackson for JSON serialization -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

<build>
    <plugins>
        <!-- Jib for containerization -->
        <plugin>
            <groupId>com.google.cloud.tools</groupId>
            <artifactId>jib-maven-plugin</artifactId>
            <version>3.3.1</version>
            <configuration>
                <from>
                    <image>eclipse-temurin:17-jre-focal</image>
                </from>
                <to>
                    <image>my-registry/opencv-worker:${project.version}</image>
                </to>
                <container>
                    <mainClass>com.example.worker.WorkerApplication</mainClass>
                    <ports>
                        <port>8080</port>
                    </ports>
                </container>
            </configuration>
        </plugin>
    </plugins>
</build>

Jib的配置简洁明了。它直接从Maven构建过程中生成镜像,无需Docker守护进程,并且能智能地分层,将不常变的依赖项和经常变动的应用代码分开,极大地提升了构建效率。

2. ZooKeeper注册与任务声明

我们创建一个服务来管理与ZooKeeper的交互。

package com.example.worker.zookeeper;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.Map;

@Service
public class ZkCoordinator {

    private static final Logger logger = LoggerFactory.getLogger(ZkCoordinator.class);
    private static final String SERVICES_PATH = "/services";
    private static final String TASKS_PATH = "/tasks";

    @Value("${zookeeper.connection.string}")
    private String zkConnectionString;

    @Value("${server.port}")
    private int serverPort;

    @Value("${server.address:localhost}") // Assume localhost if not set
    private String serverAddress;

    private CuratorFramework client;
    private String serviceNodePath;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @PostConstruct
    public void start() throws Exception {
        client = CuratorFrameworkFactory.newClient(zkConnectionString, new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.blockUntilConnected();

        // Ensure base paths exist
        ensurePath(SERVICES_PATH);
        ensurePath(TASKS_PATH);

        registerService();
    }

    private void ensurePath(String path) throws Exception {
        if (client.checkExists().forPath(path) == null) {
            try {
                client.create().creatingParentsIfNeeded().forPath(path);
            } catch (Exception e) {
                // Ignore if path already exists due to race condition
                if (client.checkExists().forPath(path) == null) {
                    throw e;
                }
            }
        }
    }

    private void registerService() throws Exception {
        Map<String, Object> servicePayload = Map.of(
            "host", serverAddress,
            "port", serverPort,
            "timestamp", System.currentTimeMillis()
        );
        byte[] payloadBytes = objectMapper.writeValueAsBytes(servicePayload);

        // Create an ephemeral, sequential node for this worker instance
        this.serviceNodePath = client.create()
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
            .forPath(SERVICES_PATH + "/worker-", payloadBytes);
        
        logger.info("Service registered at Zookeeper: {}", serviceNodePath);
    }

    public boolean claimTask(String streamId) {
        String taskPath = TASKS_PATH + "/" + streamId;
        try {
            // Data stored is the path to the service node that owns the task
            byte[] payload = this.serviceNodePath.getBytes(StandardCharsets.UTF_8);
            client.create()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(taskPath, payload);
            logger.info("Successfully claimed task for stream '{}' at path {}", streamId, taskPath);
            return true;
        } catch (Exception e) {
            // NodeAlreadyExistsException means another worker claimed it first
            logger.warn("Failed to claim task for stream '{}'. It might be already taken. Error: {}", streamId, e.getMessage());
            return false;
        }
    }

    @PreDestroy
    public void stop() {
        if (client != null) {
            logger.info("Closing Zookeeper client and de-registering service...");
            client.close(); // Closing the client will automatically remove the ephemeral node
        }
    }
}

这里的关键点:

  • 使用ExponentialBackoffRetry策略,这是生产环境中必须的,保证了连接的健壮性。
  • registerService创建的是EPHEMERAL_SEQUENTIAL节点,宕机自动注销,序列号则避免了命名冲突。
  • claimTask尝试创建EPHEMERAL节点。这是一个原子操作,利用了ZooKeeper的特性,第一个成功创建的节点即为任务的持有者。如果节点已存在,会抛出异常,我们捕获它并返回false,表示任务抢占失败。

状态感知网关的实现

网关同样是一个基于Spring Boot的应用,我们使用Spring Cloud Gateway来实现代理功能,因为它提供了灵活的路由和过滤器定制能力。

1. 核心路由逻辑

网关的核心是RouteLocator,但在这里,我们需要一个动态的路由决策者。我们将通过自定义一个GatewayFilter来实现。

package com.example.gateway.filter;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class StatefulRoutingFilter implements GlobalFilter, Ordered {

    private static final Logger logger = LoggerFactory.getLogger(StatefulRoutingFilter.class);
    private static final String TASKS_PATH = "/tasks";
    private static final String SERVICES_PATH = "/services";
    private final Random random = new Random();

    @Autowired
    private CuratorFramework zkClient;

    private final ObjectMapper objectMapper = new ObjectMapper();

    // Local cache: { streamId -> worker URI }
    private final ConcurrentHashMap<String, URI> taskRoutingCache = new ConcurrentHashMap<>();

    // Watch for changes in task assignments in ZK
    @Autowired
    public void initTaskWatcher(CuratorFramework zkClient) throws Exception {
        PathChildrenCache watcher = new PathChildrenCache(zkClient, TASKS_PATH, true);
        watcher.getListenable().addListener((client, event) -> {
            if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
                String streamId = ZkPathUtil.getNodeName(event.getData().getPath());
                URI removedUri = taskRoutingCache.remove(streamId);
                if (removedUri != null) {
                    logger.info("Task mapping removed from cache due to ZK event: {} -> {}", streamId, removedUri);
                }
            }
        });
        watcher.start(PathChildrenCache.StartMode.NORMAL);
    }
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        String path = exchange.getRequest().getURI().getPath();
        // Assuming path format is /process/{streamId}
        String[] parts = path.split("/");
        if (parts.length < 3 || !parts[1].equals("process")) {
            return chain.filter(exchange); // Not a target request, pass through
        }
        String streamId = parts[2];

        return resolveWorkerUri(streamId)
            .flatMap(uri -> {
                // Modify the request to route to the resolved worker
                ServerWebExchange modifiedExchange = exchange.mutate()
                    .request(r -> r.uri(uri))
                    .build();
                
                // Store the final URI in the exchange attributes for logging/observability
                modifiedExchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, uri);
                
                logger.debug("Routing stream '{}' to worker {}", streamId, uri);
                return chain.filter(modifiedExchange);
            })
            .onErrorResume(e -> {
                logger.error("Failed to resolve worker for stream '{}': {}", streamId, e.getMessage());
                exchange.getResponse().setStatusCode(HttpStatus.SERVICE_UNAVAILABLE);
                return exchange.getResponse().setComplete();
            });
    }

    private Mono<URI> resolveWorkerUri(String streamId) {
        // 1. Check local cache first
        if (taskRoutingCache.containsKey(streamId)) {
            return Mono.just(taskRoutingCache.get(streamId));
        }

        // 2. If not in cache, query Zookeeper
        return Mono.fromCallable(() -> {
            String taskPath = TASKS_PATH + "/" + streamId;
            if (zkClient.checkExists().forPath(taskPath) != null) {
                // Task is already assigned
                return getUriFromTaskNode(taskPath);
            } else {
                // Task is new, need to assign it to a worker
                return assignTaskToNewWorker();
            }
        }).flatMap(uri -> {
            taskRoutingCache.put(streamId, uri);
            return Mono.just(uri);
        });
    }

    private URI getUriFromTaskNode(String taskPath) throws Exception {
        byte[] ownerServiceNodePathBytes = zkClient.getData().forPath(taskPath);
        String ownerServiceNodePath = new String(ownerServiceNodePathBytes, StandardCharsets.UTF_8);

        // A potential race condition exists here: the service node might disappear
        // between getting its path and getting its data. Real-world code needs retries.
        if (zkClient.checkExists().forPath(ownerServiceNodePath) == null) {
            throw new IllegalStateException("Task owner service node " + ownerServiceNodePath + " disappeared.");
        }
        
        byte[] servicePayloadBytes = zkClient.getData().forPath(ownerServiceNodePath);
        return constructUriFromPayload(servicePayloadBytes);
    }

    private URI assignTaskToNewWorker() throws Exception {
        List<String> availableWorkers = zkClient.getChildren().forPath(SERVICES_PATH);
        if (availableWorkers.isEmpty()) {
            throw new IllegalStateException("No available OpenCV workers found in Zookeeper.");
        }
        // Simple random selection for load balancing new tasks
        String chosenWorkerNode = availableWorkers.get(random.nextInt(availableWorkers.size()));
        String workerPath = SERVICES_PATH + "/" + chosenWorkerNode;
        
        byte[] servicePayloadBytes = zkClient.getData().forPath(workerPath);
        return constructUriFromPayload(servicePayloadBytes);
    }

    private URI constructUriFromPayload(byte[] payloadBytes) throws java.io.IOException {
        Map<String, Object> serviceInfo = objectMapper.readValue(payloadBytes, new TypeReference<>() {});
        String host = (String) serviceInfo.get("host");
        int port = (int) serviceInfo.get("port");
        return URI.create(String.format("http://%s:%d", host, port));
    }


    @Override
    public int getOrder() {
        return -1; // Execute this filter with high precedence
    }
}

这段代码是网关的大脑:

  • 本地缓存 taskRoutingCache: 这是性能的关键。一旦路由关系确定,后续请求直接从内存中获取目标地址,避免了每次都查询ZooKeeper。
  • PathChildrenCache: 我们用它来监听/tasks路径下的子节点变化。当一个任务完成或一个工作节点宕机,其在/tasks下的临时节点会被删除。CHILD_REMOVED事件会触发,我们从本地缓存中移除对应的条目。这保证了缓存的最终一致性。
  • resolveWorkerUri 方法: 实现了完整的路由决策逻辑。先查缓存,缓存未命中则查ZooKeeper。如果任务已分配,就解析出所有者地址;如果任务未分配,就从/services中选择一个可用的工作节点来分配。
  • 错误处理: 真实项目中,与ZooKeeper的交互充满了各种可能的异常,比如连接超时、节点不存在等。这里的代码给出了基本的异常处理框架,但在生产环境中需要更加细致的重试和降级策略。例如,当发现任务所有者节点消失时,应该立即清除缓存并触发重新分配流程。

方案的局限性与未来展望

这个基于ZooKeeper的方案优雅地解决了状态感知路由的问题,但在复杂场景下仍有其边界。

首先,网关成为了一个有状态的决策点。虽然它本身不存储持久化状态,但它对ZooKeeper有很强的依赖。如果ZooKeeper集群出现性能问题,会直接影响网关的路由决策延迟,特别是对于新任务的分配。本地缓存缓解了这个问题,但无法消除对首次查询的依赖。

其次,任务分配策略目前非常简单(随机选择)。在真实项目中,工作节点的负载(CPU、内存使用率)是异构的,一个更优的分配器应该考虑这些指标。可以由工作节点定期将自己的负载信息写入其在/services下的ZNode数据中,网关在分配新任务时读取这些数据,选择负载最低的节点。

再者,当一个工作节点宕机后,其正在处理的视频流任务的上下文(已加载的模型、中间处理状态)会完全丢失。当前架构下,任务会被重新分配给一个新的节点,该节点必须重新执行昂贵的模型加载操作。一个更具韧性的设计可能会引入分布式缓存(如Redis或Ignite)来存储可恢复的会z话状态,或者设计一种任务交接(handover)机制。

最后,随着系统规模的扩大,对ZooKeeper的读写压力会成为瓶颈。虽然其读性能很好,但写操作需要通过共识协议,性能有限。对于超大规模的系统,可以考虑使用更轻量级的协调服务,或者将任务状态的管理下沉到专门的状态管理服务中,网关只负责查询。

尽管存在这些局限,该架构作为一个起点,清晰地展示了如何创造性地组合网关、代理和分布式协调服务,来解决特定工作负载下的复杂路由挑战,为构建健壮、高效的分布式计算系统提供了坚实的工程实践。


  目录