基于领域事件与AWS SQS构建Django到C#的最终一致性数据管道


我们面临一个典型的异构系统集成挑战:一个核心业务系统由 Django 构建,负责商品数据的全生命周期管理;而一个新的、高性能的搜索与推荐服务则决定采用 C# 和 .NET 平台,以利用其类型安全和计算性能优势。问题在于,如何以一种可靠、解耦且可扩展的方式,将 Django 中商品数据的变更实时同步到 C# 服务,并最终索引到 Meilisearch 中。

方案A:基于API的同步拉取

最初的构想是暴露一个 Django REST Framework (DRF) 端点,提供一个带有时间戳或版本号的商品变更列表。C# 服务则通过一个定时任务(如 Hangfire 或 Quartz.NET)定期轮询此端点,拉取增量数据并更新自身的数据库和 Meilisearch 索引。

优点:

  • 实现简单直观,开发周期短。
  • 技术栈成熟,易于理解和维护。

致命缺陷:

  1. 紧耦合: C# 服务强依赖于 Django API 的可用性和契约。Django 服务的任何网络抖动或部署都会直接影响数据同步。
  2. 轮询效率低下: 无论有无数据变更,轮询都在消耗网络和计算资源。为了降低延迟而缩短轮询间隔,会加剧资源浪费;反之,则会牺牲数据同步的实时性。
  3. 状态管理复杂: C# 服务需要精确记录上次同步的timestampversion。如果处理失败,重试逻辑可能会导致数据重复或遗漏,状态管理的复杂度很高。
  4. 可扩展性差: 如果未来出现第三个需要商品数据的下游服务(例如数据分析、风控系统),它也必须实现一套独立的轮询逻辑,对 Django 核心服务的压力将成倍增加。

这种方案在真实生产环境中是脆弱的。一次网络分区或 API 错误就可能导致数据长时间不一致,排查和修复的成本极高。

方案B:基于领域事件的异步消息驱动

该方案的核心思想是引入消息队列作为解耦的中间层,并采用领域驱动设计(DDD)的理念来定义跨系统边界的通信契约。

架构流程:

  1. 事件发布 (Django): 当 Django 中的Product聚合根发生状态变更(创建或更新)时,它不直接与外部系统通信,而是发布一个领域事件(Domain Event),如ProductUpdated。一个应用服务或领域服务负责捕获此事件,将其序列化为一个标准格式的消息(如 JSON),并发送到 AWS SQS 队列。
  2. 事件总线 (AWS SQS): SQS 作为可靠的事件总线,负责持久化存储这些事件消息,直到它们被消费者成功处理。我们选用 FIFO 队列以保证消息的顺序性,并配置死信队列 (DLQ) 来处理无法被消费的毒消息。
  3. 事件消费 (C#): C# 服务中运行一个后台工作者 (Background Worker / IHostedService),它持续地从 SQS 队列中拉取消息。
  4. 投影更新 (C#): 消费者接收到消息后,反序列化为本地的事件 DTO。然后,它执行相应的业务逻辑,比如更新本地的读模型缓存,并调用 Meilisearch 客户端库将数据变更写入搜索引擎。
sequenceDiagram
    participant User
    participant Django App
    participant Product Aggregate
    participant SQS (FIFO Queue)
    participant CSharp Service
    participant Meilisearch

    User->>+Django App: 发起商品更新请求 (e.g., /products/123)
    Django App->>+Product Aggregate: 调用方法更新状态 (e.g., change_price)
    Product Aggregate-->>-Django App: 状态变更完成, 发布 ProductUpdated 领域事件
    Django App->>+SQS (FIFO Queue): 将 ProductUpdated 事件序列化后发送
    SQS (FIFO Queue)-->>-Django App: 确认消息接收
    
    Note right of SQS (FIFO Queue): 消息在队列中持久化

    loop 持续轮询
        CSharp Service->>+SQS (FIFO Queue): 长轮询拉取消息
        SQS (FIFO Queue)-->>-CSharp Service: 返回 ProductUpdated 事件消息
    end

    CSharp Service->>CSharp Service: 反序列化消息, 执行业务逻辑
    CSharp Service->>+Meilisearch: 更新或创建商品索引文档
    Meilisearch-->>-CSharp Service: 确认索引更新
    CSharp Service->>SQS (FIFO Queue): 确认消息处理完成, 删除消息

最终选择与理由:
我们坚决选择方案 B。尽管初始实现成本略高,但它带来的长期收益是巨大的:

  • 高度解耦: Django 发布者和 C# 消费者之间没有直接的网络依赖。任何一方的部署、扩展或故障都不会立即影响另一方。
  • 韧性与可靠性: SQS 提供了至少一次的消息交付保证和持久化。即使 C# 服务宕机数小时,待其恢复后仍能从中断的地方继续处理积压的事件,不会丢失数据。DLQ 机制为处理异常提供了兜底。
  • 可扩展性: 新的下游服务只需订阅同一个 SQS 队列即可获取商品数据变更,对上游 Django 系统零影响。这符合“开放-封闭原则”。
  • 异步与性能: 核心交易流程(Django端)可以快速完成,因为它只需将事件推送到 SQS,这是一个低延迟操作。耗时的索引和数据处理任务被异步地转移到了消费者端。
  • DDD契约: 使用领域事件作为通信契”约,使得系统间的交互基于稳定的业务概念,而不是脆弱的数据库表结构或API字段。

核心实现概览

1. Django 端:领域事件的发布

在 Django 项目中,我们不直接在 models.pysave() 方法中发送消息,这会违反单一职责原则。我们采用 Django Signals 或更明确的服务层来处理。

a. 定义领域事件 DTO (Data Transfer Object)

这是一个简单的 Pydantic 模型,用于定义事件的结构。使用 Pydantic 能确保数据类型和结构的正确性。

product_events/dtos.py:

import uuid
from decimal import Decimal
from datetime import datetime
from pydantic import BaseModel, Field

class ProductAttributeDTO(BaseModel):
    name: str
    value: str

class ProductUpdatedEventDTO(BaseModel):
    """
    商品更新领域事件的契约。
    这是跨语言通信的核心,必须保持稳定。
    """
    event_id: uuid.UUID = Field(default_factory=uuid.uuid4)
    event_timestamp: datetime = Field(default_factory=datetime.utcnow)
    event_type: str = "ProductUpdated"
    
    product_id: str
    sku: str
    name: str
    description: str | None
    price: Decimal
    is_active: bool
    attributes: list[ProductAttributeDTO]

b. 事件发布服务

这个服务封装了与 AWS SQS 的所有交互细节。

product_events/publisher.py:

import boto3
import json
import logging
from decimal import Decimal
from django.conf import settings
from pydantic import BaseModel

logger = logging.getLogger(__name__)

class DecimalEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Decimal):
            # 将 Decimal 转换为字符串以保持精度
            return str(obj)
        return super(DecimalEncoder, self).default(obj)

class SqsEventPublisher:
    def __init__(self):
        # 实践中, access_key 和 secret_key 应通过 IAM Role 自动获取
        self.sqs_client = boto3.client(
            'sqs',
            region_name=settings.AWS_SQS_REGION,
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY
        )
        self.queue_url = settings.AWS_SQS_QUEUE_URL
        self.message_group_id = "product-events" # 用于 FIFO 队列

    def publish(self, event_dto: BaseModel):
        """
        将 Pydantic DTO 序列化并发布到 SQS FIFO 队列。
        """
        try:
            message_body = event_dto.model_dump_json(encoder=DecimalEncoder)
            
            # FIFO 队列需要 MessageGroupId 和 MessageDeduplicationId
            # MessageDeduplicationId 使用事件ID确保幂等性
            message_deduplication_id = str(event_dto.event_id)

            response = self.sqs_client.send_message(
                QueueUrl=self.queue_url,
                MessageBody=message_body,
                MessageGroupId=self.message_group_id,
                MessageDeduplicationId=message_deduplication_id
            )
            logger.info(f"Event {event_dto.event_id} published to SQS. Message ID: {response.get('MessageId')}")
            return True
        except Exception as e:
            # 生产级的错误处理:应包含重试逻辑和告警
            logger.error(
                f"Failed to publish event {getattr(event_dto, 'event_id', 'N/A')} to SQS.",
                exc_info=True
            )
            # 可以在这里抛出自定义异常,让上层服务决定如何处理,例如放入本地失败队列
            return False

c. 连接 Django Signals

我们使用 post_save 信号来触发事件发布。

products/signals.py:

from django.db.models.signals import post_save
from django.dispatch import receiver
from .models import Product
from product_events.dtos import ProductUpdatedEventDTO, ProductAttributeDTO
from product_events.publisher import SqsEventPublisher

@receiver(post_save, sender=Product)
def on_product_save(sender, instance: Product, created, **kwargs):
    """
    当 Product 模型实例被保存后,构建并发布领域事件。
    """
    # 单元测试时,我们可能不希望真实地发送消息
    if kwargs.get('raw', False):
        return

    publisher = SqsEventPublisher()
    
    # 将 Django model 实例转换为 DTO,这是解耦的关键一步
    # 它将内部数据模型与外部事件契约分离开来
    event_dto = ProductUpdatedEventDTO(
        product_id=str(instance.uuid),
        sku=instance.sku,
        name=instance.name,
        description=instance.description,
        price=instance.price,
        is_active=instance.is_active,
        attributes=[
            ProductAttributeDTO(name=attr.name, value=attr.value)
            for attr in instance.attributes.all()
        ]
    )
    
    publisher.publish(event_dto)

2. C# 端:后台消费者与 Meilisearch 投影

在 C# 服务中,我们创建一个 IHostedService 来在后台长轮询 SQS。

a. 项目依赖配置

.csproj 文件中添加必要的 NuGet 包:

<Project Sdk="Microsoft.NET.Sdk.Worker">

  <PropertyGroup>
    <TargetFramework>net8.0</TargetFramework>
    <Nullable>enable</Nullable>
    <ImplicitUsings>enable</ImplicitUsings>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="AWSSDK.SQS" Version="3.7.300.27" />
    <PackageReference Include="Meilisearch" Version="1.2.0" />
    <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
  </ItemGroup>

</Project>

b. 定义事件契约 DTO

这个 C# record 必须与 Python 的 ProductUpdatedEventDTO 结构完全匹配。

Events/ProductUpdatedEvent.cs:

namespace WorkerService.Events;

public record ProductAttributeDto(string Name, string Value);

public record ProductUpdatedEvent
{
    // 使用 System.Text.Json 的属性名注解来匹配 Python 的 snake_case
    [System.Text.Json.Serialization.JsonPropertyName("event_id")]
    public Guid EventId { get; init; }

    [System.Text.Json.Serialization.JsonPropertyName("event_timestamp")]
    public DateTime EventTimestamp { get; init; }

    [System.Text.Json.Serialization.JsonPropertyName("event_type")]
    public string EventType { get; init; } = string.Empty;

    [System.Text.Json.Serialization.JsonPropertyName("product_id")]
    public string ProductId { get; init; } = string.Empty;
    
    [System.Text.Json.Serialization.JsonPropertyName("sku")]
    public string Sku { get; init; } = string.Empty;

    [System.Text.Json.Serialization.JsonPropertyName("name")]
    public string Name { get; init; } = string.Empty;

    [System.Text.Json.Serialization.JsonPropertyName("description")]
    public string? Description { get; init; }

    [System.Text.Json.Serialization.JsonPropertyName("price")]
    public decimal Price { get; init; }

    [System.Text.Json.Serialization.JsonPropertyName("is_active")]
    public bool IsActive { get; init; }

    [System.Text.Json.Serialization.JsonPropertyName("attributes")]
    public List<ProductAttributeDto> Attributes { get; init; } = new();
}

c. Meilisearch 文档模型

Models/ProductDocument.cs:

namespace WorkerService.Models;

public class ProductDocument
{
    // Meilisearch 要求一个名为 'id' 的主键
    public required string Id { get; set; }
    public required string Sku { get; set; }
    public required string Name { get; set; }
    public string? Description { get; set; }
    public decimal Price { get; set; }
    public bool IsActive { get; set; }
    public IEnumerable<string> Attributes { get; set; } = Enumerable.Empty<string>();
}

d. SQS 消费者后台服务

这是整个 C# 服务的核心。

SqsConsumerService.cs:

using Amazon.SQS;
using Amazon.SQS.Model;
using System.Text.Json;
using WorkerService.Events;
using WorkerService.Services;

public class SqsConsumerService : BackgroundService
{
    private readonly IAmazonSQS _sqsClient;
    private readonly ILogger<SqsConsumerService> _logger;
    private readonly string _queueUrl;
    private readonly MeilisearchIndexer _indexer;

    public SqsConsumerService(
        IAmazonSQS sqsClient,
        IConfiguration configuration,
        ILogger<SqsConsumerService> logger,
        MeilisearchIndexer indexer)
    {
        _sqsClient = sqsClient;
        _logger = logger;
        _queueUrl = configuration["Aws:SqsQueueUrl"] ?? throw new InvalidOperationException("SQS Queue URL not configured.");
        _indexer = indexer;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("SQS Consumer Service is starting.");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var receiveRequest = new ReceiveMessageRequest
                {
                    QueueUrl = _queueUrl,
                    MaxNumberOfMessages = 5, // 一次最多拉取5条消息
                    WaitTimeSeconds = 20 // 启用长轮询,减少空轮询次数
                };

                var response = await _sqsClient.ReceiveMessageAsync(receiveRequest, stoppingToken);

                if (response.Messages.Any())
                {
                    _logger.LogInformation($"{response.Messages.Count} messages received.");
                    foreach (var message in response.Messages)
                    {
                        await ProcessMessageAsync(message, stoppingToken);
                    }
                }
                else
                {
                    _logger.LogTrace("No messages received in this poll.");
                }
            }
            catch (OperationCanceledException)
            {
                // 正常关闭
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "An error occurred while polling SQS.");
                // 发生未知错误时,等待一段时间再重试,防止快速失败循环耗尽资源
                await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
            }
        }
        
        _logger.LogInformation("SQS Consumer Service is stopping.");
    }

    private async Task ProcessMessageAsync(Message message, CancellationToken cancellationToken)
    {
        try
        {
            _logger.LogInformation("Processing message ID: {MessageId}", message.MessageId);
            var eventData = JsonSerializer.Deserialize<ProductUpdatedEvent>(message.Body);

            if (eventData is null)
            {
                _logger.LogWarning("Failed to deserialize message body for message ID: {MessageId}. Moving to DLQ.", message.MessageId);
                // 这里不删除消息,让它在 Visibility Timeout 后重试,
                // 多次失败后 SQS 会自动将其移入死信队列 (DLQ)。
                return;
            }
            
            // 真实项目中,这里应有一个事件路由器,根据 eventData.EventType 分发给不同的处理器
            // 这里我们简化处理,直接调用索引服务
            await _indexer.IndexProductAsync(eventData);

            // 只有在成功处理后,才从队列中删除消息
            await _sqsClient.DeleteMessageAsync(_queueUrl, message.ReceiptHandle, cancellationToken);
            _logger.LogInformation("Successfully processed and deleted message for ProductId: {ProductId}", eventData.ProductId);
        }
        catch (JsonException jsonEx)
        {
             _logger.LogError(jsonEx, "JSON Deserialization failed for message ID: {MessageId}. This is a poison pill.", message.MessageId);
             // 这种无法反序列化的消息是“毒丸”,不应该重试。
             // 删除它以防阻塞队列,同时必须有告警通知开发人员。
             await _sqsClient.DeleteMessageAsync(_queueUrl, message.ReceiptHandle, cancellationToken);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process message ID: {MessageId}. It will be retried after visibility timeout.", message.MessageId);
            // 任何其他异常,我们不删除消息,让 SQS 的 Visibility Timeout 机制来控制重试。
        }
    }
}

e. 依赖注入配置

Program.cs:

using Amazon.SQS;
using WorkerService.Services;

var host = Host.CreateDefaultBuilder(args)
    .ConfigureServices((hostContext, services) =>
    {
        // 从 appsettings.json 读取 AWS 配置
        services.AddDefaultAWSOptions(hostContext.Configuration.GetAWSOptions());
        services.AddAWSService<IAmazonSQS>();

        // 注册 Meilisearch 客户端和我们的索引服务
        services.AddSingleton(sp => {
            var config = sp.GetRequiredService<IConfiguration>();
            var client = new Meilisearch.MeilisearchClient(
                config["Meilisearch:Url"],
                config["Meilisearch:ApiKey"]
            );
            return client;
        });
        services.AddSingleton<MeilisearchIndexer>();

        // 注册后台服务
        services.AddHostedService<SqsConsumerService>();
    })
    .Build();

await host.RunAsync();

架构的扩展性与局限性

此架构的扩展性体现在,我们可以轻松地添加更多的 C# 消费者实例来提高处理吞吐量,SQS 会自动进行负载均衡。我们也可以添加全新的、使用不同语言(如 Go、Java)编写的消费者服务,只要它们能理解 ProductUpdated 事件契约,就可以接入系统。

然而,该架构也存在固有的局限性:

  1. 最终一致性延迟: 从 Django 端更新到 Meilisearch 索引生效,存在一个可感知的延迟,包括消息入队、网络传输、消费者轮询间隔和处理时间。对于需要强一致性的场景,此架构不适用。
  2. 事件契约演进: 跨语言的事件契约一旦确定,修改起来就非常复杂。任何对事件 DTO 的不兼容变更(如删除字段、修改类型)都需要协调发布者和所有消费者同步更新部署,否则会导致反序列化失败。使用 Schema Registry (如 Avro Schema Registry) 是管理这个问题的工业级方案。
  3. 消息顺序性: 虽然我们使用了 SQS FIFO 队列来保证单个商品(以 MessageGroupId 区分)的事件顺序,但这会牺牲一部分吞吐量。对于不需要严格顺序的场景,使用标准 SQS 队列性能更佳,但消费者端必须能处理乱序和重复消息,例如通过版本号或时间戳进行乐观并发控制。
  4. 可观测性成本: 维护一个分布式系统需要更完善的监控。我们必须监控 SQS 队列深度、消息年龄、DLQ 中的消息数,以及消费者服务的健康状况、处理延迟和错误率,这增加了运维的复杂性。

  目录