我们团队维护的一个高流量Ruby on Rails应用,每天会产生数千万级别的特定领域事件——比如功能开关的评估、搜索查询的详细参数、用户关键行为路径节点等。这些数据对于理解用户行为和驱动业务决策至关重要。最初,我们尝试将这些事件作为自定义指标或结构化日志推送到Datadog。很快,成本就失控了。Datadog在APM、基础设施监控和标准日志管理上表现出色,但将其用作海量自定义事件的分析平台,其成本模型让我们难以承受。我们需要一个方案,既能满足对这些事件的实时分析需求,又能将成本控制在合理范围内。
初步构想与技术选型权衡
核心思路是构建一个并行的、专门处理这类“低优先级但高容量”事件的内部管道。我们将继续使用Datadog负责核心的应用性能监控(APM)、基础设施指标和关键错误报警。而这个新管道则专注于收集、存储和分析海量的业务事件。
技术选型过程中的思考如下:
事件生产者与缓冲 (Producer & Buffer): 应用程序是Ruby on Rails,所以生产者自然是Ruby。直接在Web请求周期内同步发送事件是不可接受的,会严重影响响应时间。因此,必须采用异步处理。我们已经重度使用Sidekiq,所以利用它来创建一个专门的队列处理事件批处理是理所当然的选择。
数据存储与分析引擎 (Storage & Analytics):
- PostgreSQL? 我们已经有生产级的PG集群。但对于这种写密集、分析查询为主的场景,PG并不是最优解。它的行式存储、索引开销以及在聚合大规模数据时的性能都无法与专门的分析型数据库相比。
- Elasticsearch? ELK栈是日志分析的常见选择。但我们的场景更偏向于结构化数据的聚合统计,而非全文搜索。虽然Elasticsearch可以胜任,但在存储成本和纯粹的聚合查询速度上,它通常不如列式数据库。
- ClickHouse? 这似乎是为我们的问题量身定做的。作为一个开源的列式数据库管理系统,它为在线分析处理(OLAP)场景设计。其核心优势是:极高的数据压缩率(列式存储的天然优势)、闪电般的聚合查询速度、以及惊人的写入吞吐能力。对于不可变的事件流数据,ClickHouse的
MergeTree系列表引擎是完美匹配。
数据可视化前端 (Frontend): 我们需要一个简单的内部仪表盘来展示ClickHouse中的数据。
- React/Vue? 对于一个内部工具,引入完整的前端框架显得过重。我们不需要复杂的状态管理、虚拟DOM。这会增加构建复杂度和维护成本。
- 纯HTML + 轻量JS? 可行,但手写CSS来构建一个数据密集型、可维护的界面会很痛苦。
- PostCSS? 这是一个绝佳的折中方案。我们可以继续使用简单的服务器端渲染(如Rails的ERB或Sinatra),同时利用PostCSS及其强大的插件生态(如
postcss-nesting,autoprefixer,tailwindcss)来编写现代化、模块化的CSS。它没有运行时开销,最终产物只是一个高度优化的原生CSS文件。这完全符合我们对这个内部工具“轻量、高效、易维护”的要求。
元监控 (Meta-Monitoring): 我们构建了一个新的数据管道,但谁来保证这个管道自身的健康?答案是Datadog。我们不能放弃我们现有的、成熟的监控体系。我们将使用Datadog来监控这个新管道的关键组件:Sidekiq队列深度、Ruby生产者应用的性能、以及ClickHouse集群的健康状况和数据同步延迟。这种“混合可观测性”模型让我们能取长补短。
最终的架构决策是:Ruby应用通过Sidekiq将事件异步批量推送到ClickHouse。一个轻量级的Sinatra应用(或Rails内的独立Engine)负责查询ClickHouse并通过ERB模板渲染前端页面,页面的样式由PostCSS构建。同时,Datadog Agent监控着整个管道的健康状态。
第一步:Ruby端的事件生产者与Sidekiq批处理
首先,我们定义一个统一的事件发送接口。一个简单的Service Object是理想的实现。
# app/services/event_tracker.rb
require 'net/http'
require 'uri'
require 'json'
require 'concurrent'
# NOTE: 这是一个单例服务,用于在应用各处记录事件。
# 它内部使用一个线程安全的缓冲区来暂存事件,
# 并通过Sidekiq定期将缓冲区内容刷入ClickHouse。
class EventTracker
include Singleton
# ClickHouse的配置,从环境变量读取
CLICKHOUSE_HOST = ENV.fetch('CLICKHOUSE_HOST', 'localhost')
CLICKHOUSE_PORT = ENV.fetch('CLICKHOUSE_PORT', '8123')
CLICKHOUSE_USER = ENV.fetch('CLICKHOUSE_USER', 'default')
CLICKHOUSE_PASSWORD = ENV.fetch('CLICKHOUSE_PASSWORD', '')
CLICKHOUSE_TABLE = ENV.fetch('CLICKHOUSE_TABLE', 'default.events')
# 缓冲区配置
# 在真实项目中,这个大小需要根据事件频率和内存占用仔细调优
BUFFER_FLUSH_SIZE = 1000
BUFFER_FLUSH_INTERVAL = 10.seconds # seconds
def initialize
# @buffer是一个线程安全的数组,用于暂存事件
@buffer = Concurrent::Array.new
@lock = Mutex.new
# 启动一个定时任务,定期刷新缓冲区
# 这里的实现比较简单,在生产环境中可能需要更健壮的后台任务调度器
# 但由于我们最终会用Sidekiq,这里的定时器只是一个概念验证
# 在Rails初始化时启动这个逻辑
end
# 公共接口,用于记录事件
# @param event_name [String] 事件名称
# @param properties [Hash] 事件属性
def track(event_name, properties = {})
event = {
event_name: event_name.to_s,
properties: properties.to_json, # 将properties序列化为JSON字符串
timestamp: Time.now.utc.iso8601(6), # ISO8601格式,带微秒
host: Socket.gethostname
}
@buffer << event
# 当缓冲区达到阈值时,触发异步任务
flush_async if @buffer.size >= BUFFER_FLUSH_SIZE
end
# 强制刷新缓冲区,可用于应用关闭前的优雅处理
def flush
events_to_flush = nil
@lock.synchronize do
return if @buffer.empty?
events_to_flush = @buffer.dup
@buffer.clear
end
# 这里的关键是:将实际的网络IO操作交给Sidekiq
ClickHouseEventWorker.perform_async(events_to_flush) if events_to_flush&.any?
end
private
def flush_async
# 在真实项目中,这里应该有逻辑防止在短时间内频繁触发
# 例如使用Redis锁来做节流
@lock.synchronize do
return if @buffer.size < BUFFER_FLUSH_SIZE
events_to_flush = @buffer.slice!(0, BUFFER_FLUSH_SIZE)
ClickHouseEventWorker.perform_async(events_to_flush)
end
end
end
# 使用方式:
# EventTracker.instance.track('user_signed_up', { user_id: 123, plan: 'premium' })
接下来是处理实际发送逻辑的Sidekiq Worker。它负责将一批事件通过HTTP接口发送给ClickHouse。
# app/workers/click_house_event_worker.rb
class ClickHouseEventWorker
include Sidekiq::Worker
# 配置Sidekiq重试策略
# 对于数据管道,我们希望有重试,但不能无限重试
# 失败的任务会进入Dead Job Queue,需要后续手动或自动处理
sidekiq_options queue: 'low_priority', retry: 5
# 从EventTracker获取配置
CLICKHOUSE_URI = URI::HTTP.build(
host: EventTracker::CLICKHOUSE_HOST,
port: EventTracker::CLICKHOUSE_PORT,
query: {
# 指定目标表和输入格式
query: "INSERT INTO #{EventTracker::CLICKHOUSE_TABLE} FORMAT JSONEachRow"
}.to_query
)
def perform(events)
return if events.empty?
# 将事件数组转换为ClickHouse的JSONEachRow格式
# 即每个JSON对象占一行
payload = events.map(&:to_json).join("\n")
http = Net::HTTP.new(CLICKHOUSE_URI.host, CLICKHOUSE_URI.port)
http.use_ssl = (CLICKHOUSE_URI.scheme == 'https')
# 设置合理的超时,防止长时间阻塞worker
http.read_timeout = 15
http.open_timeout = 5
request = Net::HTTP::Post.new(CLICKHOUSE_URI.request_uri)
request.body = payload
request.basic_auth(EventTracker::CLICKHOUSE_USER, EventTracker::CLICKHOUSE_PASSWORD)
request['Content-Type'] = 'application/x-ndjson'
begin
response = http.request(request)
# 必须检查响应码,ClickHouse在失败时也会返回200 OK,但body里有错误信息
unless response.is_a?(Net::HTTPSuccess) && !response.body.include?('Exception')
# 记录详细错误日志,以便排查
Rails.logger.error "Failed to send events to ClickHouse. Status: #{response.code}, Body: #{response.body}"
# 主动抛出异常,触发Sidekiq的重试机制
raise "ClickHouseIngestionError: #{response.code} #{response.body}"
end
# 可以在这里记录成功发送的事件数量作为监控指标
Rails.logger.info "Successfully sent #{events.count} events to ClickHouse."
rescue Net::ReadTimeout, Net::OpenTimeout, Net::HTTPBadResponse, Errno::ECONNREFUSED => e
Rails.logger.error "Network error while sending events to ClickHouse: #{e.class} - #{e.message}"
# 再次抛出异常以利用Sidekiq重试
raise e
end
end
end
这里的关键点在于错误处理和批处理格式。我们使用 JSONEachRow 格式,它在性能和易用性之间取得了很好的平衡。Sidekiq的重试机制为我们提供了基础的数据发送可靠性保障。
第二步:ClickHouse的表结构设计
ClickHouse的性能很大程度上取决于表结构的设计。对于事件数据,MergeTree 是不二之选。
-- 在ClickHouse中执行此DDL
CREATE TABLE default.events (
-- 我们使用UUID作为事件的唯一标识,这在排查问题时非常有用
`event_id` UUID DEFAULT generateUUIDv4(),
-- 事件名称,例如 'user_signed_up'
-- LowCardinality对于基数不高的字符串能极大优化存储和查询性能
`event_name` LowCardinality(String),
-- 使用DateTime64来存储带毫秒或微秒精度的时间戳
`timestamp` DateTime64(6, 'UTC'),
-- 事件相关的属性,存储为JSON字符串。
-- 在查询时可以使用ClickHouse的JSON函数进行解析
`properties` String,
-- 记录事件来源的主机名,便于区分来源
`host` LowCardinality(String),
-- 为了数据管理,我们添加一个仅用于分区的日期列
`event_date` Date DEFAULT toDate(timestamp)
) ENGINE = MergeTree()
-- 按月分区。这是管理数据生命周期(TTL)和提升查询性能的关键
PARTITION BY toYYYYMM(event_date)
-- 排序键是查询优化的核心。将最常用于过滤和聚合的列放在前面
-- event_name 通常是查询的第一个过滤条件
-- timestamp 通常用于范围查询
ORDER BY (event_name, timestamp)
-- 设置数据自动过期,例如保留最近90天的数据
TTL event_date + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;
这个表结构的设计包含了几个最佳实践:
-
LowCardinality(String): 对于像event_name或host这类取值范围有限的字符串,使用LowCardinality可以将其转换为字典编码,大幅减少存储空间和加快查询速度。 -
PARTITION BY: 按月分区使得删除旧数据(TTL)或对特定月份进行查询时效率极高,ClickHouse只需处理相关的分区目录,而不是扫描整张表。 -
ORDER BY: 排序键(PRIMARY KEY在MergeTree中的实际含义)决定了数据在磁盘上的物理存储顺序。查询时如果WHERE条件命中了排序键的前缀,ClickHouse可以快速跳过不相关的数据块,性能提升是数量级的。
第三步:使用PostCSS构建轻量级数据仪表盘
我们选择Sinatra作为后端Web框架,因为它足够轻量。
# dashboard.rb
require 'sinatra'
require 'clickhouse' # 使用 `clickhouse-ruby` gem
# 全局配置ClickHouse连接
# 在生产中,这些应该来自环境变量
CLICKHOUSE_CONFIG = {
host: 'localhost',
port: 8123,
database: 'default'
}
Clickhouse.establish_connection(CLICKHOUSE_CONFIG)
get '/' do
# 查询最近1小时每分钟的事件总数
@events_per_minute = Clickhouse.connection.select_rows(
query: %{
SELECT toStartOfMinute(timestamp) AS minute, count()
FROM default.events
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute ASC
}
).map { |row| { minute: row[0], count: row[1] } }
# 查询最近发生的100个事件
@latest_events = Clickhouse.connection.select_all(
query: %{
SELECT event_name, timestamp, properties, host
FROM default.events
ORDER BY timestamp DESC
LIMIT 100
}
)
erb :index
end
视图views/index.erb会渲染这些数据。但重点在于CSS的构建。我们的项目结构可能如下:
.
├── dashboard.rb
├── views/
│ ├── index.erb
│ └── layout.erb
├── public/
│ └── css/
│ └── app.css # 这是编译后的产物
└── assets/
└── css/
├── app.css
├── _variables.css
└── components/
├── _table.css
└── _chart.css
package.json定义了我们的构建脚本:
{
"name": "dashboard-frontend",
"version": "1.0.0",
"scripts": {
"build": "postcss assets/css/app.css -o public/css/app.css",
"watch": "postcss assets/css/app.css -o public/css/app.css --watch"
},
"devDependencies": {
"autoprefixer": "^10.4.16",
"cssnano": "^6.0.1",
"postcss": "^8.4.31",
"postcss-cli": "^10.1.0",
"postcss-import": "^15.1.0",
"postcss-nesting": "^12.0.1"
}
}
postcss.config.js配置了我们使用的插件:
// postcss.config.js
module.exports = {
plugins: [
require('postcss-import'), // 允许使用 @import 语法来组织CSS文件
require('postcss-nesting'), // 允许使用类似Sass的嵌套语法
require('autoprefixer'), // 自动添加浏览器厂商前缀
...(process.env.NODE_ENV === 'production' ? [require('cssnano')] : []) // 生产环境启用压缩
]
};
我们的CSS源文件assets/css/app.css:
/* assets/css/app.css */
@import "./_variables.css";
@import "./components/_table.css";
@import "./components/_chart.css";
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
background-color: var(--color-background);
color: var(--color-text);
margin: 0;
padding: 2rem;
}
一个组件样式文件assets/css/components/_table.css的例子,展示了postcss-nesting的威力:
/* assets/css/components/_table.css */
.data-table {
width: 100%;
border-collapse: collapse;
font-size: 0.9em;
& th, & td {
padding: 0.75rem 1rem;
border-bottom: 1px solid var(--color-border);
text-align: left;
}
& thead th {
background-color: var(--color-header-bg);
font-weight: 600;
}
& tbody tr {
&:hover {
background-color: var(--color-row-hover);
}
}
/* 针对属性列的特殊样式,内容可能是长JSON */
& .properties-cell {
max-width: 400px;
white-space: pre-wrap; /* 保持JSON格式 */
word-break: break-all;
font-family: monospace;
font-size: 0.85em;
background-color: var(--color-code-bg);
}
}
运行npm run build后,我们会得到一个单一、优化过的public/css/app.css文件。这种方法的优点是极致的性能和简单性。页面加载的CSS极小,没有任何JS运行时框架的开销,对于一个数据展示工具来说,响应速度飞快。
第四步:Datadog的元监控集成
现在管道已经建立,我们需要确保它的稳定。Datadog Agent提供了自定义检查的功能,非常适合这个场景。我们编写一个简单的Python脚本,让Agent定期执行。
# /etc/datadog-agent/checks.d/clickhouse_pipeline.py
from datadog_checks.base import AgentCheck
import requests
import time
class ClickHousePipelineCheck(AgentCheck):
def check(self, instance):
ch_host = instance.get('clickhouse_host', 'localhost')
ch_port = instance.get('clickhouse_port', 8123)
table = instance.get('table', 'default.events')
# 1. 检查数据注入延迟
query_lag = f"SELECT max(timestamp) FROM {table}"
try:
response = requests.get(
f"http://{ch_host}:{ch_port}/",
params={'query': query_lag},
timeout=5
)
response.raise_for_status()
# ClickHouse返回的时间戳格式是 'YYYY-MM-DD HH:MM:SS.ffffff'
max_ts_str = response.text.strip()
# 兼容不带小数秒的情况
if '.' in max_ts_str:
max_timestamp = datetime.strptime(max_ts_str, '%Y-%m-%d %H:%M:%S.%f').timestamp()
else:
max_timestamp = datetime.strptime(max_ts_str, '%Y-%m-%d %H:%M:%S').timestamp()
current_timestamp = time.time()
ingestion_lag = current_timestamp - max_timestamp
self.gauge('pipeline.clickhouse.ingestion_lag_seconds', ingestion_lag, tags=['table:' + table])
except Exception as e:
self.service_check('pipeline.clickhouse.can_connect', AgentCheck.CRITICAL, message=str(e))
self.log.error(f"Error checking ClickHouse ingestion lag: {e}")
else:
self.service_check('pipeline.clickhouse.can_connect', AgentCheck.OK)
# 2. 还可以监控Sidekiq队列深度 (通过Sidekiq API 或 Redis)
# ... 此处省略Sidekiq监控实现细节
对应的配置文件:
# /etc/datadog-agent/conf.d/clickhouse_pipeline.yaml
init_config:
instances:
- clickhouse_host: localhost
clickhouse_port: 8123
table: default.events
重启Datadog Agent后,我们就会在Datadog中看到一个新的指标pipeline.clickhouse.ingestion_lag_seconds。我们可以基于这个指标设置监控器,例如“如果数据注入延迟超过5分钟,则发送P1级别警报”。这形成了一个完美的闭环:我们用Datadog来保障我们为降低Datadog成本而构建的新系统的稳定性。
最终成果
我们成功构建了一个高吞吐、低成本的混合可观测性管道。
graph TD
subgraph "Ruby on Rails App"
A[Controller/Model] -- generates event --> B(EventTracker Service);
B -- buffers & triggers --> C{Sidekiq};
end
subgraph "Data Pipeline"
C -- async batch insert --> D[ClickHouse Server];
end
subgraph "Internal Dashboard"
E[Sinatra App] -- queries --> D;
F[Browser] -- requests page --> E;
E -- renders ERB with PostCSS assets --> F;
end
subgraph "Datadog Monitoring"
G[Datadog Agent] -- custom check queries --> D;
G -- monitors --> C;
G -- reports metrics --> H((Datadog Platform));
end
这个架构让我们能够以极低的成本捕获和分析海量的自定义业务事件。ClickHouse的查询性能使得我们的内部仪表盘响应迅速,PostCSS则保证了前端代码的轻量和可维护性。最重要的是,通过Datadog对这个新管道进行元监控,我们确保了整个系统的可靠性,而没有牺牲我们已有的、成熟的SRE工作流程。
局限性与未来展望
这个方案并非没有缺点。首先,它增加了运维复杂性,团队需要具备ClickHouse的运维能力。当前的仪表盘功能相对简单,如果需要更复杂的交互和可视化,可能还是需要引入一个轻量级的JS库(如Alpine.js或Stimulus)。
数据 schema 的演进也是一个挑战。如果事件的properties结构频繁变更,依赖JSON字符串的方式可能会变得脆弱。未来可以考虑引入一个schema注册中心,或者使用更结构化的格式(如Protobuf)来规范事件数据。
对于数据管道的可靠性,目前依赖Sidekiq的重试机制。在对数据可靠性要求更高的场景下,可以在应用和ClickHouse之间引入一个消息队列(如Kafka或RabbitMQ)作为更持久的缓冲区,以防止Sidekiq彻底失败时的数据丢失。