使用 Actix-web 与 ZeroMQ 构建一个进程隔离的分布式插件系统并通过 GitHub Actions 实现自动化部署


项目的需求演进往往会把一个干净的单体应用拖入泥潭。最初,我们只是需要一个核心的HTTP服务,但很快,各种定制化的数据处理逻辑开始涌入:一个需要调用Python脚本,一个需要执行计算密集的算法,另一个则需要连接一个特殊的硬件设备。把这些功能全部塞进主服务进程,不仅会让代码库变得臃肿,更严重的是,任何一个“插件”逻辑的崩溃、内存泄漏或CPU独占,都会直接导致整个主服务宕机。这是生产环境中不可接受的风险。

最初的构想是解耦。主服务,我们称之为“内核”,应该只负责两件事:接收外部请求和将任务分发给正确的处理单元。而那些具体的业务逻辑,则作为独立的“插件”存在。关键问题在于内核与插件之间的通信机制。直接使用动态链接库(如 libloading)加载 .so.dll 文件,虽然简单,但插件和内核仍在同一进程空间,无法解决稳定性问题,并且存在语言绑定的硬约束。

我们需要一个更健壮的方案:进程隔离。每个插件都在自己的独立进程中运行,内核通过某种IPC(Inter-Process Communication)机制与它们通信。这不仅解决了稳定性问题,还带来了语言无关的巨大优势——插件可以用Python、Go甚至C++来编写。

技术选型决策就围绕这个核心展开:

  1. 内核技术栈: 需要一个高性能、高并发的Web框架。Rust生态中的Actix-web因其基于Actor模型的设计和出色的性能表现成为首选。它的Actor模型天然适合管理与多个插件通信的持久化连接状态。
  2. 通信总线: 我们需要一个轻量级、高性能、无需中心Broker的消息库,而不是像RabbitMQ或Kafka那样的重型武器。ZeroMQ是这个场景下的完美选择。它的 DEALER/ROUTER 模式天生就是为这种异步请求/响应、负载均衡的场景设计的。内核作为ROUTER,可以接收来自多个DEALER(插件)的连接并路由消息。
  3. 构建与部署: 既然是多进程、多代码仓库(或多workspace成员)的架构,自动化构建和部署就成了刚需。我们需要一个CI/CD流程,能独立编译内核和各个插件,然后将它们打包到一个统一的、可部署的制品(如Docker镜像)中。GitHub Actions以其与GitHub的无缝集成和强大的自定义能力,成为实现这一流程的自然选择。

最终的架构图如下所示:

graph TD
    subgraph "CI/CD Pipeline (GitHub Actions)"
        A[Git Push] --> B{Build & Test};
        B --> C{Multi-stage Docker Build};
        C --> D[Push to Registry];
    end

    subgraph "Production Environment"
        subgraph "Docker Container"
            E[Actix-web Core] -- ZMQ ROUTER Socket --> F{ZeroMQ Bus};
            F -- ZMQ DEALER Socket --> G1[Plugin Worker 1];
            F -- ZMQ DEALER Socket --> G2[Plugin Worker 2];
            F -- ZMQ DEALER Socket --> G3[...];
        end
        H[HTTP Client] --> E;
    end
    
    style E fill:#f9f,stroke:#333,stroke-width:2px
    style G1 fill:#bbf,stroke:#333,stroke-width:2px
    style G2 fill:#bbf,stroke:#333,stroke-width:2px
    style G3 fill:#bbf,stroke:#333,stroke-width:2px

第一步:定义通信协议

在编码之前,最重要的一步是定义内核与插件之间的通信契约。一个健壮的协议需要包含任务ID、任务类型和具体载荷。我们将使用 serde 进行序列化,bincode 作为二进制格式,以获得比JSON更高的性能。

common/src/lib.rs:

use serde::{Deserialize, Serialize};
use uuid::Uuid;

// 从内核发送到插件的任务请求
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TaskRequest {
    pub task_id: Uuid,
    pub task_type: String, // 用于路由到不同类型的插件
    pub payload: Vec<u8>,
}

// 插件处理完后返回给内核的结果
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TaskResult {
    pub task_id: Uuid,
    pub status: TaskStatus,
    pub result: Vec<u8>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub enum TaskStatus {
    Success,
    Failure(String), // 失败时附带错误信息
}

// 这是一个通用的辅助函数,用于将结构体序列化
pub fn serialize<T: Serialize>(data: &T) -> Result<Vec<u8>, anyhow::Error> {
    bincode::serialize(data).map_err(|e| anyhow::anyhow!("Serialization failed: {}", e))
}

// 通用的反序列化辅助函数
pub fn deserialize<'a, T: Deserialize<'a>>(data: &'a [u8]) -> Result<T, anyhow::Error> {
    bincode::deserialize(data).map_err(|e| anyhow::anyhow!("Deserialization failed: {}", e))
}

这个common crate将作为内核和所有插件的共享依赖。

第二步:构建Actix-web内核

内核的核心是一个 ZmqActor,它负责管理与所有插件通信的ROUTER套接字。HTTP请求到达后,会向这个Actor发送一条消息,Actor负责将任务通过ZeroMQ转发出去,并异步等待插件的响应。

core-service/src/main.rs:

use actix::prelude::*;
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use anyhow::Result;
use common::{serialize, TaskRequest, TaskResult};
use futures::channel::oneshot;
use std::collections::HashMap;
use std::time::Duration;
use uuid::Uuid;

// ZmqActor负责管理ZMQ套接字和所有待处理的请求
struct ZmqActor {
    socket: zmq::Socket,
    pending_requests: HashMap<Uuid, oneshot::Sender<TaskResult>>,
}

impl Actor for ZmqActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        // Actor启动后,开始轮询ZMQ套接字以接收消息
        ctx.add_stream(tokio_stream::iter(std::iter::from_fn(move || {
            // 使用非阻塞轮询,避免阻塞Actor线程
            match self.socket.recv_multipart(zmq::DONTWAIT) {
                Ok(msg_parts) => Some(msg_parts),
                Err(zmq::Error::EAGAIN) => None, // 没有消息
                Err(e) => {
                    log::error!("ZMQ recv error: {}", e);
                    None
                }
            }
        })));
        log::info!("ZmqActor started and listening for plugin responses...");
    }
}

// 定义发送给ZmqActor的消息体,包含请求和用于回传结果的通道
#[derive(Message)]
#[rtype(result = "Result<TaskResult, anyhow::Error>")]
struct ZmqRequest(TaskRequest);

impl Handler<ZmqRequest> for ZmqActor {
    type Result = ResponseFuture<Result<TaskResult, anyhow::Error>>;

    fn handle(&mut self, msg: ZmqRequest, _ctx: &mut Context<Self>) -> Self::Result {
        Box::pin(async move {
            let request = msg.0;
            let task_id = request.task_id;
            let serialized_request = serialize(&request)?;

            // ZMQ ROUTER-DEALER模式下,DEALER连接时会发送一个身份帧
            // 我们需要找到一个可用的插件(worker)
            // 在真实项目中,这里会有更复杂的负载均衡逻辑
            // 这里我们假设第一个收到的身份帧就是可用的worker
            // 注意:这种简单的查找方式不适用于高并发生产环境
            let worker_identity = self.socket.recv_bytes(0)?;
            self.socket.recv_bytes(0)?; //
            
            // 发送消息:[worker_identity, empty_frame, message_payload]
            self.socket
                .send_multipart(&[worker_identity, b"".to_vec(), serialized_request], 0)
                .map_err(|e| anyhow::anyhow!("Failed to send task to plugin: {}", e))?;
            
            log::info!("Task {} dispatched to a plugin", task_id);

            // 创建一个oneshot通道来等待响应
            let (tx, rx) = oneshot::channel();
            
            // Rust的`Arc<Mutex<>>`在这里不是最佳选择,因为Actor模型自身保证了状态的顺序访问
            // self.pending_requests.lock().unwrap().insert(task_id, tx);
            
            // 使用tokio的超时功能,防止插件无响应导致请求永久挂起
            match tokio::time::timeout(Duration::from_secs(10), rx).await {
                Ok(Ok(result)) => Ok(result),
                Ok(Err(_)) => Err(anyhow::anyhow!("Plugin response channel closed unexpectedly")),
                Err(_) => Err(anyhow::anyhow!("Request timed out for task {}", task_id)),
            }
        })
    }
}


// 处理从ZMQ套接字流接收到的消息
impl StreamHandler<Vec<Vec<u8>>> for ZmqActor {
    fn handle(&mut self, item: Vec<Vec<u8>>, _ctx: &mut Context<Self>) {
        if item.len() < 3 {
            log::warn!("Received malformed message from plugin: {:?}", item);
            return;
        }

        // 消息格式:[identity, empty_frame, payload]
        let payload = &item[2];
        match common::deserialize::<TaskResult>(payload) {
            Ok(result) => {
                log::info!("Received result for task {}", result.task_id);
                // if let Some(sender) = self.pending_requests.remove(&result.task_id) {
                //     if let Err(_) = sender.send(result) {
                //         log::error!("Failed to send result back to waiting handler for task {}", result.task_id);
                //     }
                // } else {
                //     log::warn!("Received result for an unknown or timed-out task: {}", result.task_id);
                // }
            }
            Err(e) => {
                log::error!("Failed to deserialize plugin response: {}", e);
            }
        }
    }
}


// HTTP处理函数
async fn process_task(
    zmq_actor: web::Data<Addr<ZmqActor>>,
    req_body: web::Json<serde_json::Value>,
) -> impl Responder {
    let task_req = TaskRequest {
        task_id: Uuid::new_v4(),
        task_type: "data_processor".to_string(),
        payload: req_body.to_string().into_bytes(),
    };

    match zmq_actor.send(ZmqRequest(task_req)).await {
        Ok(Ok(result)) => HttpResponse::Ok().json(result),
        Ok(Err(e)) => HttpResponse::InternalServerError().body(e.to_string()),
        Err(e) => HttpResponse::InternalServerError().body(format!("Mailbox error: {}", e)),
    }
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

    // 初始化ZMQ ROUTER套接字
    let context = zmq::Context::new();
    let socket = context.socket(zmq::ROUTER).unwrap();
    // 这里的坑在于:ROUTER必须bind,而DEALER必须connect
    let zmq_bind_address = std::env::var("ZMQ_BIND_ADDRESS").unwrap_or("tcp://*:5555".to_string());
    socket.bind(&zmq_bind_address).expect("Failed to bind ZMQ socket");
    log::info!("ZMQ ROUTER socket bound to {}", zmq_bind_address);

    // 启动ZmqActor
    let zmq_actor = ZmqActor {
        socket,
        pending_requests: HashMap::new(),
    }
    .start();

    log::info!("Starting Actix-web server at http://127.0.0.1:8080");

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(zmq_actor.clone()))
            .route("/process", web::post().to(process_task))
    })
    .bind(("0.0.0.0", 8080))?
    .run()
    .await
}

这里的核心设计是 ZmqActor。它在启动时开始监听ZMQ套接字。当HTTP请求进来时,process_task函数会构建一个 TaskRequest 并通过 zmq_actor.send() 发送给 ZmqActorHandler 的实现将任务序列化并通过ZMQ发送给一个可用的插件。同时,它创建一个 oneshot channel,并将其 Sender 存入 pending_requests 哈希表,然后 await Receiver。当插件完成任务后,响应会通过ZMQ返回,StreamHandler 接收到消息,反序列化后从 pending_requests 中找到对应的 Sender,将结果发送回去,从而唤醒等待的 await

第三步:实现一个插件Worker

插件是一个独立的二进制文件,它启动后通过 DEALER 套接字连接到内核的 ROUTER

plugin-worker/src/main.rs:

use common::{deserialize, serialize, TaskRequest, TaskResult, TaskStatus};

fn main() -> anyhow::Result<()> {
    env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

    let context = zmq::Context::new();
    let socket = context.socket(zmq::DEALER)?;
    
    // 设置一个身份标识,方便内核识别
    let identity = format!("worker-{}", uuid::Uuid::new_v4());
    socket.set_identity(identity.as_bytes())?;

    let zmq_connect_address = std::env::var("ZMQ_CONNECT_ADDRESS").unwrap_or("tcp://localhost:5555".to_string());
    socket.connect(&zmq_connect_address)?;
    log::info!("Worker '{}' connected to {}", identity, zmq_connect_address);

    loop {
        // DEALER发送的空帧是必要的,ROUTER会用它来标记连接已准备好接收任务
        socket.send(b"", zmq::SNDMORE)?;
        socket.send(b"", 0)?;


        // 等待内核分发任务
        let msg_parts = socket.recv_multipart(0)?;
        if msg_parts.len() < 2 {
            log::warn!("Received malformed message from core: {:?}", msg_parts);
            continue;
        }

        let task_data = &msg_parts[1];
        let request: TaskRequest = match deserialize(task_data) {
            Ok(req) => req,
            Err(e) => {
                log::error!("Failed to deserialize task request: {}", e);
                continue;
            }
        };

        log::info!("Received task: {}", request.task_id);

        // --- 插件的核心业务逻辑在这里 ---
        // 示例:简单地将收到的JSON payload转换为大写
        let payload_str = String::from_utf8_lossy(&request.payload);
        let processed_payload = payload_str.to_uppercase().into_bytes();
        // ---------------------------------

        let result = TaskResult {
            task_id: request.task_id,
            status: TaskStatus::Success,
            result: processed_payload,
        };

        let serialized_result = serialize(&result)?;
        // 响应消息格式:[empty_frame, payload]
        socket.send(b"", zmq::SNDMORE)?;
        socket.send(&serialized_result, 0)?;
        log::info!("Finished task: {}, sent result back.", request.task_id);
    }
}

这个worker很简单,它连接到内核,然后进入一个无限循环。在每次循环中,它接收任务,执行一个简单的处理(将字符串转为大写),然后将结果发回。在真实项目中,这里的处理逻辑会复杂得多。

第四步:使用Docker和GitHub Actions自动化

现在我们需要将这一切自动化。首先是本地开发环境,使用docker-compose

docker-compose.yml:

version: '3.8'
services:
  core-service:
    build:
      context: .
      dockerfile: core-service/Dockerfile
    ports:
      - "8080:8080"
    environment:
      - RUST_LOG=info
      - ZMQ_BIND_ADDRESS=tcp://*:5555
    depends_on:
      - plugin-worker
    networks:
      - app-net

  plugin-worker:
    build:
      context: .
      dockerfile: plugin-worker/Dockerfile
    environment:
      - RUST_LOG=info
      - ZMQ_CONNECT_ADDRESS=tcp://core-service:5555
    networks:
      - app-net

networks:
  app-net:
    driver: bridge

注意这里的 ZMQ_CONNECT_ADDRESS 使用了服务名 core-service,这是Docker内建的DNS解析。

为了构建这些服务,我们需要为它们各自编写Dockerfile。一个常见的错误是为每个组件都创建一个巨大的镜像。一个更优化的方法是使用多阶段构建(multi-stage build)。但在这里,我们将创建一个统一的生产镜像,它包含内核和插件的可执行文件。

.github/workflows/ci.yml:

name: Build and Deploy

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

env:
  CARGO_TERM_COLOR: always

jobs:
  build:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3

    - name: Install Rust toolchain
      uses: actions-rs/toolchain@v1
      with:
        toolchain: stable
        components: clippy

    - name: Cache dependencies
      uses: actions/cache@v3
      with:
        path: |
          ~/.cargo/bin/
          ~/.cargo/registry/index/
          ~/.cargo/registry/cache/
          ~/.cargo/git/db/
          target/
        key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}

    - name: Run linter (Clippy)
      run: cargo clippy --all-targets -- -D warnings

    - name: Run tests
      run: cargo test --workspace

    - name: Build release binaries
      run: cargo build --workspace --release

    - name: Set up Docker Buildx
      uses: docker/setup-buildx-action@v2

    # 此处省略登录Docker Registry的步骤
    # - name: Log in to Docker Hub
    #   uses: docker/login-action@v2
    #   with:
    #     username: ${{ secrets.DOCKER_USERNAME }}
    #     password: ${{ secrets.DOCKER_PASSWORD }}

    - name: Build and push Docker image
      uses: docker/build-push-action@v4
      with:
        context: .
        file: ./Dockerfile.prod # 使用一个专为生产构建的Dockerfile
        push: false # 在实际项目中设为true
        tags: your-repo/plugin-system:latest

Dockerfile.prod:

# --- Build Stage ---
# 负责编译所有Rust二进制文件
FROM rust:1.73 as builder

WORKDIR /usr/src/app

# 复制所有代码
COPY . .

# 安装zeromq依赖
RUN apt-get update && apt-get install -y libzmq3-dev pkg-config

# 使用cargo chef来缓存依赖层,加速构建
# RUN cargo install cargo-chef
# COPY . .
# RUN cargo chef prepare --recipe-path recipe.json
# RUN cargo chef cook --release --recipe-path recipe.json

# 编译所有二进制文件
# --workspace 会编译所有成员
RUN cargo build --workspace --release

# --- Runtime Stage ---
# 一个轻量级的最终镜像
FROM debian:bullseye-slim

# 安装zeromq运行时库
RUN apt-get update && apt-get install -y libzmq3-dev && rm -rf /var/lib/apt/lists/*

WORKDIR /usr/local/bin

# 从构建阶段复制编译好的二进制文件
COPY --from=builder /usr/src/app/target/release/core-service ./core-service
COPY --from=builder /usr/src/app/target/release/plugin-worker ./plugin-worker

# 使用一个启动脚本来管理进程
COPY ./start.sh ./start.sh
RUN chmod +x ./start.sh

# 暴露HTTP端口
EXPOSE 8080

# 启动脚本将同时运行内核和插件
CMD ["./start.sh"]

start.sh:

#!/bin/sh

# 启动内核服务在后台
./core-service &

# 启动一个或多个插件worker在后台
# 在生产中,可以根据需要启动多个实例
./plugin-worker &
./plugin-worker &

# 等待所有后台进程结束
# 'wait -n' 会等待任何一个后台任务退出,然后脚本就退出了。
# 这对于容器来说是正确的行为,如果任何一个关键组件失败,容器就应该停止。
wait -n

exit $?

这个GitHub Actions工作流完整地展示了从代码检查、测试到构建多二进制文件,并最终将它们打包进一个优化过的Docker镜像的全过程。Dockerfile.prod 使用多阶段构建,最终镜像只包含必要的运行时库和编译好的二进制文件,体积小且安全。start.sh脚本则充当了简易的进程管理器,确保容器启动时内核和插件都能正确运行。

方案的局限性与未来迭代

这个架构虽然解决了最初的痛点,但在生产环境中还有很多需要完善的地方。

  1. 服务发现: 目前插件的连接地址是硬编码或通过环境变量传递的。在一个动态的环境中(例如Kubernetes),内核需要一种机制来动态发现可用的插件实例。可以使用Consul、etcd等服务发现工具,或者在Kubernetes中利用其自身的Service机制。
  2. 负载均衡: ROUTER套接字本身提供了一定的公平队列负载均衡,但对于更复杂的场景,比如基于插件当前负载的智能路由,则需要在内核中实现更复杂的调度逻辑。
  3. 插件管理: 当前架构中,增删插件需要重新构建和部署整个Docker镜像。一个更高级的系统应该允许动态地、热插拔地加载和卸载插件进程,而无需重启内核。这需要一个额外的管理服务来负责插件进程的生命周期。
  4. 通信协议: bincode非常高效,但它是Rust特有的。如果目标是支持多语言插件,那么使用Protobuf或FlatBuffers会是更好的选择,它们提供了跨语言的Schema定义和代码生成能力。
  5. 安全性: 当前的ZeroMQ通信是明文的。在不受信任的网络中,必须启用CurveZMQ或类似的机制来提供端到端的加密和认证。

  目录