diff --git a/evaluators/chatglm3.py b/evaluators/chatglm3.py new file mode 100644 index 0000000..f635b10 --- /dev/null +++ b/evaluators/chatglm3.py @@ -0,0 +1,253 @@ +import os +import re +from tqdm import tqdm +import torch +from transformers import AutoTokenizer, AutoModel, AutoConfig +from transformers.generation.logits_process import LogitsProcessor +from transformers.generation.utils import LogitsProcessorList +from evaluators.evaluator import Evaluator + +from pathlib import Path +from typing import Union, Tuple +import typer +from peft import AutoPeftModelForCausalLM, PeftModelForCausalLM +from transformers import ( + AutoModelForCausalLM, + PreTrainedModel, + PreTrainedTokenizer, + PreTrainedTokenizerFast, +) +from peft import PeftModel, PeftConfig + +CUDA_VISIBLE_DEVICES = 0 +ModelType = Union[PreTrainedModel, PeftModelForCausalLM] +TokenizerType = Union[PreTrainedTokenizer, PreTrainedTokenizerFast] + + +def _resolve_path(path: Union[str, Path]) -> Path: + return Path(path).expanduser().resolve() + + +def load_model_and_tokenizer(model_dir: Union[str, Path]) -> Tuple[ModelType, TokenizerType]: + model_dir = _resolve_path(model_dir) + if (model_dir / 'adapter_config.json').exists(): + config = PeftConfig.from_pretrained(model_dir) + base_model = AutoModel.from_pretrained("THUDM/chatglm3-6b", trust_remote_code=True, mirror="tuna", + resume_download=True) + # base_model = AutoModelForCausalLM.from_pretrained(config.base_model_name_or_path,trust_remote_code=True, + # device_map='auto') + model = PeftModel.from_pretrained(base_model, model_dir) + tokenizer_dir = model.peft_config['default'].base_model_name_or_path + + else: + model = AutoModelForCausalLM.from_pretrained( + model_dir, trust_remote_code=True, device_map='auto' + ) + tokenizer_dir = model_dir + tokenizer = AutoTokenizer.from_pretrained( + tokenizer_dir, trust_remote_code=True + ) + return model, tokenizer + + +class InvalidScoreLogitsProcessor(LogitsProcessor): + def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor: + if torch.isnan(scores).any() or torch.isinf(scores).any(): + scores.zero_() + scores[..., 5] = 5e4 + return scores + + +class ChatGLM_Evaluator(Evaluator): + def __init__(self, choices, k, model_name, device, finetune=None, finetune_method=None): + super(ChatGLM_Evaluator, self).__init__(choices, model_name, k) + # try adding 'mirror="tuna"' and 'resume_download=True' if facing the 'read timed out' problem + # or directly clone the model + + self.tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm3-6b", trust_remote_code=True, mirror="tuna") + if finetune_method == "qlora": + model_dir = 'qlora/glm3/' + finetune + self.model, self.tokenizer = load_model_and_tokenizer(model_dir) + self.model = self.model.half().to(device) + print("Model loaded! use GLM3 " + finetune) + else: + self.model = AutoModel.from_pretrained("THUDM/chatglm3-6b", trust_remote_code=True, mirror="tuna", + resume_download=True).half().to(device) + print("Model loaded! (GLM3)") + # prompt = '以下是中国关于car_knowledge_in_train考试的单项选择题,请选出其中的正确答案。\n\n比亚迪的刀片电池采用哪种电池技术?\ + # nA. 镍氢电池\nB. 锂离子电池\nC. 磷酸铁锂电池\nD. 液态电池\n答案:' + # response, history = self.model.chat(self.tokenizer, prompt, max_length=128) + # print(history) + # current_length = 0 + # response = "" + # for resp, _ in self.model.stream_chat(self.tokenizer, prompt, max_length=300, + # do_sample=False): + # print(resp[current_length:], end="", flush=True) + # current_length = len(resp) + # response = resp + # print('') + self.model = self.model.eval() + + def eval_subject(self, subject_name, test_df, dev_df=None, few_shot=False, cot=False, save_result_dir=None): + correct_num = 0 + result = [] + score = [] + answer_list = [] + if few_shot: + history = self.generate_few_shot_prompt(subject_name, dev_df, cot=cot) + else: + # _ , history = self.model.chat(self.tokenizer, "接下来会提供给你一些选择题,请选出正确的答案。", do_sample=False) + history = [{'role': 'user', + 'content': '接下来会提供给你一些选择题,请选出正确的答案,给出正确的选项即可。'}, + {'role': 'assistant', + 'content': '好的,我会尽力解答。'}] + answers = list(test_df['answer']) + for row_index, row in tqdm(test_df.iterrows(), total=len(test_df)): + question = self.format_example(row, include_answer=False, cot=cot) + if few_shot: + history_temp = history.copy() + response, _ = self.model.chat(self.tokenizer, question, max_length=300, + do_sample=False, history=history_temp) + response = response.strip() + # For ChatGLM, we use answer extraction in answer-only mode too. + ans, direct_extract = self.extract_cot_answer(row, response) + else: # zero-shot by extracting answer from distribution + response, _ = self.model.chat(self.tokenizer, question, max_length=300, + do_sample=False, history=history) + response = response.strip() + ans, direct_extract = self.extract_cot_answer(row, response) + print(response, ans) + # ans = self.generate_dist(self.model, self.tokenizer, question, do_sample=False, max_length=2048, + # history=history) + if ans == answers[row_index]: + correct_num += 1 + correct = 1 + else: + correct = 0 + if save_result_dir: + # if few_shot: + result.append(response) + answer_list.append(ans) + score.append(correct) + correct_ratio = 100 * correct_num / len(answers) + + if save_result_dir: + # if few_shot: + test_df['model_output'] = result + test_df['correctness'] = score + test_df['model_answer'] = answer_list + result_file_name = f'{subject_name}_{correct_ratio}_test.csv' + if few_shot: + result_file_name = f'{subject_name}_{correct_ratio}_few_shot_test.csv' + test_df.to_csv(os.path.join(save_result_dir, result_file_name)) + + return correct_ratio + + def eval_qa(self, subject_name, qa_df, save_result_dir=None): + # history = [] + history = [{'role': 'user', + 'content': '接下来会给你一些一些汽车领域相关问题,请回答。'}, + {'role': 'assistant', + 'content': '好的,我会尽力解答。'}] + for row_index, row in tqdm(qa_df.iterrows(), total=len(qa_df)): + question = row['question'] + response, _ = self.model.chat(self.tokenizer, question, max_length=300, do_sample=False, history=history) + response = response.strip() + qa_df.loc[row_index, 'model_output'] = response + if save_result_dir: + result_file_name = f'{subject_name}_qa_test_result.csv' + qa_df.to_csv(os.path.join(save_result_dir, result_file_name)) + + def generate_few_shot_prompt(self, subject, dev_df, cot=False): + message = [] + k = self.k + if self.k == -1: + k = dev_df.shape[0] + + message.extend(self.format_example(dev_df.iloc[0, :], cot=cot, + add_prompt=f"以下是中国关于{subject}考试的单项选择题,请选出其中的正确答案。\n\n")) + for i in range(1, k): + message.extend(self.format_example(dev_df.iloc[i, :], cot=cot)) + return message + + def format_example(self, line, include_answer=True, cot=False, add_prompt=''): + example = add_prompt + line['question'] + # print(example) + for choice in self.choices: + example += f'\n{choice}. {line[f"{choice}"]}' + example += '\n答案:' + if include_answer: + if cot: + ans = "让我们一步一步思考,\n" + line["explanation"] + f"\n所以答案是{line['answer']}。" + else: + ans = line["answer"] + m = [{ + 'role': 'user', + 'content': example + }, { + 'role': 'assistant', + 'content': ans + }] + return m + return example + + def extract_cot_answer(self, line, gen_ans): + m = re.findall(r'所以答案是(.+?)。', gen_ans, re.M) + if len(m) > 0 and m[-1] in self.choices: + return m[-1], True + answer_patterns = [ + r'([ABCD])是正确的', + r'选项([ABCD])正确', + r'答案为([ABCD])', + r'答案是([ABCD])', + r'答案([ABCD])', + r'选择([ABCD])', + r'答案:([ABCD])', + r'选择答案([ABCD])' + ] + # RE extraction + for answer_pattern in answer_patterns: + m = re.search(answer_pattern, gen_ans, re.M) + if m: + answer = m.group(1) + return answer, False + # only containing one choice-character + m = re.findall(r'[ABCD]', gen_ans, re.M) + if len(m) == 1: + answer = m[0] + return answer, False + answer_word_counter = 0 + # only containing one choice-context + for c in self.choices: + if str(line[f'{c}']) in gen_ans: + answer = c + answer_word_counter += 1 + if answer_word_counter == 1: + return answer, False + return '-', False + + def generate_dist(self, model, tokenizer, query, history, num_beams=1, max_length=2048, + do_sample=False, top_p=0.7, temperature=0.95, logits_processor=None, **kwargs): + if history is None: + history = [] + if logits_processor is None: + logits_processor = LogitsProcessorList() + logits_processor.append(InvalidScoreLogitsProcessor()) + gen_kwargs = {"num_beams": num_beams, "do_sample": do_sample, "top_p": top_p, "max_length": 2048, + "temperature": temperature, "logits_processor": logits_processor, **kwargs} + if not history: + prompt = query + else: + prompt = "" + for i, (old_query, response) in enumerate(history): + prompt += "[Round {}]\n问:{}\n答:{}\n".format(i, old_query, response) + prompt += "[Round {}]\n问:{}\n答:".format(len(history), query) + inputs = tokenizer([prompt], return_tensors="pt") + inputs = inputs.to(model.device) + outputs = model.generate(**inputs, return_dict_in_generate=True, output_scores=True, **gen_kwargs) + + score = outputs.scores[0][0].tolist() + choice_score = [score[167], score[333], score[251], score[416]] + ranked_index = [index for index, value in + sorted(list(enumerate(choice_score)), key=lambda x: x[1], reverse=True)] + return self.choices[ranked_index[0]]