在 Fastify 事件驱动架构中实现端到端处理时延的 Prometheus 监控


我们团队的事件驱动系统上线后,基础的 CPU 和内存监控图表一片绿色,标准的 HTTP 网关入口请求时延 P99 也稳定在 50ms 以内。然而,一种不安感始终萦绕不去。当产品经理询问“一个订单创建事件,从触发到最终通知用户,整个流程需要多长时间?”时,我们竟无法给出一个确切的数据。我们的监控仪表盘只能看到各个孤立服务的健康状况,却对贯穿整个系统的核心业务流程的真实性能一无所知。这正是许多事件驱动架构(EDA)在可观测性上面临的典型困境:服务间的解耦带来了灵活性,也带来了端到端性能度量的黑洞。

问题很明确:必须建立一种能够衡量跨多个异步服务的、端到端业务流程性能的机制。我们的目标不是另一个通用的 APM 工具,而是针对核心业务流程,建立明确的服务等级指标(SLI),并用它来驱动我们的 SLO(服务等级目标)。

初步构想与技术选型

这个系统的核心参与者是 Fastify、一个消息队列(在本文中我们用一个简化的内存总线模拟,但在生产中它是 Kafka)以及 Prometheus。

  1. Fastify: 我们选择它作为 API 网关和各个事件消费服务的底层框架。它的高性能和轻量级特性非常适合构建这类微服务。其强大的插件体系也为集成自定义监控提供了便利。
  2. Prometheus: 事实上的监控标准。它的拉模型(pull-based)和强大的查询语言(PromQL)是关键。我们将用它来存储和查询我们自定义的 SLI 指标。
  3. 事件驱动架构 (EDA): 系统的骨架。一个事件由上游服务发布,被一个或多个下游服务异步消费。

我们的核心构想是创建一个自定义的 Prometheus Histogram 指标,名为 event_processing_end_to_end_latency_seconds。这个指标将记录从事件发布那一刻起,到最后一个相关服务完成处理那一刻止的总时长。通过这个直方图,我们不仅能计算平均时延,更重要的是,能够计算出 P95、P99 等关键的分位数值,这对于定义一个有意义的 SLO 至关重要。

为了实现这一点,事件本身必须携带一个初始时间戳。当最终消费者处理完事件后,它将用当前时间减去事件的初始时间戳,得到端到端延迟,并将其报告给 Prometheus。

架构与流程的可视化

在深入代码之前,先用图表清晰地展示我们的系统架构和监控数据流。

sequenceDiagram
    participant Client
    participant API Gateway (Fastify)
    participant EventBus as Message Queue
    participant Order Service (Fastify)
    participant Notification Service (Fastify)
    participant Prometheus

    Client->>+API Gateway (Fastify): POST /orders (创建订单)
    API Gateway (Fastify)->>API Gateway (Fastify): 1. 生成 OrderCreated 事件
    API Gateway (Fastify)->>EventBus as Message Queue: 2. 发布事件 (含 publishedAt 时间戳)
    API Gateway (Fastify)-->>-Client: HTTP 202 Accepted
    
    Note right of EventBus as Message Queue: 事件被异步消费
    
    EventBus as Message Queue->>+Order Service (Fastify): 3. 投递 OrderCreated 事件
    Order Service (Fastify)->>Order Service (Fastify): 4. 处理订单逻辑 (例如: 扣减库存)
    Order Service (Fastify)->>EventBus as Message Queue: 5. 发布 OrderProcessed 事件
    deactivate Order Service (Fastify)

    EventBus as Message Queue->>+Notification Service (Fastify): 6. 投递 OrderProcessed 事件
    Notification Service (Fastify)->>Notification Service (Fastify): 7. 处理通知逻辑 (例如: 发送邮件)
    Notification Service (Fastify)->>Prometheus: 8. 计算时延并上报 SLI 指标
    deactivate Notification Service (Fastify)

    Prometheus->>Prometheus: 定期从各服务拉取指标

步骤化实现:从零构建可观测的事件流

我们将构建三个核心服务:

  1. api-gateway: 接收外部请求,发布初始事件。
  2. order-service: 消费初始事件,进行处理,并发布后续事件。
  3. notification-service: 消费最终事件,完成业务流程,并负责上报端到端时延指标。

1. 项目结构与共享模块

一个稳健的项目结构是可维护性的基础。

.
├── services
│   ├── api-gateway
│   │   ├── Dockerfile
│   │   ├── package.json
│   │   └── src
│   │       └── index.js
│   ├── notification-service
│   │   ├── ...
│   └── order-service
│       ├── ...
├── shared
│   ├── event-bus.js      # 模拟的消息总线
│   └── metrics.js        # Prometheus 指标定义与初始化
└── docker-compose.yml

共享模块 metrics.js

这是整个监控体系的核心。我们在这里统一定义和初始化 Prometheus 客户端及自定义指标。在真实项目中,这会是一个私有的 npm 包。

// shared/metrics.js

const promClient = require('prom-client');
const fastify = require('fastify')(); // 引入一个临时的 fastify 实例来注册插件

// 启用默认的 Node.js 指标 (CPU, Memory, GC 等)
promClient.collectDefaultMetrics();

// 定义我们的核心 SLI 指标: 事件端到端处理时延直方图
// 这里的 buckets 需要根据业务的实际情况和 SLO 目标来精心设计
// 例如,如果我们的 SLO 是 500ms,那么 buckets 应该在 0.5 附近有更高的精度
const eventProcessingLatency = new promClient.Histogram({
    name: 'event_processing_end_to_end_latency_seconds',
    help: 'End-to-end latency for processing an event from publish to final consumption.',
    labelNames: ['event_type', 'status'], // status可以是 'success' 或 'failure'
    buckets: [0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10] 
});

// 为 Fastify 实例注册 metrics 插件
// 我们将其封装成一个函数,供各个服务调用
function setupMetrics(serviceName) {
    const registry = promClient.register;
    registry.setDefaultLabels({ service: serviceName });
    
    fastify.register(require('fastify-metrics'), {
        endpoint: '/metrics',
        register: registry,
    });
}

module.exports = {
    promClient,
    eventProcessingLatency,
    setupMetrics,
    getMetrics: async () => {
        return promClient.register.metrics();
    }
};

这里的坑在于 buckets 的选择。如果 buckets 设置得太宽泛,我们将丢失计算高精度百分位数值的能力。如果太密集,会增加 Prometheus 的存储开销。一个常见的错误是直接使用默认 buckets,这通常不符合具体业务的 SLO 要求。

共享模块 event-bus.js

为了让示例可独立运行,我们用 Node.js 的 EventEmitter 模拟一个消息队列。

// shared/event-bus.js
const EventEmitter = require('events');

class SimpleEventBus extends EventEmitter {}

// 使用单例模式确保所有服务共享同一个 bus 实例
const eventBus = new SimpleEventBus();

module.exports = eventBus;

2. API Gateway: 事件的起点

api-gateway 负责接收 HTTP 请求,并将其转换为系统内部的事件。

// services/api-gateway/src/index.js

const fastify = require('fastify')({ logger: { level: 'info' } });
const { v4: uuidv4 } = require('uuid');
const eventBus = require('../../../shared/event-bus');
const { setupMetrics } = require('../../../shared/metrics');

// 初始化并暴露 /metrics 端点
setupMetrics('api-gateway');

fastify.post('/orders', async (request, reply) => {
    const orderId = uuidv4();
    const event = {
        id: uuidv4(),
        type: 'OrderCreated',
        payload: {
            orderId,
            items: request.body.items,
            userId: request.body.userId,
        },
        metadata: {
            // 这是关键!记录事件发布的时间戳
            publishedAt: Date.now(),
            source: 'api-gateway'
        }
    };
    
    // 在真实项目中,这里会是一个健壮的发布逻辑,包含错误处理和重试
    eventBus.emit('event', event);

    fastify.log.info(`Published event ${event.type} for order ${orderId}`);
    
    // 返回 202 Accepted,表示请求已被接受,但处理是异步的
    return reply.code(202).send({ orderId });
});


const start = async () => {
    try {
        await fastify.listen({ port: 3000, host: '0.0.0.0' });
    } catch (err) {
        fastify.log.error(err);
        process.exit(1);
    }
};

start();

一个常见的错误是在事件发布后立即返回 200 OK。在异步架构中,202 Accepted 更能准确反映系统的行为:请求已被接受,将在后台处理。

3. Order Service: 中间处理环节

order-service 监听 OrderCreated 事件,执行自己的业务逻辑,然后发布一个新的事件 OrderProcessed

// services/order-service/src/index.js

const fastify = require('fastify')({ logger: { level: 'info' } });
const { v4: uuidv4 } = require('uuid');
const eventBus = require('../../../shared/event-bus');
const { setupMetrics } = require('../../../shared/metrics');

setupMetrics('order-service');

// 模拟业务处理
const processOrder = async (order) => {
    fastify.log.info(`Processing order ${order.orderId}...`);
    // 模拟I/O密集型操作
    await new Promise(resolve => setTimeout(resolve, Math.random() * 200 + 50)); 
    fastify.log.info(`Order ${order.orderId} processed.`);
};

eventBus.on('event', async (event) => {
    if (event.type !== 'OrderCreated') {
        return;
    }

    try {
        await processOrder(event.payload);

        // 创建下一个事件,注意要传递原始的 publishedAt 时间戳
        const nextEvent = {
            id: uuidv4(),
            type: 'OrderProcessed',
            payload: event.payload,
            metadata: {
                // 关键点:将原始时间戳透传下去
                publishedAt: event.metadata.publishedAt,
                source: 'order-service'
            }
        };

        eventBus.emit('event', nextEvent);
        fastify.log.info(`Published event ${nextEvent.type} for order ${event.payload.orderId}`);

    } catch (error) {
        fastify.log.error(error, `Failed to process event ${event.id}`);
        // 在此应有错误处理逻辑,例如发布到死信队列
    }
});


const start = async () => {
    try {
        // 这个服务不需要监听 HTTP 端口,除非它有自己的 API
        // 但我们需要启动 Fastify 实例来暴露 /metrics 端点
        await fastify.listen({ port: 3001, host: '0.0.0.0' });
        fastify.log.info(`Order Service listening for events.`);
    } catch (err) {
        fastify.log.error(err);
        process.exit(1);
    }
};

start();

这里的核心在于 metadata.publishedAt 的透传。如果在这个环节丢失了初始时间戳,整个端到端时延的计算就失效了。在复杂的、跨越多步的事件链中,确保这个上下文信息的可靠传递至关重要。

4. Notification Service: 终点与度量

notification-service 是流程的终点。它消费 OrderProcessed 事件,并执行最终的度量计算和上报。

// services/notification-service/src/index.js

const fastify = require('fastify')({ logger: { level: 'info' } });
const eventBus = require('../../../shared/event-bus');
const { setupMetrics, eventProcessingLatency } = require('../../../shared/metrics');

setupMetrics('notification-service');

// 模拟发送通知
const sendNotification = async (order) => {
    fastify.log.info(`Sending notification for order ${order.orderId}...`);
    // 模拟I/O操作
    await new Promise(resolve => setTimeout(resolve, Math.random() * 100 + 20));
    fastify.log.info(`Notification sent for order ${order.orderId}.`);
};

eventBus.on('event', async (event) => {
    if (event.type !== 'OrderProcessed') {
        return;
    }

    const startTime = event.metadata.publishedAt;
    const eventType = 'OrderCreationFlow'; // 我们给整个流程起一个名字

    try {
        await sendNotification(event.payload);

        // 计算端到端时延 (毫秒 -> 秒)
        const durationSeconds = (Date.now() - startTime) / 1000;
        
        // 上报指标!
        eventProcessingLatency.observe({ event_type: eventType, status: 'success' }, durationSeconds);
        
        fastify.log.info({
            latency: durationSeconds,
            eventType: eventType,
            orderId: event.payload.orderId
        }, `Successfully processed event and recorded latency.`);

    } catch (error) {
        fastify.log.error(error, `Failed to send notification for event ${event.id}`);

        // 即使失败,也要上报,但状态为 'failure'
        // 注意:这里的时延可能意义不大,但记录失败次数非常重要
        const durationSeconds = (Date.now() - startTime) / 1000;
        eventProcessingLatency.observe({ event_type: eventType, status: 'failure' }, durationSeconds);
    }
});


const start = async () => {
    try {
        await fastify.listen({ port: 3002, host: '0.0.0.0' });
        fastify.log.info(`Notification Service listening for events.`);
    } catch (err) {
        fastify.log.error(err);
        process.exit(1);
    }
};

start();

这段代码是整个方案的闭环。在 catch 块中上报 status: 'failure' 的指标同样重要。这使我们能够计算错误率,这是可用性 SLI 的另一个关键组成部分。

整合与验证

使用 docker-compose.yml 来编排我们的服务和 Prometheus。

version: '3.8'

services:
  api-gateway:
    build: ./services/api-gateway
    ports:
      - "3000:3000"
    environment:
      - NODE_ENV=production

  order-service:
    build: ./services/order-service
    ports:
      - "3001:3001"
    environment:
      - NODE_ENV=production

  notification-service:
    build: ./services/notification-service
    ports:
      - "3002:3002"
    environment:
      - NODE_ENV=production

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'

prometheus.yml 配置:

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'fastify-eda-services'
    static_configs:
      - targets: ['api-gateway:3000', 'order-service:3001', 'notification-service:3002']

启动所有服务 docker-compose up --build,然后用 curl 产生一些流量:

while true; do
  curl -X POST -H "Content-Type: application/json" \
    -d '{"items": [{"id": "item-1", "quantity": 2}], "userId": "user-123"}' \
    http://localhost:3000/orders;
  sleep 0.1;
done

访问 Prometheus UI (http://localhost:9090),我们现在可以执行强大的 PromQL 查询了:

  1. 查询 P99 处理时延 (我们的核心SLI):

    histogram_quantile(0.99, sum(rate(event_processing_end_to_end_latency_seconds_bucket{event_type="OrderCreationFlow"}[5m])) by (le))
  2. 查询流程的错误率:

    sum(rate(event_processing_end_to_end_latency_seconds_count{event_type="OrderCreationFlow", status="failure"}[5m]))
    /
    sum(rate(event_processing_end_to_end_latency_seconds_count{event_type="OrderCreationFlow"}[5m]))
  3. 查询总处理速率 (RPS):

    sum(rate(event_processing_end_to_end_latency_seconds_count{event_type="OrderCreationFlow"}[5m]))

现在,当产品经理再问起那个问题时,我们不仅能给他一个精确的 P99 时延数字,还能提供错误率和服务吞吐量,并告诉他我们正在基于这些数据设定和监控我们的 SLO。我们终于照亮了那个监控黑洞。

方案的局限性与未来迭代

当前这套方案虽然解决了核心问题,但在生产环境中仍有几个需要考量的点。

首先,它严重依赖于节点间的时钟同步。在跨数据中心或大规模集群中,时钟漂移可能会导致时延数据不准确。一个更严谨的方案是引入分布式追踪系统(如 OpenTelemetry),通过在事件元数据中传递和延续 Trace Context 来获得更精确的、不受时钟漂移影响的跨服务耗时。

其次,对于包含重试逻辑的事件处理,这个简单的时延计算会失真。一次失败后的重试成功,其时延会非常长。需要更复杂的逻辑来区分首次处理时延和最终成功时延,例如在事件元数据中增加 attempt 计数,并只在 attempt=1 时记录首次处理指标。

最后,模拟的事件总线不具备持久化、顺序保证和消费者组等生产级特性。替换为 Kafka 或 RabbitMQ 后,需要考虑消息队列本身引入的延迟(broker 延迟、网络延迟),这部分延迟也被包含在了我们的端到端指标中,这通常是符合预期的,因为它反映了用户感受到的真实延迟。


  目录