离线计算 (Offline Computing)
一句话概述
离线计算是广告大数据系统的"大脑",通过 Hive/Spark 等批处理引擎对海量历史数据进行 ETL、特征计算、报表聚合和模型训练样本生成,是数据仓库和数据分析的核心。
离线计算架构
flowchart LR
subgraph 数据源
A1[Kafka<br/>实时日志]
A2[HDFS<br/>历史数据]
A3[MySQL<br/>业务数据]
end
subgraph 计算引擎
B[Hive / Spark]
S[调度系统<br/>Airflow/DS]
end
subgraph 输出
C1[数据仓库 Hive 表]
C2[OLAP 引擎 ClickHouse]
C3[特征存储 HBase/Redis]
end
A1 & A2 & A3 --> B
S -.-> B
B --> C1 & C2 & C3
数据仓库分层 (重点)
分层架构
flowchart TD
ADS["ADS 应用数据层<br/>面向应用的聚合数据 (报表、API、导出)"]
DWS["DWS 汇总数据层<br/>轻度聚合,按主题域汇总 (日/周/月粒度)"]
DWD["DWD 明细数据层<br/>清洗后的明细数据,统一格式和口径"]
ODS["ODS 原始数据层<br/>原始日志数据,保持原貌"]
DIM["DIM 维度层<br/>维度表 (广告主、广告计划、广告位、地域等)"]
ODS --> DWD --> DWS --> ADS
DIM -.-> DWD
DIM -.-> DWS
广告数仓各层示例
ODS 层 (原始数据)
-- ODS: 原始广告请求日志
CREATE TABLE ods_ad_request_log (
request_id STRING,
timestamp BIGINT,
user_id STRING,
device_id STRING,
ip STRING,
ad_slot_id STRING,
os STRING,
app_id STRING,
geo_city STRING,
raw_json STRING -- 原始 JSON 保留
)
PARTITIONED BY (dt STRING, hour STRING)
STORED AS ORC;
-- 数据来源: Kafka → Flink/Spark → HDFS → Hive 外表
DWD 层 (明细数据)
-- DWD: 广告展示明细表
CREATE TABLE dwd_ad_impression (
request_id STRING,
event_time TIMESTAMP,
user_id STRING,
device_id STRING,
ad_id BIGINT,
campaign_id BIGINT,
advertiser_id BIGINT,
creative_id BIGINT,
ad_slot_id STRING,
ad_format STRING, -- banner/feed/splash
os STRING,
city STRING,
province STRING,
network STRING, -- wifi/4g/5g
bid_price DECIMAL(10,4),
win_price DECIMAL(10,4),
ecpm DECIMAL(10,4),
pctr DOUBLE,
pcvr DOUBLE,
is_valid INT -- 反作弊标记: 1=有效, 0=无效
)
PARTITIONED BY (dt STRING)
STORED AS ORC;
-- ETL 逻辑:
-- 1. 从 ODS 解析 JSON
-- 2. 关联维度表 (广告信息、广告主信息)
-- 3. 数据清洗 (去重、格式统一)
-- 4. 反作弊标记
DWS 层 (汇总数据)
-- DWS: 广告效果日汇总表
CREATE TABLE dws_ad_performance_daily (
dt STRING,
campaign_id BIGINT,
advertiser_id BIGINT,
ad_slot_id STRING,
os STRING,
city STRING,
-- 指标
impressions BIGINT,
clicks BIGINT,
conversions BIGINT,
cost DECIMAL(12,2),
revenue DECIMAL(12,2),
-- 衍生指标
ctr DOUBLE,
cvr DOUBLE,
ecpm DOUBLE,
cpa DOUBLE
)
PARTITIONED BY (dt STRING)
STORED AS ORC;
-- SQL:
INSERT OVERWRITE TABLE dws_ad_performance_daily PARTITION (dt = '${date}')
SELECT
'${date}' as dt,
i.campaign_id,
i.advertiser_id,
i.ad_slot_id,
i.os,
i.city,
COUNT(DISTINCT i.request_id) as impressions,
COUNT(DISTINCT c.request_id) as clicks,
COUNT(DISTINCT cv.request_id) as conversions,
SUM(i.win_price) / 1000 as cost,
SUM(cv.revenue) as revenue,
COUNT(DISTINCT c.request_id) / COUNT(DISTINCT i.request_id) as ctr,
COUNT(DISTINCT cv.request_id) / NULLIF(COUNT(DISTINCT c.request_id), 0) as cvr,
SUM(i.win_price) / COUNT(DISTINCT i.request_id) * 1000 as ecpm,
SUM(i.win_price) / NULLIF(COUNT(DISTINCT cv.request_id), 0) / 1000 as cpa
FROM dwd_ad_impression i
LEFT JOIN dwd_ad_click c ON i.request_id = c.request_id AND c.dt = '${date}'
LEFT JOIN dwd_ad_conversion cv ON c.click_id = cv.click_id AND cv.dt = '${date}'
WHERE i.dt = '${date}' AND i.is_valid = 1
GROUP BY i.campaign_id, i.advertiser_id, i.ad_slot_id, i.os, i.city;
ADS 层 (应用数据)
-- ADS: 广告主报表 (面向产品/API)
CREATE TABLE ads_advertiser_report (
dt STRING,
advertiser_id BIGINT,
advertiser_name STRING,
total_cost DECIMAL(12,2),
total_impressions BIGINT,
total_clicks BIGINT,
total_conversions BIGINT,
avg_ctr DOUBLE,
avg_cvr DOUBLE,
avg_cpa DOUBLE,
roas DOUBLE
)
PARTITIONED BY (dt STRING);
DIM 维度表
-- 广告计划维度表
CREATE TABLE dim_campaign (
campaign_id BIGINT,
campaign_name STRING,
advertiser_id BIGINT,
advertiser_name STRING,
industry STRING, -- 行业
budget_daily DECIMAL(12,2),
bid_type STRING, -- cpc/cpm/ocpm
target_cpa DECIMAL(10,2),
status STRING, -- active/paused/deleted
create_time TIMESTAMP,
update_time TIMESTAMP
);
-- 广告位维度表
CREATE TABLE dim_ad_slot (
ad_slot_id STRING,
slot_name STRING,
app_id STRING,
app_name STRING,
slot_type STRING, -- banner/feed/splash/interstitial
slot_size STRING, -- 640x100
floor_price DECIMAL(10,4)
);
计算引擎
Hive
适用场景:
- 大规模 ETL (TB 级)
- 复杂 SQL 查询
- 数仓建设
优点: SQL 友好、生态成熟、稳定
缺点: 延迟高 (分钟级)、不适合迭代计算
执行引擎:
- MapReduce (已过时)
- Tez (中等性能)
- Spark (推荐,性能最好)
SET hive.execution.engine=spark;
Spark
适用场景:
- 复杂 ETL
- 特征计算
- 模型训练样本生成
- 迭代计算 (ML)
优点: 内存计算快、API 丰富、支持 ML
缺点: 内存消耗大、调优复杂
Spark SQL 示例:
val impressions = spark.read.table("dwd_ad_impression").filter($"dt" === "2024-01-15")
val clicks = spark.read.table("dwd_ad_click").filter($"dt" === "2024-01-15")
val result = impressions
.join(clicks, Seq("request_id"), "left")
.groupBy("campaign_id", "os")
.agg(
count("*").as("impressions"),
count(clicks("request_id")).as("clicks"),
sum("win_price").as("cost")
)
Presto / Trino
适用场景:
- 交互式查询 (秒级响应)
- Ad-hoc 分析
- 跨数据源查询
优点: 查询速度快、支持多数据源
缺点: 不适合大规模 ETL、内存限制
典型用法:
数据分析师通过 Presto 查询 Hive 表
运营人员通过 BI 工具 (Superset/Metabase) 连接 Presto
调度系统
Airflow
# 广告数仓 DAG 示例
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'ad_data_warehouse',
default_args=default_args,
schedule_interval='0 3 * * *', # 每天凌晨3点
start_date=datetime(2024, 1, 1),
)
# Task 1: ODS → DWD
ods_to_dwd_impression = HiveOperator(
task_id='ods_to_dwd_impression',
hql='sql/dwd_ad_impression.sql',
hive_cli_conn_id='hive_default',
dag=dag,
)
# Task 2: ODS → DWD (点击)
ods_to_dwd_click = HiveOperator(
task_id='ods_to_dwd_click',
hql='sql/dwd_ad_click.sql',
dag=dag,
)
# Task 3: DWD → DWS
dwd_to_dws = HiveOperator(
task_id='dwd_to_dws_performance',
hql='sql/dws_ad_performance_daily.sql',
dag=dag,
)
# Task 4: DWS → ADS
dws_to_ads = HiveOperator(
task_id='dws_to_ads_report',
hql='sql/ads_advertiser_report.sql',
dag=dag,
)
# 依赖关系
[ods_to_dwd_impression, ods_to_dwd_click] >> dwd_to_dws >> dws_to_ads
调度系统对比
| 系统 |
特点 |
适用场景 |
| Airflow |
Python DAG、社区活跃 |
通用调度 |
| DolphinScheduler |
国产、可视化强 |
国内企业 |
| Azkaban |
LinkedIn 开源、简单 |
中小规模 |
| Oozie |
Hadoop 原生 |
Hadoop 生态 |
数据质量
数据校验规则
-- 数据量校验: 与前一天对比
SELECT
'${date}' as dt,
COUNT(*) as today_cnt,
LAG(COUNT(*)) OVER (ORDER BY dt) as yesterday_cnt,
(COUNT(*) - LAG(COUNT(*)) OVER (ORDER BY dt))
/ LAG(COUNT(*)) OVER (ORDER BY dt) as change_rate
FROM dwd_ad_impression
WHERE dt IN ('${date}', '${yesterday}')
GROUP BY dt
HAVING ABS(change_rate) > 0.3; -- 变化超过30%告警
-- 空值校验
SELECT
COUNT(*) as total,
SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) as null_user_cnt,
SUM(CASE WHEN ad_id IS NULL THEN 1 ELSE 0 END) as null_ad_cnt
FROM dwd_ad_impression
WHERE dt = '${date}';
-- 数据对账: 实时 vs 离线
SELECT
a.dt,
a.impressions as realtime_imp,
b.impressions as offline_imp,
ABS(a.impressions - b.impressions) / b.impressions as diff_rate
FROM realtime_report a
JOIN dws_ad_performance_daily b ON a.dt = b.dt
WHERE ABS(diff_rate) > 0.01; -- 差异超过1%告警
数据血缘 (Data Lineage)
追踪数据从源到目标的流转路径:
ads_advertiser_report
← dws_ad_performance_daily
← dwd_ad_impression + dwd_ad_click + dwd_ad_conversion
← ods_ad_request_log + ods_ad_click_log + ods_ad_conversion_log
← Kafka (ads.impression.server + ads.click.server + ads.conversion.s2s)
用途:
- 问题排查: 报表数据异常时快速定位源头
- 影响分析: 修改上游表时评估下游影响
- 合规审计: 追踪数据的使用和流转
广告离线计算典型任务
| 任务 |
频率 |
引擎 |
说明 |
| 日志入仓 |
每小时 |
Spark |
Kafka → ODS |
| DWD ETL |
T+1 |
Hive/Spark |
ODS → DWD |
| DWS 聚合 |
T+1 |
Hive/Spark |
DWD → DWS |
| 报表生成 |
T+1 |
Hive/Spark |
DWS → ADS |
| 用户画像 |
T+1 |
Spark |
标签计算 |
| 样本生成 |
T+1 |
Spark |
训练样本 |
| 特征回填 |
T+1 |
Spark |
离线特征 |
| 数据导出 |
T+1 |
Sqoop/DataX |
Hive → MySQL/ClickHouse |
| 数据对账 |
T+1 |
Hive |
多方数据对比 |
与大数据开发的日常工作
- 数仓建模: 设计和维护广告数仓的分层架构
- ETL 开发: 编写 Hive SQL / Spark 任务
- 调度管理: 配置和维护调度 DAG
- 数据质量: 数据校验规则的开发和维护
- 性能优化: SQL 调优、数据倾斜处理、小文件合并
- 需求响应: 新报表、新指标的开发
面试高频问题
- 数据仓库分层架构 (ODS/DWD/DWS/ADS) 各层的职责?
- 广告数仓的核心表有哪些?如何设计?
- Hive 和 Spark 的区别?各自适用什么场景?
- 如何保证离线数据质量?
- 数据倾斜如何处理?
- 调度系统如何保证任务的依赖和重试?
推荐阅读