构建高可用Milvus分区动态管理服务中的分布式锁架构权衡


在一个典型的多租户AI服务中,我们使用Milvus作为核心的向量检索引擎。为了隔离租户数据并优化资源利用,我们采用了动态加载和卸载Partition的策略。应用服务是无状态、可水平扩展的,这意味着任何一个服务实例都可能接收到来自某个租户的请求,并需要确保其对应的Partition已被加载到Milvus中。这个场景看似简单,却隐藏着一个典型的分布式协调问题。

如果不对Partition的加载操作进行协调,多个服务实例可能在同一时刻,针对同一个未加载的Partition,并发地向Milvus发起load_partition请求。这不仅会造成Milvus侧不必要的重复加载开销,更严重的是,它可能导致应用服务内部状态不一致,甚至在加载和卸载操作交织时引发竞态条件,导致服务异常。

graph TD
    subgraph "应用服务集群 (Stateless)"
        App1 -->|"load('partition_A')"| Milvus
        App2 -->|"load('partition_A')"| Milvus
        App3 -->|"load('partition_A')"| Milvus
    end
    subgraph "负载均衡"
        LB(Nginx/ALB) --> App1
        LB --> App2
        LB --> App3
    end
    UserRequest("用户请求 (tenant_A)") --> LB
    Milvus(Milvus集群)

    style Milvus fill:#f9f,stroke:#333,stroke-width:2px
    linkStyle 0,1,2 stroke-width:2px,stroke:red,stroke-dasharray: 5 5;

上图清晰地展示了问题的根源:多个实例在缺乏协调机制的情况下,对共享资源(Milvus中的Partition状态)进行了并发写操作。要解决这个问题,我们需要引入一个分布式锁,确保在任意时刻,只有一个服务实例能够执行针对特定Partition的管理操作。

方案A: 基于Redis的轻量级锁实现

在分布式锁的选型中,Redis因其高性能和简单性,通常是第一个被考虑的方案。最基础的实现依赖于SETNX (SET if Not eXists) 命令。

初步实现与陷阱

一个天真的实现可能如下:

# WARNING: This is a naive implementation with critical flaws.
import redis
import time

def acquire_lock(conn: redis.Redis, lock_name, acquire_timeout=10):
    end_time = time.time() + acquire_timeout
    while time.time() < end_time:
        if conn.setnx(lock_name, "locked"):
            # 成功获取锁
            return True
        time.sleep(0.01)
    return False

def release_lock(conn: redis.Redis, lock_name):
    conn.delete(lock_name)

这个实现在真实项目中是完全不可用的。最致命的问题是:如果一个进程获取锁之后崩溃了,没有执行release_lock,那么这个锁将永远无法被释放,导致死锁。

一个常见的改进是为锁设置一个过期时间(TTL)。

# WARNING: Still not production-ready.
# ...
if conn.setnx(lock_name, "locked"):
    conn.expire(lock_name, 30) # 设置30秒过期
    return True
# ...

这里的坑在于setnxexpire是两个独立的操作,它们并非原子性的。如果在setnx成功后,进程在执行expire前崩溃,死锁问题依然存在。幸运的是,现代Redis版本的SET命令已经支持原子化地设置key、value、和过期时间。

生产级Redis锁实现

一个相对可靠的Redis分布式锁实现,必须考虑原子性、锁的归属权以及锁续期问题。

# partition_lock_redis.py
import uuid
import time
import logging
from contextlib import contextmanager
from redis import Redis

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class RedisDistLock:
    def __init__(self, redis_client: Redis, lock_key_prefix: str = "milvus:lock:partition:"):
        """
        基于Redis的分布式锁实现。

        :param redis_client: Redis客户端实例
        :param lock_key_prefix: 锁键的前缀
        """
        if not isinstance(redis_client, Redis):
            raise TypeError("redis_client must be an instance of redis.Redis")
        self.redis_client = redis_client
        self.lock_key_prefix = lock_key_prefix
        # 为每个锁实例生成一个唯一的标识,用于验证锁的归属权
        self.lock_value = str(uuid.uuid4())

    @contextmanager
    def lock(self, resource_id: str, lease_time_sec: int = 30, block_timeout_sec: int = 10):
        """
        使用上下文管理器获取和释放锁。

        :param resource_id: 需要锁定的资源标识,例如 tenant_id/partition_name
        :param lease_time_sec: 锁的租约时间(秒),防止死锁
        :param block_timeout_sec: 阻塞等待获取锁的超时时间(秒)
        """
        lock_key = f"{self.lock_key_prefix}{resource_id}"
        acquired = False
        start_time = time.monotonic()

        try:
            while time.monotonic() - start_time < block_timeout_sec:
                # 使用 SET 命令的 NX 和 EX 选项来原子地获取锁
                # NX: 只在键不存在时设置
                # EX: 设置过期时间(秒)
                if self.redis_client.set(lock_key, self.lock_value, ex=lease_time_sec, nx=True):
                    acquired = True
                    logging.info(f"Lock acquired for resource '{resource_id}' with value '{self.lock_value}'.")
                    yield self
                    break
                # 短暂休眠,避免CPU空转
                time.sleep(0.1)
            
            if not acquired:
                raise TimeoutError(f"Failed to acquire lock for resource '{resource_id}' within {block_timeout_sec} seconds.")
        
        finally:
            if acquired:
                # 释放锁时必须验证锁的归属权,防止误删其他客户端的锁
                # 使用Lua脚本保证 "GET and DELETE" 操作的原子性
                lua_script = """
                if redis.call("get", KEYS[1]) == ARGV[1] then
                    return redis.call("del", KEYS[1])
                else
                    return 0
                end
                """
                try:
                    self.redis_client.eval(lua_script, 1, lock_key, self.lock_value)
                    logging.info(f"Lock released for resource '{resource_id}'.")
                except Exception as e:
                    logging.error(f"Error releasing lock for resource '{resource_id}': {e}", exc_info=True)

这段代码解决了原子性和锁归属权的问题。然而,它引入了一个更棘手的架构难题:**lease_time_sec(租约时间)的设定**。

  • 如果设置得太短:一个耗时较长的操作(如加载一个巨大的Milvus Partition)可能还没执行完,锁就自动过期被释放了。此时,另一个服务实例会拿到锁,进入临界区,导致两个实例同时操作资源。
  • 如果设置得太长:如果获取锁的节点真的崩溃了,这个资源将被锁定很长一段时间,影响服务的可用性。

虽然可以通过“看门狗”(Watchdog)机制,在持有锁的客户端上启动一个后台线程,定期为即将过期的锁续期,但这极大地增加了客户端实现的复杂度。更深层次的问题在于,Redis的主从复制是异步的。如果在Master节点获取锁后,数据还没来得及同步到Slave节点,Master就宕机了,此时Slave被提升为新的Master,但锁信息已经丢失,其他客户端就能立即获取到锁,破坏了锁的互斥性。

方案A评估

  • 优点:
    • 性能极高,Redis是内存数据库,加锁解锁速度快。
    • 实现相对简单,依赖普遍存在的基础设施。
  • 缺点:
    • 可靠性存在理论上限。租约时间难以完美设定,是潜在的业务风险点。
    • 在哨兵或集群模式下,Redis的故障转移(failover)可能导致锁丢失,破坏互斥性。
    • 需要客户端实现复杂的“看门狗”逻辑来缓解租约问题。

对于我们管理Milvus Partition这种关键性操作,任何可能导致并发冲突的风险都应该被严肃对待。因此,基于Redis的方案在这种场景下,其可靠性短板是不可接受的。

方案B: 基于ZooKeeper的健壮锁实现

ZooKeeper(ZK)是一个专为分布式应用提供协调服务的软件,其核心是基于ZAB协议保证数据的一致性。分布式锁是ZK的经典应用场景。与Redis依赖TTL的临时性锁不同,ZK利用其临时节点(Ephemeral Nodes)顺序节点(Sequential Nodes)的特性,构建出逻辑上无懈可击的分布式锁。

ZK锁的核心原理

  1. 根节点: 在ZK中创建一个持久的根节点,例如 /milvus_locks
  2. 尝试获取锁: 当一个客户端需要获取锁时,它会在/milvus_locks下创建一个临时顺序节点(EPHEMERAL_SEQUENTIAL),例如/milvus_locks/lock-0000000001
  3. 判断锁归属: 客户端获取/milvus_locks下的所有子节点,并检查自己创建的节点序号是否是最小的。如果是,则它成功获得了锁。
  4. 等待锁: 如果自己创建的节点序号不是最小的,说明锁已被其他客户端持有。客户端需要找到比自己序号小的前一个节点,并对其设置一个监视(Watch)
  5. 释放锁与唤醒:
    • 当客户端完成操作后,它只需关闭与ZK的会话,或者显式删除自己创建的临时节点。
    • 由于是临时节点,如果客户端崩溃,该节点也会被ZK自动删除。
    • 当一个节点被删除时,之前监视它的那个客户端会收到通知。收到通知后,该客户端会再次检查自己是否是序号最小的节点,如果是,则获取锁;如果不是,则继续监视新的前一个节点。

这个机制避免了“惊群效应”,因为每个等待者只监视其前一个节点,而不是所有节点都监视第一个节点。最关键的是,它彻底摆脱了对“租约时间”的依赖。锁的生命周期与客户端的会话严格绑定,这才是真正可靠的。

生产级ZK锁实现

我们将使用成熟的kazoo库来实现ZK分布式锁。

# partition_lock_zk.py
import logging
import time
from contextlib import contextmanager
from kazoo.client import KazooClient, KazooState
from kazoo.exceptions import NoNodeError, LockTimeout

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class ZookeeperDistLock:
    def __init__(self, zk_hosts: str, lock_path_prefix: str = "/milvus/locks/partition"):
        """
        基于ZooKeeper的分布式锁实现。

        :param zk_hosts: ZooKeeper服务器地址,例如 '127.0.0.1:2181,127.0.0.1:2182'
        :param lock_path_prefix: 锁在ZK中的基础路径
        """
        self.zk_client = KazooClient(hosts=zk_hosts)
        self.lock_path_prefix = lock_path_prefix
        self._is_connected = False
        self._connect()

    def _connect(self):
        """建立与ZK的连接并处理状态变更"""
        try:
            if self.zk_client.state != 'CONNECTED':
                self.zk_client.start(timeout=10)
                self.zk_client.add_listener(self._state_listener)
                # 确保锁的根路径存在
                self.zk_client.ensure_path(self.lock_path_prefix)
                self._is_connected = True
                logging.info(f"Successfully connected to ZooKeeper at {self.zk_client.hosts}")
        except Exception as e:
            logging.error(f"Failed to connect to ZooKeeper: {e}", exc_info=True)
            self._is_connected = False
            raise

    def _state_listener(self, state):
        if state == KazooState.LOST:
            logging.warning("ZooKeeper session lost. Locks might have been released.")
            self._is_connected = False
        elif state == KazooState.SUSPENDED:
            logging.warning("ZooKeeper connection suspended.")
            self._is_connected = False
        elif state == KazooState.CONNECTED:
            logging.info("Reconnected to ZooKeeper.")
            self._is_connected = True
        else:
            logging.info(f"ZooKeeper state changed to: {state}")

    @contextmanager
    def lock(self, resource_id: str, block_timeout_sec: int = 15):
        """
        使用上下文管理器获取和释放锁。

        :param resource_id: 资源标识,会成为ZK路径的一部分
        :param block_timeout_sec: 阻塞等待超时时间
        """
        if not self._is_connected:
            raise ConnectionError("ZooKeeper client is not connected.")

        # ZK路径不能包含'/',所以需要替换
        safe_resource_id = resource_id.replace('/', '_')
        lock_path = f"{self.lock_path_prefix}/{safe_resource_id}"
        
        # kazoo 提供了现成的Lock秘方(recipe)
        zk_lock = self.zk_client.Lock(lock_path, identifier=f"host-{self.zk_client.client_id[0]}")
        acquired = False
        
        try:
            logging.info(f"Attempting to acquire ZK lock for resource '{resource_id}' at path '{lock_path}'.")
            acquired = zk_lock.acquire(timeout=block_timeout_sec)
            
            if acquired:
                logging.info(f"ZK lock acquired for resource '{resource_id}'.")
                yield self
            else:
                raise LockTimeout(f"Failed to acquire ZK lock for resource '{resource_id}' at path '{lock_path}' within {block_timeout_sec} seconds.")
        
        except Exception as e:
            logging.error(f"An error occurred with ZK lock for resource '{resource_id}': {e}", exc_info=True)
            # 重新抛出异常,让上层知道操作失败
            raise
        
        finally:
            if acquired:
                try:
                    zk_lock.release()
                    logging.info(f"ZK lock released for resource '{resource_id}'.")
                except NoNodeError:
                    # 如果会话丢失,节点可能已经不存在了,这是正常情况
                    logging.warning(f"ZK lock node for '{resource_id}' already gone, likely due to session expiry.")
                except Exception as e:
                    logging.error(f"Error releasing ZK lock for resource '{resource_id}': {e}", exc_info=True)

    def close(self):
        """关闭ZK客户端连接"""
        if self.zk_client.state != 'CLOSED':
            self.zk_client.close()
            logging.info("ZooKeeper client closed.")

方案B评估

  • 优点:
    • 极高的可靠性: 基于ZK会话的临时节点机制,完美解决了死锁和锁提前释放的问题。
    • 强一致性保证: ZK保证了锁操作的严格顺序和互斥性。
    • 成熟的实现: kazoo等客户端库封装了复杂的锁逻辑,使用方便。
  • 缺点:
    • 性能开销: 相较于Redis,ZK的写操作需要通过共识协议,延迟更高,吞吐量更低。
    • 运维复杂度: 引入并维护一个高可用的ZK集群比Redis更复杂。

最终选择与架构整合

在我们的场景中,Partition的加载/卸载操作频率不高,但操作的正确性至关重要。一次错误的并发操作可能导致数据服务长时间不可用,其业务影响远大于分布式锁本身带来的微小性能开销。因此,方案B,即基于ZooKeeper的分布式锁,是毫无疑问的正确选择

我们将这个锁机制整合到PartitionManager服务中。

graph TD
    subgraph "应用服务集群 (Stateless)"
        App1 -->|"acquire_lock('tenant_A/p1')"| ZK
        App2 -->|"acquire_lock('tenant_A/p1')"| ZK
    end
    
    subgraph "ZooKeeper集群"
        ZK1(ZK Server 1)
        ZK2(ZK Server 2)
        ZK3(ZK Server 3)
        ZK1 --- ZK2 --- ZK3 --- ZK1
    end
    
    ZK(ZK Ensemble) --> App1
    
    App1 -->|"lock acquired"| App1
    App1 -->|"load('partition_A')"| Milvus(Milvus集群)
    App2 -->|"lock denied (waits)"| App2

    style ZK fill:#9f9,stroke:#333,stroke-width:2px

最终的PartitionManager核心逻辑如下:

# partition_manager.py
import logging
from pymilvus import utility, connections
from partition_lock_zk import ZookeeperDistLock

class MilvusPartitionManager:
    def __init__(self, milvus_alias: str, zk_hosts: str):
        self.milvus_alias = milvus_alias
        # 确保到Milvus的连接已建立
        connections.connect(alias=self.milvus_alias)
        self.zk_lock_manager = ZookeeperDistLock(zk_hosts=zk_hosts)

    def ensure_partition_loaded(self, collection_name: str, partition_name: str):
        """
        确保一个Partition被加载,使用分布式锁保证操作的原子性和互斥性。
        """
        # 检查是否已加载,这是一个乐观检查,可以减少不必要的加锁开销
        if utility.has_partition(collection_name, partition_name, using=self.milvus_alias):
            # 进一步检查加载状态
            stats = utility.get_partition_statistics(collection_name, partition_name, using=self.milvus_alias)
            # 在Milvus 2.x中,检查加载状态通常看是否有查询节点在服务
            # 这是一个简化的检查,实际可能更复杂
            if utility.get_loading_progress(collection_name, [partition_name], using=self.milvus_alias).get('loading_progress', '100%') == '100%':
                 logging.info(f"Partition '{partition_name}' in collection '{collection_name}' is already loaded.")
                 return

        resource_id = f"{collection_name}/{partition_name}"
        logging.info(f"Partition '{partition_name}' not loaded or loading, attempting to acquire lock.")
        
        try:
            with self.zk_lock_manager.lock(resource_id, block_timeout_sec=20):
                # 双重检查:在获取锁后,再次确认Partition状态,
                # 因为可能在你等待锁的过程中,前一个持有锁的进程已经加载完毕。
                if utility.get_loading_progress(collection_name, [partition_name], using=self.milvus_alias).get('loading_progress', '100%') == '100%':
                    logging.info(f"Lock acquired, but partition '{partition_name}' was loaded by another process. Skipping.")
                    return

                logging.info(f"Lock acquired. Loading partition '{partition_name}'...")
                # 导入Collection和Partition相关的类
                from pymilvus import Collection, Partition
                collection = Collection(name=collection_name, using=self.milvus_alias)
                collection.load([partition_name])
                logging.info(f"Successfully loaded partition '{partition_name}'.")

        except TimeoutError as e:
            logging.error(f"Could not ensure partition '{partition_name}' is loaded due to lock timeout: {e}")
            raise  # 将异常上抛,让调用方处理
        except Exception as e:
            logging.error(f"An error occurred while loading partition '{partition_name}': {e}", exc_info=True)
            raise

    def close(self):
        self.zk_lock_manager.close()
        connections.disconnect(self.milvus_alias)

这个MilvusPartitionManager通过组合ZookeeperDistLock,提供了一个健壮的、对并发安全的ensure_partition_loaded接口。双重检查锁定(Double-Checked Locking)模式在这里的应用,优化了性能,避免了在锁已经被其他进程加载完毕后的重复RPC调用。

当前架构的局限性与未来优化

尽管当前基于ZooKeeper的锁方案在可靠性上表现出色,但它并非没有缺点。当前的锁粒度是collection_name/partition_name,这意味着对同一Partition的加载和卸载操作是互斥的,这是符合预期的。但是,如果管理操作变得更加复杂,例如,一个操作需要同时锁定多个Partition,那么当前的简单锁模型就需要演进为更复杂的事务性操作。

此外,整个系统的可靠性现在强依赖于ZooKeeper集群的健康状况。虽然ZK本身是高可用的,但它成为了系统架构中的一个关键依赖。在未来的演进中,可以考虑将这种协调逻辑下沉到Milvus自身。如果Milvus的未来版本能够在其Proxy层或RootCoord中提供原生的、幂等的load_partition接口,那么应用层的分布式锁就可以被彻底移除,从而简化整体架构。这是一个值得向Milvus社区提出并推动的特性。


  目录