Init commit. Add Evaluators and support ChatGLM/ChatGLM2.

main
PeterAlbus 10 months ago
parent 620be4a685
commit 299beda00e

7
.gitignore vendored

@ -160,3 +160,10 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
/.idea/
/THUDM/
/THUDM/chatglm-6b/
/lora/
/ptuning/
/logs/
/data/

@ -1,3 +1,12 @@
# LLM_Evaluator # LLM_Evaluator
A simple program to evaluate large language model. A simple program to evaluate large language model.
## Recommend Requirements
- Python 3.8
- torch 1.13.1+cu117
- transformers 4.33.2
- accelerate 0.26.1
- tqdm 4.66.1
- openai 1.10.0

@ -0,0 +1,97 @@
import os
import argparse
import pandas as pd
import torch
# from evaluators.chatgpt import ChatGPT_Evaluator
from evaluators.chatglm import ChatGLM_Evaluator
from evaluators.chatglm2 import ChatGLM_Evaluator as ChatGLM2_Evaluator
import time
choices = ["A", "B", "C", "D"]
def main(args):
if "turbo" in args.model_name or "gpt-4" in args.model_name:
print("Not supported yet")
return -1
# evaluator=ChatGPT_Evaluator(
# choices=choices,
# k=args.ntrain,
# api_key=args.openai_key,
# model_name=args.model_name
# )
elif "chatglm2" in args.model_name:
if args.cuda_device:
os.environ["CUDA_VISIBLE_DEVICES"] = args.cuda_device
device = torch.device("cuda")
if args.finetune:
fine_tune_model = args.finetune
else:
fine_tune_model = None
evaluator=ChatGLM2_Evaluator(
choices=choices,
k=args.ntrain,
model_name=args.model_name,
device=device,
finetune=fine_tune_model
)
elif "chatglm" in args.model_name:
if args.cuda_device:
os.environ["CUDA_VISIBLE_DEVICES"] = args.cuda_device
device = torch.device("cuda")
if args.finetune:
fine_tune_model = args.finetune
else:
fine_tune_model = None
evaluator=ChatGLM_Evaluator(
choices=choices,
k=args.ntrain,
model_name=args.model_name,
device=device,
finetune=fine_tune_model
)
else:
print("Unknown model name")
return -1
if not os.path.exists(r"logs"):
os.mkdir(r"logs")
run_date=time.strftime('%Y-%m-%d_%H-%M-%S',time.localtime(time.time()))
if args.finetune:
fine_tune_model_name = args.finetune
else:
fine_tune_model_name = 'original'
save_result_dir=os.path.join(r"logs",f"{args.model_name}_{fine_tune_model_name}_{run_date}")
os.mkdir(save_result_dir)
subject_list = ['computer_architecture', 'car_knowledge', 'car_use', 'car_market']
for subject_name in subject_list:
print(subject_name)
# subject_name=args.subject
val_file_path=os.path.join('data/val',f'{subject_name}_val.csv')
val_df=pd.read_csv(val_file_path)
if args.few_shot:
dev_file_path=os.path.join('data/dev',f'{subject_name}_dev.csv')
dev_df=pd.read_csv(dev_file_path)
correct_ratio = evaluator.eval_subject(subject_name, val_df, dev_df, few_shot=args.few_shot,save_result_dir=save_result_dir,cot=args.cot)
else:
correct_ratio = evaluator.eval_subject(subject_name, val_df, few_shot=args.few_shot,save_result_dir=save_result_dir)
print("Acc:",correct_ratio)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ntrain", "-k", type=int, default=5)
parser.add_argument("--openai_key", type=str,default="xxx")
parser.add_argument("--minimax_group_id", type=str,default="xxx")
parser.add_argument("--minimax_key", type=str,default="xxx")
parser.add_argument("--few_shot", action="store_true")
parser.add_argument("--model_name",type=str)
parser.add_argument("--cot",action="store_true")
# parser.add_argument("--subject","-s",type=str,default="operating_system")
parser.add_argument("--cuda_device", type=str)
parser.add_argument("--finetune", type=str)
args = parser.parse_args()
main(args)

@ -0,0 +1,164 @@
import os
import re
from tqdm import tqdm
import torch
from transformers import AutoTokenizer, AutoModel, AutoConfig
from transformers.generation.logits_process import LogitsProcessor
from transformers.generation.utils import LogitsProcessorList
from evaluators.evaluator import Evaluator
class InvalidScoreLogitsProcessor(LogitsProcessor):
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor:
if torch.isnan(scores).any() or torch.isinf(scores).any():
scores.zero_()
scores[..., 5] = 5e4
return scores
class ChatGLM_Evaluator(Evaluator):
def __init__(self, choices, k, model_name, device, finetune=None):
super(ChatGLM_Evaluator, self).__init__(choices, model_name, k)
# try adding 'mirror="tuna"' and 'resume_download=True' if facing the 'read timed out' problem
# or directly clone the model
self.tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm-6b", trust_remote_code=True, mirror="tuna")
if finetune:
CHECKPOINT_PATH="ptuning/" + finetune
config = AutoConfig.from_pretrained("THUDM/chatglm-6b", trust_remote_code=True, pre_seq_len=128)
self.model = AutoModel.from_pretrained("THUDM/chatglm-6b", config=config, trust_remote_code=True)
prefix_state_dict = torch.load(os.path.join(CHECKPOINT_PATH, "pytorch_model.bin"))
new_prefix_state_dict = {}
for k, v in prefix_state_dict.items():
if k.startswith("transformer.prefix_encoder."):
new_prefix_state_dict[k[len("transformer.prefix_encoder."):]] = v
self.model.transformer.prefix_encoder.load_state_dict(new_prefix_state_dict)
self.model = self.model.half().to(device)
self.model.transformer.prefix_encoder.float()
print("Model loaded! use GLM + " + finetune)
else:
self.model = AutoModel.from_pretrained("THUDM/chatglm-6b", trust_remote_code=True, mirror="tuna", resume_download=True).half().to(device)
print("Model loaded!(GLM)")
# self.model = self.model.eval()
def eval_subject(self, subject_name, test_df, dev_df=None, few_shot=False, cot=False, save_result_dir=None):
correct_num = 0
if save_result_dir:
if few_shot:
result = []
score = []
if few_shot:
history = self.generate_few_shot_prompt(subject_name, dev_df, cot=cot)
else:
history = []
answers = list(test_df['answer'])
for row_index, row in tqdm(test_df.iterrows(), total=len(test_df)):
question = self.format_example(row, include_answer=False, cot=cot)
if few_shot:
response, _ = self.model.chat(self.tokenizer, question, do_sample=False, history=history)
response = response.strip()
# For ChatGLM, we use answer extraction in answer-only mode too.
ans, direct_extract = self.extract_cot_answer(row, response)
else: # zero-shot by extracting answer from distribution
ans = self.generate_dist(self.model, self.tokenizer, question, do_sample=False, max_length=2048, history=history)
if ans == answers[row_index]:
correct_num += 1
correct = 1
else:
correct = 0
if save_result_dir:
if few_shot:
result.append(response)
score.append(correct)
correct_ratio = 100*correct_num/len(answers)
if save_result_dir:
if few_shot:
test_df['model_output'] = result
test_df['correctness'] = score
test_df.to_csv(os.path.join(save_result_dir, f'{subject_name}_test.csv'))
return correct_ratio
def generate_few_shot_prompt(self, subject, dev_df, cot=False):
message = []
k = self.k
if self.k == -1:
k = dev_df.shape[0]
message.append(self.format_example(dev_df.iloc[0, :], cot=cot, add_prompt=f"以下是中国关于{subject}考试的单项选择题,请选出其中的正确答案。\n\n"))
for i in range(1, k):
message.append(self.format_example(dev_df.iloc[i, :], cot=cot))
return message
def format_example(self, line, include_answer=True, cot=False, add_prompt=''):
example = add_prompt + line['question']
# print(example)
for choice in self.choices:
example += f'\n{choice}. {line[f"{choice}"]}'
example += '\n答案:'
if include_answer:
if cot:
ans = "让我们一步一步思考,\n" + line["explanation"] + f"\n所以答案是{line['answer']}"
else:
ans = line["answer"]
m = (example, ans)
return m
return example
def extract_cot_answer(self, line, gen_ans):
m = re.findall(r'所以答案是(.+?)。', gen_ans, re.M)
if len(m) > 0 and m[-1] in self.choices:
return m[-1], True
answer_patterns = [
r'([ABCD])是正确的',
r'选项([ABCD])正确',
r'答案为([ABCD])',
r'答案是([ABCD])',
r'答案([ABCD])',
r'选择([ABCD])',
r'答案:([ABCD])',
r'选择答案([ABCD])'
]
# RE extraction
for answer_pattern in answer_patterns:
m = re.search(answer_pattern, gen_ans, re.M)
if m:
answer = m.group(1)
return answer, False
# only containing one choice-character
m = re.findall(r'[ABCD]', gen_ans, re.M)
if len(m) == 1:
answer = m[0]
return answer, False
answer_word_counter = 0
# only containing one choice-context
for c in self.choices:
if str(line[f'{c}']) in gen_ans:
answer = c
answer_word_counter += 1
if answer_word_counter == 1:
return answer, False
return '-', False
def generate_dist(self, model, tokenizer, query, history, num_beams=1, max_length=2048,
do_sample=False, top_p=0.7, temperature=0.95, logits_processor=None, **kwargs):
if history is None:
history = []
if logits_processor is None:
logits_processor = LogitsProcessorList()
logits_processor.append(InvalidScoreLogitsProcessor())
gen_kwargs = {"num_beams": num_beams, "do_sample": do_sample, "top_p": top_p, "max_length": 2048,
"temperature": temperature, "logits_processor": logits_processor, **kwargs}
if not history:
prompt = query
else:
prompt = ""
for i, (old_query, response) in enumerate(history):
prompt += "[Round {}]\n问:{}\n答:{}\n".format(i, old_query, response)
prompt += "[Round {}]\n问:{}\n答:".format(len(history), query)
inputs = tokenizer([prompt], return_tensors="pt")
inputs = inputs.to(model.device)
outputs = model.generate(**inputs, return_dict_in_generate=True, output_scores=True, **gen_kwargs)
score = outputs.scores[0][0].tolist()
choice_score = [score[167], score[333], score[251], score[416]]
ranked_index = [index for index, value in sorted(list(enumerate(choice_score)), key=lambda x:x[1], reverse=True)]
return self.choices[ranked_index[0]]

@ -0,0 +1,155 @@
import os
import re
from tqdm import tqdm
import torch
from transformers import AutoTokenizer, AutoModel, AutoConfig
from transformers.generation.logits_process import LogitsProcessor
from transformers.generation.utils import LogitsProcessorList
from evaluators.evaluator import Evaluator
from peft import PeftModel
class InvalidScoreLogitsProcessor(LogitsProcessor):
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor:
if torch.isnan(scores).any() or torch.isinf(scores).any():
scores.zero_()
scores[..., 5] = 5e4
return scores
class ChatGLM_Evaluator(Evaluator):
def __init__(self, choices, k, model_name, device, finetune=None):
super(ChatGLM_Evaluator, self).__init__(choices, model_name, k)
# try adding 'mirror="tuna"' and 'resume_download=True' if facing the 'read timed out' problem
# or directly clone the model
self.tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm2-6b", trust_remote_code=True, mirror="tuna")
self.model = AutoModel.from_pretrained("THUDM/chatglm2-6b", trust_remote_code=True, mirror="tuna", resume_download=True).half().to(device)
if finetune:
peft_model_id = "lora/" + finetune
self.model = PeftModel.from_pretrained(self.model, peft_model_id)
print("Model loaded! use GLM2" + finetune)
else:
print("Model loaded!(GLM2)")
# self.model = self.model.eval()
def eval_subject(self, subject_name, test_df, dev_df=None, few_shot=False, cot=False, save_result_dir=None):
correct_num = 0
if save_result_dir:
if few_shot:
result = []
score = []
if few_shot:
history = self.generate_few_shot_prompt(subject_name, dev_df, cot=cot)
else:
history = []
answers = list(test_df['answer'])
for row_index, row in tqdm(test_df.iterrows(), total=len(test_df)):
question = self.format_example(row, include_answer=False, cot=cot)
if few_shot:
response, _ = self.model.chat(self.tokenizer, question, do_sample=False, history=history)
response = response.strip()
# For ChatGLM, we use answer extraction in answer-only mode too.
ans, direct_extract = self.extract_cot_answer(row, response)
else: # zero-shot by extracting answer from distribution
ans = self.generate_dist(self.model, self.tokenizer, question, do_sample=False, max_length=2048, history=history)
if ans == answers[row_index]:
correct_num += 1
correct = 1
else:
correct = 0
if save_result_dir:
if few_shot:
result.append(response)
score.append(correct)
correct_ratio = 100*correct_num/len(answers)
if save_result_dir:
if few_shot:
test_df['model_output'] = result
test_df['correctness'] = score
test_df.to_csv(os.path.join(save_result_dir, f'{subject_name}_{correct_ratio}_test.csv'))
return correct_ratio
def generate_few_shot_prompt(self, subject, dev_df, cot=False):
message = []
k = self.k
if self.k == -1:
k = dev_df.shape[0]
message.append(self.format_example(dev_df.iloc[0, :], cot=cot, add_prompt=f"以下是中国关于{subject}考试的单项选择题,请选出其中的正确答案。\n\n"))
for i in range(1, k):
message.append(self.format_example(dev_df.iloc[i, :], cot=cot))
return message
def format_example(self, line, include_answer=True, cot=False, add_prompt=''):
example = add_prompt + line['question']
# print(example)
for choice in self.choices:
example += f'\n{choice}. {line[f"{choice}"]}'
example += '\n答案:'
if include_answer:
if cot:
ans = "让我们一步一步思考,\n" + line["explanation"] + f"\n所以答案是{line['answer']}"
else:
ans = line["answer"]
m = (example, ans)
return m
return example
def extract_cot_answer(self, line, gen_ans):
m = re.findall(r'所以答案是(.+?)。', gen_ans, re.M)
if len(m) > 0 and m[-1] in self.choices:
return m[-1], True
answer_patterns = [
r'([ABCD])是正确的',
r'选项([ABCD])正确',
r'答案为([ABCD])',
r'答案是([ABCD])',
r'答案([ABCD])',
r'选择([ABCD])',
r'答案:([ABCD])',
r'选择答案([ABCD])'
]
# RE extraction
for answer_pattern in answer_patterns:
m = re.search(answer_pattern, gen_ans, re.M)
if m:
answer = m.group(1)
return answer, False
# only containing one choice-character
m = re.findall(r'[ABCD]', gen_ans, re.M)
if len(m) == 1:
answer = m[0]
return answer, False
answer_word_counter = 0
# only containing one choice-context
for c in self.choices:
if str(line[f'{c}']) in gen_ans:
answer = c
answer_word_counter += 1
if answer_word_counter == 1:
return answer, False
return '-', False
def generate_dist(self, model, tokenizer, query, history, num_beams=1, max_length=2048,
do_sample=False, top_p=0.7, temperature=0.95, logits_processor=None, **kwargs):
if history is None:
history = []
if logits_processor is None:
logits_processor = LogitsProcessorList()
logits_processor.append(InvalidScoreLogitsProcessor())
gen_kwargs = {"num_beams": num_beams, "do_sample": do_sample, "top_p": top_p, "max_length": 2048,
"temperature": temperature, "logits_processor": logits_processor, **kwargs}
if not history:
prompt = query
else:
prompt = ""
for i, (old_query, response) in enumerate(history):
prompt += "[Round {}]\n问:{}\n答:{}\n".format(i, old_query, response)
prompt += "[Round {}]\n问:{}\n答:".format(len(history), query)
inputs = tokenizer([prompt], return_tensors="pt")
inputs = inputs.to(model.device)
outputs = model.generate(**inputs, return_dict_in_generate=True, output_scores=True, **gen_kwargs)
score = outputs.scores[0][0].tolist()
choice_score = [score[167], score[333], score[251], score[416]]
ranked_index = [index for index, value in sorted(list(enumerate(choice_score)), key=lambda x:x[1], reverse=True)]
return self.choices[ranked_index[0]]

@ -0,0 +1,169 @@
import os
from tqdm import tqdm
import openai
from evaluators.evaluator import Evaluator
from time import sleep
import re
class ChatGPT_Evaluator(Evaluator):
def __init__(self, choices, k, api_key,model_name):
super(ChatGPT_Evaluator, self).__init__(choices, model_name, k)
openai.api_key = api_key
def format_example(self,line,include_answer=True,cot=False):
example=line['question']
for choice in self.choices:
example+=f'\n{choice}. {line[f"{choice}"]}'
example+='\n答案:'
if include_answer:
if cot:
ans=line["answer"]
content="让我们一步一步思考,\n"+line["explanation"]+f"\n所以答案是{ans}"
return [
{"role":"user","content":example},
{"role":"assistant","content":content}
]
else:
return [
{"role":"user","content":example},
{"role":"assistant","content":line["answer"]}
]
else:
return [
{"role":"user","content":example},
]
def generate_few_shot_prompt(self, subject, dev_df, cot=False):
prompt=[
{
"role":"system",
"content":f"你是一个中文人工智能助手,以下是中国关于{subject}考试的单项选择题,请选出其中的正确答案。"
}
]
k=self.k
if self.k==-1:
k=dev_df.shape[0]
for i in range(k):
tmp=self.format_example(dev_df.iloc[i,:],include_answer=True,cot=cot)
if i==0:
tmp[0]["content"]=f"以下是中国关于{subject}考试的单项选择题,请选出其中的正确答案。\n\n"+tmp[0]["content"]
prompt+=tmp
return prompt
def eval_subject(self, subject_name, test_df, dev_df=None, few_shot=False, save_result_dir=None,cot=False):
correct_num = 0
if save_result_dir:
result = []
score=[]
if few_shot:
few_shot_prompt = self.generate_few_shot_prompt(subject_name, dev_df,cot=cot)
else:
few_shot_prompt=[
{
"role":"system",
"content":f"你是一个中文人工智能助手,以下是中国关于{subject_name}考试的单项选择题,请选出其中的正确答案。"
}
]
answers = list(test_df['answer'])
for row_index, row in tqdm(test_df.iterrows(),total=len(test_df)):
question = self.format_example(row, include_answer=False)
full_prompt = few_shot_prompt + question
if not few_shot:
full_prompt[-1]["content"]=f"以下是中国关于{subject_name}考试的单项选择题,请选出其中的正确答案。\n\n"+full_prompt[-1]["content"]
response=None
timeout_counter=0
while response is None and timeout_counter<=30:
try:
response = openai.ChatCompletion.create(
model=self.model_name,
messages=full_prompt,
temperature=0.
)
except Exception as msg:
if "timeout=600" in str(msg):
timeout_counter+=1
print(msg)
sleep(5)
continue
if response==None:
response_str=""
else:
response_str = response['choices'][0]['message']['content']
#print(response_str)
if cot:
ans_list=re.findall(r"答案是(.+?)。",response_str)
if len(ans_list)==0:
ans_list=re.findall(r"答案为(.+?)。",response_str)
if len(ans_list)==0:
ans_list=re.findall(r"选项(.+?)是正确的。",response_str)
if len(ans_list)==0:
correct=0
else:
if self.exact_match(ans_list[-1],row["answer"]):
correct_num+=1
correct=1
else:
correct=0
else:
response_str=response_str.strip()
if few_shot:
if len(response_str)>0:
if self.exact_match(response_str,row["answer"]):
correct_num+=1
correct=1
else:
correct=0
else:
correct=0
else:
if len(response_str)>0:
ans_list=self.extract_ans(response_str)
if len(ans_list)>0 and (ans_list[-1]==row["answer"]):
correct_num+=1
correct=1
else:
correct=0
else:
correct=0
if save_result_dir:
result.append(response_str)
score.append(correct)
correct_ratio = 100*correct_num/len(answers)
if save_result_dir:
test_df['model_output']=result
test_df["correctness"]=score
test_df.to_csv(os.path.join(save_result_dir, f'{subject_name}_val.csv'),encoding="utf-8",index=False)
return correct_ratio
def extract_ans(self,response_str):
pattern=[
r"^选([A-D])",
r"^选项([A-D])",
r"答案是\s?选?项?\s?([A-D])",
r"答案为\s?选?项?\s?([A-D])",
r"答案应为\s?选?项?\s?([A-D])",
r"答案选\s?选?项?\s?([A-D])",
r"答案是:\s?选?项?\s?([A-D])",
r"答案应该是:\s?选?项?\s?([A-D])",
r"正确的一项是\s?([A-D])",
r"答案为:\s?选?项?\s?([A-D])",
r"答案应为:\s?选?项?\s?([A-D])",
r"答案:\s?选?项?\s?([A-D])",
r"答案是:\s?选?项?\s?([A-D])",
r"答案应该是:\s?选?项?\s?([A-D])",
r"答案为:\s?选?项?\s?([A-D])",
r"答案应为:\s?选?项?\s?([A-D])",
r"答案:\s?选?项?\s?([A-D])",
]
ans_list=[]
if response_str[0] in ["A",'B','C','D']:
ans_list.append(response_str[0])
for p in pattern:
if len(ans_list)==0:
ans_list=re.findall(p,response_str)
else:
break
return ans_list

@ -0,0 +1,47 @@
import re
import string
class Evaluator:
def __init__(self, choices, model_name, k=-1):
self.choices = choices
self.model_name = model_name
self.k = k
self.puncs = list(string.punctuation)
def format_example(self, line, include_answer=True):
example = line['question']
# print(example)
for choice in self.choices:
example += f'\n{choice}. {line[f"{choice}"]}'
example += '\n答案:'
if include_answer:
example += f'{line["answer"]}\n\n'
return example
def generate_few_shot_prompt(self, subject, dev_df):
prompt = f"以下是中国关于{subject}考试的单项选择题,请选出其中的正确答案。\n\n"
k = self.k
if self.k == -1:
k = dev_df.shape[0]
for i in range(k):
prompt += self.format_example(dev_df.iloc[i, :])
return prompt
def eval_subject(self, subject_name, test_df, dev_df=None, few_shot=False, save_result_dir=None):
pass
def normalize_answer(self,s):
def white_space_fix(text):
return ' '.join(text.split())
def remove_punc(text):
exclude=set(self.puncs)
return ''.join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
return white_space_fix(remove_punc(lower(s)))
def exact_match(self,pred, target):
return self.normalize_answer(pred)==self.normalize_answer(target)

@ -0,0 +1,37 @@
import pandas as pd
# 读取CSV文件
df = pd.read_csv("data/val/car_use_val.csv")
correct_num = 0
total_num = 0
# 遍历每一行并生成选择题文本
for index, row in df.iterrows():
question_text = row['question']
options = [row['A'], row['B'], row['C'], row['D']]
answer = row['answer']
# 生成选择题文本
question_text = f"{question_text}\n"
for i, option in enumerate(options):
question_text += f"{chr(65 + i)}. {option}\n"
# 打印生成的选择题
print(f"问题 {index + 1}:")
print(question_text)
user_answer = input("请输入你的答案: ")
df.loc[index, 'user_answer'] = user_answer
print(f"答案: {answer}\n")
total_num += 1
if user_answer == answer:
print("回答正确!\n")
correct_num += 1
else:
print("回答错误!\n")
# 计算正确率
correct_ratio = 100 * correct_num / total_num
print(f"正确率: {correct_ratio}%")
#结果保存到文件
df.to_csv("logs/car_use_val_gpt3.5_" + str(correct_ratio) + ".csv")
Loading…
Cancel
Save