vector_match_tests/vector_tests.py

190 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import time
import logging
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
# 配置日志
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
# 初始化模型
model = SentenceTransformer('all-MiniLM-L6-v2')
# 模拟错误数据库
error_db = [
{
"error": "Traceback (most recent call last):\n File \"image_error.py\", line 55, in <module>\n loss = criterion(outputs, labels)\n File \"module.py\", line 1553, in _wrapped_call_impl\n return self._call_impl(*args, **kwargs)\n File \"module.py\", line 1562, in _call_impl\n return forward_call(*args, **kwargs)\n File \"loss.py\", line 1188, in forward\n return F.cross_entropy(input, target, weight=self.weight,\n File \"functional.py\", line 3104, in cross_entropy\n return cross_entropy_loss(input, target, ...)\nIndexError: Target 15 is out of bounds.\n",
"solution": "可能是 labels 中的值超出了分类数量的范围。检查模型最后一层的输出维度是否正确。"
},
{
"error": "Traceback (most recent call last):\n File \"rnn_error.py\", line 1, in <module>\n import torch\nModuleNotFoundError: No module named 'torch'\n",
"solution": "检查路径是否存在,或模型文件是否已正确保存。"
},
{
"error": "Traceback (most recent call last):\n File \"cnn_success.py\", line 51, in <module>\n loss.backward()\n File \"_tensor.py\", line 521, in backward\n torch.autograd.backward(...)\n File \"__init__.py\", line 289, in backward\n _engine_run_backward(...)\n File \"graph.py\", line 768, in _engine_run_backward\n return run_backward(...)\nKeyboardInterrupt\n",
"solution": "程序被用户中断 (KeyboardInterrupt)。这不是一个典型的代码错误,可能是手动停止了程序。"
},
{
"error": "Traceback (most recent call last):\n File \"cnn_success.py\", line 1, in <module>\n import torch\nModuleNotFoundError: No module named 'torch'\n",
"solution": "找不到模块torch, 请检查是否已安装 PyTorch。"
},
{
"error": "Traceback (most recent call last):\n File \"rnn_error.py\", line 40, in <module>\n loss = criterion(output, label.unsqueeze(0))\n File \"module.py\", line 1553, in _wrapped_call_impl\n return self._call_impl(*args, **kwargs)\n File \"module.py\", line 1562, in _call_impl\n return forward_call(*args, **kwargs)\n File \"loss.py\", line 1188, in forward\n return F.cross_entropy(input, target, weight=self.weight,\n File \"functional.py\", line 3104, in cross_entropy\n return cross_entropy_loss(input, target, ...)\nValueError: Expected input batch_size (2) to match target batch_size (1).\n",
"solution": "检查模型输出和标签的形状是否匹配。确保它们在 batch_size 维度上是一致的。"
},
{
"error": "Syntax error in command",
"solution": "检查代码中的语法错误。可能是缺少括号、引号或其他语法问题。"
}
]
def clean_traceback(traceback_string):
cleaned = traceback_string.replace('\r', '')
cleaned = re.sub(r'^Traceback \(most recent call last\):[\n]+', '', cleaned, flags=re.MULTILINE)
file_line_regex = re.compile(r'^ *File \"(.*?)\", line (\d+)(, in .*?)?$', re.MULTILINE)
def replace_file_line(match):
full_path = match.group(1)
line_num = match.group(2)
function_part = match.group(3) or ''
filename = full_path.split('/')[-1].split('\\')[-1]
return f' File "{filename}", line {line_num}{function_part}'
cleaned = file_line_regex.sub(replace_file_line, cleaned)
cleaned = re.sub(r'<.* at 0x[0-9a-fA-F]+>', '<...>', cleaned)
cleaned = re.sub(r'\n\s*\n+', '\n', cleaned).strip()
return cleaned
def split_traceback_layers(traceback_string):
"""将traceback清洗后按行分割为多个frame每一层都可以独立向量化匹配"""
lines = traceback_string.strip().split("\n")
return [line.strip() for line in lines if line.strip()]
# 计时:创建数据库
start_db_time = time.time()
# 初始化聚合索引
aggregate_vectors = []
for entry in error_db:
cleaned = clean_traceback(entry["error"])
layers = split_traceback_layers(cleaned)
embeddings = model.encode(layers)
aggregate_vectors.append(np.mean(embeddings, axis=0)) # 使用平均向量代表整个错误
aggregate_vectors = np.array(aggregate_vectors).astype('float32')
aggregate_index = faiss.IndexFlatIP(aggregate_vectors.shape[1])
aggregate_index.add(aggregate_vectors)
# 构建分层向量数据库,增加 FAISS 索引
layered_error_db = []
for entry, agg_vec in zip(error_db, aggregate_vectors):
cleaned = clean_traceback(entry["error"])
layers = split_traceback_layers(cleaned)
embeddings = model.encode(layers)
index = faiss.IndexFlatIP(embeddings.shape[1]) # 构建内积索引
index.add(embeddings.astype('float32')) # 添加向量到索引中
layered_error_db.append({
"layers": layers,
"vectors": embeddings, # 保留原始向量
"index": index, # 添加 FAISS 索引
"solution": entry["solution"],
"agg_vector": agg_vec # 添加聚合向量
})
end_db_time = time.time()
logging.info(f"创建数据库耗时:{end_db_time - start_db_time:.2f}")
# 查询错误信息
# 完全相同
# user_error = """Traceback (most recent call last):\n File \"image_error.py\", line 55, in <module>\n loss = criterion(outputs, labels)\n File \"module.py\", line 1553, in _wrapped_call_impl\n return self._call_impl(*args, **kwargs)\n File \"module.py\", line 1562, in _call_impl\n return forward_call(*args, **kwargs)\n File \"loss.py\", line 1188, in forward\n return F.cross_entropy(input, target, weight=self.weight,\n File \"functional.py\", line 3104, in cross_entropy\n return cross_entropy_loss(input, target, ...)\nIndexError: Target 15 is out of bounds.\n"""
# 完全不同
# user_error = """"Traceback (most recent call last):\n File \"D:\\Develop\\Projects\\bash-hook\\matches\\perf_2.py\", line 94, in <module>\n D, I = index.search(query_vec, k=1)\n File \"C:\\Users\\qcqcqc\\.conda\\envs\\sentence_transformers\\lib\\site-packages\\faiss\\class_wrappers.py\", line 329, in replacement_search\n assert d == self.d\nAssertionError\n"""
# 底层不同
# user_error = """Traceback (most recent call last):\n File \"image_error.py\", line 55, in <module>\n loss = criterion(outputs, labels)\n File \"module.py\", line 1553, in _wrapped_call_impl\n return self._call_impl(*args, **kwargs)\n File \"module.py\", line 1570, in _call_impl\n return forward_call(*args, **kwargs)\n File \"loss.py\", line 1200, in forward\n return F.mse_loss(input, target, reduction=self.reduction)\n File \"functional.py\", line 2301, in mse_loss\n return mean_squared_error_loss(input, target, ...)\nValueError: The size of input tensor must match the size of target tensor.\n"""
# 顶层不同
user_error = """Traceback (most recent call last):\n File \"train_model.py\", line 72, in <module>\n loss = loss_function(predictions, ground_truth)\n File \"core.py\", line 442, in _call_function\n return self._execute(*args, **kwargs)\n File \"core.py\", line 450, in _execute\n return forward_execution(*args, **kwargs)\n File \"loss_functions.py\", line 205, in forward\n return F.cross_entropy(input, target, weight=self.weight,\n File \"functional.py\", line 3104, in cross_entropy\n return cross_entropy_loss(input, target, ...)\nIndexError: Target 15 is out of bounds.\n"""
# 无错误信息
# user_error = "success."
cleaned_user = clean_traceback(user_error)
user_layers = split_traceback_layers(cleaned_user)
user_vectors = model.encode(user_layers)
# 计算逐层匹配分数(平均最大匹配)
def compute_layered_similarity_avg(user_vecs, db_index, layer_weights=None):
score = 0.0
weighted_score = 0.0
if layer_weights is None:
layer_weights = np.ones(len(user_vecs)) # 每层权重相同默认为1
for i, u_vec in enumerate(user_vecs):
u_vec = u_vec.astype('float32').reshape(1, -1)
sim, _ = db_index.search(u_vec, k=1) # 获取当前层与数据库中最相似向量的相似度
max_sim = sim[0][0]
weighted_score += max_sim * layer_weights[i]
return weighted_score / np.sum(layer_weights) if len(user_vecs) > 0 else 0.0
def compute_layered_similarity_sco(user_vecs, db_index, layer_weights=None):
weighted_score = 0.0
if layer_weights is None:
# 使用对数函数生成非线性权重
layer_weights = np.logspace(0, 1, len(user_vecs)) # 从浅层到深层递增权重
layer_weights /= np.sum(layer_weights) # 归一化权重使其和为1
for i, u_vec in enumerate(user_vecs):
u_vec = u_vec.astype('float32').reshape(1, -1)
sim, _ = db_index.search(u_vec, k=1) # 获取当前层与数据库中最相似向量的相似度
max_sim = sim[0][0]
weighted_score += max_sim * layer_weights[i]
return weighted_score if len(user_vecs) > 0 else 0.0
# 计时:查询
start_query_time = time.time()
cleaned_user = clean_traceback(user_error)
user_layers = split_traceback_layers(cleaned_user)
user_vectors = model.encode(user_layers)
user_agg_vector = np.mean(user_vectors, axis=0).astype('float32').reshape(1, -1)
# 初步检索聚合索引找到最接近的前5个问题
k = 5
sim, indices = aggregate_index.search(user_agg_vector, k)
# 输出找到的序号
print(f"找到的前{k}个最接近的问题的序号: {indices[0]}")
# 具体查找最合适的匹配
best_match = None
best_score = -1
best_match_index = -1 # 用于记录最相似的错误堆栈的序号
for idx in indices[0]:
db_entry = layered_error_db[idx]
score = compute_layered_similarity_sco(user_vectors, db_entry["index"])
if score > best_score:
best_score = score
best_match = db_entry
best_match_index = idx # 更新最相似的错误堆栈的序号
end_query_time = time.time()
logging.info(f"查询耗时:{end_query_time - start_query_time:.2f}")
# 输出匹配结果
print("--- 清洗后的用户错误信息 ---")
print(cleaned_user)
print("\n--- 最相似的错误堆栈 ---")
print("\n".join(best_match["layers"]))
print("\n--- 建议的解决方案 ---")
print(best_match["solution"])
print(f"\n--- 匹配度 (分层匹配平均最大相似度): {best_score:.4f}")
print(f"\n--- 最相似的错误堆栈的序号: {best_match_index}")