分割堆栈以及权重匹配

This commit is contained in:
Pan Qiancheng 2025-04-22 14:34:05 +08:00
parent 42307b2303
commit b6f7ddbcfd
1 changed files with 68 additions and 105 deletions

View File

@ -3,148 +3,111 @@ from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
# 1. 初始化模型
# SentenceTransformer 'all-MiniLM-L6-v2' 是一个不错的选择,速度快,效果好
# 它输出的向量通常是L2归一化的
# 初始化模型
model = SentenceTransformer('all-MiniLM-L6-v2')
# 2. 模拟错误数据库 (包含清洗前的原始错误信息)
# 实际应用中,这些数据可能来自文件或数据库
# 模拟错误数据库
error_db = [
{
"error": """Traceback (most recent call last):\n  File \"image_error.py\", line 55, in <module>\n    loss = criterion(outputs, labels)\n  File \"module.py\", line 1553, in _wrapped_call_impl\n    return self._call_impl(*args, **kwargs)\n  File \"module.py\", line 1562, in _call_impl\n    return forward_call(*args, **kwargs)\n  File \"loss.py\", line 1188, in forward\n    return F.cross_entropy(input, target, weight=self.weight,\n  File \"functional.py\", line 3104, in cross_entropy\n    return cross_entropy_loss(input, target, ...)\nIndexError: Target 15 is out of bounds.\n""",
"error": """Traceback (most recent call last):\n File \"image_error.py\", line 55, in <module>\n loss = criterion(outputs, labels)\n File \"module.py\", line 1553, in _wrapped_call_impl\n return self._call_impl(*args, **kwargs)\n File \"module.py\", line 1562, in _call_impl\n return forward_call(*args, **kwargs)\n File \"loss.py\", line 1188, in forward\n return F.cross_entropy(input, target, weight=self.weight,\n File \"functional.py\", line 3104, in cross_entropy\n return cross_entropy_loss(input, target, ...)\nIndexError: Target 15 is out of bounds.\n""",
"solution": "可能是 labels 中的值超出了分类数量的范围。检查模型最后一层的输出维度是否正确。"
},
{
"error": """Traceback (most recent call last):\n  File \"rnn_error.py\", line 1, in <module>\n    import torch\nModuleNotFoundError: No module named 'torch'\n""",
"solution": "检查路径是否存在,或模型文件是否已正确保存。 (更准确的解决方案应该是检查是否安装了torch)" # 修正更准确的解决方案
"error": """Traceback (most recent call last):\n File \"rnn_error.py\", line 1, in <module>\n import torch\nModuleNotFoundError: No module named 'torch'\n""",
"solution": "检查路径是否存在,或模型文件是否已正确保存。"
},
{
"error": """Traceback (most recent call last):\n  File \"cnn_success.py\", line 51, in <module>\n    loss.backward()\n  File \"_tensor.py\", line 521, in backward\n    torch.autograd.backward(...)\n  File \"__init__.py\", line 289, in backward\n    _engine_run_backward(...)\n  File \"graph.py\", line 768, in _engine_run_backward\n    return run_backward(...)\nKeyboardInterrupt\n""",
"solution": "程序被用户中断 (KeyboardInterrupt)。这不是一个典型的代码错误,可能是手动停止了程序。" # 修正更准确的解决方案
"error": """Traceback (most recent call last):\n File \"cnn_success.py\", line 51, in <module>\n loss.backward()\n File \"_tensor.py\", line 521, in backward\n torch.autograd.backward(...)\n File \"__init__.py\", line 289, in backward\n _engine_run_backward(...)\n File \"graph.py\", line 768, in _engine_run_backward\n return run_backward(...)\nKeyboardInterrupt\n""",
"solution": "程序被用户中断 (KeyboardInterrupt)。这不是一个典型的代码错误,可能是手动停止了程序。"
},
{
"error": """Traceback (most recent call last):\n  File \"cnn_success.py\", line 1, in <module>\n    import torch\nModuleNotFoundError: No module named 'torch'\n""",
"error": """Traceback (most recent call last):\n File \"cnn_success.py\", line 1, in <module>\n import torch\nModuleNotFoundError: No module named 'torch'\n""",
"solution": "找不到模块torch, 请检查是否已安装 PyTorch。"
}
]
# 3. 错误信息清洗函数
def clean_traceback(traceback_string):
"""
清洗 Traceback 字符串移除文件路径保留文件名行号函数名和最后的错误信息
"""
cleaned = traceback_string
# 0. 移除 \r 字符,标准化换行符(先做,避免影响后续正则匹配)
cleaned = cleaned.replace('\r', '')
# 1. 移除 Traceback 头部
# 使用 re.MULTILINE 标志使 ^ 匹配每行的开头
cleaned = traceback_string.replace('\r', '')
cleaned = re.sub(r'^Traceback \(most recent call last\):[\n]+', '', cleaned, flags=re.MULTILINE)
# --- 修复点 ---
# 2. 清洗 File and Line Info
# 新的正则表达式:
# ^ *File \"(.*?)\", line (\d+) - 匹配开头、File "、路径(Group 1)、", line 、行号(Group 2)
# (, in .*?)?$ - 可选组(Group 3):匹配 ", in " 后跟直到行尾的所有内容(.*?$)
# 这样 Group 3 会捕获整个 ", in function_name" 或 ", in " 或 ", in <module>" 部分,或者为 None
file_line_regex = re.compile(r'^ *File \"(.*?)\", line (\d+)(, in .*?)?$', re.MULTILINE)
def replace_file_line(match):
# Groups: 1=full_path, 2=line_num, 3=optional_in_part (e.g., ", in function_name")
full_path = match.group(1)
line_num = match.group(2)
# Group 3 是整个可选的 ", in ..." 部分或 None
function_part = match.group(3) or '' # 如果 Group 3 不存在,则为空字符串
# 从完整路径中提取文件名
last_sep_index = max(full_path.rfind('/'), full_path.rfind('\\'))
filename = full_path[last_sep_index + 1:] if last_sep_index != -1 else full_path
# 重新组合字符串,只保留文件名、行号和捕获的整个可选的 ", in ..." 部分
# 格式为 File "文件名", line 行号, (, in 函数名)
function_part = match.group(3) or ''
filename = full_path.split('/')[-1].split('\\')[-1]
return f' File "{filename}", line {line_num}{function_part}'
# 使用替换函数处理所有匹配到的 File 行
cleaned = file_line_regex.sub(replace_file_line, cleaned)
# -------------
# 3. 移除不稳定内存地址/对象表示
# 应该在 file line 清洗之后进行,以免误伤文件路径
cleaned = re.sub(r'<.* at 0x[0-9a-fA-F]+>', '<...>', cleaned)
# 4. 移除连续的空白行,并去除首尾空白
cleaned = re.sub(r'\n\s*\n+', '\n', cleaned).strip()
return cleaned
# 对数据库中的错误信息进行清洗
cleaned_error_db = []
def split_traceback_layers(traceback_string):
"""将traceback清洗后按行分割为多个frame每一层都可以独立向量化匹配"""
lines = traceback_string.strip().split("\n")
return [line.strip() for line in lines if line.strip()]
# 构建分层向量数据库
layered_error_db = []
for entry in error_db:
cleaned_error = clean_traceback(entry["error"])
cleaned_error_db.append({
"cleaned_error": cleaned_error,
"original_error": entry["error"], # 保留原始错误信息以便展示
cleaned = clean_traceback(entry["error"])
layers = split_traceback_layers(cleaned)
embeddings = model.encode(layers)
layered_error_db.append({
"layers": layers,
"vectors": embeddings,
"solution": entry["solution"]
})
# 4. 向量化清洗后的错误信息并构建索引
# SentenceTransformer 默认输出 L2 归一化的向量
embeddings = np.array([model.encode(e['cleaned_error']) for e in cleaned_error_db])
# 查询错误信息
user_error = """Traceback (most recent call last):\n File \"train_model.py\", line 72, in <module>\n loss = loss_function(predictions, ground_truth)\n File \"core.py\", line 442, in _call_function\n return self._execute(*args, **kwargs)\n File \"core.py\", line 450, in _execute\n return forward_execution(*args, **kwargs)\n File \"loss_functions.py\", line 205, in forward\n return F.cross_entropy(input, target, weight=self.weight,\n File \"functional.py\", line 3104, in cross_entropy\n return cross_entropy_loss(input, target, ...)\nIndexError: Target 15 is out of bounds.\n"""
# user_error = """Traceback (most recent call last):\n File \"image_error.py\", line 55, in <module>\n loss = criterion(outputs, labels)\n File \"module.py\", line 1553, in _wrapped_call_impl\n return self._call_impl(*args, **kwargs)\n File \"module.py\", line 1570, in _call_impl\n return forward_call(*args, **kwargs)\n File \"loss.py\", line 1200, in forward\n return F.mse_loss(input, target, reduction=self.reduction)\n File \"functional.py\", line 2301, in mse_loss\n return mean_squared_error_loss(input, target, ...)\nValueError: The size of input tensor must match the size of target tensor.\n"""
cleaned_user = clean_traceback(user_error)
user_layers = split_traceback_layers(cleaned_user)
user_vectors = model.encode(user_layers)
# 使用 L2 距离的 FAISS 索引,适用于归一化向量的余弦相似度搜索
# 注意:对于归一化向量 u, v ||u - v||^2 = ||u||^2 + ||v||^2 - 2 * (u . v)
# 如果 ||u|| = ||v|| = 1, 距离 squared = 1 + 1 - 2 * (u . v) = 2 - 2 * cosine_similarity
# 距离 = sqrt(2 - 2 * cosine_similarity)
# 或者更简单的, cosine_similarity = 1 - Distance^2 / 2 (这里Distance是L2距离)
# 或者, cosine_similarity = 1 - Distance / 2 (如果Distance是L2距离 in [0, 2])
# SentenceTransformer 输出的 embeddings 已经是 L2 归一化的L2 距离在 [0, sqrt(2)]
# 如果距离是 L2 距离 D则 cosine_similarity = 1 - (D^2)/2 是更准确的转换
# 然而,通常 FAISS 内部计算的是 squared L2 distance D^2这里我们假设D是实际L2距离。
# 更直接的转换是使用 IP (Inner Product) 索引,但 L2 索引也常用于归一化向量。
# 让我们使用 L2 索引,并计算 Cosine Similarity = 1 - D^2/2 或 1 - D/2 哪个更符合预期。
# SentenceTransformer文档通常建议 L2 距离或 Cosine Similarity。
# 让我们计算 squared L2 distance D_sq, then cosine_similarity = 1 - D_sq/2.
# 或者直接用 D = sqrt(D_sq), cosine_similarity = 1 - D^2/2.
# FAISS IndexFlatL2 存储的是向量本身计算的是实际L2距离 D = ||u-v||。
# 那么 cosine_similarity = 1 - D^2 / 2 是正确的(对于归一化向量)。
# 计算逐层匹配分数(平均最大匹配)
# def compute_layered_similarity(user_vecs, db_vecs):
# score = 0.0
# for u_vec in user_vecs:
# sims = np.dot(db_vecs, u_vec)
# score += np.max(sims)
# return score / len(user_vecs) if len(user_vecs) > 0 else 0.0
def compute_layered_similarity(user_vecs, db_vecs, layer_weights=None):
score = 0.0
weighted_score = 0.0
index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(embeddings)
# 默认权重(越低的层越重)
if layer_weights is None:
layer_weights = np.linspace(0.1, 1, len(user_vecs)) # 低层权重较小,高层权重较大
# 5. 用户报错(测试查询)- 使用原始的、未清洗的报错信息
user_error = """Traceback (most recent call last):\r\n  File \"/root/bash_tests/image_error.py\", line 55, in <module>\r\n    loss = criterion(outputs, labels)\r\n  File \"/root/miniconda3/envs/qc-d2l/lib/python3.10/site-packages/torch/nn/modules/module.py\", line 1553, in _wrapped_call_impl\r\n    return self._call_impl(*args, **kwargs)\r\n  File \"/root/miniconda3/envs/qc-d2l/lib/python3.10/site-packages/torch/nn/modules/module.py\", line 1562, in _call_impl\r\n    return forward_call(*args, **kwargs)\r\n  File \"/root/miniconda3/envs/qc-d2l/lib/python3.10/site-packages/torch/nn/modules/loss.py\", line 1188, in forward\r\n    return F.cross_entropy(input, target, weight=self.weight,\r\n  File \"/root/miniconda3/envs/qc-d2l/lib/python3.10/site-packages/torch/nn/functional.py\", line 3104, in cross_entropy\r\n    return torch._C._nn.cross_entropy_loss(input, target, weight, _Reduction.get_enum(reduction), ignore_index, label_smoothing)\r\nIndexError: Target 15 is out of bounds.\r\n"""
# user_error = "Unknown command 'python'"
for i, u_vec in enumerate(user_vecs):
sims = np.dot(db_vecs, u_vec) # 计算每一层的相似度
max_sim = np.max(sims) # 获取该层的最大相似度
# 6. 清洗用户查询并向量化
cleaned_user_error = clean_traceback(user_error)
weighted_score += max_sim * layer_weights[i] # 将相似度乘以当前层的权重
# 计算加权平均分数
return weighted_score / np.sum(layer_weights) if len(user_vecs) > 0 else 0.0
# 在分层数据库中找到最相似的一项
best_match = None
best_score = -1
for db_entry in layered_error_db:
score = compute_layered_similarity(user_vectors, db_entry["vectors"])
if score > best_score:
best_score = score
best_match = db_entry
# 输出匹配结果
print("--- 清洗后的用户错误信息 ---")
print(cleaned_user_error)
print("-------------------------")
query_vec = model.encode(cleaned_user_error)
# FAISS search expects a 2D array, even for a single query
query_vec = np.array([query_vec])
# 7. 在索引中搜索最相似的错误
# k=1 表示返回最相似的1个结果
D, I = index.search(query_vec, k=1)
# 8. 输出最相似的错误及其解决方案及匹配度
# D 包含了 L2 距离, I 包含了匹配项在原数据库中的索引
matched_index = I[0][0]
distance = D[0][0] # L2 distance
# 计算余弦相似度,对于 L2 归一化向量, Cosine Similarity = 1 - D^2 / 2
cosine_similarity = 1 - (distance**2) / 2
# 8. 输出最相似的错误及其解决方案及匹配度
matched_entry = cleaned_error_db[matched_index]
print("\n--- 最相似的已知错误信息 ---")
print(matched_entry['cleaned_error'])
print(cleaned_user)
print("\n--- 最相似的错误堆栈 ---")
print("\n".join(best_match["layers"]))
print("\n--- 建议的解决方案 ---")
print(matched_entry['solution'])
print(f"\n--- 匹配度 (Cosine Similarity): {cosine_similarity:.4f}")
print(best_match["solution"])
print(f"\n--- 匹配度 (分层匹配平均最大相似度): {best_score:.4f}")