evalscope_v0.17.0/evalscope.0.17.0/evalscope/utils/io_utils.py

269 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import hashlib
import json
import jsonlines as jsonl
import os
import re
import yaml
from evalscope.constants import DumpMode
from evalscope.utils.logger import get_logger
logger = get_logger()
class OutputsStructure:
LOGS_DIR = 'logs'
PREDICTIONS_DIR = 'predictions'
REVIEWS_DIR = 'reviews'
REPORTS_DIR = 'reports'
CONFIGS_DIR = 'configs'
def __init__(self, outputs_dir: str, is_make=True):
self.outputs_dir = outputs_dir
self.is_make = is_make
self._dirs = {
'logs_dir': None,
'predictions_dir': None,
'reviews_dir': None,
'reports_dir': None,
'configs_dir': None
}
def _get_dir(self, attr_name, dir_name):
if self._dirs[attr_name] is None:
dir_path = os.path.join(self.outputs_dir, dir_name)
if self.is_make:
os.makedirs(dir_path, exist_ok=True)
self._dirs[attr_name] = dir_path
return self._dirs[attr_name]
@property
def logs_dir(self):
return self._get_dir('logs_dir', OutputsStructure.LOGS_DIR)
@property
def predictions_dir(self):
return self._get_dir('predictions_dir', OutputsStructure.PREDICTIONS_DIR)
@property
def reviews_dir(self):
return self._get_dir('reviews_dir', OutputsStructure.REVIEWS_DIR)
@property
def reports_dir(self):
return self._get_dir('reports_dir', OutputsStructure.REPORTS_DIR)
@property
def configs_dir(self):
return self._get_dir('configs_dir', OutputsStructure.CONFIGS_DIR)
def jsonl_to_list(jsonl_file):
"""
Read jsonl file to list.
Args:
jsonl_file: jsonl file path.
Returns:
list: list of lines. Each line is a dict.
"""
res_list = []
with jsonl.open(jsonl_file, mode='r') as reader:
for line in reader.iter(type=dict, allow_none=True, skip_invalid=False):
res_list.append(line)
return res_list
def jsonl_to_reader(jsonl_file):
"""
Read jsonl file to reader object.
Args:
jsonl_file: jsonl file path.
Returns:
reader: jsonl reader object.
"""
with jsonl.open(jsonl_file, mode='r') as reader:
return reader
def dump_jsonl_data(data_list, jsonl_file, dump_mode=DumpMode.OVERWRITE):
"""
Dump data to jsonl file.
Args:
data_list: data list to be dumped. [{'a': 'aaa'}, ...]
jsonl_file: jsonl file path.
dump_mode: dump mode. It can be 'overwrite' or 'append'.
"""
if not jsonl_file:
raise ValueError('output file must be provided.')
jsonl_file = os.path.expanduser(jsonl_file)
if not isinstance(data_list, list):
data_list = [data_list]
if dump_mode == DumpMode.OVERWRITE:
dump_mode = 'w'
elif dump_mode == DumpMode.APPEND:
dump_mode = 'a'
with jsonl.open(jsonl_file, mode=dump_mode) as writer:
writer.write_all(data_list)
def jsonl_to_csv(jsonl_file, csv_file):
"""
Convert jsonl file to csv file.
Args:
jsonl_file: jsonl file path.
csv_file: csv file path.
"""
data = jsonl_to_list(jsonl_file)
if not data:
logger.warning(f'No data found in {jsonl_file}.')
return
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(data[0].keys()) # Write header
for item in data:
writer.writerow(item.values())
def csv_to_list(csv_file) -> list:
"""
Read csv file to list.
Args:
csv_file: csv file path.
Returns:
list: list of lines. Each line is a dict.
"""
res_list = []
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
res_list.append(row)
return res_list
def csv_to_jsonl(csv_file, jsonl_file):
"""
Convert csv file to jsonl file.
Args:
csv_file: csv file path.
jsonl_file: jsonl file path.
"""
data = csv_to_list(csv_file)
if not data:
logger.warning(f'No data found in {csv_file}.')
return
dump_jsonl_data(data, jsonl_file, dump_mode=DumpMode.OVERWRITE)
def yaml_to_dict(yaml_file) -> dict:
"""
Read yaml file to dict.
"""
with open(yaml_file, 'r') as f:
try:
stream = yaml.safe_load(f)
except yaml.YAMLError as e:
logger.error(f'{e}')
raise e
return stream
def dict_to_yaml(d: dict, yaml_file: str):
"""
Dump dict to yaml file.
"""
with open(yaml_file, 'w') as f:
yaml.dump(d, f, default_flow_style=False, allow_unicode=True)
def json_to_dict(json_file) -> dict:
"""
Read json file to dict.
"""
with open(json_file, 'r') as f:
try:
stream = json.load(f)
except json.JSONDecodeError as e:
logger.error(f'{e}')
raise e
return stream
def are_paths_same(path1, path2):
"""
Check if two paths are the same.
"""
real_path1 = os.path.realpath(os.path.abspath(os.path.expanduser(path1)))
real_path2 = os.path.realpath(os.path.abspath(os.path.expanduser(path2)))
return real_path1 == real_path2
def dict_to_json(d: dict, json_file: str):
"""
Dump dict to json file.
"""
with open(json_file, 'w') as f:
json.dump(d, f, indent=4, ensure_ascii=False)
def get_latest_folder_path(work_dir):
from datetime import datetime
# Get all subdirectories in the work_dir
folders = [f for f in os.listdir(work_dir) if os.path.isdir(os.path.join(work_dir, f))]
# Get the timestampYYYYMMDD_HHMMSS
timestamp_pattern = re.compile(r'^\d{8}_\d{6}$')
# Filter out the folders
timestamped_folders = [f for f in folders if timestamp_pattern.match(f)]
if not timestamped_folders:
print(f'>> No timestamped folders found in {work_dir}!')
return None
# timestamp parser
def parse_timestamp(folder_name):
return datetime.strptime(folder_name, '%Y%m%d_%H%M%S')
# Find the latest folder
latest_folder = max(timestamped_folders, key=parse_timestamp)
return os.path.join(work_dir, latest_folder)
def gen_hash(name: str, bits: int = 32):
return hashlib.md5(name.encode(encoding='UTF-8')).hexdigest()[:bits]
def get_valid_list(input_list, candidate_list):
"""
Get the valid and invalid list from input_list based on candidate_list.
Args:
input_list: The input list.
candidate_list: The candidate list.
Returns:
valid_list: The valid list.
invalid_list: The invalid list.
"""
return [i for i in input_list if i in candidate_list], \
[i for i in input_list if i not in candidate_list]