我们的WebRTC业务在过去两个季度增长了500%,但我们的故障排查能力却倒退回了石器时代。当用户投诉“视频卡顿”时,我们能做的只有检查GCP上的服务器CPU和内存,这些指标几乎永远是绿色的。问题显而易见:服务端的基础设施监控,对于诊断发生在客户端网络环境中的、高度动态的WebRTC会话质量问题,几乎毫无用处。我们迫切需要一个能够下钻到每个用户、每个会话、每个媒体流的监控系统,以量化“卡顿”到底意味着什么——是丢包率飙升?是抖动过大?还是往返时间(RTT)激增?
WebRTC标准通过 getStats()
API提供了一个数据金矿,它能暴露几百个关于连接和媒体流的实时指标。最初的想法是简单地将这些数据批量发送到我们的日志系统,但很快就发现这是个死胡同。日志系统对于聚合和趋势分析非常低效,无法进行实时的告警和查询。我们需要的是一个真正的时序数据库(TSDB)。
Prometheus是显而易见的选择,它在云原生领域是事实标准。但一个核心的架构矛盾摆在面前:Prometheus采用拉(Pull)模型,它主动抓取目标的 /metrics
端点。而我们的WebRTC客户端运行在全球用户的浏览器中,它们隐藏在NAT和防火墙之后,不可能被Prometheus直接抓取。因此,架构的核心是设计一个中间件,一个“指标收集器”,它能接收来自客户端的主动推送(Push),然后将这些数据转换为Prometheus能够抓取的格式。
整个系统将部署在Google Cloud Platform(GCP)上,具体来说是Google Kubernetes Engine (GKE),因为它为我们提供了运行有状态应用(Prometheus)和无状态应用(指标收集器)所需的一切:弹性、服务发现和持久化存储。
架构设计:从客户端到TSDB的数据流
整个可观测性管道的生命周期如下:
- 客户端(浏览器): 在WebRTC会话期间,一个JavaScript定时器会周期性(例如每10秒)调用
RTCPeerConnection.getStats()
。 - 数据筛选与推送: 客户端脚本会解析
getStats()
返回的庞大报告,从中提取出我们最关心的QoE(Quality of Experience)指标,如packetsLost
,jitter
,roundTripTime
,bytesReceived
等,并将它们连同会话元数据(如userId
,sessionId
,connectionId
)打包成JSON,通过HTTPS POST请求发送到指标收集器。 - 指标收集器(Go Service on GKE): 这是一个我们自己开发的无状态Go服务。它接收客户端的POST请求,将JSON数据转换为Prometheus的文本展览格式。这些转换后的指标会缓存在内存中,并通过一个
/metrics
端点暴露出来。 - Prometheus(on GKE): Prometheus服务器配置了Kubernetes服务发现,它会自动找到所有指标收集器的Pod,并定期从它们的
/metrics
端点拉取数据。 - 存储与查询: Prometheus将抓取到的时序数据持久化到其内置的TSDB中。运维和开发人员现在可以通过PromQL查询具体用户的会话质量,并配置告警规则。
sequenceDiagram participant Client as WebRTC Client (Browser) participant Collector as Go Metrics Collector (GKE) participant Prometheus as Prometheus Server (GKE) loop Every 10s Client->>Client: getStats() Client->>Client: Filter key metrics (jitter, RTT, etc.) Client->>+Collector: POST /v1/metrics (JSON payload) end Collector-->>-Client: 202 Accepted Note right of Collector: Transform JSON to
Prometheus metrics format
and cache in memory. Prometheus->>+Collector: GET /metrics (Scrape) Collector-->>-Prometheus: 200 OK (Exposition Format) Note right of Prometheus: Ingest metrics into TSDB
这个架构的关键在于指标收集器,它巧妙地将客户端的Push模型转换为了Prometheus生态友好的Pull模型。
第一步:客户端的数据采集与上报
在生产环境中,你不能简单地将 getStats()
的所有内容都上报,这会产生巨大的、大部分无用的数据量。核心任务是精确地挑选出反映音视频质量和网络状况的关键指标。
这里的代码是一个精简但生产可用的TypeScript实现,它专注于采集出站(outbound-rtp
)和入站(inbound-rtp
)流的关键指标。
// src/webrtc-stats-collector.ts
interface RtcMetrics {
timestamp: number;
sessionId: string;
userId: string;
connectionId: string;
metrics: Record<string, number | string>;
}
export class WebRTCStatsCollector {
private peerConnection: RTCPeerConnection;
private intervalId?: number;
private readonly endpoint: string;
private readonly sessionId: string;
private readonly userId: string;
constructor(
pc: RTCPeerConnection,
endpoint: string,
sessionId: string,
userId: string
) {
if (!pc) {
throw new Error("RTCPeerConnection instance is required.");
}
this.peerConnection = pc;
this.endpoint = endpoint;
this.sessionId = sessionId;
this.userId = userId;
}
public start(intervalMs: number = 10000): void {
if (this.intervalId) {
console.warn("Stats collector is already running.");
return;
}
this.intervalId = window.setInterval(async () => {
try {
await this.collectAndSend();
} catch (error) {
console.error("Failed to collect and send WebRTC stats:", error);
}
}, intervalMs);
}
public stop(): void {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = undefined;
}
}
private async collectAndSend(): Promise<void> {
const statsReport = await this.peerConnection.getStats();
const metricsPayloads: RtcMetrics[] = [];
statsReport.forEach(report => {
// 在真实项目中,我们会根据 track identifier 关联更多的元数据
// 这里为了简化,我们为每个有意义的 report 单独上报
let extractedMetrics: Record<string, number | string> | null = null;
if (report.type === 'inbound-rtp' && report.kind === 'video') {
extractedMetrics = this.extractInboundRtpMetrics(report);
} else if (report.type === 'outbound-rtp' && report.kind === 'video') {
extractedMetrics = this.extractOutboundRtpMetrics(report);
} else if (report.type === 'remote-inbound-rtp' && report.kind === 'video') {
// 这是对方接收我们发送的流的情况,包含了 RTT
extractedMetrics = this.extractRemoteInboundRtpMetrics(report);
}
if (extractedMetrics) {
metricsPayloads.push({
timestamp: Date.now(),
sessionId: this.sessionId,
userId: this.userId,
connectionId: this.peerConnection.id || 'pc_default', // 在真实应用中,为每个 PC 生成唯一ID
metrics: extractedMetrics,
});
}
});
if (metricsPayloads.length > 0) {
// 使用 navigator.sendBeacon 可以在页面关闭时也尝试发送,更可靠
// 但 sendBeacon 不支持复杂的 content-type 和 header,这里为了演示使用 fetch
fetch(this.endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(metricsPayloads),
keepalive: true, // 确保页面卸载时请求也能继续
}).catch(err => console.error("Beacon failed:", err));
}
}
private extractInboundRtpMetrics(report: any): Record<string, number | string> {
return {
type: 'inbound-video',
ssrc: report.ssrc,
packetsReceived: report.packetsReceived,
packetsLost: report.packetsLost, // 关键指标:接收丢包数
jitter: report.jitter, // 关键指标:网络抖动
framesDropped: report.framesDropped,
bytesReceived: report.bytesReceived,
framesDecoded: report.framesDecoded,
keyFramesDecoded: report.keyFramesDecoded,
firCount: report.firCount,
pliCount: report.pliCount,
nackCount: report.nackCount,
};
}
private extractOutboundRtpMetrics(report: any): Record<string, number | string> {
return {
type: 'outbound-video',
ssrc: report.ssrc,
packetsSent: report.packetsSent,
bytesSent: report.bytesSent,
retransmittedPacketsSent: report.retransmittedPacketsSent,
targetBitrate: report.targetBitrate,
qualityLimitationReason: report.qualityLimitationReason || 'none', // 关键指标:质量受限原因
};
}
private extractRemoteInboundRtpMetrics(report: any): Record<string, number | string> {
return {
type: 'remote-inbound-video',
ssrc: report.ssrc,
packetsLost: report.packetsLost, // 这是对方报告的丢包数
roundTripTime: report.roundTripTime, // 关键指标:端到端往返时间
};
}
}
一个常见的错误是过于频繁地调用 getStats
。10秒一次对于趋势监控是合理的平衡点。过于频繁(例如1秒)会给客户端带来不必要的CPU负担,并产生海量数据,增加后端成本和处理压力。
第二步:构建高并发指标收集器
这个Go服务是整个管道的中枢。它的设计必须满足几个条件:
- 高性能: 能够处理大量并发的HTTP POST请求。
- 高容错: 即使收到格式错误的JSON,也不能崩溃。
- 无状态: 易于在Kubernetes中水平扩展。
- Prometheus兼容: 正确地将指标转换为展览格式,并处理高基数标签。
高基数
是我们面临的主要挑战。每个用户的每个会-话都会生成唯一的sessionId
和userId
。如果我们将这些直接作为Prometheus的标签,标签组合的数量会爆炸式增长,导致Prometheus内存消耗和查询性能下降。
我们的策略是:
- 对会话级的元数据(
sessionId
,userId
)保留为标签,因为这是我们查询的核心维度。 - 对更细粒度的ID(如
ssrc
,transportId
)也保留,因为排查问题时必须下钻到流级别。 - 实现一个内部的指标注册表,对不再活跃的指标(例如会话结束)进行垃圾回收,防止内存无限增长。
以下是指标收集器的核心实现:
// main.go
package main
import (
"encoding/json"
"log"
"net/http"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// MetricPayload 对应客户端发送的单个指标上报
type MetricPayload struct {
SessionID string `json:"sessionId"`
UserID string `json:"userId"`
ConnectionID string `json:"connectionId"`
Metrics map[string]interface{} `json:"metrics"`
}
// metricRegistry 负责动态管理 Prometheus 指标,并处理垃圾回收
type metricRegistry struct {
sync.RWMutex
gauges map[string]*prometheus.GaugeVec
lastSeen map[string]time.Time // 用于GC
labelKeys []string
metricsToGC map[string]bool // 指标是否参与GC
}
var registry *metricRegistry
// init 初始化指标注册表,定义所有可能上报的指标
func init() {
registry = &metricRegistry{
gauges: make(map[string]*prometheus.GaugeVec),
lastSeen: make(map[string]time.Time),
labelKeys: []string{"session_id", "user_id", "connection_id", "ssrc", "type"},
metricsToGC: map[string]bool{
// 只有这些指标需要基于 ssrc 等动态标签,需要GC
"webrtc_packets_lost_total": true,
"webrtc_jitter_seconds": true,
// ...其他需要GC的指标
},
}
// 注册一些核心指标
registry.addGauge("webrtc_packets_lost_total", "Total number of packets lost.")
registry.addGauge("webrtc_jitter_seconds", "Packet jitter in seconds.")
registry.addGauge("webrtc_round_trip_time_seconds", "Round trip time in seconds.")
registry.addGauge("webrtc_bytes_received_total", "Total bytes received.")
registry.addGauge("webrtc_bytes_sent_total", "Total bytes sent.")
registry.addGauge("webrtc_frames_dropped_total", "Total frames dropped.")
// ... 更多指标
}
func (r *metricRegistry) addGauge(name, help string) {
r.gauges[name] = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: name,
Help: help,
}, r.labelKeys)
prometheus.MustRegister(r.gauges[name])
}
// metricsHandler 是接收客户端POST请求的核心处理函数
func metricsHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
return
}
var payloads []MetricPayload
if err := json.NewDecoder(r.Body).Decode(&payloads); err != nil {
log.Printf("Error decoding JSON: %v", err)
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
return
}
registry.Lock()
defer registry.Unlock()
for _, p := range payloads {
ssrcVal, _ := p.Metrics["ssrc"].(float64)
typeVal, _ := p.Metrics["type"].(string)
labels := prometheus.Labels{
"session_id": p.SessionID,
"user_id": p.UserID,
"connection_id": p.ConnectionID,
"ssrc": fmt.Sprintf("%.0f", ssrcVal),
"type": typeVal,
}
// 为这组标签更新 lastSeen 时间戳
// key 由所有标签值构成,确保唯一性
labelKey := fmt.Sprintf("%s-%s-%s-%.0f-%s", p.SessionID, p.UserID, p.ConnectionID, ssrcVal, typeVal)
registry.lastSeen[labelKey] = time.Now()
for key, val := range p.Metrics {
floatVal, ok := val.(float64)
if !ok {
// 对于 'qualityLimitationReason' 等字符串值,可以创建另一个专门的 Gauge
// 这里为了简化,我们只处理数值型
continue
}
promKey := "webrtc_" + toSnakeCase(key) // 转换为 webrtc_packets_lost_total
if gaugeVec, exists := registry.gauges[promKey]; exists {
gaugeVec.With(labels).Set(floatVal)
}
}
}
w.WriteHeader(http.StatusAccepted)
}
// startGC 启动一个goroutine定期清理过期的指标
func startGC(interval time.Duration, expiration time.Duration) {
ticker := time.NewTicker(interval)
go func() {
for range ticker.C {
registry.Lock()
now := time.Now()
for labelKey, ts := range registry.lastSeen {
if now.Sub(ts) > expiration {
// 实际的删除逻辑比较复杂,需要从 lastSeen 和所有相关的 GaugeVec 中清理
// Prometheus client-go 没有直接的 Detach/Delete 方法,
// 一个常见实践是让Prometheus的 `scrape_interval` 和这里的 `expiration` 配合
// Prometheus 在一段时间内没抓取到某个series后会自动标记为stale。
// 更彻底的清理需要重启服务或使用更复杂的自定义注册表。
// 这里我们只从lastSeen中删除,防止其无限增长。
delete(registry.lastSeen, labelKey)
}
}
registry.Unlock()
log.Println("GC cycle finished.")
}
}()
}
func main() {
// 启动指标清理,每5分钟检查一次,清理超过15分钟未上报的指标
startGC(5*time.Minute, 15*time.Minute)
http.HandleFunc("/v1/metrics", metricsHandler)
http.Handle("/metrics", promhttp.Handler())
log.Println("Starting WebRTC metrics collector on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}
// toSnakeCase 是一个辅助函数,将驼峰命名转换为蛇形命名
// 例如: packetsLost -> packets_lost_total, jitter -> jitter_seconds
// ... (具体实现省略)
这个Go服务的设计考虑了生产环境的细节。例如,GC机制对于一个长期运行的服务至关重要,它可以防止因旧会话的标签残留而导致的内存泄漏。在真实项目中,GC的实现会更复杂,可能需要自定义 prometheus.Collector
接口来完全控制指标的生命周期。
第三步:在GKE上部署和暴露服务
现在我们将Go收集器容器化,并通过Kubernetes部署到GKE。
Dockerfile
# Stage 1: Build the Go binary
FROM golang:1.20-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o webrtc-collector .
# Stage 2: Create a minimal final image
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY /app/webrtc-collector .
EXPOSE 8080
CMD ["./webrtc-collector"]
deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: webrtc-collector
labels:
app: webrtc-collector
spec:
replicas: 3 # 从3个副本开始,根据负载自动扩展
selector:
matchLabels:
app: webrtc-collector
template:
metadata:
labels:
app: webrtc-collector
spec:
containers:
- name: collector
image: gcr.io/your-gcp-project/webrtc-collector:v1.0.0
ports:
- containerPort: 8080
resources:
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "500m"
memory: "256Mi"
---
apiVersion: v1
kind: Service
metadata:
name: webrtc-collector
annotations:
# Prometheus Operator 的 ServiceMonitor 会识别这个注解
prometheus.io/scrape: 'true'
prometheus.io/port: '8080'
labels:
app: webrtc-collector
spec:
selector:
app: webrtc-collector
ports:
- name: http
port: 80
targetPort: 8080
type: ClusterIP # 服务只在集群内部可访问
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: webrtc-collector-ingress
spec:
rules:
- http:
paths:
- path: /v1/metrics
pathType: Prefix
backend:
service:
name: webrtc-collector
port:
number: 80
部署这套YAML后,我们就有了一个可水平扩展的、能通过Ingress从公网接收客户端指标的收集器集群。Service
上的prometheus.io/scrape: 'true'
注解是关键,它使得运行在GKE上的Prometheus能够自动发现并开始抓取这些Pod。
第四步:配置Prometheus抓取
如果你使用Prometheus Operator,上述注解就足够了。如果手动配置Prometheus,你需要添加一个基于Kubernetes服务发现的抓取配置。
prometheus.yml
(部分)
scrape_configs:
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
# 只抓取带有 prometheus.io/scrape=true 注解的 Pod
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
# ... 其他 relabeling 规则
至此,整个管道已经打通。客户端的WebRTC统计信息会源源不断地流入Prometheus TSDB。我们可以使用如下的PromQL来诊断问题:
# 查询某个特定会话的视频流往返时间(RTT)
webrtc_round_trip_time_seconds{session_id="session-abc-123", type="remote-inbound-video"}
# 计算过去5分钟内,所有视频接收流的平均网络抖动
avg(webrtc_jitter_seconds{type="inbound-video"})
# 找到过去15分钟内丢包率最高的10个用户
topk(10, rate(webrtc_packets_lost_total{type="inbound-video"}[5m]) / (rate(webrtc_packets_lost_total{type="inbound-video"}[5m]) + rate(webrtc_packets_received_total{type="inbound-video"}[5m])))
我们终于获得了之前梦寐以求的洞察力。
当前方案的局限性与未来展望
这个方案有效地解决了WebRTC会话质量的实时监控问题,但它并非银弹。在真实项目中,我们必须正视其局限性:
- Prometheus的长期存储: 单机Prometheus不适合存储超过几个月的海量高基数数据。随着业务增长,必须引入如Thanos或Cortex这样的方案,利用GCS(Google Cloud Storage)作为廉价的长期存储后端,并提供全局查询视图。
- 基数问题仍是达摩克利斯之剑: 尽管我们设计了GC,但如果用户量和会话频率极高,标签基数依然可能压垮中等规模的Prometheus实例。一种演进方向是在收集器层面进行预聚合,例如,不再为每个会话上报原始指标,而是计算区域、网络类型(WiFi/4G)等维度的P95、P99分位数,从而大幅降低基数。
- 数据孤岛: 当前系统只包含了WebRTC客户端指标。一个完整的可观测性方案需要将这些数据与服务器端日志、应用APM追踪以及基础设施指标关联起来。例如,当发现某个区域用户普遍RTT升高时,能够快速关联到该区域的SFU(Selective Forwarding Unit)服务器是否出现CPU瓶颈。这需要一个统一的可观测性平台,并将
sessionId
和userId
作为贯穿所有系统的关联ID。
下一步的迭代方向是明确的:集成Thanos进行扩展,并探索在收集器端使用流处理引擎(如Flink)进行实时聚合,将原始事件流送入BigQuery进行更复杂的离线分析,同时将聚合后的指标送入Prometheus用于实时告警。这套架构为我们从被动响应用户投诉,转向主动发现并解决潜在质量问题打下了坚实的基础。