Files
CTI-Inference-Opt/代码/code/bench.py
T
2026-06-17 20:20:50 +08:00

421 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""本地测量闭环:设置 infer.CONFIG,跑推理,同步计时,打印 AUC/PCOC/延迟/总分。
不进提交包。**以子进程方式运行**(AI Studio 内核禁止 import torch):
%cd /home/aistudio/code
!python bench.py --diag # 诊断:序列长度分布 + sign-id 超界比例
!python bench.py --smoke 50 # 冒烟:只跑前 50 batch
!python bench.py # 默认基线
!python bench.py --fp32 # FP32 天花板
!python bench.py --rebuild # 强制重建过滤缓存
只保留“测试用户”的数据:不同用户被因果 mask 完全隔离,非测试用户的前向输出
不参与打分;过滤掉它们对测试样本的 AUC/PCOC 没有任何影响,却能把数据量从
924 万条降到一小部分。
缓存用**文本 CSV**而非 pickle:容器 cgroup 内存有限,pickle.dump 大对象的 memo
会瞬间撑爆内存被静默 OOM-kill;逐行写 CSV 内存几乎不涨,再用 load_sample_files
读回,稳。
"""
import os
import sys
import time
from collections import defaultdict
from pathlib import Path
# baseline 把依赖装在 --target 目录(非默认 site-packages),import 前先加 sys.path
for _p in ("/home/aistudio/external-libraries", "/home/aistudio/libraries",
os.path.abspath("../libraries"), os.path.abspath("./libraries")):
if os.path.isdir(_p) and _p not in sys.path:
sys.path.insert(0, _p)
import numpy as np
import torch
from torch.utils.data import DataLoader
import infer # 同目录
def _test_user_ids(test_csv):
"""从 test.csv 读出所有测试用户 id(第 2 列 userid)。"""
users = set()
with open(test_csv) as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(",")
if len(parts) >= 2:
users.add(int(parts[1]))
return users
def _stream_build(ref, cache_csv_path=None):
"""流式过滤:构建 item_dict/user_seq;若给 cache_csv_path,同时把保留的历史行
原样逐行写入(低内存文本缓存,test.csv 直接复用、不进缓存)。
"""
test_csv = ref / "test.csv"
history = ref / "history"
test_users = _test_user_ids(test_csv)
files = (sorted(history.glob("*.csv")) if history.exists() else []) + [test_csv]
print(f"[BENCH] 流式过滤加载 {len(files)} 个文件(仅保留 {len(test_users)} 个测试用户)...")
item_dict = {}
user_logs = defaultdict(list)
cf = open(cache_csv_path, "w") if cache_csv_path else None
try:
for fp in files:
has_clk = infer._detect_has_clk(fp)
min_parts = 5 if has_clk else 4
is_test = (Path(fp).name == test_csv.name)
kept = 0
with open(fp) as f:
for raw in f:
line = raw.strip()
if not line:
continue
parts = line.split(",")
if len(parts) < min_parts:
continue
userid = int(parts[1])
if userid not in test_users:
continue
if cf is not None and not is_test: # 只缓存历史行
cf.write(raw if raw.endswith("\n") else raw + "\n")
logid = int(parts[0])
adid = int(parts[2])
if has_clk:
clk = int(parts[3])
timestamp = int(parts[4])
fs = 5
else:
clk = 0
timestamp = int(parts[3])
fs = 4
signs, slots = [], []
for pair in parts[fs:]:
if ":" in pair:
s, sl = pair.split(":", 1)
signs.append(int(s))
slots.append(int(sl))
item_dict[logid] = {
"logid": logid, "userid": userid, "adid": adid,
"clk": clk, "timestamp": timestamp,
"signs": np.array(signs, dtype=np.int64),
"slots": np.array(slots, dtype=np.int64),
}
user_logs[userid].append((timestamp, logid))
kept += 1
print(f" {Path(fp).name}: has_clk={has_clk}, kept={kept}")
finally:
if cf is not None:
cf.flush()
os.fsync(cf.fileno())
cf.close()
user_seq = {}
for u, logs in user_logs.items():
logs.sort(key=lambda x: x[0])
user_seq[u] = [lid for _, lid in logs]
print(f"[BENCH] 过滤后:{len(item_dict)} 条记录,{len(user_seq)} 个用户")
if cache_csv_path:
print(f"[BENCH] 已缓存历史行 -> {cache_csv_path}(下次快速读取)")
return item_dict, user_seq
def _get_data(cur, ref, rebuild=False):
"""取过滤后的 (item_dict, user_seq),优先读 CSV 缓存。"""
cache_csv = cur / "cache_filtered_history.csv"
test_csv = ref / "test.csv"
if cache_csv.exists() and not rebuild:
print(f"[BENCH] 读取过滤缓存(CSV){cache_csv}")
try:
return infer.load_sample_files([str(cache_csv), str(test_csv)])
except Exception as e:
print(f"[BENCH][WARN] 缓存读取失败({e}),重新构建")
return _stream_build(ref, cache_csv_path=str(cache_csv))
def run_diag(rebuild=False):
"""诊断:测试用户序列长度分布 + sign-id 是否超界(判断上下文与 modulo 的价值)。"""
cur = Path(__file__).parent
ref = cur / "dataset"
item_dict, user_seq = _get_data(cur, ref, rebuild=rebuild)
lens = np.array([len(v) for v in user_seq.values()]) if user_seq else np.array([0])
print(f"[DIAG] 测试用户数={len(user_seq)} 总记录数={len(item_dict)}")
print(f"[DIAG] 每用户序列长度 min/median/mean/max = "
f"{int(lens.min())}/{int(np.median(lens))}/{lens.mean():.1f}/{int(lens.max())}")
print(f"[DIAG] 序列长度>1 的用户占比 = {(lens > 1).mean():.1%}")
VOCAB = 5_000_000
mx, over, tot = 0, 0, 0
for rec in item_dict.values():
s = rec["signs"]
if s.size:
m = int(s.max())
if m > mx:
mx = m
over += int((s >= VOCAB).sum())
tot += int(s.size)
print(f"[DIAG] max_sign_id={mx} vocab={VOCAB} "
f"超界sign占比={over}/{tot}={(over / max(tot, 1)):.2%}")
def run_profile(config_override=None, n=20, batch_size=50, rebuild=False):
"""用 torch.profiler 剖析前 n 个 batch,打印按 CUDA 耗时排序的算子表,定位真正瓶颈。"""
if config_override is None:
config_override = {}
infer.CONFIG.update(config_override)
cur = Path(__file__).parent
ref = cur / "dataset"
item_dict, user_seq = _get_data(cur, ref, rebuild=rebuild)
test_logids = infer.load_logids_from_file(ref / "test.csv")
ds = infer.CTRTestSeqDataset(
test_logids_ordered=list(test_logids), item_dict=item_dict,
user_seq=user_seq, max_feasign_per_slot={1: 2}, max_ctx_len=None)
loader = DataLoader(ds, batch_size=batch_size, shuffle=False, num_workers=0,
collate_fn=infer.make_collate_fn(ds.max_slot_id))
batches = []
for b in loader:
batches.append(infer.move_batch_to_device(b, torch.device("cpu")))
if len(batches) >= n:
break
del item_dict, user_seq, ds, loader
import gc
gc.collect()
model, dev = infer.load_model(ckpt_path=None)
cuda = (dev.type == "cuda")
from torch.profiler import profile, ProfilerActivity
acts = [ProfilerActivity.CPU] + ([ProfilerActivity.CUDA] if cuda else [])
with torch.inference_mode():
warm = infer.move_batch_to_device(batches[0], dev) # 预热(触发任何首次编译)
model(warm)
if cuda:
torch.cuda.synchronize()
with profile(activities=acts) as prof:
for b in batches:
b = infer.move_batch_to_device(b, dev)
model(b)
if cuda:
torch.cuda.synchronize()
sort_key = "cuda_time_total" if cuda else "cpu_time_total"
print(prof.key_averages().table(sort_by=sort_key, row_limit=25))
def run_once(config_override=None, batch_size=50, max_batches=None,
max_feasign_per_slot=None, rebuild=False):
"""跑一次本地推理并打分。返回 infer._cal_score 的结果 dict。"""
if config_override is None:
config_override = {}
if max_feasign_per_slot is None:
max_feasign_per_slot = {1: 2}
# precompute_rep: 从已加载的过滤 batches 自建缓存(测 gather);
# eval_precompute: 走真正的评测路径(load_model 流式过滤自动预计算)
want_precompute = bool(config_override.pop("precompute_rep", False))
eval_precompute = bool(config_override.pop("eval_precompute", False))
infer.CONFIG.update(config_override)
infer.CONFIG["sync_timing"] = True
infer.CONFIG["precompute_rep"] = eval_precompute # True 时让 load_model 自动预计算
cur = Path(__file__).parent
ref = cur / "dataset"
test_csv = ref / "test.csv"
label_file = ref / "label_data.txt"
item_dict, user_seq = _get_data(cur, ref, rebuild=rebuild)
test_logids = infer.load_logids_from_file(test_csv)
ds = infer.CTRTestSeqDataset(
test_logids_ordered=list(test_logids), item_dict=item_dict,
user_seq=user_seq, max_feasign_per_slot=max_feasign_per_slot, max_ctx_len=None,
)
loader = DataLoader(
ds, batch_size=batch_size, shuffle=False, num_workers=0,
collate_fn=infer.make_collate_fn(ds.max_slot_id),
)
# load_model 先于 batch 构建,使 collate_fn 能拿到模型就地算 rep(镜像评测流程)
model, dev = infer.load_model(ckpt_path=None)
cuda = (dev.type == "cuda")
batches = []
for b in loader:
batches.append(infer.move_batch_to_device(b, torch.device("cpu")))
if max_batches is not None and len(batches) >= max_batches:
break
del item_dict, user_seq, ds, loader
import gc
gc.collect()
if eval_precompute and model._rep_cache is not None:
print(f"[BENCH] eval-path rep cache (load_model): {model._rep_cache[0].numel()} items")
# 本地从已建好的 batches 构造 rep 缓存(复用 batches、省内存;不计入计时)
if want_precompute and not eval_precompute:
lc, ec = [], []
with torch.inference_mode():
for b in batches:
bb = infer.move_batch_to_device(b, dev)
rep = model.rep_encoder(bb)
lc.append(bb["logid"].to(dev))
ec.append(rep)
logids = torch.cat(lc)
emb = torch.cat(ec)
order = torch.argsort(logids)
model._rep_cache = (logids[order].contiguous(), emb[order].contiguous())
print(f"[BENCH] rep cache built from batches: {logids.numel()} items")
logid2p = {}
logid2logit = {}
t_sum = 0.0
with torch.inference_mode():
for b in batches:
b = infer.move_batch_to_device(b, dev)
pm = b["pred_mask"].bool()
if cuda:
torch.cuda.synchronize()
t0 = time.time()
logits, _ = model(b)
probs = torch.sigmoid(logits.squeeze(-1))
if cuda:
torch.cuda.synchronize()
t_sum += time.time() - t0
lg = logits.squeeze(-1)
for lid, p, lv in zip(b["logid"][pm].cpu().tolist(),
probs[pm].cpu().tolist(), lg[pm].cpu().tolist()):
logid2p[lid] = p
logid2logit[lid] = lv
order = [int(l.split(",")[0]) for l in open(test_csv) if l.strip()]
missing = [lid for lid in order if lid not in logid2p]
if missing:
print(f"[BENCH][WARN] {len(missing)} 个测试 logid 没预测到(前几个 {missing[:5]}")
pred_path = cur / "predict.txt"
with open(pred_path, "w") as f:
for lid in order:
f.write(f"{logid2p.get(lid, 0.0)}\n")
res = infer._cal_score(pred_path, label_file, default_latency=t_sum)
print(
f"[BENCH] cfg={config_override} bs={batch_size}"
f"{'' if max_batches is None else f' (first {max_batches} batches)'}"
f" -> AUC={res['auc']:.5f} PCOC={res['pcoc']:.4f}"
f" lat={res['latency']:.2f}s score={res['score_all']:.2f}"
)
# 拟合 PCOC 校准 logit_bias(使 mean(sigmoid(logit+b))=mean(label)
try:
ol = np.array([logid2logit.get(lid, 0.0) for lid in order], dtype=np.float64)
labels = infer._read_label(str(label_file))
ml = float(labels.mean())
lo, hi = -3.0, 3.0
for _ in range(60):
mid = 0.5 * (lo + hi)
if (1.0 / (1.0 + np.exp(-(ol + mid)))).mean() > ml:
hi = mid
else:
lo = mid
print(f"[BENCH] 建议 logit_bias={0.5*(lo+hi):.4f}PCOC→1.0,免费+~0.34分)")
except Exception as e:
print(f"[BENCH] logit_bias 拟合跳过: {e}")
return res
def _parse_args():
import argparse
ap = argparse.ArgumentParser(description="CTI 推理测量闭环(子进程跑:!python bench.py ...")
ap.add_argument("--diag", action="store_true", help="只跑诊断,不推理")
ap.add_argument("--smoke", type=int, default=None, help="只跑前 N 个 batch(冒烟)")
ap.add_argument("--bs", type=int, default=50, help="batch_size(本地参考)")
ap.add_argument("--fp32", action="store_true", help="FP32 天花板 = 关 fp16 + 关 expert 合并")
ap.add_argument("--no-fp16", action="store_true", help="关闭半精度")
ap.add_argument("--no-merge", action="store_true", help="关闭 expert 合并")
ap.add_argument("--signid", choices=["clamp", "modulo"], default=None, help="sign-id 处理方式")
ap.add_argument("--merge-th", type=float, default=None, help="expert 合并余弦阈值")
ap.add_argument("--keep", type=str, default=None,
help="逗号分隔的 keep_fp32_modules,如 linear,rep_encoder.input_norm")
ap.add_argument("--feasign-none", action="store_true",
help="不截断特征(max_feasign_per_slot=None")
ap.add_argument("--attn", choices=["sdpa", "chunked", "triton", "flex", "varlen"], default=None,
help="注意力:sdpa=稠密, chunked=分块SDPA, triton=varlen flash kernel, flex/varlen=对照")
ap.add_argument("--chunk-users", type=int, default=None, help="chunked 模式每块用户数")
ap.add_argument("--triton-bm", type=int, default=None, help="Triton query 块大小(32/64/128)")
ap.add_argument("--moe", choices=["dense", "loop"], default=None,
help="MoE实现:dense=向量化(新), loop=逐expert循环(原)")
ap.add_argument("--compile", action="store_true", help="开启 torch.compile")
ap.add_argument("--emb-fp16", action="store_true", help="Embedding表转FP16(查表带宽减半,测AUC)")
ap.add_argument("--dedup-emb", action="store_true", help="查表前对sign去重(减少大表随机访存)")
ap.add_argument("--emb-bag", action="store_true", help="F.embedding_bag 融合查表+池化")
ap.add_argument("--no-moe-baddbmm", action="store_true", help="关闭 MoE baddbmm(用 einsum 对照)")
ap.add_argument("--no-skip-moe-loss", action="store_true", help="不跳过 moe_loss(对照)")
ap.add_argument("--logit-bias", type=float, default=None, help="PCOC校准:logit偏移(本地验证PCOC→1.0)")
ap.add_argument("--sparse-pool", action="store_true", help="稀疏矩阵乘做池化(段内高重复时省)")
ap.add_argument("--precompute-rep", action="store_true",
help="预计算RepEncoder缓存,model(batch)跳过embedding层(从batches自建)")
ap.add_argument("--eval-precompute", action="store_true",
help="走评测路径:load_model 流式过滤自动预计算(本地验证不OOM)")
ap.add_argument("--no-collate-rep", action="store_true",
help="关闭 collate 内算 rep(用于对照基准)")
ap.add_argument("--no-movedev-rep", action="store_true",
help="关闭 move_batch_to_device 内算 rep(用于对照基准)")
ap.add_argument("--profile", type=int, default=None, metavar="N",
help="剖析前 N 个 batch,打印按 CUDA 耗时排序的算子表(定位瓶颈)")
ap.add_argument("--rebuild", action="store_true", help="强制重建过滤缓存")
return ap.parse_args()
if __name__ == "__main__":
a = _parse_args()
if a.diag:
run_diag(rebuild=a.rebuild)
sys.exit(0)
cfg = {}
if a.fp32:
cfg["fp16"] = False
cfg["expert_merge"] = False
if a.no_fp16:
cfg["fp16"] = False
if a.no_merge:
cfg["expert_merge"] = False
if a.signid:
cfg["signid_mode"] = a.signid
if a.merge_th is not None:
cfg["merge_threshold"] = a.merge_th
if a.keep is not None:
cfg["keep_fp32_modules"] = tuple(x for x in a.keep.split(",") if x)
if a.attn is not None:
cfg["attn"] = a.attn
if a.chunk_users is not None:
cfg["chunk_users"] = a.chunk_users
if a.triton_bm is not None:
cfg["triton_block_m"] = a.triton_bm
if a.moe is not None:
cfg["vectorize_moe"] = (a.moe == "dense")
if a.emb_fp16:
cfg["emb_fp16"] = True
if a.dedup_emb:
cfg["dedup_embedding"] = True
if a.emb_bag:
cfg["use_embedding_bag"] = True
if a.no_moe_baddbmm:
cfg["moe_baddbmm"] = False
if a.no_skip_moe_loss:
cfg["skip_moe_loss"] = False
if a.logit_bias is not None:
cfg["logit_bias"] = a.logit_bias
if a.sparse_pool:
cfg["sparse_pool"] = True
if a.precompute_rep:
cfg["precompute_rep"] = True
if a.eval_precompute:
cfg["eval_precompute"] = True
if a.no_collate_rep:
cfg["collate_rep"] = False
if a.no_movedev_rep:
cfg["movedev_rep"] = False
if a.compile:
cfg["compile"] = True
if a.profile is not None:
run_profile(cfg, n=a.profile, batch_size=a.bs, rebuild=a.rebuild)
sys.exit(0)
mf = None if a.feasign_none else {1: 2}
run_once(cfg, batch_size=a.bs, max_batches=a.smoke, max_feasign_per_slot=mf, rebuild=a.rebuild)