构建异构微服务体系下的Solr近实时原子更新管道


项目进入深水区,一个棘手的技术痛点浮出水面。我们的数据科学团队(主力技术栈为Python/FastAPI)需要对用户画像数据进行复杂的、多维度的实时查询,以支撑在线推荐和反欺诈模型。而这些用户画像数据,其源头分散在多个由Java/Spring Boot构建的核心业务微服务中——订单、用户、行为日志等等。最初,我们尝试让各个Java服务直接向Solr写入数据,但这很快演变成了一场灾难。

问题在于,用户画像是一个复杂的、包含多个嵌套子文档的Solr Document。例如,一个用户文档下可能嵌套着收货地址列表、最近订单列表、浏览历史等。不同的源业务服务只关心更新这个大文档中的一小部分。比如,订单服务完成一笔交易后,只想向用户的订单列表子文档中add一条新记录,而不是重索引整个用户。直接并发操作同一个Solr文档,导致了频繁的写冲突和数据覆盖,最终索引里的数据处于一种不可信的“薛定谔”状态。我们需要一个能确保原子性、维护数据一致性的中心化索引管道。

初步构想是,与其让各个业务服务直接“野蛮”地操作Solr,不如引入一个专门的“索引协调服务”。这个服务将作为唯一入口,负责接收来自不同源服务的局部更新请求,然后在内部将它们聚合成对Solr的原子更新操作。这不仅能解决并发写入的冲突,还能统一Solr的Schema管理和查询优化策略,将底层搜索引擎的复杂性对上游业务服务透明化。

技术选型决策上,我们决定让这个新的索引协调服务使用Spring Boot。理由很充分:Java生态在处理高并发、维护系统稳定性方面有成熟的解决方案,Spring Data Solr对Solr的集成非常完善,特别是对复杂的对象到文档的映射。而消费端,FastAPI依旧是数据科学团队的首选,它的高性能异步特性和Pydantic带来的数据校验能力,使其成为暴露数据查询API的最佳选择。Solr本身则因其强大的嵌套文档支持(Block Join)和原子更新(Atomic Updates)能力,成为这个架构的核心基石。整个流程将是:源服务 -> Spring Boot索引服务 -> Solr <- FastAPI数据消费服务

第一步:定义一个健壮的、支持嵌套的Solr Schema

一切的起点是Solr的managed-schema文件。为了支持原子更新和嵌套查询,我们需要精心设计它。这里的关键是定义一个根文档(user_profile)和多个子文档(addresses, orders),并通过_root_字段将它们关联起来。

<!-- managed-schema -->
<schema name="user_profiles" version="1.6">

    <!-- 核心字段 -->
    <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
    <field name="doc_type" type="string" indexed="true" stored="true" default="user_profile"/>
    <field name="_root_" type="string" indexed="true" stored="true" docValues="false" />
    <field name="_version_" type="plong" indexed="false" stored="false"/>
    <field name="last_modified" type="pdate" indexed="true" stored="true" default="NOW" />

    <!-- 用户主文档字段 -->
    <field name="user_id" type="plong" indexed="true" stored="true" />
    <field name="username" type="string" indexed="true" stored="true" />
    <field name="email" type="string" indexed="true" stored="true" />
    <field name="registration_date" type="pdate" indexed="true" stored="true" />
    <field name="tags" type="string" indexed="true" stored="true" multiValued="true" />

    <!-- 子文档: 地址 -->
    <field name="address_id" type="string" indexed="true" stored="true" />
    <field name="address_type" type="string" indexed="true" stored="true" />
    <field name="city" type="string" indexed="true" stored="true" />
    <field name="street" type="string" indexed="true" stored="true" />
    <field name="is_default_address" type="boolean" indexed="true" stored="true" />

    <!-- 子文档: 订单 -->
    <field name="order_id" type="string" indexed="true" stored="true" />
    <field name="order_amount" type="pdouble" indexed="true" stored="true" />
    <field name="order_date" type="pdate" indexed="true" stored="true" />
    <field name="product_count" type="pint" indexed="true" stored="true" />

    <!-- 动态字段,用于未来扩展 -->
    <dynamicField name="*_s" type="string" indexed="true" stored="true" />
    <dynamicField name="*_l" type="plong" indexed="true" stored="true" />
    <dynamicField name="*_b" type="boolean" indexed="true" stored="true" />

    <uniqueKey>id</uniqueKey>

    <!-- 定义文档类型,便于区分 -->
    <copyField source="user_id" dest="user_id_s"/>
    <copyField source="username" dest="username_s"/>

</schema>

这里的_root_字段是实现嵌套文档的关键。对于根文档,它的值就是自己的id;对于子文档,它的值是其所属根文档的id。这使得Solr能够将它们视为一个逻辑块。doc_type字段则用于在查询时区分根文档和子文档。

第二步:构建Spring Boot索引协调服务

这是整个管道的核心,它负责将业务逻辑上的“部分更新”转化为Solr底层的“原子更新”指令。

2.1 项目配置

pom.xml中引入Spring Data Solr依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-solr</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Lombok for boilerplate code reduction -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

application.yml中配置Solr连接信息:

spring:
  data:
    solr:
      host: http://localhost:8983/solr
      core: user_profiles # 对应Solr Core的名称
logging:
  level:
    org.apache.solr.client: DEBUG # 开启Solr客户端日志,便于调试

2.2 定义文档模型 (POJO)

我们需要创建与Solr Schema对应的Java类。Spring Data Solr的@SolrDocument, @Field, @ChildDocument注解让这个过程非常直观。

// UserProfile.java - 根文档
import lombok.Data;
import org.apache.solr.client.solrj.beans.Field;
import org.springframework.data.annotation.Id;
import org.springframework.data.solr.core.mapping.SolrDocument;
import org.springframework.data.solr.core.mapping.ChildDocument;

import java.util.Date;
import java.util.List;
import java.util.Set;

@Data
@SolrDocument(collection = "user_profiles")
public class UserProfile {

    @Id
    @Field
    private String id; // Solr文档的唯一ID, 通常是 user_id

    @Field("doc_type")
    private String docType = "user_profile"; // 标识为根文档

    @Field("user_id")
    private Long userId;

    @Field
    private String username;

    @Field
    private String email;

    @Field("registration_date")
    private Date registrationDate;

    @Field
    private Set<String> tags;

    @ChildDocument
    private List<Address> addresses; // 嵌套地址文档

    @ChildDocument
    private List<Order> orders; // 嵌套订单文档
}

// Address.java - 子文档
@Data
public class Address {
    @Id
    @Field("id")
    private String id; // 子文档也需要唯一ID,例如 address_id

    @Field("doc_type")
    private String docType = "address"; // 标识为子文档

    @Field("address_id")
    private String addressId;
    
    @Field("address_type")
    private String type;
    
    @Field
    private String city;
    
    @Field
    private String street;
    
    @Field("is_default_address")
    private boolean isDefault;
}

// Order.java - 子文档
@Data
public class Order {
    @Id
    @Field("id")
    private String id; // 子文档ID,例如 order_id
    
    @Field("doc_type")
    private String docType = "order"; // 标识为子文档

    @Field("order_id")
    private String orderId;

    @Field("order_amount")
    private Double amount;
    
    @Field("order_date")
    private Date orderDate;
    
    @Field("product_count")
    private Integer productCount;
}

注意: 每个子文档类也需要一个@Id@Field("id"),这是Solr的要求,确保整个文档块内的所有文档都有唯一标识。

2.3 实现原子更新服务

这是最关键的部分。我们不直接保存UserProfile对象,而是构造PartialUpdate对象,它能生成Solr需要的原子更新JSON。

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.solr.core.SolrTemplate;
import org.springframework.data.solr.core.query.PartialUpdate;
import org.springframework.stereotype.Service;
import org.apache.solr.common.SolrInputDocument;
import java.util.Collections;
import java.util.Date;

@Service
@Slf4j
public class UserProfileIndexingService {

    private final SolrTemplate solrTemplate;

    public UserProfileIndexingService(SolrTemplate solrTemplate) {
        this.solrTemplate = solrTemplate;
    }

    /**
     * 原子性地为指定用户添加一个新的地址子文档。
     * 这里的坑在于,不能直接更新根文档的List字段,
     * 必须使用Solr的"add"指令来操作嵌套文档。
     *
     * @param userId  目标用户的ID
     * @param address 要添加的新地址对象
     */
    public void addAddress(Long userId, Address address) {
        String rootDocId = String.valueOf(userId);
        // 为子文档生成一个唯一的ID
        address.setId(rootDocId + "_" + address.getAddressId()); 

        // 构造一个PartialUpdate对象,目标是根文档
        PartialUpdate update = new PartialUpdate("id", rootDocId);

        // 使用"add"指令添加一个完整的子文档对象
        // Spring Data Solr 会自动将其转换为一个包含 _root_ 字段的SolrInputDocument
        update.add("addresses", address);
        
        // 同时更新主文档的最后修改时间
        update.add("last_modified", new Date());

        try {
            solrTemplate.saveBean("user_profiles", update);
            solrTemplate.commit("user_profiles");
            log.info("Successfully added address {} for user {}", address.getAddressId(), userId);
        } catch (Exception e) {
            log.error("Failed to add address for user {}: {}", userId, e.getMessage());
            solrTemplate.rollback("user_profiles");
            // 在真实项目中,这里应该抛出一个自定义的业务异常
            throw new IndexingFailedException("Failed to add address", e);
        }
    }

    /**
     * 原子性地更新用户标签。
     * 使用 "set" 指令可以完全替换一个字段的值。
     *
     * @param userId 目标用户ID
     * @param newTags 新的标签集合
     */
    public void updateUserTags(Long userId, Set<String> newTags) {
        String rootDocId = String.valueOf(userId);
        PartialUpdate update = new PartialUpdate("id", rootDocId);
        
        // "set"指令会覆盖原有的所有tags
        update.setValueOfField("tags", newTags);
        update.add("last_modified", new Date());

        solrTemplate.saveBean("user_profiles", update);
        solrTemplate.commit("user_profiles");
        log.info("Successfully updated tags for user {}", userId);
    }
    
    /**
     * 创建一个全新的用户画像文档,包含根文档和初始子文档。
     * 这是创建操作,不是更新。
     */
    public void createUserProfile(UserProfile profile) {
        profile.setId(String.valueOf(profile.getUserId()));
        // 确保所有子文档有唯一ID
        if (profile.getAddresses() != null) {
            profile.getAddresses().forEach(addr -> addr.setId(profile.getId() + "_" + addr.getAddressId()));
        }
        if (profile.getOrders() != null) {
            profile.getOrders().forEach(order -> order.setId(profile.getId() + "_" + order.getOrderId()));
        }
        
        solrTemplate.saveBean("user_profiles", profile);
        solrTemplate.commit("user_profiles");
        log.info("Created new profile for user {}", profile.getUserId());
    }
}

// 自定义异常
public class IndexingFailedException extends RuntimeException {
    public IndexingFailedException(String message, Throwable cause) {
        super(message, cause);
    }
}

2.4 暴露RESTful API

最后,通过@RestController将这些服务方法暴露为HTTP接口,供其他微服务调用。

@RestController
@RequestMapping("/api/v1/indexing/user-profiles")
public class UserProfileController {

    private final UserProfileIndexingService indexingService;

    public UserProfileController(UserProfileIndexingService indexingService) {
        this.indexingService = indexingService;
    }

    // 假设 AddAddressRequest 是一个包含 userId 和 Address 数据的DTO
    @PostMapping("/{userId}/addresses")
    public ResponseEntity<Void> addAddress(@PathVariable Long userId, @RequestBody Address address) {
        indexingService.addAddress(userId, address);
        return ResponseEntity.ok().build();
    }
    
    // 假设 UpdateTagsRequest 是一个包含 userId 和 tags 数据的DTO
    @PutMapping("/{userId}/tags")
    public ResponseEntity<Void> updateTags(@PathVariable Long userId, @RequestBody Set<String> tags) {
        indexingService.updateUserTags(userId, tags);
        return ResponseEntity.ok().build();
    }
    
    @PostMapping
    public ResponseEntity<Void> createProfile(@RequestBody UserProfile profile) {
        indexingService.createUserProfile(profile);
        return ResponseEntity.status(HttpStatus.CREATED).build();
    }
}

至此,我们的Spring Boot索引服务已经完成。它提供了一组清晰的、面向业务操作的API,内部则将这些操作安全地转换为对Solr的原子更新,彻底解决了数据一致性问题。

sequenceDiagram
    participant OrderService as 源服务 (Java)
    participant IndexingService as 索引协调服务 (Spring Boot)
    participant Solr
    
    OrderService->>+IndexingService: POST /api/v1/.../{userId}/orders (携带新订单数据)
    IndexingService->>IndexingService: 构造PartialUpdate对象
    IndexingService->>IndexingService: update.add("orders", newOrder)
    IndexingService->>+Solr: 发送原子更新请求 (JSON)
    Solr-->>-IndexingService: 更新成功
    IndexingService-->>-OrderService: 200 OK

第三步:构建FastAPI数据消费服务

现在轮到数据消费方。FastAPI服务需要查询Solr,并利用这些结构化的用户画像数据。

3.1 环境准备与连接

安装pysolr库:pip install pysolr.

创建一个Solr连接的单例:

# solr_client.py
import pysolr
import logging

SOLR_URL = "http://localhost:8983/solr/user_profiles"
solr = None

def get_solr_connection():
    global solr
    if solr is None:
        try:
            solr = pysolr.Solr(SOLR_URL, always_commit=True, timeout=10)
            # 发送一个ping请求来验证连接
            solr.ping()
            logging.info("Successfully connected to Solr.")
        except Exception as e:
            logging.error(f"Failed to connect to Solr: {e}")
            solr = None # 连接失败,重置为None
    return solr

3.2 定义数据模型 (Pydantic)

使用Pydantic模型来映射从Solr返回的JSON数据,这能提供类型提示和数据校验,是生产级代码的最佳实践。

# models.py
from pydantic import BaseModel, Field
from typing import List, Optional, Set
from datetime import datetime

class Address(BaseModel):
    id: str
    doc_type: str = "address"
    address_id: str
    type: str = Field(..., alias="address_type")
    city: str
    street: str
    is_default: bool = Field(..., alias="is_default_address")

class Order(BaseModel):
    id: str
    doc_type: str = "order"
    order_id: str
    amount: float = Field(..., alias="order_amount")
    order_date: datetime
    product_count: int

class UserProfile(BaseModel):
    id: str
    doc_type: str = "user_profile"
    user_id: int
    username: str
    email: str
    registration_date: datetime
    last_modified: datetime
    tags: Optional[Set[str]] = None
    addresses: Optional[List[Address]] = []
    orders: Optional[List[Order]] = []

这里的alias用法很关键,它允许Python模型字段名(如is_default)和Solr中的字段名(is_default_address)不一致,保持了代码的整洁。

3.3 创建查询API

FastAPI的核心是API端点。我们将创建一个端点,根据用户ID查询完整的用户画像,包括所有嵌套的子文档。

# main.py
from fastapi import FastAPI, HTTPException, Depends
from typing import Optional
from .solr_client import get_solr_connection
from .models import UserProfile
import logging

app = FastAPI()

logging.basicConfig(level=logging.INFO)

@app.get("/api/v1/user-profiles/{user_id}", response_model=UserProfile)
def get_user_profile(user_id: int, solr=Depends(get_solr_connection)):
    if not solr:
        raise HTTPException(status_code=503, detail="Search service is unavailable.")

    # Block Join 查询是这里的核心技术点
    # 它能在一个查询中同时返回根文档和所有符合条件的子文档
    # {!parent which="doc_type:user_profile"} 表示筛选出所有父文档是 user_profile 的子文档
    # fq (filter query) 进一步限制只捞取特定 user_id 的文档块
    query = "*:*"
    filter_queries = [
        f"_root_:{user_id}", 
        "{!parent which=doc_type:user_profile}"
    ]
    params = {
        "fq": filter_queries,
        "fl": "*,[child limit=100]" # 返回所有字段,并限制子文档最多返回100个
    }

    try:
        results = solr.search(q=query, **params)
    except Exception as e:
        logging.error(f"Solr query failed for user_id {user_id}: {e}")
        raise HTTPException(status_code=500, detail="Error querying search index.")

    if not results.docs:
        raise HTTPException(status_code=404, detail=f"User profile for user_id {user_id} not found.")

    # Solr 返回的是一个扁平的文档列表,我们需要手动重组成嵌套结构
    # 第一个文档通常是根文档
    root_doc = None
    child_docs = {"addresses": [], "orders": []}

    for doc in results.docs:
        if doc.get("doc_type") == "user_profile":
            root_doc = doc
        elif doc.get("doc_type") == "address":
            child_docs["addresses"].append(doc)
        elif doc.get("doc_type") == "order":
            child_docs["orders"].append(doc)
    
    if not root_doc:
         raise HTTPException(status_code=404, detail=f"Root user profile for user_id {user_id} not found in index block.")

    # 将子文档列表合并到根文档中
    root_doc.update(child_docs)
    
    # 使用Pydantic模型进行解析和校验
    return UserProfile.parse_obj(root_doc)

这个查询的精髓在于{!parent which="doc_type:user_profile"}_root_:{user_id}的组合。它告诉Solr:“找到_root_字段为指定user_id的所有文档(包括根和子),然后返回那些父文档是user_profile的文档块”。查询结果是一个扁平列表,我们需要在代码中将其重组为Pydantic模型期望的嵌套结构。这是一个在消费端必须处理的常见模式。

架构的局限性与未来迭代方向

我们构建的这个管道有效地解决了异构服务间的数据一致性问题,并提供了一个高性能的查询接口。然而,这个架构并非银弹。

当前的索引协调服务是一个单点,尽管可以水平扩展,但它仍然是整个写入链路上的一个瓶颈。在高并发写入场景下,频繁的commit操作也会对Solr性能造成压力。未来的一个优化方向是引入消息队列(如Kafka)。源服务将更新事件发布到Kafka Topic中,索引协调服务作为消费者,可以按批次、异步地处理这些更新,并通过更柔和的softCommit策略来平衡近实时性和系统吞吐量。

另一个考量是,FastAPI端的查询后重组逻辑。虽然不复杂,但在极高QPS下也存在计算开销。如果性能成为瓶颈,可以考虑在Solr端使用childparent转换器(Document Transformers)来尝试在Solr内部完成文档的嵌套重组,从而简化客户端逻辑。

最后,对于单元测试,Spring Boot侧需要使用@DataSolrTestTestcontainers来模拟Solr环境,验证PartialUpdate逻辑的正确性。FastAPI侧则可以通过pytesthttpx模拟对Solr的请求和响应,确保数据重组逻辑无误。这些都是确保这套复杂管道在生产环境中稳定运行不可或缺的环节。


  目录