使用C#在DigitalOcean构建连接GCP Pub/Sub与React的高可用WebSocket消息桥


我们的核心计算服务,出于成本和历史原因,部署在DigitalOcean的Droplets上。然而,对于事件驱动和异步任务处理,我们选择了Google Cloud Pub/Sub,因为它具备出色的可扩展性和托管便利性。这就产生了一个直接的技术痛点:如何将GCP中产生的实时事件,以低延迟、高可靠的方式推送给运行在浏览器端的React监控面板?

最初的构想是前端轮询一个后端的API端点,但这显然无法满足实时性的要求,并且会造成大量的无效HTTP请求。我们需要一个持久连接的解决方案。技术选型最终落在了WebSocket上。为此,我们需要在DigitalOcean上构建一个中间件服务,我称之为“消息桥”。这个桥的角色是:订阅GCP Pub/Sub的消息,然后通过WebSocket将这些消息实时广播给所有连接的React客户端。

这个桥的设计必须考虑生产环境的复杂性。它不能是一个简单的“收到即转发”的代理。它必须处理网络中断、GCP连接失败、客户端突然断开等一系列问题。在真实项目中,稳定性永远是第一位的。

架构设计与核心组件

整个数据流是单向的:从后端服务发布消息到GCP Pub/Sub,消息桥订阅这些消息,再通过WebSocket推送到React前端。

graph TD
    A[后端服务 on DO] -- publishes --> B(GCP Pub/Sub Topic);
    B -- push/pull --> C{C# 消息桥 on DO};
    C -- subscribes --> B;
    D[React 客户端 1] -- WebSocket --> C;
    E[React 客户端 2] -- WebSocket --> C;
    F[React 客户端 N] -- WebSocket --> C;
    C -- broadcasts --> D;
    C -- broadcasts --> E;
    C -- broadcasts --> F;

这个C#消息桥的核心由三个部分组成:

  1. Pub/Sub监听器 (PubSubListenerService): 一个后台托管服务 (IHostedService),负责与GCP Pub/Sub建立连接,并持续拉取消息。这是系统的入口。
  2. WebSocket连接管理器 (WebSocketMiddleware): 一个ASP.NET Core中间件,负责处理WebSocket的握手请求,并管理所有活跃的客户端连接。
  3. 内存消息通道 (System.Threading.Channels.Channel): 一个高性能的、线程安全的内存队列。Pub/Sub监听器将收到的消息写入这个通道,而WebSocket管理器则从通道中读取消息并广播出去。使用Channel可以有效解耦生产者(Pub/Sub监听器)和消费者(WebSocket广播器),并天然支持背压。

关键实现:C#消息桥

首先,我们需要配置GCP的服务账号。在GCP控制台创建一个服务账号,赋予其”Pub/Sub Subscriber”角色,然后下载对应的JSON凭证文件。这个文件对于部署在DigitalOcean上的C#应用至关重要。

appsettings.Production.json 配置示例:

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "Gcp": {
    "ProjectId": "your-gcp-project-id",
    "SubscriptionId": "your-pubsub-subscription-id",
    "ServiceAccountKeyPath": "/etc/secrets/gcp-credentials.json"
  },
  "AllowedHosts": "*"
}

这里的ServiceAccountKeyPath指向一个在服务器上安全存放的路径,而不是硬编码或随代码提交。

1. Pub/Sub 监听器与韧性设计

监听器必须足够健壮,以应对连接GCP时可能出现的各种网络问题。一个常见的错误是简单地在循环中拉取消息,一旦发生异常就导致整个后台服务崩溃。在真实项目中,我们会使用Polly这样的库来增加重试和熔断机制。

PubSubListenerService.cs:

using Google.Cloud.PubSub.V1;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Polly;
using Polly.Retry;
using System;

public class PubSubListenerService : BackgroundService
{
    private readonly ILogger<PubSubListenerService> _logger;
    private readonly ChannelWriter<string> _channelWriter;
    private readonly GcpOptions _gcpOptions;
    private readonly AsyncRetryPolicy _retryPolicy;

    public PubSubListenerService(
        ILogger<PubSubListenerService> logger,
        Channel<string> channel,
        IOptions<GcpOptions> gcpOptions)
    {
        _logger = logger;
        _channelWriter = channel.Writer;
        _gcpOptions = gcpOptions.Value;

        // 定义一个指数退避重试策略,处理临时的网络或GCP服务抖动
        _retryPolicy = Policy
            .Handle<Exception>()
            .WaitAndRetryForeverAsync(
                retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt > 5 ? 5 : retryAttempt)), // 最大32秒
                (exception, timespan, context) =>
                {
                    _logger.LogWarning(exception, "Failed to connect to GCP Pub/Sub. Retrying in {timespan}s...", timespan.TotalSeconds);
                });
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Pub/Sub Listener Service is starting.");
        
        // 使用Polly策略包装整个订阅逻辑
        await _retryPolicy.ExecuteAsync(async (token) =>
        {
            await SubscribeToMessagesAsync(token);
        }, stoppingToken);

        _logger.LogInformation("Pub/Sub Listener Service is stopping.");
    }

    private async Task SubscribeToMessagesAsync(CancellationToken stoppingToken)
    {
        var subscriptionName = SubscriptionName.FromProjectSubscription(_gcpOptions.ProjectId, _gcpOptions.SubscriptionId);
        
        // 确保服务账号凭证配置正确
        var subscriberClient = await new SubscriberClientBuilder
        {
            SubscriptionName = subscriptionName,
            CredentialsPath = _gcpOptions.ServiceAccountKeyPath
        }.BuildAsync(stoppingToken);

        await subscriberClient.StartAsync(async (PubsubMessage message, CancellationToken ct) =>
        {
            var text = System.Text.Encoding.UTF8.GetString(message.Data.ToArray());
            _logger.LogInformation("Message received from Pub/Sub: {MessageId}", message.MessageId);

            // 尝试将消息写入Channel,如果Channel已满则等待
            await _channelWriter.WriteAsync(text, ct);

            // 确认消息,防止重复消费
            return SubscriberClient.Reply.Ack;
        });

        _logger.LogInformation("Successfully connected to GCP Pub/Sub subscription: {Subscription}", subscriptionName);

        // 保持服务运行直到取消
        await Task.Delay(Timeout.Infinite, stoppingToken);
    }
}

// 用于从appsettings.json中读取配置的类
public class GcpOptions
{
    public string ProjectId { get; set; }
    public string SubscriptionId { get; set; }
    public string ServiceAccountKeyPath { get; set; }
}

这里的关键是_retryPolicy。它不是简单地重试一次,而是采用指数退避策略无限重试。这使得服务在DigitalOcean实例或网络连接从长时间中断中恢复后,能够自动重新连接到GCP Pub/Sub,而无需手动干预。

2. WebSocket 中间件与连接管理

ASP.NET Core 对 WebSocket 提供了底层支持,但我们需要自己实现连接管理和广播逻辑。

WebSocketMiddleware.cs:

using Microsoft.AspNetCore.Http;
using System.Net.WebSockets;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
using System;
using Microsoft.Extensions.Logging;
using System.Text;

public class WebSocketMiddleware
{
    private readonly RequestDelegate _next;
    private readonly ILogger<WebSocketMiddleware> _logger;
    // 使用并发字典来安全地管理WebSocket连接
    private static readonly ConcurrentDictionary<string, WebSocket> _sockets = new ConcurrentDictionary<string, WebSocket>();

    public WebSocketMiddleware(RequestDelegate next, ILogger<WebSocketMiddleware> logger)
    {
        _next = next;
        _logger = logger;
    }

    public async Task InvokeAsync(HttpContext context)
    {
        if (!context.WebSockets.IsWebSocketRequest)
        {
            await _next(context);
            return;
        }

        var socket = await context.WebSockets.AcceptWebSocketAsync();
        var socketId = Guid.NewGuid().ToString();
        _sockets.TryAdd(socketId, socket);
        _logger.LogInformation("WebSocket connected: {SocketId}", socketId);

        try
        {
            // 保持连接开放,处理客户端消息(如果需要)或等待关闭
            var buffer = new byte[1024 * 4];
            WebSocketReceiveResult result = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            while (!result.CloseStatus.HasValue)
            {
                // 在此可以处理客户端发来的消息,但本场景主要是服务器推送
                result = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            }
            await socket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
        }
        catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
        {
            _logger.LogWarning("WebSocket connection closed prematurely: {SocketId}", socketId);
        }
        finally
        {
            _sockets.TryRemove(socketId, out _);
            _logger.LogInformation("WebSocket disconnected: {SocketId}", socketId);
        }
    }

    // 广播方法,由其他服务调用
    public static async Task BroadcastMessageAsync(string message)
    {
        var buffer = Encoding.UTF8.GetBytes(message);
        var segment = new ArraySegment<byte>(buffer);

        // 并行地向所有连接的客户端发送消息
        var tasks = _sockets.Values.Select(socket =>
        {
            if (socket.State == WebSocketState.Open)
            {
                return socket.SendAsync(segment, WebSocketMessageType.Text, true, CancellationToken.None);
            }
            return Task.CompletedTask;
        });

        await Task.WhenAll(tasks);
    }
}

3. 粘合剂:Program.cs 和广播服务

我们需要一个服务来从内存 Channel 中读取数据并调用 WebSocketMiddleware.BroadcastMessageAsync。同样,IHostedService 是理想的选择。

WebSocketBroadcastService.cs:

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

public class WebSocketBroadcastService : BackgroundService
{
    private readonly ILogger<WebSocketBroadcastService> _logger;
    private readonly ChannelReader<string> _channelReader;

    public WebSocketBroadcastService(ILogger<WebSocketBroadcastService> logger, Channel<string> channel)
    {
        _logger = logger;
        _channelReader = channel.Reader;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("WebSocket Broadcast Service is starting.");
        
        // 持续从Channel读取,直到Channel被关闭且为空
        await foreach (var message in _channelReader.ReadAllAsync(stoppingToken))
        {
            try
            {
                await WebSocketMiddleware.BroadcastMessageAsync(message);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error broadcasting message to WebSockets.");
            }
        }

        _logger.LogInformation("WebSocket Broadcast Service is stopping.");
    }
}

最后,在 Program.cs 中把所有东西组装起来:

using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;

var builder = WebApplication.CreateBuilder(args);

// 1. 配置注入
builder.Services.Configure<GcpOptions>(builder.Configuration.GetSection("Gcp"));

// 2. 注册Channel为单例,以便在服务间共享
builder.Services.AddSingleton(Channel.CreateUnbounded<string>());

// 3. 注册后台服务
builder.Services.AddHostedService<PubSubListenerService>();
builder.Services.AddHostedService<WebSocketBroadcastService>();

var app = builder.Build();

// 4. 配置WebSocket中间件
app.UseWebSockets();
app.UseMiddleware<WebSocketMiddleware>();

// 简单的HTTP GET端点,用于健康检查
app.MapGet("/", () => "Message Bridge is running.");

app.Run();

React 前端实现

前端需要一个健壮的WebSocket客户端,能够处理连接、断开和重连。一个自定义的React Hook是封装此逻辑的优雅方式。

useRealtimeLogs.js:

import { useState, useEffect, useRef } from 'react';

const WEBSOCKET_URL = 'wss://your-do-domain.com/ws';

export const useRealtimeLogs = () => {
    const [logs, setLogs] = useState([]);
    const [isConnected, setIsConnected] = useState(false);
    const ws = useRef(null);
    const reconnectAttempt = useRef(0);

    const connect = () => {
        if (ws.current && ws.current.readyState === WebSocket.OPEN) {
            return;
        }

        ws.current = new WebSocket(WEBSOCKET_URL);

        ws.current.onopen = () => {
            console.log('WebSocket connected');
            setIsConnected(true);
            reconnectAttempt.current = 0; // 重置重试计数器
        };

        ws.current.onmessage = (event) => {
            // 这里的坑在于,直接解析JSON可能会因为消息格式问题崩溃
            try {
                const newLog = JSON.parse(event.data);
                // 为了性能,只保留最新的100条日志
                setLogs(prevLogs => [newLog, ...prevLogs.slice(0, 99)]);
            } catch (error) {
                console.error('Failed to parse incoming log message:', event.data, error);
            }
        };

        ws.current.onclose = () => {
            console.log('WebSocket disconnected');
            setIsConnected(false);
            
            // 实现指数退避重连逻辑
            const timeout = Math.pow(2, reconnectAttempt.current) * 1000;
            console.log(`Attempting to reconnect in ${timeout / 1000}s...`);
            setTimeout(connect, Math.min(timeout, 30000)); // 最大间隔30秒
            reconnectAttempt.current++;
        };

        ws.current.onerror = (error) => {
            console.error('WebSocket error:', error);
            ws.current.close();
        };
    };

    useEffect(() => {
        connect();

        return () => {
            if (ws.current) {
                ws.current.close();
            }
        };
    }, []);

    return { logs, isConnected };
};

在React组件中使用这个Hook就非常简单了:

import React from 'react';
import { useRealtimeLogs } from './useRealtimeLogs';

const LogDashboard = () => {
    const { logs, isConnected } = useRealtimeLogs();

    return (
        <div>
            <h1>Real-time Log Stream</h1>
            <p>Connection Status: {isConnected ? 'Connected' : 'Disconnected'}</p>
            <div style={{ fontFamily: 'monospace', background: '#f0f0f0', padding: '10px', height: '500px', overflowY: 'auto' }}>
                {logs.map((log, index) => (
                    <div key={index}>{JSON.stringify(log)}</div>
                ))}
            </div>
        </div>
    );
};

部署到DigitalOcean

我们将C#应用容器化以便于部署。

Dockerfile:

FROM mcr.microsoft.com/dotnet/aspnet:7.0 AS base
WORKDIR /app
EXPOSE 80

FROM mcr.microsoft.com/dotnet/sdk:7.0 AS build
WORKDIR /src
COPY ["MessageBridge.csproj", "./"]
RUN dotnet restore "./MessageBridge.csproj"
COPY . .
WORKDIR "/src/."
RUN dotnet build "MessageBridge.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "MessageBridge.csproj" -c Release -o /app/publish /p:UseAppHost=false

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "MessageBridge.dll"]

在DigitalOcean Droplet上,我们使用Nginx作为反向代理。这不仅是为了托管静态文件(如果需要),更重要的是处理HTTPS和将WebSocket流量正确代理到后台的Kestrel服务器。

nginx.conf 关键配置:

server {
    listen 443 ssl http2;
    server_name your-do-domain.com;

    ssl_certificate /etc/letsencrypt/live/your-do-domain.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/your-do-domain.com/privkey.pem;

    location / {
        proxy_pass http://localhost:5000;
        proxy_http_version 1.1;
        # 这些头对于WebSocket代理至关重要
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_cache_bypass $http_upgrade;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}

proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade"; 是Nginx代理WebSocket请求的魔法所在。没有它们,连接握手会失败。

局限性与未来迭代路径

当前这个单实例的消息桥架构存在一个明显的单点故障。如果运行它的Droplet宕机,所有实时消息推送都会中断。对于要求更高可用性的系统,下一步的演进是部署多个消息桥实例,并使用负载均衡器分发流量。但这会引入新的复杂性:WebSocket连接是状态化的,一个客户端连接到一个实例后,后续消息必须由该实例推送。标准的轮询负载均衡策略会失效。解决方案通常是引入一个共享的后端,如Redis Pub/Sub,让所有消息桥实例都订阅它,从而确保任何一个实例都能将消息广播给它所连接的客户端。

此外,当前方案没有实现任何认证或授权机制。任何知道WebSocket端点的人都可以连接。在生产环境中,必须在WebSocket握手阶段集成身份验证,例如通过JWT令牌,以确保只有授权用户才能接收实时数据。


  目录