
1.1 本文目标:教会开发者如何用可复用的工具,根据CDN/流媒体服务的推流(publish)与断流(unpublish)事件或访问日志,准确计算直播时长并导出统计报告。
1.2 适用场景:自建推流(如 nginx-rtmp)、第三方CDN回调、或CDN访问日志(CSV/JSON)等。
2.1 数据来源:优先使用CDN或流 media server 提供的“publish/unpublish”事件回调;其次用访问日志(每条包含 stream_id、timestamp、event/type 或播放请求)。
2.2 思路要点:按 stream_id 聚合,按时间排序配对 start/end;如果没有明确 end,则用最后心跳或超时(例如 gap > 60s)视为结束。
3.1 推荐字段:stream_id, event(publish/unpublish/keepalive), timestamp (ISO8601或UNIX秒), client_ip, app, vhost。
3.2 数据预处理:统一时区(UTC)、去重(重复回调),把日志导成 CSV 或按天分片存储,方便批处理。
4.1 基本规则:每个 publish 对应一个 session;duration = end_time - start_time;统计按天/小时汇总。
4.2 边界情况:无 end -> 用最后见到的播放请求时间或 start + DEFAULT_TIMEOUT(建议60-120秒);重复 publish(短时间内)视为同一 session 或分割,需配置 gap_threshold(如 30s)。
5.1 示例说明:假设输入为 CSV,列为 timestamp,stream_id,event(publish/unpublish)。下面为伪代码步骤:
5.2 伪代码: 1) 读入CSV并按 (stream_id, timestamp) 排序; 2) 维护 dict active_sessions = {};当 event == 'publish' 且 stream_id 不在 active_sessions,记录 start_time;当 event == 'unpublish' 且在 active_sessions,计算 duration = end-start,写入结果并删除 active_sessions;如果遇到重复 publish,检测是否 gap < threshold 合并。
5.3 代码片段(简化): for row in rows: sid = row['stream_id']; t = parse(row['timestamp']); e = row['event'] if e == 'publish': if sid not in active: active[sid]=t else: if t - active[sid] > gap_threshold: record(active[sid], t); active[sid]=t elif e == 'unpublish': if sid in active: record(active[sid], t); del active[sid] # 处理残余 active for sid,start in active.items(): record(start, start + timeout)
6.1 数据库设计(关系型示例):table sessions(id, stream_id, start_ts, end_ts, duration_seconds, source_log)。按 start_ts 建索引,方便按天统计。
6.2 定时任务:用 cron 或 Airflow 每分钟或每小时解析当日日志,写入 sessions 表;再用 SQL 汇总每日/每小时时长: SELECT stream_id, SUM(duration_seconds) FROM sessions WHERE DATE(start_ts)=? GROUP BY stream_id
6.3 API 暴露:可用 Flask/Express 提供查询接口,支持 stream_id、date 范围、分页导出 CSV/JSON。
7.1 答:把访问日志按 stream_id 排序,连续请求之间若 gap 小于阈值(例如 gap_threshold=30s 或根据播放分片时长调整),视为同一会话。遇到长时间无请求则视为断开,结束会话并记录 end_time 为最后一次请求时间或最后请求时间+分片时长。
8.1 答:两种策略:一是设置合理的超时时间(例如 60-120s)自动关闭残留会话;二是结合播放器心跳或 CDN 的 play 请求(端到端结合),用最后一次有效播放请求作为结束时间以降低误差。
9.1 答:分片处理日志(按天或按流量分块),使用并行消费者(如 multiprocessing 或分布式任务队列),将中间结果写入批量插入的 DB,定期归档老数据;关键字段建索引,避免全表扫描,必要时用时间窗口的流式计算(例如 Kafka + Flink)。