使用Prometheus与GCP构建高基数WebRTC会话质量的可观测性管道


我们的WebRTC业务在过去两个季度增长了500%,但我们的故障排查能力却倒退回了石器时代。当用户投诉“视频卡顿”时,我们能做的只有检查GCP上的服务器CPU和内存,这些指标几乎永远是绿色的。问题显而易见:服务端的基础设施监控,对于诊断发生在客户端网络环境中的、高度动态的WebRTC会话质量问题,几乎毫无用处。我们迫切需要一个能够下钻到每个用户、每个会话、每个媒体流的监控系统,以量化“卡顿”到底意味着什么——是丢包率飙升?是抖动过大?还是往返时间(RTT)激增?

WebRTC标准通过 getStats() API提供了一个数据金矿,它能暴露几百个关于连接和媒体流的实时指标。最初的想法是简单地将这些数据批量发送到我们的日志系统,但很快就发现这是个死胡同。日志系统对于聚合和趋势分析非常低效,无法进行实时的告警和查询。我们需要的是一个真正的时序数据库(TSDB)。

Prometheus是显而易见的选择,它在云原生领域是事实标准。但一个核心的架构矛盾摆在面前:Prometheus采用拉(Pull)模型,它主动抓取目标的 /metrics 端点。而我们的WebRTC客户端运行在全球用户的浏览器中,它们隐藏在NAT和防火墙之后,不可能被Prometheus直接抓取。因此,架构的核心是设计一个中间件,一个“指标收集器”,它能接收来自客户端的主动推送(Push),然后将这些数据转换为Prometheus能够抓取的格式。

整个系统将部署在Google Cloud Platform(GCP)上,具体来说是Google Kubernetes Engine (GKE),因为它为我们提供了运行有状态应用(Prometheus)和无状态应用(指标收集器)所需的一切:弹性、服务发现和持久化存储。

架构设计:从客户端到TSDB的数据流

整个可观测性管道的生命周期如下:

  1. 客户端(浏览器): 在WebRTC会话期间,一个JavaScript定时器会周期性(例如每10秒)调用 RTCPeerConnection.getStats()
  2. 数据筛选与推送: 客户端脚本会解析 getStats() 返回的庞大报告,从中提取出我们最关心的QoE(Quality of Experience)指标,如 packetsLost, jitter, roundTripTime, bytesReceived 等,并将它们连同会话元数据(如 userId, sessionId, connectionId)打包成JSON,通过HTTPS POST请求发送到指标收集器。
  3. 指标收集器(Go Service on GKE): 这是一个我们自己开发的无状态Go服务。它接收客户端的POST请求,将JSON数据转换为Prometheus的文本展览格式。这些转换后的指标会缓存在内存中,并通过一个 /metrics 端点暴露出来。
  4. Prometheus(on GKE): Prometheus服务器配置了Kubernetes服务发现,它会自动找到所有指标收集器的Pod,并定期从它们的 /metrics 端点拉取数据。
  5. 存储与查询: Prometheus将抓取到的时序数据持久化到其内置的TSDB中。运维和开发人员现在可以通过PromQL查询具体用户的会话质量,并配置告警规则。
sequenceDiagram
    participant Client as WebRTC Client (Browser)
    participant Collector as Go Metrics Collector (GKE)
    participant Prometheus as Prometheus Server (GKE)

    loop Every 10s
        Client->>Client: getStats()
        Client->>Client: Filter key metrics (jitter, RTT, etc.)
        Client->>+Collector: POST /v1/metrics (JSON payload)
    end

    Collector-->>-Client: 202 Accepted
    Note right of Collector: Transform JSON to
Prometheus metrics format
and cache in memory. Prometheus->>+Collector: GET /metrics (Scrape) Collector-->>-Prometheus: 200 OK (Exposition Format) Note right of Prometheus: Ingest metrics into TSDB

这个架构的关键在于指标收集器,它巧妙地将客户端的Push模型转换为了Prometheus生态友好的Pull模型。

第一步:客户端的数据采集与上报

在生产环境中,你不能简单地将 getStats() 的所有内容都上报,这会产生巨大的、大部分无用的数据量。核心任务是精确地挑选出反映音视频质量和网络状况的关键指标。

这里的代码是一个精简但生产可用的TypeScript实现,它专注于采集出站(outbound-rtp)和入站(inbound-rtp)流的关键指标。

// src/webrtc-stats-collector.ts

interface RtcMetrics {
  timestamp: number;
  sessionId: string;
  userId: string;
  connectionId: string;
  metrics: Record<string, number | string>;
}

export class WebRTCStatsCollector {
  private peerConnection: RTCPeerConnection;
  private intervalId?: number;
  private readonly endpoint: string;
  private readonly sessionId: string;
  private readonly userId: string;

  constructor(
    pc: RTCPeerConnection,
    endpoint: string,
    sessionId: string,
    userId: string
  ) {
    if (!pc) {
      throw new Error("RTCPeerConnection instance is required.");
    }
    this.peerConnection = pc;
    this.endpoint = endpoint;
    this.sessionId = sessionId;
    this.userId = userId;
  }

  public start(intervalMs: number = 10000): void {
    if (this.intervalId) {
      console.warn("Stats collector is already running.");
      return;
    }
    this.intervalId = window.setInterval(async () => {
      try {
        await this.collectAndSend();
      } catch (error) {
        console.error("Failed to collect and send WebRTC stats:", error);
      }
    }, intervalMs);
  }

  public stop(): void {
    if (this.intervalId) {
      clearInterval(this.intervalId);
      this.intervalId = undefined;
    }
  }

  private async collectAndSend(): Promise<void> {
    const statsReport = await this.peerConnection.getStats();
    const metricsPayloads: RtcMetrics[] = [];

    statsReport.forEach(report => {
      // 在真实项目中,我们会根据 track identifier 关联更多的元数据
      // 这里为了简化,我们为每个有意义的 report 单独上报
      let extractedMetrics: Record<string, number | string> | null = null;
      
      if (report.type === 'inbound-rtp' && report.kind === 'video') {
        extractedMetrics = this.extractInboundRtpMetrics(report);
      } else if (report.type === 'outbound-rtp' && report.kind === 'video') {
        extractedMetrics = this.extractOutboundRtpMetrics(report);
      } else if (report.type === 'remote-inbound-rtp' && report.kind === 'video') {
        // 这是对方接收我们发送的流的情况,包含了 RTT
        extractedMetrics = this.extractRemoteInboundRtpMetrics(report);
      }

      if (extractedMetrics) {
        metricsPayloads.push({
          timestamp: Date.now(),
          sessionId: this.sessionId,
          userId: this.userId,
          connectionId: this.peerConnection.id || 'pc_default', // 在真实应用中,为每个 PC 生成唯一ID
          metrics: extractedMetrics,
        });
      }
    });

    if (metricsPayloads.length > 0) {
      // 使用 navigator.sendBeacon 可以在页面关闭时也尝试发送,更可靠
      // 但 sendBeacon 不支持复杂的 content-type 和 header,这里为了演示使用 fetch
      fetch(this.endpoint, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(metricsPayloads),
        keepalive: true, // 确保页面卸载时请求也能继续
      }).catch(err => console.error("Beacon failed:", err));
    }
  }

  private extractInboundRtpMetrics(report: any): Record<string, number | string> {
    return {
      type: 'inbound-video',
      ssrc: report.ssrc,
      packetsReceived: report.packetsReceived,
      packetsLost: report.packetsLost, // 关键指标:接收丢包数
      jitter: report.jitter,           // 关键指标:网络抖动
      framesDropped: report.framesDropped,
      bytesReceived: report.bytesReceived,
      framesDecoded: report.framesDecoded,
      keyFramesDecoded: report.keyFramesDecoded,
      firCount: report.firCount,
      pliCount: report.pliCount,
      nackCount: report.nackCount,
    };
  }

  private extractOutboundRtpMetrics(report: any): Record<string, number | string> {
    return {
      type: 'outbound-video',
      ssrc: report.ssrc,
      packetsSent: report.packetsSent,
      bytesSent: report.bytesSent,
      retransmittedPacketsSent: report.retransmittedPacketsSent,
      targetBitrate: report.targetBitrate,
      qualityLimitationReason: report.qualityLimitationReason || 'none', // 关键指标:质量受限原因
    };
  }
  
  private extractRemoteInboundRtpMetrics(report: any): Record<string, number | string> {
    return {
      type: 'remote-inbound-video',
      ssrc: report.ssrc,
      packetsLost: report.packetsLost, // 这是对方报告的丢包数
      roundTripTime: report.roundTripTime, // 关键指标:端到端往返时间
    };
  }
}

一个常见的错误是过于频繁地调用 getStats。10秒一次对于趋势监控是合理的平衡点。过于频繁(例如1秒)会给客户端带来不必要的CPU负担,并产生海量数据,增加后端成本和处理压力。

第二步:构建高并发指标收集器

这个Go服务是整个管道的中枢。它的设计必须满足几个条件:

  1. 高性能: 能够处理大量并发的HTTP POST请求。
  2. 高容错: 即使收到格式错误的JSON,也不能崩溃。
  3. 无状态: 易于在Kubernetes中水平扩展。
  4. Prometheus兼容: 正确地将指标转换为展览格式,并处理高基数标签。

高基数是我们面临的主要挑战。每个用户的每个会-话都会生成唯一的sessionIduserId。如果我们将这些直接作为Prometheus的标签,标签组合的数量会爆炸式增长,导致Prometheus内存消耗和查询性能下降。

我们的策略是:

  • 对会话级的元数据(sessionId, userId)保留为标签,因为这是我们查询的核心维度。
  • 对更细粒度的ID(如 ssrc, transportId)也保留,因为排查问题时必须下钻到流级别。
  • 实现一个内部的指标注册表,对不再活跃的指标(例如会话结束)进行垃圾回收,防止内存无限增长。

以下是指标收集器的核心实现:

// main.go
package main

import (
	"encoding/json"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

// MetricPayload 对应客户端发送的单个指标上报
type MetricPayload struct {
	SessionID    string                 `json:"sessionId"`
	UserID       string                 `json:"userId"`
	ConnectionID string                 `json:"connectionId"`
	Metrics      map[string]interface{} `json:"metrics"`
}

// metricRegistry 负责动态管理 Prometheus 指标,并处理垃圾回收
type metricRegistry struct {
	sync.RWMutex
	gauges       map[string]*prometheus.GaugeVec
	lastSeen     map[string]time.Time // 用于GC
	labelKeys    []string
	metricsToGC  map[string]bool // 指标是否参与GC
}

var registry *metricRegistry

// init 初始化指标注册表,定义所有可能上报的指标
func init() {
	registry = &metricRegistry{
		gauges:    make(map[string]*prometheus.GaugeVec),
		lastSeen:  make(map[string]time.Time),
		labelKeys: []string{"session_id", "user_id", "connection_id", "ssrc", "type"},
		metricsToGC: map[string]bool{
			// 只有这些指标需要基于 ssrc 等动态标签,需要GC
			"webrtc_packets_lost_total": true,
			"webrtc_jitter_seconds": true,
			// ...其他需要GC的指标
		},
	}

	// 注册一些核心指标
	registry.addGauge("webrtc_packets_lost_total", "Total number of packets lost.")
	registry.addGauge("webrtc_jitter_seconds", "Packet jitter in seconds.")
	registry.addGauge("webrtc_round_trip_time_seconds", "Round trip time in seconds.")
	registry.addGauge("webrtc_bytes_received_total", "Total bytes received.")
	registry.addGauge("webrtc_bytes_sent_total", "Total bytes sent.")
	registry.addGauge("webrtc_frames_dropped_total", "Total frames dropped.")
	// ... 更多指标
}

func (r *metricRegistry) addGauge(name, help string) {
	r.gauges[name] = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Name: name,
		Help: help,
	}, r.labelKeys)
	prometheus.MustRegister(r.gauges[name])
}

// metricsHandler 是接收客户端POST请求的核心处理函数
func metricsHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
		return
	}

	var payloads []MetricPayload
	if err := json.NewDecoder(r.Body).Decode(&payloads); err != nil {
		log.Printf("Error decoding JSON: %v", err)
		http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
		return
	}

	registry.Lock()
	defer registry.Unlock()

	for _, p := range payloads {
		ssrcVal, _ := p.Metrics["ssrc"].(float64)
		typeVal, _ := p.Metrics["type"].(string)

		labels := prometheus.Labels{
			"session_id":    p.SessionID,
			"user_id":       p.UserID,
			"connection_id": p.ConnectionID,
			"ssrc":          fmt.Sprintf("%.0f", ssrcVal),
			"type":          typeVal,
		}

		// 为这组标签更新 lastSeen 时间戳
		// key 由所有标签值构成,确保唯一性
		labelKey := fmt.Sprintf("%s-%s-%s-%.0f-%s", p.SessionID, p.UserID, p.ConnectionID, ssrcVal, typeVal)
		registry.lastSeen[labelKey] = time.Now()

		for key, val := range p.Metrics {
			floatVal, ok := val.(float64)
			if !ok {
				// 对于 'qualityLimitationReason' 等字符串值,可以创建另一个专门的 Gauge
				// 这里为了简化,我们只处理数值型
				continue
			}

			promKey := "webrtc_" + toSnakeCase(key) // 转换为 webrtc_packets_lost_total
			if gaugeVec, exists := registry.gauges[promKey]; exists {
				gaugeVec.With(labels).Set(floatVal)
			}
		}
	}

	w.WriteHeader(http.StatusAccepted)
}

// startGC 启动一个goroutine定期清理过期的指标
func startGC(interval time.Duration, expiration time.Duration) {
	ticker := time.NewTicker(interval)
	go func() {
		for range ticker.C {
			registry.Lock()
			now := time.Now()
			for labelKey, ts := range registry.lastSeen {
				if now.Sub(ts) > expiration {
					// 实际的删除逻辑比较复杂,需要从 lastSeen 和所有相关的 GaugeVec 中清理
					// Prometheus client-go 没有直接的 Detach/Delete 方法,
					// 一个常见实践是让Prometheus的 `scrape_interval` 和这里的 `expiration` 配合
					// Prometheus 在一段时间内没抓取到某个series后会自动标记为stale。
					// 更彻底的清理需要重启服务或使用更复杂的自定义注册表。
					// 这里我们只从lastSeen中删除,防止其无限增长。
					delete(registry.lastSeen, labelKey)
				}
			}
			registry.Unlock()
			log.Println("GC cycle finished.")
		}
	}()
}


func main() {
	// 启动指标清理,每5分钟检查一次,清理超过15分钟未上报的指标
	startGC(5*time.Minute, 15*time.Minute)

	http.HandleFunc("/v1/metrics", metricsHandler)
	http.Handle("/metrics", promhttp.Handler())

	log.Println("Starting WebRTC metrics collector on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

// toSnakeCase 是一个辅助函数,将驼峰命名转换为蛇形命名
// 例如: packetsLost -> packets_lost_total, jitter -> jitter_seconds
// ... (具体实现省略)

这个Go服务的设计考虑了生产环境的细节。例如,GC机制对于一个长期运行的服务至关重要,它可以防止因旧会话的标签残留而导致的内存泄漏。在真实项目中,GC的实现会更复杂,可能需要自定义 prometheus.Collector 接口来完全控制指标的生命周期。

第三步:在GKE上部署和暴露服务

现在我们将Go收集器容器化,并通过Kubernetes部署到GKE。

Dockerfile

# Stage 1: Build the Go binary
FROM golang:1.20-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o webrtc-collector .

# Stage 2: Create a minimal final image
FROM alpine:latest
RUN apk --no-cache add ca-certificates

WORKDIR /root/
COPY --from=builder /app/webrtc-collector .

EXPOSE 8080
CMD ["./webrtc-collector"]

deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: webrtc-collector
  labels:
    app: webrtc-collector
spec:
  replicas: 3 # 从3个副本开始,根据负载自动扩展
  selector:
    matchLabels:
      app: webrtc-collector
  template:
    metadata:
      labels:
        app: webrtc-collector
    spec:
      containers:
      - name: collector
        image: gcr.io/your-gcp-project/webrtc-collector:v1.0.0
        ports:
        - containerPort: 8080
        resources:
          requests:
            cpu: "100m"
            memory: "128Mi"
          limits:
            cpu: "500m"
            memory: "256Mi"

---
apiVersion: v1
kind: Service
metadata:
  name: webrtc-collector
  annotations:
    # Prometheus Operator 的 ServiceMonitor 会识别这个注解
    prometheus.io/scrape: 'true'
    prometheus.io/port:   '8080'
  labels:
    app: webrtc-collector
spec:
  selector:
    app: webrtc-collector
  ports:
  - name: http
    port: 80
    targetPort: 8080
  type: ClusterIP # 服务只在集群内部可访问

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: webrtc-collector-ingress
spec:
  rules:
  - http:
      paths:
      - path: /v1/metrics
        pathType: Prefix
        backend:
          service:
            name: webrtc-collector
            port:
              number: 80

部署这套YAML后,我们就有了一个可水平扩展的、能通过Ingress从公网接收客户端指标的收集器集群。Service上的prometheus.io/scrape: 'true'注解是关键,它使得运行在GKE上的Prometheus能够自动发现并开始抓取这些Pod。

第四步:配置Prometheus抓取

如果你使用Prometheus Operator,上述注解就足够了。如果手动配置Prometheus,你需要添加一个基于Kubernetes服务发现的抓取配置。

prometheus.yml (部分)

scrape_configs:
  - job_name: 'kubernetes-pods'
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      # 只抓取带有 prometheus.io/scrape=true 注解的 Pod
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
        action: keep
        regex: true
      # ... 其他 relabeling 规则

至此,整个管道已经打通。客户端的WebRTC统计信息会源源不断地流入Prometheus TSDB。我们可以使用如下的PromQL来诊断问题:

# 查询某个特定会话的视频流往返时间(RTT)
webrtc_round_trip_time_seconds{session_id="session-abc-123", type="remote-inbound-video"}

# 计算过去5分钟内,所有视频接收流的平均网络抖动
avg(webrtc_jitter_seconds{type="inbound-video"})

# 找到过去15分钟内丢包率最高的10个用户
topk(10, rate(webrtc_packets_lost_total{type="inbound-video"}[5m]) / (rate(webrtc_packets_lost_total{type="inbound-video"}[5m]) + rate(webrtc_packets_received_total{type="inbound-video"}[5m])))

我们终于获得了之前梦寐以求的洞察力。

当前方案的局限性与未来展望

这个方案有效地解决了WebRTC会话质量的实时监控问题,但它并非银弹。在真实项目中,我们必须正视其局限性:

  1. Prometheus的长期存储: 单机Prometheus不适合存储超过几个月的海量高基数数据。随着业务增长,必须引入如Thanos或Cortex这样的方案,利用GCS(Google Cloud Storage)作为廉价的长期存储后端,并提供全局查询视图。
  2. 基数问题仍是达摩克利斯之剑: 尽管我们设计了GC,但如果用户量和会话频率极高,标签基数依然可能压垮中等规模的Prometheus实例。一种演进方向是在收集器层面进行预聚合,例如,不再为每个会话上报原始指标,而是计算区域、网络类型(WiFi/4G)等维度的P95、P99分位数,从而大幅降低基数。
  3. 数据孤岛: 当前系统只包含了WebRTC客户端指标。一个完整的可观测性方案需要将这些数据与服务器端日志、应用APM追踪以及基础设施指标关联起来。例如,当发现某个区域用户普遍RTT升高时,能够快速关联到该区域的SFU(Selective Forwarding Unit)服务器是否出现CPU瓶颈。这需要一个统一的可观测性平台,并将sessionIduserId作为贯穿所有系统的关联ID。

下一步的迭代方向是明确的:集成Thanos进行扩展,并探索在收集器端使用流处理引擎(如Flink)进行实时聚合,将原始事件流送入BigQuery进行更复杂的离线分析,同时将聚合后的指标送入Prometheus用于实时告警。这套架构为我们从被动响应用户投诉,转向主动发现并解决潜在质量问题打下了坚实的基础。


  目录