fix: bench.py 只保留测试用户数据(流式过滤+磁盘缓存),解决 OOM 与 16min 重载
不同用户被因果mask隔离,过滤非测试用户对测试样本AUC/PCOC零影响。 流式加载只持有测试用户记录,避免 CTRTestSeqDataset 构造期 OOM; 过滤结果缓存到 bench_filtered_cache.pt,后续秒级复用。 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
+117
-31
@@ -1,46 +1,128 @@
|
|||||||
"""本地测量闭环:设置 infer.CONFIG,跑推理,同步计时,打印 AUC/PCOC/延迟/总分。
|
"""本地测量闭环:设置 infer.CONFIG,跑推理,同步计时,打印 AUC/PCOC/延迟/总分。
|
||||||
|
|
||||||
不进提交包。在 AI Studio notebook(带 dataset/ 与 ckpt.pt)里运行:
|
不进提交包。**以子进程方式运行**(AI Studio 内核禁止 import torch):
|
||||||
|
|
||||||
%cd /home/aistudio/code
|
%cd /home/aistudio/code
|
||||||
!python bench.py # 默认配置基准
|
!python bench.py --smoke 50 # 冒烟:只跑前 50 batch
|
||||||
|
!python bench.py # 默认基线
|
||||||
|
!python bench.py --fp32 # FP32 天花板(Task 3)
|
||||||
|
!python bench.py --rebuild # 强制重建过滤缓存
|
||||||
|
|
||||||
或在 notebook cell 里逐配置扫描:
|
关键设计——只保留“测试用户”的数据:
|
||||||
|
不同用户被因果 mask 完全隔离,非测试用户的前向输出不参与打分;过滤掉它们
|
||||||
import bench
|
对测试样本的 AUC/PCOC 没有任何影响,却能把数据量从 924 万条降到一小部分,
|
||||||
bench.run_once({"fp16": False, "expert_merge": False}) # FP32 参考跑
|
避免 CTRTestSeqDataset 构造时 OOM。过滤后的数据缓存到磁盘,后续秒级复用。
|
||||||
bench.run_once({"signid_mode": "modulo"}) # 取模 vs clamp
|
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from collections import defaultdict
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
# baseline 把依赖装在 --target 目录(非默认 site-packages),在 kernel 里 import
|
# baseline 把依赖装在 --target 目录(非默认 site-packages),import 前先加 sys.path
|
||||||
# 之前必须先把它加到 sys.path,否则 import torch 会 ModuleNotFoundError。
|
|
||||||
for _p in ("/home/aistudio/external-libraries", "/home/aistudio/libraries",
|
for _p in ("/home/aistudio/external-libraries", "/home/aistudio/libraries",
|
||||||
os.path.abspath("../libraries"), os.path.abspath("./libraries")):
|
os.path.abspath("../libraries"), os.path.abspath("./libraries")):
|
||||||
if os.path.isdir(_p) and _p not in sys.path:
|
if os.path.isdir(_p) and _p not in sys.path:
|
||||||
sys.path.insert(0, _p)
|
sys.path.insert(0, _p)
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
import torch
|
import torch
|
||||||
from torch.utils.data import DataLoader
|
from torch.utils.data import DataLoader
|
||||||
|
|
||||||
import infer # 同目录
|
import infer # 同目录
|
||||||
|
|
||||||
|
|
||||||
def run_once(config_override=None, batch_size=50, max_batches=None, max_feasign_per_slot=None):
|
def _test_user_ids(test_csv):
|
||||||
"""跑一次本地推理并打分。
|
"""从 test.csv 读出所有测试用户 id(第 2 列 userid)。"""
|
||||||
|
users = set()
|
||||||
|
with open(test_csv) as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
parts = line.split(",")
|
||||||
|
if len(parts) >= 2:
|
||||||
|
users.add(int(parts[1]))
|
||||||
|
return users
|
||||||
|
|
||||||
Args:
|
|
||||||
config_override: 覆盖 infer.CONFIG 的字典(如 {"fp16": False})
|
def _load_filtered(history_dir, test_csv, test_users):
|
||||||
batch_size: DataLoader 的 batch 大小(本地参考;评测端可能自有设定)
|
"""流式读取所有文件,只保留 userid ∈ test_users 的记录(不持有完整字典,防 OOM)。
|
||||||
max_batches: 只跑前 N 个 batch(快速冒烟用),None=全量
|
|
||||||
max_feasign_per_slot: 传给 CTRTestSeqDataset 的截断字典,None=不截断;
|
解析逻辑与 infer.load_sample_files 完全一致,只是多了一道用户过滤。
|
||||||
默认沿用 baseline 的 {1: 2}
|
|
||||||
Returns:
|
|
||||||
infer._cal_score 的结果 dict
|
|
||||||
"""
|
"""
|
||||||
|
files = (sorted(history_dir.glob("*.csv")) if history_dir.exists() else []) + [test_csv]
|
||||||
|
print(f"[BENCH] 流式过滤加载 {len(files)} 个文件(仅保留 {len(test_users)} 个测试用户)...")
|
||||||
|
item_dict = {}
|
||||||
|
user_logs = defaultdict(list)
|
||||||
|
for fp in files:
|
||||||
|
has_clk = infer._detect_has_clk(fp)
|
||||||
|
min_parts = 5 if has_clk else 4
|
||||||
|
kept = 0
|
||||||
|
with open(fp) as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
parts = line.split(",")
|
||||||
|
if len(parts) < min_parts:
|
||||||
|
continue
|
||||||
|
userid = int(parts[1])
|
||||||
|
if userid not in test_users:
|
||||||
|
continue
|
||||||
|
logid = int(parts[0])
|
||||||
|
adid = int(parts[2])
|
||||||
|
if has_clk:
|
||||||
|
clk = int(parts[3])
|
||||||
|
timestamp = int(parts[4])
|
||||||
|
fs = 5
|
||||||
|
else:
|
||||||
|
clk = 0
|
||||||
|
timestamp = int(parts[3])
|
||||||
|
fs = 4
|
||||||
|
signs, slots = [], []
|
||||||
|
for pair in parts[fs:]:
|
||||||
|
if ":" in pair:
|
||||||
|
s, sl = pair.split(":", 1)
|
||||||
|
signs.append(int(s))
|
||||||
|
slots.append(int(sl))
|
||||||
|
item_dict[logid] = {
|
||||||
|
"logid": logid, "userid": userid, "adid": adid,
|
||||||
|
"clk": clk, "timestamp": timestamp,
|
||||||
|
"signs": np.array(signs, dtype=np.int64),
|
||||||
|
"slots": np.array(slots, dtype=np.int64),
|
||||||
|
}
|
||||||
|
user_logs[userid].append((timestamp, logid))
|
||||||
|
kept += 1
|
||||||
|
print(f" {fp.name}: has_clk={has_clk}, kept={kept}")
|
||||||
|
|
||||||
|
user_seq = {}
|
||||||
|
for u, logs in user_logs.items():
|
||||||
|
logs.sort(key=lambda x: x[0])
|
||||||
|
user_seq[u] = [lid for _, lid in logs]
|
||||||
|
print(f"[BENCH] 过滤后:{len(item_dict)} 条记录,{len(user_seq)} 个用户")
|
||||||
|
return item_dict, user_seq
|
||||||
|
|
||||||
|
|
||||||
|
def _get_data(cur, ref, rebuild=False):
|
||||||
|
"""取过滤后的 (item_dict, user_seq),优先读磁盘缓存。"""
|
||||||
|
cache = cur / "bench_filtered_cache.pt"
|
||||||
|
test_csv = ref / "test.csv"
|
||||||
|
history = ref / "history"
|
||||||
|
if cache.exists() and not rebuild:
|
||||||
|
print(f"[BENCH] 读取过滤缓存:{cache}")
|
||||||
|
d = torch.load(cache, weights_only=False)
|
||||||
|
return d["item_dict"], d["user_seq"]
|
||||||
|
test_users = _test_user_ids(test_csv)
|
||||||
|
item_dict, user_seq = _load_filtered(history, test_csv, test_users)
|
||||||
|
torch.save({"item_dict": item_dict, "user_seq": user_seq}, cache)
|
||||||
|
print(f"[BENCH] 已缓存 -> {cache}")
|
||||||
|
return item_dict, user_seq
|
||||||
|
|
||||||
|
|
||||||
|
def run_once(config_override=None, batch_size=50, max_batches=None,
|
||||||
|
max_feasign_per_slot=None, rebuild=False):
|
||||||
|
"""跑一次本地推理并打分。返回 infer._cal_score 的结果 dict。"""
|
||||||
if config_override is None:
|
if config_override is None:
|
||||||
config_override = {}
|
config_override = {}
|
||||||
if max_feasign_per_slot is None:
|
if max_feasign_per_slot is None:
|
||||||
@@ -51,20 +133,15 @@ def run_once(config_override=None, batch_size=50, max_batches=None, max_feasign_
|
|||||||
|
|
||||||
cur = Path(__file__).parent
|
cur = Path(__file__).parent
|
||||||
ref = cur / "dataset"
|
ref = cur / "dataset"
|
||||||
history = ref / "history"
|
|
||||||
test_csv = ref / "test.csv"
|
test_csv = ref / "test.csv"
|
||||||
label_file = ref / "label_data.txt"
|
label_file = ref / "label_data.txt"
|
||||||
|
|
||||||
# ----- 加载数据 -----
|
# ----- 取数据(过滤+缓存)-----
|
||||||
files = (sorted(history.glob("*.csv")) if history.exists() else []) + [test_csv]
|
item_dict, user_seq = _get_data(cur, ref, rebuild=rebuild)
|
||||||
item_dict, user_seq = infer.load_sample_files(files)
|
|
||||||
test_logids = infer.load_logids_from_file(test_csv)
|
test_logids = infer.load_logids_from_file(test_csv)
|
||||||
ds = infer.CTRTestSeqDataset(
|
ds = infer.CTRTestSeqDataset(
|
||||||
test_logids_ordered=list(test_logids),
|
test_logids_ordered=list(test_logids), item_dict=item_dict,
|
||||||
item_dict=item_dict,
|
user_seq=user_seq, max_feasign_per_slot=max_feasign_per_slot, max_ctx_len=None,
|
||||||
user_seq=user_seq,
|
|
||||||
max_feasign_per_slot=max_feasign_per_slot,
|
|
||||||
max_ctx_len=None,
|
|
||||||
)
|
)
|
||||||
loader = DataLoader(
|
loader = DataLoader(
|
||||||
ds, batch_size=batch_size, shuffle=False, num_workers=0,
|
ds, batch_size=batch_size, shuffle=False, num_workers=0,
|
||||||
@@ -76,6 +153,11 @@ def run_once(config_override=None, batch_size=50, max_batches=None, max_feasign_
|
|||||||
if max_batches is not None and len(batches) >= max_batches:
|
if max_batches is not None and len(batches) >= max_batches:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# 释放构造期内存,降低推理峰值
|
||||||
|
del item_dict, user_seq, ds, loader
|
||||||
|
import gc
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
# ----- 加载模型 -----
|
# ----- 加载模型 -----
|
||||||
model, dev = infer.load_model(ckpt_path=None)
|
model, dev = infer.load_model(ckpt_path=None)
|
||||||
|
|
||||||
@@ -100,10 +182,13 @@ def run_once(config_override=None, batch_size=50, max_batches=None, max_feasign_
|
|||||||
|
|
||||||
# ----- 按 test.csv 顺序写 predict.txt 并打分 -----
|
# ----- 按 test.csv 顺序写 predict.txt 并打分 -----
|
||||||
order = [int(l.split(",")[0]) for l in open(test_csv) if l.strip()]
|
order = [int(l.split(",")[0]) for l in open(test_csv) if l.strip()]
|
||||||
|
missing = [lid for lid in order if lid not in logid2p]
|
||||||
|
if missing:
|
||||||
|
print(f"[BENCH][WARN] {len(missing)} 个测试 logid 没预测到(前几个 {missing[:5]})")
|
||||||
pred_path = cur / "predict.txt"
|
pred_path = cur / "predict.txt"
|
||||||
with open(pred_path, "w") as f:
|
with open(pred_path, "w") as f:
|
||||||
for lid in order:
|
for lid in order:
|
||||||
f.write(f"{logid2p[lid]}\n")
|
f.write(f"{logid2p.get(lid, 0.0)}\n")
|
||||||
|
|
||||||
res = infer._cal_score(pred_path, label_file, default_latency=t_sum)
|
res = infer._cal_score(pred_path, label_file, default_latency=t_sum)
|
||||||
print(
|
print(
|
||||||
@@ -117,7 +202,7 @@ def run_once(config_override=None, batch_size=50, max_batches=None, max_feasign_
|
|||||||
|
|
||||||
def _parse_args():
|
def _parse_args():
|
||||||
import argparse
|
import argparse
|
||||||
ap = argparse.ArgumentParser(description="CTI 推理测量闭环(以子进程方式跑:!python bench.py ...)")
|
ap = argparse.ArgumentParser(description="CTI 推理测量闭环(子进程跑:!python bench.py ...)")
|
||||||
ap.add_argument("--smoke", type=int, default=None, help="只跑前 N 个 batch(冒烟)")
|
ap.add_argument("--smoke", type=int, default=None, help="只跑前 N 个 batch(冒烟)")
|
||||||
ap.add_argument("--bs", type=int, default=50, help="batch_size(本地参考)")
|
ap.add_argument("--bs", type=int, default=50, help="batch_size(本地参考)")
|
||||||
ap.add_argument("--fp32", action="store_true", help="FP32 天花板 = 关 fp16 + 关 expert 合并")
|
ap.add_argument("--fp32", action="store_true", help="FP32 天花板 = 关 fp16 + 关 expert 合并")
|
||||||
@@ -129,6 +214,7 @@ def _parse_args():
|
|||||||
help="逗号分隔的 keep_fp32_modules,如 linear,rep_encoder.input_norm")
|
help="逗号分隔的 keep_fp32_modules,如 linear,rep_encoder.input_norm")
|
||||||
ap.add_argument("--feasign-none", action="store_true",
|
ap.add_argument("--feasign-none", action="store_true",
|
||||||
help="不截断特征(max_feasign_per_slot=None)")
|
help="不截断特征(max_feasign_per_slot=None)")
|
||||||
|
ap.add_argument("--rebuild", action="store_true", help="强制重建过滤缓存")
|
||||||
return ap.parse_args()
|
return ap.parse_args()
|
||||||
|
|
||||||
|
|
||||||
@@ -149,4 +235,4 @@ if __name__ == "__main__":
|
|||||||
if a.keep is not None:
|
if a.keep is not None:
|
||||||
cfg["keep_fp32_modules"] = tuple(x for x in a.keep.split(",") if x)
|
cfg["keep_fp32_modules"] = tuple(x for x in a.keep.split(",") if x)
|
||||||
mf = None if a.feasign_none else {1: 2}
|
mf = None if a.feasign_none else {1: 2}
|
||||||
run_once(cfg, batch_size=a.bs, max_batches=a.smoke, max_feasign_per_slot=mf)
|
run_once(cfg, batch_size=a.bs, max_batches=a.smoke, max_feasign_per_slot=mf, rebuild=a.rebuild)
|
||||||
|
|||||||
Reference in New Issue
Block a user