claude-code/claude-zh/skills/clickhouse-io/SKILL.md

439 lines
10 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
name: clickhouse-io
description: ClickHouse 資料庫模式、查詢優化、分析以及高效能分析工作負載的資料工程最佳實踐。
---
# ClickHouse 分析模式 (ClickHouse Analytics Patterns)
用於高效能分析與資料工程的 ClickHouse 特定模式。
## 何時啟用
- 設計 ClickHouse 資料表綱要 (MergeTree 引擎選擇)
- 撰寫分析查詢 (聚合、視窗函式、Join)
- 優化查詢效能 (分區裁剪、投影 Projections、物化視圖 Materialized views)
- 攝取大量資料 (批次插入、Kafka 整合)
- 從 PostgreSQL/MySQL 遷移至 ClickHouse 進行分析
- 實作即時儀表板或時間序列分析
## 概述
ClickHouse 是一個用於線上分析處理 (OLAP) 的欄位導向資料庫管理系統 (DBMS)。它針對大型資料集上的快速分析查詢進行了優化。
**關鍵特性:**
- 欄位導向儲存
- 資料壓縮
- 平行查詢執行
- 分散式查詢
- 即時分析
## 資料表設計模式
### MergeTree 引擎 (最常用)
```sql
CREATE TABLE markets_analytics (
date Date,
market_id String,
market_name String,
volume UInt64,
trades UInt32,
unique_traders UInt32,
avg_trade_size Float64,
created_at DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, market_id)
SETTINGS index_granularity = 8192;
```
### ReplacingMergeTree (去重)
```sql
-- 適用於可能存在重複的資料 (例如來自多個來源)
CREATE TABLE user_events (
event_id String,
user_id String,
event_type String,
timestamp DateTime,
properties String
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, event_id, timestamp)
PRIMARY KEY (user_id, event_id);
```
### AggregatingMergeTree (預聚合)
```sql
-- 用於維護聚合指標
CREATE TABLE market_stats_hourly (
hour DateTime,
market_id String,
total_volume AggregateFunction(sum, UInt64),
total_trades AggregateFunction(count, UInt32),
unique_users AggregateFunction(uniq, String)
) ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, market_id);
-- 查詢聚合資料
SELECT
hour,
market_id,
sumMerge(total_volume) AS volume,
countMerge(total_trades) AS trades,
uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= toStartOfHour(now() - INTERVAL 24 HOUR)
GROUP BY hour, market_id
ORDER BY hour DESC;
```
## 查詢優化模式
### 高效率過濾
```sql
-- ✅ 推薦 (GOOD):優先使用索引欄位
SELECT *
FROM markets_analytics
WHERE date >= '2025-01-01'
AND market_id = 'market-123'
AND volume > 1000
ORDER BY date DESC
LIMIT 100;
-- ❌ 錯誤 (BAD):先過濾非索引欄位
SELECT *
FROM markets_analytics
WHERE volume > 1000
AND market_name LIKE '%election%'
AND date >= '2025-01-01';
```
### 聚合 (Aggregations)
```sql
-- ✅ 推薦 (GOOD):使用 ClickHouse 特有的聚合函式
SELECT
toStartOfDay(created_at) AS day,
market_id,
sum(volume) AS total_volume,
count() AS total_trades,
uniq(trader_id) AS unique_traders,
avg(trade_size) AS avg_size
FROM trades
WHERE created_at >= today() - INTERVAL 7 DAY
GROUP BY day, market_id
ORDER BY day DESC, total_volume DESC;
-- ✅ 使用 quantile 獲取百分位數 (比 percentile 更高效)
SELECT
quantile(0.50)(trade_size) AS median,
quantile(0.95)(trade_size) AS p95,
quantile(0.99)(trade_size) AS p99
FROM trades
WHERE created_at >= now() - INTERVAL 1 HOUR;
```
### 視窗函式 (Window Functions)
```sql
-- 計算累計總量 (Running totals)
SELECT
date,
market_id,
volume,
sum(volume) OVER (
PARTITION BY market_id
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS cumulative_volume
FROM markets_analytics
WHERE date >= today() - INTERVAL 30 DAY
ORDER BY market_id, date;
```
## 資料插入模式
### 批次插入 (強烈建議)
```typescript
import { ClickHouse } from 'clickhouse'
const clickhouse = new ClickHouse({
url: process.env.CLICKHOUSE_URL,
port: 8123,
basicAuth: {
username: process.env.CLICKHOUSE_USER,
password: process.env.CLICKHOUSE_PASSWORD
}
})
// ✅ 批次插入 (高效)
async function bulkInsertTrades(trades: Trade[]) {
const values = trades.map(trade => `(
'${trade.id}',
'${trade.market_id}',
'${trade.user_id}',
${trade.amount},
'${trade.timestamp.toISOString()}'
)`).join(',')
await clickhouse.query(`
INSERT INTO trades (id, market_id, user_id, amount, timestamp)
VALUES ${values}
`).toPromise()
}
// ❌ 單筆插入 (慢)
async function insertTrade(trade: Trade) {
// 絕對不要在迴圈中這樣做!
await clickhouse.query(`
INSERT INTO trades VALUES ('${trade.id}', ...)
`).toPromise()
}
```
### 串流插入 (Streaming Insert)
```typescript
// 用於持續性的資料攝取
import { createWriteStream } from 'fs'
import { pipeline } from 'stream/promises'
async function streamInserts() {
const stream = clickhouse.insert('trades').stream()
for await (const batch of dataSource) {
stream.write(batch)
}
await stream.end()
}
```
## 物化視圖 (Materialized Views)
### 即時聚合
```sql
-- 建立用於每小時統計的物化視圖
CREATE MATERIALIZED VIEW market_stats_hourly_mv
TO market_stats_hourly
AS SELECT
toStartOfHour(timestamp) AS hour,
market_id,
sumState(amount) AS total_volume,
countState() AS total_trades,
uniqState(user_id) AS unique_users
FROM trades
GROUP BY hour, market_id;
-- 查詢物化視圖
SELECT
hour,
market_id,
sumMerge(total_volume) AS volume,
countMerge(total_trades) AS trades,
uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, market_id;
```
## 效能監控
### 查詢效能
```sql
-- 檢查慢查詢
SELECT
query_id,
user,
query,
query_duration_ms,
read_rows,
read_bytes,
memory_usage
FROM system.query_log
WHERE type = 'QueryFinish'
AND query_duration_ms > 1000
AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY query_duration_ms DESC
LIMIT 10;
```
### 資料表統計
```sql
-- 檢查表大小
SELECT
database,
table,
formatReadableSize(sum(bytes)) AS size,
sum(rows) AS rows,
max(modification_time) AS latest_modification
FROM system.parts
WHERE active
GROUP BY database, table
ORDER BY sum(bytes) DESC;
```
## 常見分析查詢
### 時間序列分析
```sql
-- 每日活躍使用者 (DAU)
SELECT
toDate(timestamp) AS date,
uniq(user_id) AS daily_active_users
FROM events
WHERE timestamp >= today() - INTERVAL 30 DAY
GROUP BY date
ORDER BY date;
-- 留存分析 (Retention)
SELECT
signup_date,
countIf(days_since_signup = 0) AS day_0,
countIf(days_since_signup = 1) AS day_1,
countIf(days_since_signup = 7) AS day_7,
countIf(days_since_signup = 30) AS day_30
FROM (
SELECT
user_id,
min(toDate(timestamp)) AS signup_date,
toDate(timestamp) AS activity_date,
dateDiff('day', signup_date, activity_date) AS days_since_signup
FROM events
GROUP BY user_id, activity_date
)
GROUP BY signup_date
ORDER BY signup_date DESC;
```
### 漏斗分析 (Funnel Analysis)
```sql
-- 轉換漏斗
SELECT
countIf(step = 'viewed_market') AS viewed,
countIf(step = 'clicked_trade') AS clicked,
countIf(step = 'completed_trade') AS completed,
round(clicked / viewed * 100, 2) AS view_to_click_rate,
round(completed / clicked * 100, 2) AS click_to_completion_rate
FROM (
SELECT
user_id,
session_id,
event_type AS step
FROM events
WHERE event_date = today()
)
GROUP BY session_id;
```
### 分群分析 (Cohort Analysis)
```sql
-- 按註冊月份劃分的使用者分群
SELECT
toStartOfMonth(signup_date) AS cohort,
toStartOfMonth(activity_date) AS month,
dateDiff('month', cohort, month) AS months_since_signup,
count(DISTINCT user_id) AS active_users
FROM (
SELECT
user_id,
min(toDate(timestamp)) OVER (PARTITION BY user_id) AS signup_date,
toDate(timestamp) AS activity_date
FROM events
)
GROUP BY cohort, month, months_since_signup
ORDER BY cohort, months_since_signup;
```
## 資料管道模式 (Data Pipeline Patterns)
### ETL 模式
```typescript
// 擷取 (Extract)、轉換 (Transform)、載入 (Load)
async function etlPipeline() {
// 1. 從來源擷取
const rawData = await extractFromPostgres()
// 2. 轉換
const transformed = rawData.map(row => ({
date: new Date(row.created_at).toISOString().split('T')[0],
market_id: row.market_slug,
volume: parseFloat(row.total_volume),
trades: parseInt(row.trade_count)
}))
// 3. 載入至 ClickHouse
await bulkInsertToClickHouse(transformed)
}
// 定期執行
setInterval(etlPipeline, 60 * 60 * 1000) // 每小時一次
```
### 變更資料擷取 (CDC)
```typescript
// 監聽 PostgreSQL 變更並同步至 ClickHouse
import { Client } from 'pg'
const pgClient = new Client({ connectionString: process.env.DATABASE_URL })
pgClient.query('LISTEN market_updates')
pgClient.on('notification', async (msg) => {
const update = JSON.parse(msg.payload)
await clickhouse.insert('market_updates', [
{
market_id: update.id,
event_type: update.operation, // INSERT, UPDATE, DELETE
timestamp: new Date(),
data: JSON.stringify(update.new_data)
}
])
})
```
## 最佳實踐
### 1. 分區策略 (Partitioning Strategy)
- 按時間分區 (通常是按月或按日)。
- 避免分區過多 (影響效能)。
- 分區鍵使用 DATE 類型。
### 2. 排序鍵 (Ordering Key)
- 將最常被過濾的欄位排在前面。
- 考量基數 (Cardinality基數高的排在前面)。
- 排序會影響壓縮效果。
### 3. 資料型別
- 使用最小且合適的型別 (UInt32 vs UInt64)。
- 重複出現的字串使用 LowCardinality。
- 類別資料使用 Enum。
### 4. 應避免的做法
- SELECT * (請指定欄位)。
- FINAL (應在查詢前進行資料合併)。
- 過多的 JOIN (分析時建議進行去正規化 Denormalize)。
- 少量頻繁的插入 (應改為批次插入)。
### 5. 監控
- 追蹤查詢效能。
- 監控磁碟使用量。
- 檢查合併 (Merge) 操作。
- 審查慢查詢日誌。
**請記住**ClickHouse 在分析工作負載方面表現卓越。請根據您的查詢模式設計資料表、使用批次插入,並善用物化視圖來進行即時聚合。