项目进入深水区,一个棘手的技术痛点浮出水面。我们的数据科学团队(主力技术栈为Python/FastAPI)需要对用户画像数据进行复杂的、多维度的实时查询,以支撑在线推荐和反欺诈模型。而这些用户画像数据,其源头分散在多个由Java/Spring Boot构建的核心业务微服务中——订单、用户、行为日志等等。最初,我们尝试让各个Java服务直接向Solr写入数据,但这很快演变成了一场灾难。
问题在于,用户画像是一个复杂的、包含多个嵌套子文档的Solr Document。例如,一个用户文档下可能嵌套着收货地址列表、最近订单列表、浏览历史等。不同的源业务服务只关心更新这个大文档中的一小部分。比如,订单服务完成一笔交易后,只想向用户的订单列表子文档中add
一条新记录,而不是重索引整个用户。直接并发操作同一个Solr文档,导致了频繁的写冲突和数据覆盖,最终索引里的数据处于一种不可信的“薛定谔”状态。我们需要一个能确保原子性、维护数据一致性的中心化索引管道。
初步构想是,与其让各个业务服务直接“野蛮”地操作Solr,不如引入一个专门的“索引协调服务”。这个服务将作为唯一入口,负责接收来自不同源服务的局部更新请求,然后在内部将它们聚合成对Solr的原子更新操作。这不仅能解决并发写入的冲突,还能统一Solr的Schema管理和查询优化策略,将底层搜索引擎的复杂性对上游业务服务透明化。
技术选型决策上,我们决定让这个新的索引协调服务使用Spring Boot。理由很充分:Java生态在处理高并发、维护系统稳定性方面有成熟的解决方案,Spring Data Solr对Solr的集成非常完善,特别是对复杂的对象到文档的映射。而消费端,FastAPI依旧是数据科学团队的首选,它的高性能异步特性和Pydantic带来的数据校验能力,使其成为暴露数据查询API的最佳选择。Solr本身则因其强大的嵌套文档支持(Block Join)和原子更新(Atomic Updates)能力,成为这个架构的核心基石。整个流程将是:源服务 -> Spring Boot索引服务 -> Solr <- FastAPI数据消费服务
。
第一步:定义一个健壮的、支持嵌套的Solr Schema
一切的起点是Solr的managed-schema
文件。为了支持原子更新和嵌套查询,我们需要精心设计它。这里的关键是定义一个根文档(user_profile
)和多个子文档(addresses
, orders
),并通过_root_
字段将它们关联起来。
<!-- managed-schema -->
<schema name="user_profiles" version="1.6">
<!-- 核心字段 -->
<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="doc_type" type="string" indexed="true" stored="true" default="user_profile"/>
<field name="_root_" type="string" indexed="true" stored="true" docValues="false" />
<field name="_version_" type="plong" indexed="false" stored="false"/>
<field name="last_modified" type="pdate" indexed="true" stored="true" default="NOW" />
<!-- 用户主文档字段 -->
<field name="user_id" type="plong" indexed="true" stored="true" />
<field name="username" type="string" indexed="true" stored="true" />
<field name="email" type="string" indexed="true" stored="true" />
<field name="registration_date" type="pdate" indexed="true" stored="true" />
<field name="tags" type="string" indexed="true" stored="true" multiValued="true" />
<!-- 子文档: 地址 -->
<field name="address_id" type="string" indexed="true" stored="true" />
<field name="address_type" type="string" indexed="true" stored="true" />
<field name="city" type="string" indexed="true" stored="true" />
<field name="street" type="string" indexed="true" stored="true" />
<field name="is_default_address" type="boolean" indexed="true" stored="true" />
<!-- 子文档: 订单 -->
<field name="order_id" type="string" indexed="true" stored="true" />
<field name="order_amount" type="pdouble" indexed="true" stored="true" />
<field name="order_date" type="pdate" indexed="true" stored="true" />
<field name="product_count" type="pint" indexed="true" stored="true" />
<!-- 动态字段,用于未来扩展 -->
<dynamicField name="*_s" type="string" indexed="true" stored="true" />
<dynamicField name="*_l" type="plong" indexed="true" stored="true" />
<dynamicField name="*_b" type="boolean" indexed="true" stored="true" />
<uniqueKey>id</uniqueKey>
<!-- 定义文档类型,便于区分 -->
<copyField source="user_id" dest="user_id_s"/>
<copyField source="username" dest="username_s"/>
</schema>
这里的_root_
字段是实现嵌套文档的关键。对于根文档,它的值就是自己的id
;对于子文档,它的值是其所属根文档的id
。这使得Solr能够将它们视为一个逻辑块。doc_type
字段则用于在查询时区分根文档和子文档。
第二步:构建Spring Boot索引协调服务
这是整个管道的核心,它负责将业务逻辑上的“部分更新”转化为Solr底层的“原子更新”指令。
2.1 项目配置
pom.xml
中引入Spring Data Solr依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-solr</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Lombok for boilerplate code reduction -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
application.yml
中配置Solr连接信息:
spring:
data:
solr:
host: http://localhost:8983/solr
core: user_profiles # 对应Solr Core的名称
logging:
level:
org.apache.solr.client: DEBUG # 开启Solr客户端日志,便于调试
2.2 定义文档模型 (POJO)
我们需要创建与Solr Schema对应的Java类。Spring Data Solr的@SolrDocument
, @Field
, @ChildDocument
注解让这个过程非常直观。
// UserProfile.java - 根文档
import lombok.Data;
import org.apache.solr.client.solrj.beans.Field;
import org.springframework.data.annotation.Id;
import org.springframework.data.solr.core.mapping.SolrDocument;
import org.springframework.data.solr.core.mapping.ChildDocument;
import java.util.Date;
import java.util.List;
import java.util.Set;
@Data
@SolrDocument(collection = "user_profiles")
public class UserProfile {
@Id
@Field
private String id; // Solr文档的唯一ID, 通常是 user_id
@Field("doc_type")
private String docType = "user_profile"; // 标识为根文档
@Field("user_id")
private Long userId;
@Field
private String username;
@Field
private String email;
@Field("registration_date")
private Date registrationDate;
@Field
private Set<String> tags;
@ChildDocument
private List<Address> addresses; // 嵌套地址文档
@ChildDocument
private List<Order> orders; // 嵌套订单文档
}
// Address.java - 子文档
@Data
public class Address {
@Id
@Field("id")
private String id; // 子文档也需要唯一ID,例如 address_id
@Field("doc_type")
private String docType = "address"; // 标识为子文档
@Field("address_id")
private String addressId;
@Field("address_type")
private String type;
@Field
private String city;
@Field
private String street;
@Field("is_default_address")
private boolean isDefault;
}
// Order.java - 子文档
@Data
public class Order {
@Id
@Field("id")
private String id; // 子文档ID,例如 order_id
@Field("doc_type")
private String docType = "order"; // 标识为子文档
@Field("order_id")
private String orderId;
@Field("order_amount")
private Double amount;
@Field("order_date")
private Date orderDate;
@Field("product_count")
private Integer productCount;
}
注意: 每个子文档类也需要一个@Id
和@Field("id")
,这是Solr的要求,确保整个文档块内的所有文档都有唯一标识。
2.3 实现原子更新服务
这是最关键的部分。我们不直接保存UserProfile
对象,而是构造PartialUpdate
对象,它能生成Solr需要的原子更新JSON。
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.solr.core.SolrTemplate;
import org.springframework.data.solr.core.query.PartialUpdate;
import org.springframework.stereotype.Service;
import org.apache.solr.common.SolrInputDocument;
import java.util.Collections;
import java.util.Date;
@Service
@Slf4j
public class UserProfileIndexingService {
private final SolrTemplate solrTemplate;
public UserProfileIndexingService(SolrTemplate solrTemplate) {
this.solrTemplate = solrTemplate;
}
/**
* 原子性地为指定用户添加一个新的地址子文档。
* 这里的坑在于,不能直接更新根文档的List字段,
* 必须使用Solr的"add"指令来操作嵌套文档。
*
* @param userId 目标用户的ID
* @param address 要添加的新地址对象
*/
public void addAddress(Long userId, Address address) {
String rootDocId = String.valueOf(userId);
// 为子文档生成一个唯一的ID
address.setId(rootDocId + "_" + address.getAddressId());
// 构造一个PartialUpdate对象,目标是根文档
PartialUpdate update = new PartialUpdate("id", rootDocId);
// 使用"add"指令添加一个完整的子文档对象
// Spring Data Solr 会自动将其转换为一个包含 _root_ 字段的SolrInputDocument
update.add("addresses", address);
// 同时更新主文档的最后修改时间
update.add("last_modified", new Date());
try {
solrTemplate.saveBean("user_profiles", update);
solrTemplate.commit("user_profiles");
log.info("Successfully added address {} for user {}", address.getAddressId(), userId);
} catch (Exception e) {
log.error("Failed to add address for user {}: {}", userId, e.getMessage());
solrTemplate.rollback("user_profiles");
// 在真实项目中,这里应该抛出一个自定义的业务异常
throw new IndexingFailedException("Failed to add address", e);
}
}
/**
* 原子性地更新用户标签。
* 使用 "set" 指令可以完全替换一个字段的值。
*
* @param userId 目标用户ID
* @param newTags 新的标签集合
*/
public void updateUserTags(Long userId, Set<String> newTags) {
String rootDocId = String.valueOf(userId);
PartialUpdate update = new PartialUpdate("id", rootDocId);
// "set"指令会覆盖原有的所有tags
update.setValueOfField("tags", newTags);
update.add("last_modified", new Date());
solrTemplate.saveBean("user_profiles", update);
solrTemplate.commit("user_profiles");
log.info("Successfully updated tags for user {}", userId);
}
/**
* 创建一个全新的用户画像文档,包含根文档和初始子文档。
* 这是创建操作,不是更新。
*/
public void createUserProfile(UserProfile profile) {
profile.setId(String.valueOf(profile.getUserId()));
// 确保所有子文档有唯一ID
if (profile.getAddresses() != null) {
profile.getAddresses().forEach(addr -> addr.setId(profile.getId() + "_" + addr.getAddressId()));
}
if (profile.getOrders() != null) {
profile.getOrders().forEach(order -> order.setId(profile.getId() + "_" + order.getOrderId()));
}
solrTemplate.saveBean("user_profiles", profile);
solrTemplate.commit("user_profiles");
log.info("Created new profile for user {}", profile.getUserId());
}
}
// 自定义异常
public class IndexingFailedException extends RuntimeException {
public IndexingFailedException(String message, Throwable cause) {
super(message, cause);
}
}
2.4 暴露RESTful API
最后,通过@RestController
将这些服务方法暴露为HTTP接口,供其他微服务调用。
@RestController
@RequestMapping("/api/v1/indexing/user-profiles")
public class UserProfileController {
private final UserProfileIndexingService indexingService;
public UserProfileController(UserProfileIndexingService indexingService) {
this.indexingService = indexingService;
}
// 假设 AddAddressRequest 是一个包含 userId 和 Address 数据的DTO
@PostMapping("/{userId}/addresses")
public ResponseEntity<Void> addAddress(@PathVariable Long userId, @RequestBody Address address) {
indexingService.addAddress(userId, address);
return ResponseEntity.ok().build();
}
// 假设 UpdateTagsRequest 是一个包含 userId 和 tags 数据的DTO
@PutMapping("/{userId}/tags")
public ResponseEntity<Void> updateTags(@PathVariable Long userId, @RequestBody Set<String> tags) {
indexingService.updateUserTags(userId, tags);
return ResponseEntity.ok().build();
}
@PostMapping
public ResponseEntity<Void> createProfile(@RequestBody UserProfile profile) {
indexingService.createUserProfile(profile);
return ResponseEntity.status(HttpStatus.CREATED).build();
}
}
至此,我们的Spring Boot索引服务已经完成。它提供了一组清晰的、面向业务操作的API,内部则将这些操作安全地转换为对Solr的原子更新,彻底解决了数据一致性问题。
sequenceDiagram participant OrderService as 源服务 (Java) participant IndexingService as 索引协调服务 (Spring Boot) participant Solr OrderService->>+IndexingService: POST /api/v1/.../{userId}/orders (携带新订单数据) IndexingService->>IndexingService: 构造PartialUpdate对象 IndexingService->>IndexingService: update.add("orders", newOrder) IndexingService->>+Solr: 发送原子更新请求 (JSON) Solr-->>-IndexingService: 更新成功 IndexingService-->>-OrderService: 200 OK
第三步:构建FastAPI数据消费服务
现在轮到数据消费方。FastAPI服务需要查询Solr,并利用这些结构化的用户画像数据。
3.1 环境准备与连接
安装pysolr
库:pip install pysolr
.
创建一个Solr连接的单例:
# solr_client.py
import pysolr
import logging
SOLR_URL = "http://localhost:8983/solr/user_profiles"
solr = None
def get_solr_connection():
global solr
if solr is None:
try:
solr = pysolr.Solr(SOLR_URL, always_commit=True, timeout=10)
# 发送一个ping请求来验证连接
solr.ping()
logging.info("Successfully connected to Solr.")
except Exception as e:
logging.error(f"Failed to connect to Solr: {e}")
solr = None # 连接失败,重置为None
return solr
3.2 定义数据模型 (Pydantic)
使用Pydantic模型来映射从Solr返回的JSON数据,这能提供类型提示和数据校验,是生产级代码的最佳实践。
# models.py
from pydantic import BaseModel, Field
from typing import List, Optional, Set
from datetime import datetime
class Address(BaseModel):
id: str
doc_type: str = "address"
address_id: str
type: str = Field(..., alias="address_type")
city: str
street: str
is_default: bool = Field(..., alias="is_default_address")
class Order(BaseModel):
id: str
doc_type: str = "order"
order_id: str
amount: float = Field(..., alias="order_amount")
order_date: datetime
product_count: int
class UserProfile(BaseModel):
id: str
doc_type: str = "user_profile"
user_id: int
username: str
email: str
registration_date: datetime
last_modified: datetime
tags: Optional[Set[str]] = None
addresses: Optional[List[Address]] = []
orders: Optional[List[Order]] = []
这里的alias
用法很关键,它允许Python模型字段名(如is_default
)和Solr中的字段名(is_default_address
)不一致,保持了代码的整洁。
3.3 创建查询API
FastAPI的核心是API端点。我们将创建一个端点,根据用户ID查询完整的用户画像,包括所有嵌套的子文档。
# main.py
from fastapi import FastAPI, HTTPException, Depends
from typing import Optional
from .solr_client import get_solr_connection
from .models import UserProfile
import logging
app = FastAPI()
logging.basicConfig(level=logging.INFO)
@app.get("/api/v1/user-profiles/{user_id}", response_model=UserProfile)
def get_user_profile(user_id: int, solr=Depends(get_solr_connection)):
if not solr:
raise HTTPException(status_code=503, detail="Search service is unavailable.")
# Block Join 查询是这里的核心技术点
# 它能在一个查询中同时返回根文档和所有符合条件的子文档
# {!parent which="doc_type:user_profile"} 表示筛选出所有父文档是 user_profile 的子文档
# fq (filter query) 进一步限制只捞取特定 user_id 的文档块
query = "*:*"
filter_queries = [
f"_root_:{user_id}",
"{!parent which=doc_type:user_profile}"
]
params = {
"fq": filter_queries,
"fl": "*,[child limit=100]" # 返回所有字段,并限制子文档最多返回100个
}
try:
results = solr.search(q=query, **params)
except Exception as e:
logging.error(f"Solr query failed for user_id {user_id}: {e}")
raise HTTPException(status_code=500, detail="Error querying search index.")
if not results.docs:
raise HTTPException(status_code=404, detail=f"User profile for user_id {user_id} not found.")
# Solr 返回的是一个扁平的文档列表,我们需要手动重组成嵌套结构
# 第一个文档通常是根文档
root_doc = None
child_docs = {"addresses": [], "orders": []}
for doc in results.docs:
if doc.get("doc_type") == "user_profile":
root_doc = doc
elif doc.get("doc_type") == "address":
child_docs["addresses"].append(doc)
elif doc.get("doc_type") == "order":
child_docs["orders"].append(doc)
if not root_doc:
raise HTTPException(status_code=404, detail=f"Root user profile for user_id {user_id} not found in index block.")
# 将子文档列表合并到根文档中
root_doc.update(child_docs)
# 使用Pydantic模型进行解析和校验
return UserProfile.parse_obj(root_doc)
这个查询的精髓在于{!parent which="doc_type:user_profile"}
和_root_:{user_id}
的组合。它告诉Solr:“找到_root_
字段为指定user_id
的所有文档(包括根和子),然后返回那些父文档是user_profile
的文档块”。查询结果是一个扁平列表,我们需要在代码中将其重组为Pydantic模型期望的嵌套结构。这是一个在消费端必须处理的常见模式。
架构的局限性与未来迭代方向
我们构建的这个管道有效地解决了异构服务间的数据一致性问题,并提供了一个高性能的查询接口。然而,这个架构并非银弹。
当前的索引协调服务是一个单点,尽管可以水平扩展,但它仍然是整个写入链路上的一个瓶颈。在高并发写入场景下,频繁的commit
操作也会对Solr性能造成压力。未来的一个优化方向是引入消息队列(如Kafka)。源服务将更新事件发布到Kafka Topic中,索引协调服务作为消费者,可以按批次、异步地处理这些更新,并通过更柔和的softCommit
策略来平衡近实时性和系统吞吐量。
另一个考量是,FastAPI端的查询后重组逻辑。虽然不复杂,但在极高QPS下也存在计算开销。如果性能成为瓶颈,可以考虑在Solr端使用child
或parent
转换器(Document Transformers)来尝试在Solr内部完成文档的嵌套重组,从而简化客户端逻辑。
最后,对于单元测试,Spring Boot侧需要使用@DataSolrTest
或Testcontainers
来模拟Solr环境,验证PartialUpdate
逻辑的正确性。FastAPI侧则可以通过pytest
和httpx
模拟对Solr的请求和响应,确保数据重组逻辑无误。这些都是确保这套复杂管道在生产环境中稳定运行不可或缺的环节。