269 lines
6.7 KiB
Python
269 lines
6.7 KiB
Python
import csv
|
||
import hashlib
|
||
import json
|
||
import jsonlines as jsonl
|
||
import os
|
||
import re
|
||
import yaml
|
||
|
||
from evalscope.constants import DumpMode
|
||
from evalscope.utils.logger import get_logger
|
||
|
||
logger = get_logger()
|
||
|
||
|
||
class OutputsStructure:
|
||
LOGS_DIR = 'logs'
|
||
PREDICTIONS_DIR = 'predictions'
|
||
REVIEWS_DIR = 'reviews'
|
||
REPORTS_DIR = 'reports'
|
||
CONFIGS_DIR = 'configs'
|
||
|
||
def __init__(self, outputs_dir: str, is_make=True):
|
||
self.outputs_dir = outputs_dir
|
||
self.is_make = is_make
|
||
self._dirs = {
|
||
'logs_dir': None,
|
||
'predictions_dir': None,
|
||
'reviews_dir': None,
|
||
'reports_dir': None,
|
||
'configs_dir': None
|
||
}
|
||
|
||
def _get_dir(self, attr_name, dir_name):
|
||
if self._dirs[attr_name] is None:
|
||
dir_path = os.path.join(self.outputs_dir, dir_name)
|
||
if self.is_make:
|
||
os.makedirs(dir_path, exist_ok=True)
|
||
self._dirs[attr_name] = dir_path
|
||
return self._dirs[attr_name]
|
||
|
||
@property
|
||
def logs_dir(self):
|
||
return self._get_dir('logs_dir', OutputsStructure.LOGS_DIR)
|
||
|
||
@property
|
||
def predictions_dir(self):
|
||
return self._get_dir('predictions_dir', OutputsStructure.PREDICTIONS_DIR)
|
||
|
||
@property
|
||
def reviews_dir(self):
|
||
return self._get_dir('reviews_dir', OutputsStructure.REVIEWS_DIR)
|
||
|
||
@property
|
||
def reports_dir(self):
|
||
return self._get_dir('reports_dir', OutputsStructure.REPORTS_DIR)
|
||
|
||
@property
|
||
def configs_dir(self):
|
||
return self._get_dir('configs_dir', OutputsStructure.CONFIGS_DIR)
|
||
|
||
|
||
def jsonl_to_list(jsonl_file):
|
||
"""
|
||
Read jsonl file to list.
|
||
|
||
Args:
|
||
jsonl_file: jsonl file path.
|
||
|
||
Returns:
|
||
list: list of lines. Each line is a dict.
|
||
"""
|
||
res_list = []
|
||
with jsonl.open(jsonl_file, mode='r') as reader:
|
||
for line in reader.iter(type=dict, allow_none=True, skip_invalid=False):
|
||
res_list.append(line)
|
||
return res_list
|
||
|
||
|
||
def jsonl_to_reader(jsonl_file):
|
||
"""
|
||
Read jsonl file to reader object.
|
||
|
||
Args:
|
||
jsonl_file: jsonl file path.
|
||
|
||
Returns:
|
||
reader: jsonl reader object.
|
||
"""
|
||
with jsonl.open(jsonl_file, mode='r') as reader:
|
||
return reader
|
||
|
||
|
||
def dump_jsonl_data(data_list, jsonl_file, dump_mode=DumpMode.OVERWRITE):
|
||
"""
|
||
Dump data to jsonl file.
|
||
|
||
Args:
|
||
data_list: data list to be dumped. [{'a': 'aaa'}, ...]
|
||
jsonl_file: jsonl file path.
|
||
dump_mode: dump mode. It can be 'overwrite' or 'append'.
|
||
"""
|
||
if not jsonl_file:
|
||
raise ValueError('output file must be provided.')
|
||
|
||
jsonl_file = os.path.expanduser(jsonl_file)
|
||
|
||
if not isinstance(data_list, list):
|
||
data_list = [data_list]
|
||
|
||
if dump_mode == DumpMode.OVERWRITE:
|
||
dump_mode = 'w'
|
||
elif dump_mode == DumpMode.APPEND:
|
||
dump_mode = 'a'
|
||
with jsonl.open(jsonl_file, mode=dump_mode) as writer:
|
||
writer.write_all(data_list)
|
||
|
||
|
||
def jsonl_to_csv(jsonl_file, csv_file):
|
||
"""
|
||
Convert jsonl file to csv file.
|
||
|
||
Args:
|
||
jsonl_file: jsonl file path.
|
||
csv_file: csv file path.
|
||
"""
|
||
data = jsonl_to_list(jsonl_file)
|
||
if not data:
|
||
logger.warning(f'No data found in {jsonl_file}.')
|
||
return
|
||
|
||
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow(data[0].keys()) # Write header
|
||
for item in data:
|
||
writer.writerow(item.values())
|
||
|
||
|
||
def csv_to_list(csv_file) -> list:
|
||
"""
|
||
Read csv file to list.
|
||
|
||
Args:
|
||
csv_file: csv file path.
|
||
|
||
Returns:
|
||
list: list of lines. Each line is a dict.
|
||
"""
|
||
res_list = []
|
||
with open(csv_file, 'r', encoding='utf-8') as f:
|
||
reader = csv.DictReader(f)
|
||
for row in reader:
|
||
res_list.append(row)
|
||
return res_list
|
||
|
||
|
||
def csv_to_jsonl(csv_file, jsonl_file):
|
||
"""
|
||
Convert csv file to jsonl file.
|
||
|
||
Args:
|
||
csv_file: csv file path.
|
||
jsonl_file: jsonl file path.
|
||
"""
|
||
data = csv_to_list(csv_file)
|
||
if not data:
|
||
logger.warning(f'No data found in {csv_file}.')
|
||
return
|
||
|
||
dump_jsonl_data(data, jsonl_file, dump_mode=DumpMode.OVERWRITE)
|
||
|
||
|
||
def yaml_to_dict(yaml_file) -> dict:
|
||
"""
|
||
Read yaml file to dict.
|
||
"""
|
||
with open(yaml_file, 'r') as f:
|
||
try:
|
||
stream = yaml.safe_load(f)
|
||
except yaml.YAMLError as e:
|
||
logger.error(f'{e}')
|
||
raise e
|
||
|
||
return stream
|
||
|
||
|
||
def dict_to_yaml(d: dict, yaml_file: str):
|
||
"""
|
||
Dump dict to yaml file.
|
||
"""
|
||
with open(yaml_file, 'w') as f:
|
||
yaml.dump(d, f, default_flow_style=False, allow_unicode=True)
|
||
|
||
|
||
def json_to_dict(json_file) -> dict:
|
||
"""
|
||
Read json file to dict.
|
||
"""
|
||
with open(json_file, 'r') as f:
|
||
try:
|
||
stream = json.load(f)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f'{e}')
|
||
raise e
|
||
|
||
return stream
|
||
|
||
|
||
def are_paths_same(path1, path2):
|
||
"""
|
||
Check if two paths are the same.
|
||
"""
|
||
real_path1 = os.path.realpath(os.path.abspath(os.path.expanduser(path1)))
|
||
real_path2 = os.path.realpath(os.path.abspath(os.path.expanduser(path2)))
|
||
|
||
return real_path1 == real_path2
|
||
|
||
|
||
def dict_to_json(d: dict, json_file: str):
|
||
"""
|
||
Dump dict to json file.
|
||
"""
|
||
with open(json_file, 'w') as f:
|
||
json.dump(d, f, indent=4, ensure_ascii=False)
|
||
|
||
|
||
def get_latest_folder_path(work_dir):
|
||
from datetime import datetime
|
||
|
||
# Get all subdirectories in the work_dir
|
||
folders = [f for f in os.listdir(work_dir) if os.path.isdir(os.path.join(work_dir, f))]
|
||
|
||
# Get the timestamp(YYYYMMDD_HHMMSS)
|
||
timestamp_pattern = re.compile(r'^\d{8}_\d{6}$')
|
||
|
||
# Filter out the folders
|
||
timestamped_folders = [f for f in folders if timestamp_pattern.match(f)]
|
||
|
||
if not timestamped_folders:
|
||
print(f'>> No timestamped folders found in {work_dir}!')
|
||
return None
|
||
|
||
# timestamp parser
|
||
def parse_timestamp(folder_name):
|
||
return datetime.strptime(folder_name, '%Y%m%d_%H%M%S')
|
||
|
||
# Find the latest folder
|
||
latest_folder = max(timestamped_folders, key=parse_timestamp)
|
||
|
||
return os.path.join(work_dir, latest_folder)
|
||
|
||
|
||
def gen_hash(name: str, bits: int = 32):
|
||
return hashlib.md5(name.encode(encoding='UTF-8')).hexdigest()[:bits]
|
||
|
||
|
||
def get_valid_list(input_list, candidate_list):
|
||
"""
|
||
Get the valid and invalid list from input_list based on candidate_list.
|
||
Args:
|
||
input_list: The input list.
|
||
candidate_list: The candidate list.
|
||
|
||
Returns:
|
||
valid_list: The valid list.
|
||
invalid_list: The invalid list.
|
||
"""
|
||
return [i for i in input_list if i in candidate_list], \
|
||
[i for i in input_list if i not in candidate_list]
|