基于 ELK Stack 构建 MongoDB 慢查询日志的实时解析与诊断平台


我们的生产环境 MongoDB 集群开始出现无法预测的性能抖动。高峰期,部分核心服务的响应时间会从几十毫秒飙升到数秒。问题在于,这些抖动是间歇性的,当我们介入排查时,系统负载早已恢复正常。传统的排查方式——SSH到各个节点,手动 grep mongod.log 文件中的慢查询——在这种规模和动态性下已经彻底失效。我们需要一个能够持续捕获、结构化解析并提供即时查询能力的系统。

技术选型几乎没有争议:ELK Stack (Elasticsearch, Logstash, Kibana)。Filebeat 负责从各个 MongoDB 节点上轻量级地采集日志,Logstash 作为中枢进行重量级的解析和转换,Elasticsearch 提供强大的索引和聚合能力,Kibana 则用于可视化和交互式探索。挑战不在于选型,而在于实现一个生产级的、能处理 MongoDB 复杂且非结构化慢查询日志格式的可靠数据管道。

整体架构

整个数据流的设计思路是解耦和专业化。每个组件只做自己最擅长的事情。

graph TD
    subgraph MongoDB集群
        MongoNode1[MongoDB节点1
/var/log/mongodb/mongod.log] MongoNode2[MongoDB节点2
/var/log/mongodb/mongod.log] MongoNode3[MongoDB节点3
/var/log/mongodb/mongod.log] end subgraph 日志采集层 Filebeat1[Filebeat] Filebeat2[Filebeat] Filebeat3[Filebeat] end subgraph 解析与缓冲层 Logstash[Logstash Pipeline] end subgraph 存储与索引层 Elasticsearch[Elasticsearch Cluster] end subgraph 可视化与分析层 Kibana[Kibana] end subgraph 工程师 DevOps[DevOps/SRE] end MongoNode1 --> Filebeat1 MongoNode2 --> Filebeat2 MongoNode3 --> Filebeat3 Filebeat1 --> Logstash Filebeat2 --> Logstash Filebeat3 --> Logstash Logstash --> Elasticsearch Elasticsearch --> Kibana Kibana --> DevOps

步骤一:配置 MongoDB 输出慢查询日志

首先,必须让 MongoDB 开始记录我们需要的日志。直接在 mongod.conf 文件中进行配置,这比在运行时通过 db.setProfilingLevel() 命令修改更适合生产环境,因为它可以保证实例重启后配置依然生效。

# /etc/mongod.conf

# ... 其他配置 ...

# 日志配置
systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.log

# 慢查询配置
operationProfiling:
  mode: slowOp # 'off', 'slowOp', or 'all'
  slowOpThresholdMs: 100 # 记录执行时间超过100ms的操作
  # rateLimit: 100 # 可选,限制每秒记录的慢查询数量,防止日志爆炸

这里的关键是 operationProfiling 部分:

  • mode: slowOp:只记录超过阈值的慢操作。在生产环境中,all 模式会产生海量日志,通常只在调试特定问题时短暂开启。
  • slowOpThresholdMs: 100:阈值设定是一个权衡。太低会淹没在无用信息中,太高则可能错过潜在的性能问题。100ms 是一个相对合理的起点,后续可以根据业务的 P95/P99 响应时间进行调整。

配置修改后,需要重启 mongod 服务使其生效。

步骤二:使用 Filebeat 采集与预处理日志

Filebeat 的职责是“监视”日志文件,并将新增内容发送到 Logstash。对于 MongoDB 的慢查询日志,最大的挑战在于它们通常是多行的。一个慢查询记录可能横跨数十行,必须将它们合并成一个事件(event)再发送。

这是我们的 filebeat.yml 核心配置:

# /etc/filebeat/filebeat.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/mongodb/mongod.log

  # --- 多行处理是关键 ---
  # MongoDB 日志行以 ISO8601 格式的时间戳开头
  # 如果一行不以此格式开头,就将其附加到前一行
  multiline.type: pattern
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3,9}(\+|-)[0-9]{2:2}'
  multiline.negate: true
  multiline.match: after

  # 添加自定义字段,用于在 ELK 中区分日志来源
  fields:
    env: production
    app: mongodb
    cluster: user-data-cluster-1
  fields_under_root: true # 将自定义字段置于顶层

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

# 输出到 Logstash,而不是直接到 Elasticsearch
# 这强制所有日志都经过 Logstash 的解析处理
output.logstash:
  hosts: ["logstash-node1:5044", "logstash-node2:5044"]
  loadbalance: true
  # 启用 TLS/SSL 加密传输是生产环境的最佳实践
  # ssl.enabled: true
  # ssl.certificate_authorities: ["/etc/pki/tls/certs/logstash-ca.crt"]

配置解析:

  • multiline.*: 这是整个 Filebeat 配置的灵魂。我们使用正则表达式 ^\[0-9]{4}-... 来匹配 MongoDB 日志行的起始时间戳。negate: truematch: after 组合起来的含义是:“如果一行 匹配这个时间戳模式,那么它属于上一行,请把它合并进去”。这确保了完整的慢查询块作为一个单一事件被发送。
  • fieldsfields_under_root: 我们为日志注入了丰富的元数据。env, app, cluster 这些标签在后续的查询和仪表盘制作中至关重要,它们允许我们筛选特定环境或集群的慢查询。
  • output.logstash: 我们配置了负载均衡到两个 Logstash 节点,以提高可用性和吞吐量。

步骤三:Logstash 重量级解析管道

这是整个流程中最复杂、也最有价值的部分。Logstash 的任务是将 Filebeat 发来的纯文本日志块,解析成一个包含数十个字段的、结构化的 JSON 文档。

一个典型的 MongoDB 慢查询日志条目可能长这样:

2023-10-27T10:15:30.123+08:00 I COMMAND  [conn12345] command my_db.my_collection command: find { find: "my_collection", filter: { user_id: 123456 }, sort: { created_at: -1 }, limit: 1, projection: { ... } } planSummary: COLLSCAN keysExamined:0 docsExamined:1000000 cursorExhausted:1 numYields:7812 nreturned:1 reslen:432 locks:{ Global: { acquireCount: { r: 15626 } }, Database: { acquireCount: { r: 7813 } }, Collection: { acquireCount: { r: 7813 } } } protocol:op_msg 500ms

我们需要从中提取出 timestamp, component, connection, namespace, command, planSummary, keysExamined, docsExamined, locks, 和 duration 等关键信息。

下面是我们的 logstash/pipeline/mongodb-slow.conf 配置文件,包含详尽的注释。

# logstash/pipeline/mongodb-slow.conf

input {
  beats {
    port => 5044
    # ssl => true
    # ssl_certificate => "/etc/logstash/pki/tls/certs/logstash.crt"
    # ssl_key => "/etc/logstash/pki/tls/private/logstash.key"
  }
}

filter {
  # 第一步:使用 Grok 正则表达式进行初步解析
  # 这是整个管道最核心也是最脆弱的部分。
  grok {
    # 匹配 MongoDB 3.x/4.x/5.x 的慢查询日志格式
    # 这个模式经过多次迭代,能够处理绝大多数情况
    match => { "message" => "(?<mongo_timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY}T%{HOUR}:%{MINUTE}:%{SECOND}\.%{INT}(?:[+-]%{HOUR}:?%{MINUTE}))\s+%{WORD:log_level}\s+%{WORD:component}\s+\[%{DATA:context}\]\s+%{GREEDYDATA:mongo_log_message}" }
    tag_on_failure => ["_grokparsefailure_initial"]
  }

  # 如果初步解析成功,继续深度解析 mongo_log_message 部分
  if !("_grokparsefailure_initial" in [tags]) {
    grok {
      # 这是一个多模式的 Grok,因为 command 和 query 的日志格式略有不同
      # 它会按顺序尝试匹配,直到成功为止
      match => { "mongo_log_message" => [
        "command\s+%{GREEDYDATA:mongo_namespace}\s+command:\s+%{GREEDYDATA:command_details}\s+planSummary:\s+%{WORD:plan_summary}(?:\s+%{GREEDYDATA:plan_details})?\s+keysExamined:%{NUMBER:keys_examined:int}\s+docsExamined:%{NUMBER:docs_examined:int}\s+%{GREEDYDATA:query_stats}\s+%{NUMBER:duration_ms:int}ms",
        "query\s+%{GREEDYDATA:mongo_namespace}\s+query:\s+%{GREEDYDATA:command_details}\s+planSummary:\s+%{WORD:plan_summary}(?:\s+%{GREEDYDATA:plan_details})?\s+keysExamined:%{NUMBER:keys_examined:int}\s+docsExamined:%{NUMBER:docs_examined:int}\s+%{GREEDYDATA:query_stats}\s+%{NUMBER:duration_ms:int}ms",
        "getmore\s+%{GREEDYDATA:mongo_namespace}\s+getMore:\s+%{GREEDYDATA:command_details}\s+planSummary:\s+%{WORD:plan_summary}(?:\s+%{GREEDYDATA:plan_details})?\s+keysExamined:%{NUMBER:keys_examined:int}\s+docsExamined:%{NUMBER:docs_examined:int}\s+%{GREEDYDATA:query_stats}\s+%{NUMBER:duration_ms:int}ms"
      ]}
      tag_on_failure => ["_grokparsefailure_details"]
    }
  }

  # 第二步:数据类型转换和清理
  if !("_grokparsefailure_details" in [tags]) {
    mutate {
      # 将字符串字段转换为正确的数值类型,便于在 Kibana 中进行聚合计算
      convert => {
        "keys_examined" => "integer"
        "docs_examined" => "integer"
        "duration_ms" => "integer"
      }
      # 清理掉不再需要的原始日志字段
      remove_field => ["message", "mongo_log_message"]
    }

    # 第三步:解析内嵌的 JSON/BSON 字符串
    # command_details 字段通常是一个 BSON 的字符串表示,需要用 json filter 来解析
    # 这里是一个常见的坑:MongoDB 的日志输出不是严格的 JSON,需要先做一些字符串替换
    mutate {
      # 例如:将 unquoted keys 转换为 quoted keys, { key: val } -> { "key": val }
      # 这需要根据实际日志格式进行微调
      gsub => [
        "command_details", "{(\w+):", '{"\1":'
      ]
    }
    json {
      source => "command_details"
      target => "command"
      # 如果解析失败,不要抛出异常,而是添加一个 tag
      tag_on_failure => "_jsonparsefailure_command"
    }

    # 第四步:时间戳处理
    # 使用从日志中解析出的时间戳覆盖 Logstash 默认的 @timestamp
    date {
      match => [ "mongo_timestamp", "ISO8601" ]
      target => "@timestamp"
    }
    
    # 第五步: 提取数据库名和集合名
    if [mongo_namespace] {
        grok {
            match => { "mongo_namespace" => "(?<db_name>[^.]+)\.(?<collection_name>[^ ]+)"}
        }
    }
  }

  # 错误处理:对于所有解析失败的日志,我们不丢弃,而是打上标签,发送到特定的 index
  # 这样便于我们后续分析是哪些日志格式导致了解析失败,从而改进 Grok 表达式
  if "_grokparsefailure_initial" in [tags] or "_grokparsefailure_details" in [tags] {
    mutate {
      add_field => { "parse_error" => true }
    }
  }
}

output {
  if [parse_error] {
    # 解析失败的日志进入独立的索引
    elasticsearch {
      hosts => ["http://elasticsearch-node1:9200"]
      index => "mongodb-slow-logs-failed-%{+YYYY.MM.dd}"
      # user => "elastic"
      # password => "changeme"
    }
  } else {
    # 解析成功的日志进入主索引
    elasticsearch {
      hosts => ["http://elasticsearch-node1:9200"]
      index => "mongodb-slow-logs-%{+YYYY.MM.dd}"
      # template => "/etc/logstash/templates/mongodb_slow_logs_template.json"
      # template_name => "mongodb-slow-logs"
      # template_overwrite => true
    }
  }
}

这个管道的设计思想:

  1. 分层解析 (Layered Grok): 先用一个简单的 Grok 提取出公共部分,再用一个复杂的多模式 Grok 解析核心的慢查询信息。这提高了可维护性。
  2. 健壮的错误处理: 每一步解析失败都会添加一个特定的 tag (_grokparsefailure_*)。在 output 阶段,我们根据是否存在这些 tag,将成功和失败的日志分发到不同的 Elasticsearch 索引。这使得我们不会因为少数格式异常的日志而丢失大量正常日志,并且可以轻松地排查解析失败的原因。
  3. 数据类型转换: mutate filter 中的 convert 操作至关重要。将 duration_ms 等字段转为 integer,才能在 Kibana 中进行数值排序、计算平均值或绘制直方图。
  4. 处理内嵌结构: json filter 是处理 MongoDB 日志中 BSON-like 字符串的关键。真实项目中,gsub 的正则表达式可能需要不断调试和完善,以处理各种边缘情况。
  5. 时间戳覆盖: 使用日志本身的时间戳,而不是日志到达 Logstash 的时间。这保证了事件时间的准确性。

步骤四:Elasticsearch 索引模板

在数据写入 Elasticsearch 之前,定义一个索引模板 (Index Template) 是生产环境的必要步骤。模板可以确保新创建的索引有正确的 settings 和 mappings。如果不这么做,Elasticsearch 会动态猜测字段类型,这常常导致错误(例如,它可能把一个本应是 keyword 的字段映射成 text,导致无法进行精确匹配和聚合)。

通过 Kibana Dev Tools 或 curl 命令应用此模板:

PUT _index_template/mongodb_slow_logs_template
{
  "index_patterns": ["mongodb-slow-logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1,
      "index.lifecycle.name": "mongodb-logs-policy",
      "index.lifecycle.rollover_alias": "mongodb-slow-logs"
    },
    "mappings": {
      "properties": {
        "@timestamp": { "type": "date" },
        "env": { "type": "keyword" },
        "app": { "type": "keyword" },
        "cluster": { "type": "keyword" },
        "host": {
          "properties": {
            "name": { "type": "keyword" }
          }
        },
        "log_level": { "type": "keyword" },
        "component": { "type": "keyword" },
        "context": { "type": "keyword" },
        "mongo_namespace": { "type": "keyword" },
        "db_name": { "type": "keyword" },
        "collection_name": { "type": "keyword" },
        "plan_summary": { "type": "keyword" },
        "duration_ms": { "type": "long" },
        "keys_examined": { "type": "long" },
        "docs_examined": { "type": "long" },
        "nreturned": { "type": "long" },
        "command": { "type": "object", "enabled": false },
        "query_stats": { "type": "text", "index": false }
      }
    }
  }
}

模板关键点:

  • index_patterns: 应用到所有以 mongodb-slow-logs- 开头的索引。
  • settings: 定义分片、副本数量,并关联了一个 ILM (Index Lifecycle Management) 策略,用于自动管理索引的生命周期(例如,30天后转为 warm 节点,90天后删除)。
  • mappings: 这是核心。
    • keyword 类型用于需要精确匹配、排序和聚合的字段,如 env, plan_summary
    • long 类型用于数值字段,如 duration_ms
    • command: 这是一个特别的处理。命令的结构千变万化,为它定义精确的 mapping 几乎不可能。将其类型设置为 objectenabled: false,意味着 Elasticsearch 会存储这个 JSON 对象,但不会为它的子字段创建索引。我们仍然可以在 Kibana 的文档视图中看到完整命令,但无法对其中的字段(如 filter.user_id)进行搜索。这是一个性能和功能之间的权衡。
    • query_stats: 这个字段内容杂乱,我们不关心其内容,因此设置 index: false 来节省存储和索引开销。

最终成果:在 Kibana 中诊断问题

当这一切都部署完毕后,我们获得了一个强大的诊断平台。在 Kibana Discover 界面,我们可以用 KQL (Kibana Query Language) 进行复杂的查询:

  • 查找所有全表扫描 (COLLSCAN) 的慢查询:
    plan_summary: COLLSCAN and docs_examined > 10000

  • 查找特定集合上耗时最长的查询:
    collection_name: "users" and duration_ms > 1000

  • 统计每个集合的慢查询次数:
    在 Visualize 模块,创建一个条形图,Y 轴是 Count of documents,X 轴是 Terms aggregation on collection_name.keyword

我们建立了一个仪表盘,包含了几个关键视图:

  1. 慢查询总数和平均耗时的时间序列图。
  2. Top 10 慢查询的集合。
  3. Top 10 慢查询的操作类型 (command.find, command.update, etc.)。
  4. plan_summary 的饼图,可以一眼看出 COLLSCAN 占了多大比例。
  5. 一个数据表格,列出最近的慢查询详情,按 duration_ms 降序排列。

现在,当性能抖动发生时,我们不再是盲目地捞日志,而是在这个仪表盘上,通过下钻和筛选,几分钟内就能定位到具体的查询语句、执行计划和影响范围。

局限性与未来迭代方向

这个方案并非一劳永逸。Grok 解析的脆弱性是其最大的弱点。如果未来 MongoDB 版本更改了日志格式,整个解析管道就可能失效,需要重新编写和测试 Grok 模式。一个重要的改进方向是,在支持 JSON 格式日志输出的 MongoDB 新版本(4.4+)中,切换到 JSON 日志。这将极大地简化 Logstash 的配置,用一个 json filter 替换掉所有复杂的 grokmutate 操作,从而提高管道的稳定性和性能。

其次,当前的管道没有显式的背压(backpressure)处理机制。如果 MongoDB 突然产生海量慢查询(例如,由于一个错误的部署),可能会压垮 Logstash 实例。未来的迭代可以考虑在 Logstash 中启用持久化队列(Persistent Queues),或者引入一个消息队列(如 Kafka 或 RabbitMQ)作为 Filebeat 和 Logstash 之间的缓冲层,以增强系统的韧性。

最后,此平台目前仅用于事后分析。下一步是将其与告警系统集成。利用 Elasticsearch 的告警功能,我们可以创建规则,例如“如果在5分钟内,plan_summary:COLLSCANdocs_examined > 100000 的日志出现超过10次,则立即通过 PagerDuty 发出告警”,从而实现从被动诊断到主动预防的转变。


  目录