diff --git a/代码/code/bench.py b/代码/code/bench.py index 2d04e9e..54c9da3 100644 --- a/代码/code/bench.py +++ b/代码/code/bench.py @@ -1,46 +1,128 @@ """本地测量闭环:设置 infer.CONFIG,跑推理,同步计时,打印 AUC/PCOC/延迟/总分。 -不进提交包。在 AI Studio notebook(带 dataset/ 与 ckpt.pt)里运行: +不进提交包。**以子进程方式运行**(AI Studio 内核禁止 import torch): %cd /home/aistudio/code - !python bench.py # 默认配置基准 + !python bench.py --smoke 50 # 冒烟:只跑前 50 batch + !python bench.py # 默认基线 + !python bench.py --fp32 # FP32 天花板(Task 3) + !python bench.py --rebuild # 强制重建过滤缓存 -或在 notebook cell 里逐配置扫描: - - import bench - bench.run_once({"fp16": False, "expert_merge": False}) # FP32 参考跑 - bench.run_once({"signid_mode": "modulo"}) # 取模 vs clamp +关键设计——只保留“测试用户”的数据: +不同用户被因果 mask 完全隔离,非测试用户的前向输出不参与打分;过滤掉它们 +对测试样本的 AUC/PCOC 没有任何影响,却能把数据量从 924 万条降到一小部分, +避免 CTRTestSeqDataset 构造时 OOM。过滤后的数据缓存到磁盘,后续秒级复用。 """ import os import sys import time +from collections import defaultdict from pathlib import Path -# baseline 把依赖装在 --target 目录(非默认 site-packages),在 kernel 里 import -# 之前必须先把它加到 sys.path,否则 import torch 会 ModuleNotFoundError。 +# baseline 把依赖装在 --target 目录(非默认 site-packages),import 前先加 sys.path for _p in ("/home/aistudio/external-libraries", "/home/aistudio/libraries", os.path.abspath("../libraries"), os.path.abspath("./libraries")): if os.path.isdir(_p) and _p not in sys.path: sys.path.insert(0, _p) +import numpy as np import torch from torch.utils.data import DataLoader import infer # 同目录 -def run_once(config_override=None, batch_size=50, max_batches=None, max_feasign_per_slot=None): - """跑一次本地推理并打分。 +def _test_user_ids(test_csv): + """从 test.csv 读出所有测试用户 id(第 2 列 userid)。""" + users = set() + with open(test_csv) as f: + for line in f: + line = line.strip() + if not line: + continue + parts = line.split(",") + if len(parts) >= 2: + users.add(int(parts[1])) + return users - Args: - config_override: 覆盖 infer.CONFIG 的字典(如 {"fp16": False}) - batch_size: DataLoader 的 batch 大小(本地参考;评测端可能自有设定) - max_batches: 只跑前 N 个 batch(快速冒烟用),None=全量 - max_feasign_per_slot: 传给 CTRTestSeqDataset 的截断字典,None=不截断; - 默认沿用 baseline 的 {1: 2} - Returns: - infer._cal_score 的结果 dict + +def _load_filtered(history_dir, test_csv, test_users): + """流式读取所有文件,只保留 userid ∈ test_users 的记录(不持有完整字典,防 OOM)。 + + 解析逻辑与 infer.load_sample_files 完全一致,只是多了一道用户过滤。 """ + files = (sorted(history_dir.glob("*.csv")) if history_dir.exists() else []) + [test_csv] + print(f"[BENCH] 流式过滤加载 {len(files)} 个文件(仅保留 {len(test_users)} 个测试用户)...") + item_dict = {} + user_logs = defaultdict(list) + for fp in files: + has_clk = infer._detect_has_clk(fp) + min_parts = 5 if has_clk else 4 + kept = 0 + with open(fp) as f: + for line in f: + line = line.strip() + if not line: + continue + parts = line.split(",") + if len(parts) < min_parts: + continue + userid = int(parts[1]) + if userid not in test_users: + continue + logid = int(parts[0]) + adid = int(parts[2]) + if has_clk: + clk = int(parts[3]) + timestamp = int(parts[4]) + fs = 5 + else: + clk = 0 + timestamp = int(parts[3]) + fs = 4 + signs, slots = [], [] + for pair in parts[fs:]: + if ":" in pair: + s, sl = pair.split(":", 1) + signs.append(int(s)) + slots.append(int(sl)) + item_dict[logid] = { + "logid": logid, "userid": userid, "adid": adid, + "clk": clk, "timestamp": timestamp, + "signs": np.array(signs, dtype=np.int64), + "slots": np.array(slots, dtype=np.int64), + } + user_logs[userid].append((timestamp, logid)) + kept += 1 + print(f" {fp.name}: has_clk={has_clk}, kept={kept}") + + user_seq = {} + for u, logs in user_logs.items(): + logs.sort(key=lambda x: x[0]) + user_seq[u] = [lid for _, lid in logs] + print(f"[BENCH] 过滤后:{len(item_dict)} 条记录,{len(user_seq)} 个用户") + return item_dict, user_seq + + +def _get_data(cur, ref, rebuild=False): + """取过滤后的 (item_dict, user_seq),优先读磁盘缓存。""" + cache = cur / "bench_filtered_cache.pt" + test_csv = ref / "test.csv" + history = ref / "history" + if cache.exists() and not rebuild: + print(f"[BENCH] 读取过滤缓存:{cache}") + d = torch.load(cache, weights_only=False) + return d["item_dict"], d["user_seq"] + test_users = _test_user_ids(test_csv) + item_dict, user_seq = _load_filtered(history, test_csv, test_users) + torch.save({"item_dict": item_dict, "user_seq": user_seq}, cache) + print(f"[BENCH] 已缓存 -> {cache}") + return item_dict, user_seq + + +def run_once(config_override=None, batch_size=50, max_batches=None, + max_feasign_per_slot=None, rebuild=False): + """跑一次本地推理并打分。返回 infer._cal_score 的结果 dict。""" if config_override is None: config_override = {} if max_feasign_per_slot is None: @@ -51,20 +133,15 @@ def run_once(config_override=None, batch_size=50, max_batches=None, max_feasign_ cur = Path(__file__).parent ref = cur / "dataset" - history = ref / "history" test_csv = ref / "test.csv" label_file = ref / "label_data.txt" - # ----- 加载数据 ----- - files = (sorted(history.glob("*.csv")) if history.exists() else []) + [test_csv] - item_dict, user_seq = infer.load_sample_files(files) + # ----- 取数据(过滤+缓存)----- + item_dict, user_seq = _get_data(cur, ref, rebuild=rebuild) test_logids = infer.load_logids_from_file(test_csv) ds = infer.CTRTestSeqDataset( - test_logids_ordered=list(test_logids), - item_dict=item_dict, - user_seq=user_seq, - max_feasign_per_slot=max_feasign_per_slot, - max_ctx_len=None, + test_logids_ordered=list(test_logids), item_dict=item_dict, + user_seq=user_seq, max_feasign_per_slot=max_feasign_per_slot, max_ctx_len=None, ) loader = DataLoader( ds, batch_size=batch_size, shuffle=False, num_workers=0, @@ -76,6 +153,11 @@ def run_once(config_override=None, batch_size=50, max_batches=None, max_feasign_ if max_batches is not None and len(batches) >= max_batches: break + # 释放构造期内存,降低推理峰值 + del item_dict, user_seq, ds, loader + import gc + gc.collect() + # ----- 加载模型 ----- model, dev = infer.load_model(ckpt_path=None) @@ -100,10 +182,13 @@ def run_once(config_override=None, batch_size=50, max_batches=None, max_feasign_ # ----- 按 test.csv 顺序写 predict.txt 并打分 ----- order = [int(l.split(",")[0]) for l in open(test_csv) if l.strip()] + missing = [lid for lid in order if lid not in logid2p] + if missing: + print(f"[BENCH][WARN] {len(missing)} 个测试 logid 没预测到(前几个 {missing[:5]})") pred_path = cur / "predict.txt" with open(pred_path, "w") as f: for lid in order: - f.write(f"{logid2p[lid]}\n") + f.write(f"{logid2p.get(lid, 0.0)}\n") res = infer._cal_score(pred_path, label_file, default_latency=t_sum) print( @@ -117,7 +202,7 @@ def run_once(config_override=None, batch_size=50, max_batches=None, max_feasign_ def _parse_args(): import argparse - ap = argparse.ArgumentParser(description="CTI 推理测量闭环(以子进程方式跑:!python bench.py ...)") + ap = argparse.ArgumentParser(description="CTI 推理测量闭环(子进程跑:!python bench.py ...)") ap.add_argument("--smoke", type=int, default=None, help="只跑前 N 个 batch(冒烟)") ap.add_argument("--bs", type=int, default=50, help="batch_size(本地参考)") ap.add_argument("--fp32", action="store_true", help="FP32 天花板 = 关 fp16 + 关 expert 合并") @@ -129,6 +214,7 @@ def _parse_args(): help="逗号分隔的 keep_fp32_modules,如 linear,rep_encoder.input_norm") ap.add_argument("--feasign-none", action="store_true", help="不截断特征(max_feasign_per_slot=None)") + ap.add_argument("--rebuild", action="store_true", help="强制重建过滤缓存") return ap.parse_args() @@ -149,4 +235,4 @@ if __name__ == "__main__": if a.keep is not None: cfg["keep_fp32_modules"] = tuple(x for x in a.keep.split(",") if x) mf = None if a.feasign_none else {1: 2} - run_once(cfg, batch_size=a.bs, max_batches=a.smoke, max_feasign_per_slot=mf) + run_once(cfg, batch_size=a.bs, max_batches=a.smoke, max_feasign_per_slot=mf, rebuild=a.rebuild)