我们的核心计算服务,出于成本和历史原因,部署在DigitalOcean的Droplets上。然而,对于事件驱动和异步任务处理,我们选择了Google Cloud Pub/Sub,因为它具备出色的可扩展性和托管便利性。这就产生了一个直接的技术痛点:如何将GCP中产生的实时事件,以低延迟、高可靠的方式推送给运行在浏览器端的React监控面板?
最初的构想是前端轮询一个后端的API端点,但这显然无法满足实时性的要求,并且会造成大量的无效HTTP请求。我们需要一个持久连接的解决方案。技术选型最终落在了WebSocket上。为此,我们需要在DigitalOcean上构建一个中间件服务,我称之为“消息桥”。这个桥的角色是:订阅GCP Pub/Sub的消息,然后通过WebSocket将这些消息实时广播给所有连接的React客户端。
这个桥的设计必须考虑生产环境的复杂性。它不能是一个简单的“收到即转发”的代理。它必须处理网络中断、GCP连接失败、客户端突然断开等一系列问题。在真实项目中,稳定性永远是第一位的。
架构设计与核心组件
整个数据流是单向的:从后端服务发布消息到GCP Pub/Sub,消息桥订阅这些消息,再通过WebSocket推送到React前端。
graph TD A[后端服务 on DO] -- publishes --> B(GCP Pub/Sub Topic); B -- push/pull --> C{C# 消息桥 on DO}; C -- subscribes --> B; D[React 客户端 1] -- WebSocket --> C; E[React 客户端 2] -- WebSocket --> C; F[React 客户端 N] -- WebSocket --> C; C -- broadcasts --> D; C -- broadcasts --> E; C -- broadcasts --> F;
这个C#消息桥的核心由三个部分组成:
- Pub/Sub监听器 (
PubSubListenerService
): 一个后台托管服务 (IHostedService
),负责与GCP Pub/Sub建立连接,并持续拉取消息。这是系统的入口。 - WebSocket连接管理器 (
WebSocketMiddleware
): 一个ASP.NET Core中间件,负责处理WebSocket的握手请求,并管理所有活跃的客户端连接。 - 内存消息通道 (
System.Threading.Channels.Channel
): 一个高性能的、线程安全的内存队列。Pub/Sub监听器将收到的消息写入这个通道,而WebSocket管理器则从通道中读取消息并广播出去。使用Channel可以有效解耦生产者(Pub/Sub监听器)和消费者(WebSocket广播器),并天然支持背压。
关键实现:C#消息桥
首先,我们需要配置GCP的服务账号。在GCP控制台创建一个服务账号,赋予其”Pub/Sub Subscriber”角色,然后下载对应的JSON凭证文件。这个文件对于部署在DigitalOcean上的C#应用至关重要。
appsettings.Production.json
配置示例:
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"Gcp": {
"ProjectId": "your-gcp-project-id",
"SubscriptionId": "your-pubsub-subscription-id",
"ServiceAccountKeyPath": "/etc/secrets/gcp-credentials.json"
},
"AllowedHosts": "*"
}
这里的ServiceAccountKeyPath
指向一个在服务器上安全存放的路径,而不是硬编码或随代码提交。
1. Pub/Sub 监听器与韧性设计
监听器必须足够健壮,以应对连接GCP时可能出现的各种网络问题。一个常见的错误是简单地在循环中拉取消息,一旦发生异常就导致整个后台服务崩溃。在真实项目中,我们会使用Polly这样的库来增加重试和熔断机制。
PubSubListenerService.cs
:
using Google.Cloud.PubSub.V1;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Polly;
using Polly.Retry;
using System;
public class PubSubListenerService : BackgroundService
{
private readonly ILogger<PubSubListenerService> _logger;
private readonly ChannelWriter<string> _channelWriter;
private readonly GcpOptions _gcpOptions;
private readonly AsyncRetryPolicy _retryPolicy;
public PubSubListenerService(
ILogger<PubSubListenerService> logger,
Channel<string> channel,
IOptions<GcpOptions> gcpOptions)
{
_logger = logger;
_channelWriter = channel.Writer;
_gcpOptions = gcpOptions.Value;
// 定义一个指数退避重试策略,处理临时的网络或GCP服务抖动
_retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryForeverAsync(
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt > 5 ? 5 : retryAttempt)), // 最大32秒
(exception, timespan, context) =>
{
_logger.LogWarning(exception, "Failed to connect to GCP Pub/Sub. Retrying in {timespan}s...", timespan.TotalSeconds);
});
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Pub/Sub Listener Service is starting.");
// 使用Polly策略包装整个订阅逻辑
await _retryPolicy.ExecuteAsync(async (token) =>
{
await SubscribeToMessagesAsync(token);
}, stoppingToken);
_logger.LogInformation("Pub/Sub Listener Service is stopping.");
}
private async Task SubscribeToMessagesAsync(CancellationToken stoppingToken)
{
var subscriptionName = SubscriptionName.FromProjectSubscription(_gcpOptions.ProjectId, _gcpOptions.SubscriptionId);
// 确保服务账号凭证配置正确
var subscriberClient = await new SubscriberClientBuilder
{
SubscriptionName = subscriptionName,
CredentialsPath = _gcpOptions.ServiceAccountKeyPath
}.BuildAsync(stoppingToken);
await subscriberClient.StartAsync(async (PubsubMessage message, CancellationToken ct) =>
{
var text = System.Text.Encoding.UTF8.GetString(message.Data.ToArray());
_logger.LogInformation("Message received from Pub/Sub: {MessageId}", message.MessageId);
// 尝试将消息写入Channel,如果Channel已满则等待
await _channelWriter.WriteAsync(text, ct);
// 确认消息,防止重复消费
return SubscriberClient.Reply.Ack;
});
_logger.LogInformation("Successfully connected to GCP Pub/Sub subscription: {Subscription}", subscriptionName);
// 保持服务运行直到取消
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
// 用于从appsettings.json中读取配置的类
public class GcpOptions
{
public string ProjectId { get; set; }
public string SubscriptionId { get; set; }
public string ServiceAccountKeyPath { get; set; }
}
这里的关键是_retryPolicy
。它不是简单地重试一次,而是采用指数退避策略无限重试。这使得服务在DigitalOcean实例或网络连接从长时间中断中恢复后,能够自动重新连接到GCP Pub/Sub,而无需手动干预。
2. WebSocket 中间件与连接管理
ASP.NET Core 对 WebSocket 提供了底层支持,但我们需要自己实现连接管理和广播逻辑。
WebSocketMiddleware.cs
:
using Microsoft.AspNetCore.Http;
using System.Net.WebSockets;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
using System;
using Microsoft.Extensions.Logging;
using System.Text;
public class WebSocketMiddleware
{
private readonly RequestDelegate _next;
private readonly ILogger<WebSocketMiddleware> _logger;
// 使用并发字典来安全地管理WebSocket连接
private static readonly ConcurrentDictionary<string, WebSocket> _sockets = new ConcurrentDictionary<string, WebSocket>();
public WebSocketMiddleware(RequestDelegate next, ILogger<WebSocketMiddleware> logger)
{
_next = next;
_logger = logger;
}
public async Task InvokeAsync(HttpContext context)
{
if (!context.WebSockets.IsWebSocketRequest)
{
await _next(context);
return;
}
var socket = await context.WebSockets.AcceptWebSocketAsync();
var socketId = Guid.NewGuid().ToString();
_sockets.TryAdd(socketId, socket);
_logger.LogInformation("WebSocket connected: {SocketId}", socketId);
try
{
// 保持连接开放,处理客户端消息(如果需要)或等待关闭
var buffer = new byte[1024 * 4];
WebSocketReceiveResult result = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
while (!result.CloseStatus.HasValue)
{
// 在此可以处理客户端发来的消息,但本场景主要是服务器推送
result = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
await socket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
}
catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
{
_logger.LogWarning("WebSocket connection closed prematurely: {SocketId}", socketId);
}
finally
{
_sockets.TryRemove(socketId, out _);
_logger.LogInformation("WebSocket disconnected: {SocketId}", socketId);
}
}
// 广播方法,由其他服务调用
public static async Task BroadcastMessageAsync(string message)
{
var buffer = Encoding.UTF8.GetBytes(message);
var segment = new ArraySegment<byte>(buffer);
// 并行地向所有连接的客户端发送消息
var tasks = _sockets.Values.Select(socket =>
{
if (socket.State == WebSocketState.Open)
{
return socket.SendAsync(segment, WebSocketMessageType.Text, true, CancellationToken.None);
}
return Task.CompletedTask;
});
await Task.WhenAll(tasks);
}
}
3. 粘合剂:Program.cs
和广播服务
我们需要一个服务来从内存 Channel
中读取数据并调用 WebSocketMiddleware.BroadcastMessageAsync
。同样,IHostedService
是理想的选择。
WebSocketBroadcastService.cs
:
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
public class WebSocketBroadcastService : BackgroundService
{
private readonly ILogger<WebSocketBroadcastService> _logger;
private readonly ChannelReader<string> _channelReader;
public WebSocketBroadcastService(ILogger<WebSocketBroadcastService> logger, Channel<string> channel)
{
_logger = logger;
_channelReader = channel.Reader;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("WebSocket Broadcast Service is starting.");
// 持续从Channel读取,直到Channel被关闭且为空
await foreach (var message in _channelReader.ReadAllAsync(stoppingToken))
{
try
{
await WebSocketMiddleware.BroadcastMessageAsync(message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error broadcasting message to WebSockets.");
}
}
_logger.LogInformation("WebSocket Broadcast Service is stopping.");
}
}
最后,在 Program.cs
中把所有东西组装起来:
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;
var builder = WebApplication.CreateBuilder(args);
// 1. 配置注入
builder.Services.Configure<GcpOptions>(builder.Configuration.GetSection("Gcp"));
// 2. 注册Channel为单例,以便在服务间共享
builder.Services.AddSingleton(Channel.CreateUnbounded<string>());
// 3. 注册后台服务
builder.Services.AddHostedService<PubSubListenerService>();
builder.Services.AddHostedService<WebSocketBroadcastService>();
var app = builder.Build();
// 4. 配置WebSocket中间件
app.UseWebSockets();
app.UseMiddleware<WebSocketMiddleware>();
// 简单的HTTP GET端点,用于健康检查
app.MapGet("/", () => "Message Bridge is running.");
app.Run();
React 前端实现
前端需要一个健壮的WebSocket客户端,能够处理连接、断开和重连。一个自定义的React Hook是封装此逻辑的优雅方式。
useRealtimeLogs.js
:
import { useState, useEffect, useRef } from 'react';
const WEBSOCKET_URL = 'wss://your-do-domain.com/ws';
export const useRealtimeLogs = () => {
const [logs, setLogs] = useState([]);
const [isConnected, setIsConnected] = useState(false);
const ws = useRef(null);
const reconnectAttempt = useRef(0);
const connect = () => {
if (ws.current && ws.current.readyState === WebSocket.OPEN) {
return;
}
ws.current = new WebSocket(WEBSOCKET_URL);
ws.current.onopen = () => {
console.log('WebSocket connected');
setIsConnected(true);
reconnectAttempt.current = 0; // 重置重试计数器
};
ws.current.onmessage = (event) => {
// 这里的坑在于,直接解析JSON可能会因为消息格式问题崩溃
try {
const newLog = JSON.parse(event.data);
// 为了性能,只保留最新的100条日志
setLogs(prevLogs => [newLog, ...prevLogs.slice(0, 99)]);
} catch (error) {
console.error('Failed to parse incoming log message:', event.data, error);
}
};
ws.current.onclose = () => {
console.log('WebSocket disconnected');
setIsConnected(false);
// 实现指数退避重连逻辑
const timeout = Math.pow(2, reconnectAttempt.current) * 1000;
console.log(`Attempting to reconnect in ${timeout / 1000}s...`);
setTimeout(connect, Math.min(timeout, 30000)); // 最大间隔30秒
reconnectAttempt.current++;
};
ws.current.onerror = (error) => {
console.error('WebSocket error:', error);
ws.current.close();
};
};
useEffect(() => {
connect();
return () => {
if (ws.current) {
ws.current.close();
}
};
}, []);
return { logs, isConnected };
};
在React组件中使用这个Hook就非常简单了:
import React from 'react';
import { useRealtimeLogs } from './useRealtimeLogs';
const LogDashboard = () => {
const { logs, isConnected } = useRealtimeLogs();
return (
<div>
<h1>Real-time Log Stream</h1>
<p>Connection Status: {isConnected ? 'Connected' : 'Disconnected'}</p>
<div style={{ fontFamily: 'monospace', background: '#f0f0f0', padding: '10px', height: '500px', overflowY: 'auto' }}>
{logs.map((log, index) => (
<div key={index}>{JSON.stringify(log)}</div>
))}
</div>
</div>
);
};
部署到DigitalOcean
我们将C#应用容器化以便于部署。
Dockerfile
:
FROM mcr.microsoft.com/dotnet/aspnet:7.0 AS base
WORKDIR /app
EXPOSE 80
FROM mcr.microsoft.com/dotnet/sdk:7.0 AS build
WORKDIR /src
COPY ["MessageBridge.csproj", "./"]
RUN dotnet restore "./MessageBridge.csproj"
COPY . .
WORKDIR "/src/."
RUN dotnet build "MessageBridge.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "MessageBridge.csproj" -c Release -o /app/publish /p:UseAppHost=false
FROM base AS final
WORKDIR /app
COPY /app/publish .
ENTRYPOINT ["dotnet", "MessageBridge.dll"]
在DigitalOcean Droplet上,我们使用Nginx作为反向代理。这不仅是为了托管静态文件(如果需要),更重要的是处理HTTPS和将WebSocket流量正确代理到后台的Kestrel服务器。
nginx.conf
关键配置:
server {
listen 443 ssl http2;
server_name your-do-domain.com;
ssl_certificate /etc/letsencrypt/live/your-do-domain.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/your-do-domain.com/privkey.pem;
location / {
proxy_pass http://localhost:5000;
proxy_http_version 1.1;
# 这些头对于WebSocket代理至关重要
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
proxy_set_header Upgrade $http_upgrade;
和 proxy_set_header Connection "upgrade";
是Nginx代理WebSocket请求的魔法所在。没有它们,连接握手会失败。
局限性与未来迭代路径
当前这个单实例的消息桥架构存在一个明显的单点故障。如果运行它的Droplet宕机,所有实时消息推送都会中断。对于要求更高可用性的系统,下一步的演进是部署多个消息桥实例,并使用负载均衡器分发流量。但这会引入新的复杂性:WebSocket连接是状态化的,一个客户端连接到一个实例后,后续消息必须由该实例推送。标准的轮询负载均衡策略会失效。解决方案通常是引入一个共享的后端,如Redis Pub/Sub,让所有消息桥实例都订阅它,从而确保任何一个实例都能将消息广播给它所连接的客户端。
此外,当前方案没有实现任何认证或授权机制。任何知道WebSocket端点的人都可以连接。在生产环境中,必须在WebSocket握手阶段集成身份验证,例如通过JWT令牌,以确保只有授权用户才能接收实时数据。