我们的生产环境 MongoDB 集群开始出现无法预测的性能抖动。高峰期,部分核心服务的响应时间会从几十毫秒飙升到数秒。问题在于,这些抖动是间歇性的,当我们介入排查时,系统负载早已恢复正常。传统的排查方式——SSH到各个节点,手动 grep
mongod.log
文件中的慢查询——在这种规模和动态性下已经彻底失效。我们需要一个能够持续捕获、结构化解析并提供即时查询能力的系统。
技术选型几乎没有争议:ELK Stack (Elasticsearch, Logstash, Kibana)。Filebeat 负责从各个 MongoDB 节点上轻量级地采集日志,Logstash 作为中枢进行重量级的解析和转换,Elasticsearch 提供强大的索引和聚合能力,Kibana 则用于可视化和交互式探索。挑战不在于选型,而在于实现一个生产级的、能处理 MongoDB 复杂且非结构化慢查询日志格式的可靠数据管道。
整体架构
整个数据流的设计思路是解耦和专业化。每个组件只做自己最擅长的事情。
graph TD subgraph MongoDB集群 MongoNode1[MongoDB节点1
/var/log/mongodb/mongod.log] MongoNode2[MongoDB节点2
/var/log/mongodb/mongod.log] MongoNode3[MongoDB节点3
/var/log/mongodb/mongod.log] end subgraph 日志采集层 Filebeat1[Filebeat] Filebeat2[Filebeat] Filebeat3[Filebeat] end subgraph 解析与缓冲层 Logstash[Logstash Pipeline] end subgraph 存储与索引层 Elasticsearch[Elasticsearch Cluster] end subgraph 可视化与分析层 Kibana[Kibana] end subgraph 工程师 DevOps[DevOps/SRE] end MongoNode1 --> Filebeat1 MongoNode2 --> Filebeat2 MongoNode3 --> Filebeat3 Filebeat1 --> Logstash Filebeat2 --> Logstash Filebeat3 --> Logstash Logstash --> Elasticsearch Elasticsearch --> Kibana Kibana --> DevOps
步骤一:配置 MongoDB 输出慢查询日志
首先,必须让 MongoDB 开始记录我们需要的日志。直接在 mongod.conf
文件中进行配置,这比在运行时通过 db.setProfilingLevel()
命令修改更适合生产环境,因为它可以保证实例重启后配置依然生效。
# /etc/mongod.conf
# ... 其他配置 ...
# 日志配置
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod.log
# 慢查询配置
operationProfiling:
mode: slowOp # 'off', 'slowOp', or 'all'
slowOpThresholdMs: 100 # 记录执行时间超过100ms的操作
# rateLimit: 100 # 可选,限制每秒记录的慢查询数量,防止日志爆炸
这里的关键是 operationProfiling
部分:
-
mode: slowOp
:只记录超过阈值的慢操作。在生产环境中,all
模式会产生海量日志,通常只在调试特定问题时短暂开启。 -
slowOpThresholdMs: 100
:阈值设定是一个权衡。太低会淹没在无用信息中,太高则可能错过潜在的性能问题。100ms 是一个相对合理的起点,后续可以根据业务的 P95/P99 响应时间进行调整。
配置修改后,需要重启 mongod
服务使其生效。
步骤二:使用 Filebeat 采集与预处理日志
Filebeat 的职责是“监视”日志文件,并将新增内容发送到 Logstash。对于 MongoDB 的慢查询日志,最大的挑战在于它们通常是多行的。一个慢查询记录可能横跨数十行,必须将它们合并成一个事件(event)再发送。
这是我们的 filebeat.yml
核心配置:
# /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/mongodb/mongod.log
# --- 多行处理是关键 ---
# MongoDB 日志行以 ISO8601 格式的时间戳开头
# 如果一行不以此格式开头,就将其附加到前一行
multiline.type: pattern
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3,9}(\+|-)[0-9]{2:2}'
multiline.negate: true
multiline.match: after
# 添加自定义字段,用于在 ELK 中区分日志来源
fields:
env: production
app: mongodb
cluster: user-data-cluster-1
fields_under_root: true # 将自定义字段置于顶层
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
# 输出到 Logstash,而不是直接到 Elasticsearch
# 这强制所有日志都经过 Logstash 的解析处理
output.logstash:
hosts: ["logstash-node1:5044", "logstash-node2:5044"]
loadbalance: true
# 启用 TLS/SSL 加密传输是生产环境的最佳实践
# ssl.enabled: true
# ssl.certificate_authorities: ["/etc/pki/tls/certs/logstash-ca.crt"]
配置解析:
-
multiline.*
: 这是整个 Filebeat 配置的灵魂。我们使用正则表达式^\[0-9]{4}-...
来匹配 MongoDB 日志行的起始时间戳。negate: true
和match: after
组合起来的含义是:“如果一行 不 匹配这个时间戳模式,那么它属于上一行,请把它合并进去”。这确保了完整的慢查询块作为一个单一事件被发送。 -
fields
和fields_under_root
: 我们为日志注入了丰富的元数据。env
,app
,cluster
这些标签在后续的查询和仪表盘制作中至关重要,它们允许我们筛选特定环境或集群的慢查询。 -
output.logstash
: 我们配置了负载均衡到两个 Logstash 节点,以提高可用性和吞吐量。
步骤三:Logstash 重量级解析管道
这是整个流程中最复杂、也最有价值的部分。Logstash 的任务是将 Filebeat 发来的纯文本日志块,解析成一个包含数十个字段的、结构化的 JSON 文档。
一个典型的 MongoDB 慢查询日志条目可能长这样:
2023-10-27T10:15:30.123+08:00 I COMMAND [conn12345] command my_db.my_collection command: find { find: "my_collection", filter: { user_id: 123456 }, sort: { created_at: -1 }, limit: 1, projection: { ... } } planSummary: COLLSCAN keysExamined:0 docsExamined:1000000 cursorExhausted:1 numYields:7812 nreturned:1 reslen:432 locks:{ Global: { acquireCount: { r: 15626 } }, Database: { acquireCount: { r: 7813 } }, Collection: { acquireCount: { r: 7813 } } } protocol:op_msg 500ms
我们需要从中提取出 timestamp
, component
, connection
, namespace
, command
, planSummary
, keysExamined
, docsExamined
, locks
, 和 duration
等关键信息。
下面是我们的 logstash/pipeline/mongodb-slow.conf
配置文件,包含详尽的注释。
# logstash/pipeline/mongodb-slow.conf
input {
beats {
port => 5044
# ssl => true
# ssl_certificate => "/etc/logstash/pki/tls/certs/logstash.crt"
# ssl_key => "/etc/logstash/pki/tls/private/logstash.key"
}
}
filter {
# 第一步:使用 Grok 正则表达式进行初步解析
# 这是整个管道最核心也是最脆弱的部分。
grok {
# 匹配 MongoDB 3.x/4.x/5.x 的慢查询日志格式
# 这个模式经过多次迭代,能够处理绝大多数情况
match => { "message" => "(?<mongo_timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY}T%{HOUR}:%{MINUTE}:%{SECOND}\.%{INT}(?:[+-]%{HOUR}:?%{MINUTE}))\s+%{WORD:log_level}\s+%{WORD:component}\s+\[%{DATA:context}\]\s+%{GREEDYDATA:mongo_log_message}" }
tag_on_failure => ["_grokparsefailure_initial"]
}
# 如果初步解析成功,继续深度解析 mongo_log_message 部分
if !("_grokparsefailure_initial" in [tags]) {
grok {
# 这是一个多模式的 Grok,因为 command 和 query 的日志格式略有不同
# 它会按顺序尝试匹配,直到成功为止
match => { "mongo_log_message" => [
"command\s+%{GREEDYDATA:mongo_namespace}\s+command:\s+%{GREEDYDATA:command_details}\s+planSummary:\s+%{WORD:plan_summary}(?:\s+%{GREEDYDATA:plan_details})?\s+keysExamined:%{NUMBER:keys_examined:int}\s+docsExamined:%{NUMBER:docs_examined:int}\s+%{GREEDYDATA:query_stats}\s+%{NUMBER:duration_ms:int}ms",
"query\s+%{GREEDYDATA:mongo_namespace}\s+query:\s+%{GREEDYDATA:command_details}\s+planSummary:\s+%{WORD:plan_summary}(?:\s+%{GREEDYDATA:plan_details})?\s+keysExamined:%{NUMBER:keys_examined:int}\s+docsExamined:%{NUMBER:docs_examined:int}\s+%{GREEDYDATA:query_stats}\s+%{NUMBER:duration_ms:int}ms",
"getmore\s+%{GREEDYDATA:mongo_namespace}\s+getMore:\s+%{GREEDYDATA:command_details}\s+planSummary:\s+%{WORD:plan_summary}(?:\s+%{GREEDYDATA:plan_details})?\s+keysExamined:%{NUMBER:keys_examined:int}\s+docsExamined:%{NUMBER:docs_examined:int}\s+%{GREEDYDATA:query_stats}\s+%{NUMBER:duration_ms:int}ms"
]}
tag_on_failure => ["_grokparsefailure_details"]
}
}
# 第二步:数据类型转换和清理
if !("_grokparsefailure_details" in [tags]) {
mutate {
# 将字符串字段转换为正确的数值类型,便于在 Kibana 中进行聚合计算
convert => {
"keys_examined" => "integer"
"docs_examined" => "integer"
"duration_ms" => "integer"
}
# 清理掉不再需要的原始日志字段
remove_field => ["message", "mongo_log_message"]
}
# 第三步:解析内嵌的 JSON/BSON 字符串
# command_details 字段通常是一个 BSON 的字符串表示,需要用 json filter 来解析
# 这里是一个常见的坑:MongoDB 的日志输出不是严格的 JSON,需要先做一些字符串替换
mutate {
# 例如:将 unquoted keys 转换为 quoted keys, { key: val } -> { "key": val }
# 这需要根据实际日志格式进行微调
gsub => [
"command_details", "{(\w+):", '{"\1":'
]
}
json {
source => "command_details"
target => "command"
# 如果解析失败,不要抛出异常,而是添加一个 tag
tag_on_failure => "_jsonparsefailure_command"
}
# 第四步:时间戳处理
# 使用从日志中解析出的时间戳覆盖 Logstash 默认的 @timestamp
date {
match => [ "mongo_timestamp", "ISO8601" ]
target => "@timestamp"
}
# 第五步: 提取数据库名和集合名
if [mongo_namespace] {
grok {
match => { "mongo_namespace" => "(?<db_name>[^.]+)\.(?<collection_name>[^ ]+)"}
}
}
}
# 错误处理:对于所有解析失败的日志,我们不丢弃,而是打上标签,发送到特定的 index
# 这样便于我们后续分析是哪些日志格式导致了解析失败,从而改进 Grok 表达式
if "_grokparsefailure_initial" in [tags] or "_grokparsefailure_details" in [tags] {
mutate {
add_field => { "parse_error" => true }
}
}
}
output {
if [parse_error] {
# 解析失败的日志进入独立的索引
elasticsearch {
hosts => ["http://elasticsearch-node1:9200"]
index => "mongodb-slow-logs-failed-%{+YYYY.MM.dd}"
# user => "elastic"
# password => "changeme"
}
} else {
# 解析成功的日志进入主索引
elasticsearch {
hosts => ["http://elasticsearch-node1:9200"]
index => "mongodb-slow-logs-%{+YYYY.MM.dd}"
# template => "/etc/logstash/templates/mongodb_slow_logs_template.json"
# template_name => "mongodb-slow-logs"
# template_overwrite => true
}
}
}
这个管道的设计思想:
- 分层解析 (Layered Grok): 先用一个简单的 Grok 提取出公共部分,再用一个复杂的多模式 Grok 解析核心的慢查询信息。这提高了可维护性。
- 健壮的错误处理: 每一步解析失败都会添加一个特定的 tag (
_grokparsefailure_*
)。在 output 阶段,我们根据是否存在这些 tag,将成功和失败的日志分发到不同的 Elasticsearch 索引。这使得我们不会因为少数格式异常的日志而丢失大量正常日志,并且可以轻松地排查解析失败的原因。 - 数据类型转换:
mutate
filter 中的convert
操作至关重要。将duration_ms
等字段转为 integer,才能在 Kibana 中进行数值排序、计算平均值或绘制直方图。 - 处理内嵌结构:
json
filter 是处理 MongoDB 日志中 BSON-like 字符串的关键。真实项目中,gsub
的正则表达式可能需要不断调试和完善,以处理各种边缘情况。 - 时间戳覆盖: 使用日志本身的时间戳,而不是日志到达 Logstash 的时间。这保证了事件时间的准确性。
步骤四:Elasticsearch 索引模板
在数据写入 Elasticsearch 之前,定义一个索引模板 (Index Template) 是生产环境的必要步骤。模板可以确保新创建的索引有正确的 settings 和 mappings。如果不这么做,Elasticsearch 会动态猜测字段类型,这常常导致错误(例如,它可能把一个本应是 keyword
的字段映射成 text
,导致无法进行精确匹配和聚合)。
通过 Kibana Dev Tools 或 curl
命令应用此模板:
PUT _index_template/mongodb_slow_logs_template
{
"index_patterns": ["mongodb-slow-logs-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name": "mongodb-logs-policy",
"index.lifecycle.rollover_alias": "mongodb-slow-logs"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"env": { "type": "keyword" },
"app": { "type": "keyword" },
"cluster": { "type": "keyword" },
"host": {
"properties": {
"name": { "type": "keyword" }
}
},
"log_level": { "type": "keyword" },
"component": { "type": "keyword" },
"context": { "type": "keyword" },
"mongo_namespace": { "type": "keyword" },
"db_name": { "type": "keyword" },
"collection_name": { "type": "keyword" },
"plan_summary": { "type": "keyword" },
"duration_ms": { "type": "long" },
"keys_examined": { "type": "long" },
"docs_examined": { "type": "long" },
"nreturned": { "type": "long" },
"command": { "type": "object", "enabled": false },
"query_stats": { "type": "text", "index": false }
}
}
}
}
模板关键点:
-
index_patterns
: 应用到所有以mongodb-slow-logs-
开头的索引。 -
settings
: 定义分片、副本数量,并关联了一个 ILM (Index Lifecycle Management) 策略,用于自动管理索引的生命周期(例如,30天后转为 warm 节点,90天后删除)。 mappings
: 这是核心。-
keyword
类型用于需要精确匹配、排序和聚合的字段,如env
,plan_summary
。 -
long
类型用于数值字段,如duration_ms
。 -
command
: 这是一个特别的处理。命令的结构千变万化,为它定义精确的 mapping 几乎不可能。将其类型设置为object
并enabled: false
,意味着 Elasticsearch 会存储这个 JSON 对象,但不会为它的子字段创建索引。我们仍然可以在 Kibana 的文档视图中看到完整命令,但无法对其中的字段(如filter.user_id
)进行搜索。这是一个性能和功能之间的权衡。 -
query_stats
: 这个字段内容杂乱,我们不关心其内容,因此设置index: false
来节省存储和索引开销。
-
最终成果:在 Kibana 中诊断问题
当这一切都部署完毕后,我们获得了一个强大的诊断平台。在 Kibana Discover 界面,我们可以用 KQL (Kibana Query Language) 进行复杂的查询:
查找所有全表扫描 (COLLSCAN) 的慢查询:
plan_summary: COLLSCAN and docs_examined > 10000
查找特定集合上耗时最长的查询:
collection_name: "users" and duration_ms > 1000
统计每个集合的慢查询次数:
在 Visualize 模块,创建一个条形图,Y 轴是 Count of documents,X 轴是 Terms aggregation oncollection_name.keyword
。
我们建立了一个仪表盘,包含了几个关键视图:
- 慢查询总数和平均耗时的时间序列图。
- Top 10 慢查询的集合。
- Top 10 慢查询的操作类型 (
command.find
,command.update
, etc.)。 -
plan_summary
的饼图,可以一眼看出COLLSCAN
占了多大比例。 - 一个数据表格,列出最近的慢查询详情,按
duration_ms
降序排列。
现在,当性能抖动发生时,我们不再是盲目地捞日志,而是在这个仪表盘上,通过下钻和筛选,几分钟内就能定位到具体的查询语句、执行计划和影响范围。
局限性与未来迭代方向
这个方案并非一劳永逸。Grok 解析的脆弱性是其最大的弱点。如果未来 MongoDB 版本更改了日志格式,整个解析管道就可能失效,需要重新编写和测试 Grok 模式。一个重要的改进方向是,在支持 JSON 格式日志输出的 MongoDB 新版本(4.4+)中,切换到 JSON 日志。这将极大地简化 Logstash 的配置,用一个 json
filter 替换掉所有复杂的 grok
和 mutate
操作,从而提高管道的稳定性和性能。
其次,当前的管道没有显式的背压(backpressure)处理机制。如果 MongoDB 突然产生海量慢查询(例如,由于一个错误的部署),可能会压垮 Logstash 实例。未来的迭代可以考虑在 Logstash 中启用持久化队列(Persistent Queues),或者引入一个消息队列(如 Kafka 或 RabbitMQ)作为 Filebeat 和 Logstash 之间的缓冲层,以增强系统的韧性。
最后,此平台目前仅用于事后分析。下一步是将其与告警系统集成。利用 Elasticsearch 的告警功能,我们可以创建规则,例如“如果在5分钟内,plan_summary:COLLSCAN
且 docs_examined > 100000
的日志出现超过10次,则立即通过 PagerDuty 发出告警”,从而实现从被动诊断到主动预防的转变。