在一个典型的多租户AI服务中,我们使用Milvus作为核心的向量检索引擎。为了隔离租户数据并优化资源利用,我们采用了动态加载和卸载Partition的策略。应用服务是无状态、可水平扩展的,这意味着任何一个服务实例都可能接收到来自某个租户的请求,并需要确保其对应的Partition已被加载到Milvus中。这个场景看似简单,却隐藏着一个典型的分布式协调问题。
如果不对Partition的加载操作进行协调,多个服务实例可能在同一时刻,针对同一个未加载的Partition,并发地向Milvus发起load_partition
请求。这不仅会造成Milvus侧不必要的重复加载开销,更严重的是,它可能导致应用服务内部状态不一致,甚至在加载和卸载操作交织时引发竞态条件,导致服务异常。
graph TD subgraph "应用服务集群 (Stateless)" App1 -->|"load('partition_A')"| Milvus App2 -->|"load('partition_A')"| Milvus App3 -->|"load('partition_A')"| Milvus end subgraph "负载均衡" LB(Nginx/ALB) --> App1 LB --> App2 LB --> App3 end UserRequest("用户请求 (tenant_A)") --> LB Milvus(Milvus集群) style Milvus fill:#f9f,stroke:#333,stroke-width:2px linkStyle 0,1,2 stroke-width:2px,stroke:red,stroke-dasharray: 5 5;
上图清晰地展示了问题的根源:多个实例在缺乏协调机制的情况下,对共享资源(Milvus中的Partition状态)进行了并发写操作。要解决这个问题,我们需要引入一个分布式锁,确保在任意时刻,只有一个服务实例能够执行针对特定Partition的管理操作。
方案A: 基于Redis的轻量级锁实现
在分布式锁的选型中,Redis因其高性能和简单性,通常是第一个被考虑的方案。最基础的实现依赖于SETNX
(SET if Not eXists) 命令。
初步实现与陷阱
一个天真的实现可能如下:
# WARNING: This is a naive implementation with critical flaws.
import redis
import time
def acquire_lock(conn: redis.Redis, lock_name, acquire_timeout=10):
end_time = time.time() + acquire_timeout
while time.time() < end_time:
if conn.setnx(lock_name, "locked"):
# 成功获取锁
return True
time.sleep(0.01)
return False
def release_lock(conn: redis.Redis, lock_name):
conn.delete(lock_name)
这个实现在真实项目中是完全不可用的。最致命的问题是:如果一个进程获取锁之后崩溃了,没有执行release_lock
,那么这个锁将永远无法被释放,导致死锁。
一个常见的改进是为锁设置一个过期时间(TTL)。
# WARNING: Still not production-ready.
# ...
if conn.setnx(lock_name, "locked"):
conn.expire(lock_name, 30) # 设置30秒过期
return True
# ...
这里的坑在于setnx
和expire
是两个独立的操作,它们并非原子性的。如果在setnx
成功后,进程在执行expire
前崩溃,死锁问题依然存在。幸运的是,现代Redis版本的SET
命令已经支持原子化地设置key、value、和过期时间。
生产级Redis锁实现
一个相对可靠的Redis分布式锁实现,必须考虑原子性、锁的归属权以及锁续期问题。
# partition_lock_redis.py
import uuid
import time
import logging
from contextlib import contextmanager
from redis import Redis
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class RedisDistLock:
def __init__(self, redis_client: Redis, lock_key_prefix: str = "milvus:lock:partition:"):
"""
基于Redis的分布式锁实现。
:param redis_client: Redis客户端实例
:param lock_key_prefix: 锁键的前缀
"""
if not isinstance(redis_client, Redis):
raise TypeError("redis_client must be an instance of redis.Redis")
self.redis_client = redis_client
self.lock_key_prefix = lock_key_prefix
# 为每个锁实例生成一个唯一的标识,用于验证锁的归属权
self.lock_value = str(uuid.uuid4())
@contextmanager
def lock(self, resource_id: str, lease_time_sec: int = 30, block_timeout_sec: int = 10):
"""
使用上下文管理器获取和释放锁。
:param resource_id: 需要锁定的资源标识,例如 tenant_id/partition_name
:param lease_time_sec: 锁的租约时间(秒),防止死锁
:param block_timeout_sec: 阻塞等待获取锁的超时时间(秒)
"""
lock_key = f"{self.lock_key_prefix}{resource_id}"
acquired = False
start_time = time.monotonic()
try:
while time.monotonic() - start_time < block_timeout_sec:
# 使用 SET 命令的 NX 和 EX 选项来原子地获取锁
# NX: 只在键不存在时设置
# EX: 设置过期时间(秒)
if self.redis_client.set(lock_key, self.lock_value, ex=lease_time_sec, nx=True):
acquired = True
logging.info(f"Lock acquired for resource '{resource_id}' with value '{self.lock_value}'.")
yield self
break
# 短暂休眠,避免CPU空转
time.sleep(0.1)
if not acquired:
raise TimeoutError(f"Failed to acquire lock for resource '{resource_id}' within {block_timeout_sec} seconds.")
finally:
if acquired:
# 释放锁时必须验证锁的归属权,防止误删其他客户端的锁
# 使用Lua脚本保证 "GET and DELETE" 操作的原子性
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
try:
self.redis_client.eval(lua_script, 1, lock_key, self.lock_value)
logging.info(f"Lock released for resource '{resource_id}'.")
except Exception as e:
logging.error(f"Error releasing lock for resource '{resource_id}': {e}", exc_info=True)
这段代码解决了原子性和锁归属权的问题。然而,它引入了一个更棘手的架构难题:**lease_time_sec
(租约时间)的设定**。
- 如果设置得太短:一个耗时较长的操作(如加载一个巨大的Milvus Partition)可能还没执行完,锁就自动过期被释放了。此时,另一个服务实例会拿到锁,进入临界区,导致两个实例同时操作资源。
- 如果设置得太长:如果获取锁的节点真的崩溃了,这个资源将被锁定很长一段时间,影响服务的可用性。
虽然可以通过“看门狗”(Watchdog)机制,在持有锁的客户端上启动一个后台线程,定期为即将过期的锁续期,但这极大地增加了客户端实现的复杂度。更深层次的问题在于,Redis的主从复制是异步的。如果在Master节点获取锁后,数据还没来得及同步到Slave节点,Master就宕机了,此时Slave被提升为新的Master,但锁信息已经丢失,其他客户端就能立即获取到锁,破坏了锁的互斥性。
方案A评估
- 优点:
- 性能极高,Redis是内存数据库,加锁解锁速度快。
- 实现相对简单,依赖普遍存在的基础设施。
- 缺点:
- 可靠性存在理论上限。租约时间难以完美设定,是潜在的业务风险点。
- 在哨兵或集群模式下,Redis的故障转移(failover)可能导致锁丢失,破坏互斥性。
- 需要客户端实现复杂的“看门狗”逻辑来缓解租约问题。
对于我们管理Milvus Partition这种关键性操作,任何可能导致并发冲突的风险都应该被严肃对待。因此,基于Redis的方案在这种场景下,其可靠性短板是不可接受的。
方案B: 基于ZooKeeper的健壮锁实现
ZooKeeper(ZK)是一个专为分布式应用提供协调服务的软件,其核心是基于ZAB协议保证数据的一致性。分布式锁是ZK的经典应用场景。与Redis依赖TTL的临时性锁不同,ZK利用其临时节点(Ephemeral Nodes)和顺序节点(Sequential Nodes)的特性,构建出逻辑上无懈可击的分布式锁。
ZK锁的核心原理
- 根节点: 在ZK中创建一个持久的根节点,例如
/milvus_locks
。 - 尝试获取锁: 当一个客户端需要获取锁时,它会在
/milvus_locks
下创建一个临时顺序节点(EPHEMERAL_SEQUENTIAL),例如/milvus_locks/lock-0000000001
。 - 判断锁归属: 客户端获取
/milvus_locks
下的所有子节点,并检查自己创建的节点序号是否是最小的。如果是,则它成功获得了锁。 - 等待锁: 如果自己创建的节点序号不是最小的,说明锁已被其他客户端持有。客户端需要找到比自己序号小的前一个节点,并对其设置一个监视(Watch)。
- 释放锁与唤醒:
- 当客户端完成操作后,它只需关闭与ZK的会话,或者显式删除自己创建的临时节点。
- 由于是临时节点,如果客户端崩溃,该节点也会被ZK自动删除。
- 当一个节点被删除时,之前监视它的那个客户端会收到通知。收到通知后,该客户端会再次检查自己是否是序号最小的节点,如果是,则获取锁;如果不是,则继续监视新的前一个节点。
这个机制避免了“惊群效应”,因为每个等待者只监视其前一个节点,而不是所有节点都监视第一个节点。最关键的是,它彻底摆脱了对“租约时间”的依赖。锁的生命周期与客户端的会话严格绑定,这才是真正可靠的。
生产级ZK锁实现
我们将使用成熟的kazoo
库来实现ZK分布式锁。
# partition_lock_zk.py
import logging
import time
from contextlib import contextmanager
from kazoo.client import KazooClient, KazooState
from kazoo.exceptions import NoNodeError, LockTimeout
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class ZookeeperDistLock:
def __init__(self, zk_hosts: str, lock_path_prefix: str = "/milvus/locks/partition"):
"""
基于ZooKeeper的分布式锁实现。
:param zk_hosts: ZooKeeper服务器地址,例如 '127.0.0.1:2181,127.0.0.1:2182'
:param lock_path_prefix: 锁在ZK中的基础路径
"""
self.zk_client = KazooClient(hosts=zk_hosts)
self.lock_path_prefix = lock_path_prefix
self._is_connected = False
self._connect()
def _connect(self):
"""建立与ZK的连接并处理状态变更"""
try:
if self.zk_client.state != 'CONNECTED':
self.zk_client.start(timeout=10)
self.zk_client.add_listener(self._state_listener)
# 确保锁的根路径存在
self.zk_client.ensure_path(self.lock_path_prefix)
self._is_connected = True
logging.info(f"Successfully connected to ZooKeeper at {self.zk_client.hosts}")
except Exception as e:
logging.error(f"Failed to connect to ZooKeeper: {e}", exc_info=True)
self._is_connected = False
raise
def _state_listener(self, state):
if state == KazooState.LOST:
logging.warning("ZooKeeper session lost. Locks might have been released.")
self._is_connected = False
elif state == KazooState.SUSPENDED:
logging.warning("ZooKeeper connection suspended.")
self._is_connected = False
elif state == KazooState.CONNECTED:
logging.info("Reconnected to ZooKeeper.")
self._is_connected = True
else:
logging.info(f"ZooKeeper state changed to: {state}")
@contextmanager
def lock(self, resource_id: str, block_timeout_sec: int = 15):
"""
使用上下文管理器获取和释放锁。
:param resource_id: 资源标识,会成为ZK路径的一部分
:param block_timeout_sec: 阻塞等待超时时间
"""
if not self._is_connected:
raise ConnectionError("ZooKeeper client is not connected.")
# ZK路径不能包含'/',所以需要替换
safe_resource_id = resource_id.replace('/', '_')
lock_path = f"{self.lock_path_prefix}/{safe_resource_id}"
# kazoo 提供了现成的Lock秘方(recipe)
zk_lock = self.zk_client.Lock(lock_path, identifier=f"host-{self.zk_client.client_id[0]}")
acquired = False
try:
logging.info(f"Attempting to acquire ZK lock for resource '{resource_id}' at path '{lock_path}'.")
acquired = zk_lock.acquire(timeout=block_timeout_sec)
if acquired:
logging.info(f"ZK lock acquired for resource '{resource_id}'.")
yield self
else:
raise LockTimeout(f"Failed to acquire ZK lock for resource '{resource_id}' at path '{lock_path}' within {block_timeout_sec} seconds.")
except Exception as e:
logging.error(f"An error occurred with ZK lock for resource '{resource_id}': {e}", exc_info=True)
# 重新抛出异常,让上层知道操作失败
raise
finally:
if acquired:
try:
zk_lock.release()
logging.info(f"ZK lock released for resource '{resource_id}'.")
except NoNodeError:
# 如果会话丢失,节点可能已经不存在了,这是正常情况
logging.warning(f"ZK lock node for '{resource_id}' already gone, likely due to session expiry.")
except Exception as e:
logging.error(f"Error releasing ZK lock for resource '{resource_id}': {e}", exc_info=True)
def close(self):
"""关闭ZK客户端连接"""
if self.zk_client.state != 'CLOSED':
self.zk_client.close()
logging.info("ZooKeeper client closed.")
方案B评估
- 优点:
- 极高的可靠性: 基于ZK会话的临时节点机制,完美解决了死锁和锁提前释放的问题。
- 强一致性保证: ZK保证了锁操作的严格顺序和互斥性。
- 成熟的实现:
kazoo
等客户端库封装了复杂的锁逻辑,使用方便。
- 缺点:
- 性能开销: 相较于Redis,ZK的写操作需要通过共识协议,延迟更高,吞吐量更低。
- 运维复杂度: 引入并维护一个高可用的ZK集群比Redis更复杂。
最终选择与架构整合
在我们的场景中,Partition的加载/卸载操作频率不高,但操作的正确性至关重要。一次错误的并发操作可能导致数据服务长时间不可用,其业务影响远大于分布式锁本身带来的微小性能开销。因此,方案B,即基于ZooKeeper的分布式锁,是毫无疑问的正确选择。
我们将这个锁机制整合到PartitionManager
服务中。
graph TD subgraph "应用服务集群 (Stateless)" App1 -->|"acquire_lock('tenant_A/p1')"| ZK App2 -->|"acquire_lock('tenant_A/p1')"| ZK end subgraph "ZooKeeper集群" ZK1(ZK Server 1) ZK2(ZK Server 2) ZK3(ZK Server 3) ZK1 --- ZK2 --- ZK3 --- ZK1 end ZK(ZK Ensemble) --> App1 App1 -->|"lock acquired"| App1 App1 -->|"load('partition_A')"| Milvus(Milvus集群) App2 -->|"lock denied (waits)"| App2 style ZK fill:#9f9,stroke:#333,stroke-width:2px
最终的PartitionManager
核心逻辑如下:
# partition_manager.py
import logging
from pymilvus import utility, connections
from partition_lock_zk import ZookeeperDistLock
class MilvusPartitionManager:
def __init__(self, milvus_alias: str, zk_hosts: str):
self.milvus_alias = milvus_alias
# 确保到Milvus的连接已建立
connections.connect(alias=self.milvus_alias)
self.zk_lock_manager = ZookeeperDistLock(zk_hosts=zk_hosts)
def ensure_partition_loaded(self, collection_name: str, partition_name: str):
"""
确保一个Partition被加载,使用分布式锁保证操作的原子性和互斥性。
"""
# 检查是否已加载,这是一个乐观检查,可以减少不必要的加锁开销
if utility.has_partition(collection_name, partition_name, using=self.milvus_alias):
# 进一步检查加载状态
stats = utility.get_partition_statistics(collection_name, partition_name, using=self.milvus_alias)
# 在Milvus 2.x中,检查加载状态通常看是否有查询节点在服务
# 这是一个简化的检查,实际可能更复杂
if utility.get_loading_progress(collection_name, [partition_name], using=self.milvus_alias).get('loading_progress', '100%') == '100%':
logging.info(f"Partition '{partition_name}' in collection '{collection_name}' is already loaded.")
return
resource_id = f"{collection_name}/{partition_name}"
logging.info(f"Partition '{partition_name}' not loaded or loading, attempting to acquire lock.")
try:
with self.zk_lock_manager.lock(resource_id, block_timeout_sec=20):
# 双重检查:在获取锁后,再次确认Partition状态,
# 因为可能在你等待锁的过程中,前一个持有锁的进程已经加载完毕。
if utility.get_loading_progress(collection_name, [partition_name], using=self.milvus_alias).get('loading_progress', '100%') == '100%':
logging.info(f"Lock acquired, but partition '{partition_name}' was loaded by another process. Skipping.")
return
logging.info(f"Lock acquired. Loading partition '{partition_name}'...")
# 导入Collection和Partition相关的类
from pymilvus import Collection, Partition
collection = Collection(name=collection_name, using=self.milvus_alias)
collection.load([partition_name])
logging.info(f"Successfully loaded partition '{partition_name}'.")
except TimeoutError as e:
logging.error(f"Could not ensure partition '{partition_name}' is loaded due to lock timeout: {e}")
raise # 将异常上抛,让调用方处理
except Exception as e:
logging.error(f"An error occurred while loading partition '{partition_name}': {e}", exc_info=True)
raise
def close(self):
self.zk_lock_manager.close()
connections.disconnect(self.milvus_alias)
这个MilvusPartitionManager
通过组合ZookeeperDistLock
,提供了一个健壮的、对并发安全的ensure_partition_loaded
接口。双重检查锁定(Double-Checked Locking)模式在这里的应用,优化了性能,避免了在锁已经被其他进程加载完毕后的重复RPC调用。
当前架构的局限性与未来优化
尽管当前基于ZooKeeper的锁方案在可靠性上表现出色,但它并非没有缺点。当前的锁粒度是collection_name/partition_name
,这意味着对同一Partition的加载和卸载操作是互斥的,这是符合预期的。但是,如果管理操作变得更加复杂,例如,一个操作需要同时锁定多个Partition,那么当前的简单锁模型就需要演进为更复杂的事务性操作。
此外,整个系统的可靠性现在强依赖于ZooKeeper集群的健康状况。虽然ZK本身是高可用的,但它成为了系统架构中的一个关键依赖。在未来的演进中,可以考虑将这种协调逻辑下沉到Milvus自身。如果Milvus的未来版本能够在其Proxy层或RootCoord中提供原生的、幂等的load_partition
接口,那么应用层的分布式锁就可以被彻底移除,从而简化整体架构。这是一个值得向Milvus社区提出并推动的特性。