# Copyright (c) Alibaba, Inc. and its affiliates. # flake8: noqa import argparse import json import os import pandas as pd import plotly.graph_objects as go import re import seaborn as sns import streamlit as st import yaml def generate_color_palette(n): palette = sns.color_palette('hls', n) return palette.as_hex() def read_yaml(yaml_file) -> dict: """ Read yaml file to dict. """ with open(yaml_file, 'r') as f: try: stream = yaml.safe_load(f) except yaml.YAMLError as e: print(e) raise e return stream def read_jsonl(input_file): all_data = [] with open(input_file, 'r') as input_jsonl: for line in input_jsonl: data = json.loads(line) all_data.append(data) return all_data def cat_view(df, category, models): cat_df = df[df['category'] == category] if category != 'all' else df model_scores = {} for model in models: model_a_scores = (cat_df[cat_df['model_a'] == model]['scores'].apply(lambda x: x[0]).sum()) model_b_scores = (cat_df[cat_df['model_b'] == model]['scores'].apply(lambda x: x[1]).sum()) # calculate count of occurrences for model_a and model_b model_a_counts = cat_df[cat_df['model_a'] == model].shape[0] model_b_counts = cat_df[cat_df['model_b'] == model].shape[0] # calculate average scores for each model model_scores[model] = (model_a_scores + model_b_scores) / (model_a_counts + model_b_counts) return dict( category=category, count=cat_df['question_id'].nunique(), **dict(model_scores), ) def get_color(value): good_thresholds = [0.2, 0.1, 0.05] bad_thresholds = [-0.2, -0.1, -0.05] good_colors = ['#32CD32', '#98FF98', '#D0F0C0'] bad_colors = ['#FF6347', '#FA8072', '#ffcccb'] color = '' for i in range(len(good_thresholds)): if value > good_thresholds[i]: color = good_colors[i] break for i in range(len(bad_thresholds)): if value < bad_thresholds[i]: color = bad_colors[i] break return 'background-color: %s' % color if color else '' def get_category_map(category_file): if not category_file or not os.path.exists(category_file): return dict() category_mapping = read_yaml(category_file) return category_mapping def get_category_group(category_map, cat): for key, value in category_map.items(): if cat in value or '*' in value: return key return cat def show_table_view(df): models = df['model_a'].unique().tolist() for model in df['model_b'].unique().tolist(): if model not in models: models.append(model) catogories = df['category'].unique().tolist() # Calculate the average score for each category cat_data = [cat_view(df, category, models) for category in catogories] cat_df = pd.DataFrame(cat_data) cat_df.sort_values(by=models[0], ascending=False, inplace=True, ignore_index=True) # Add total data total_data = cat_view(df, 'all', models) cat_df.loc[cat_df.shape[0]] = [ '总计', total_data['count'], *[total_data[model] for model in models], ] # Render as link for each category cat_df['category'] = cat_df['category'].apply( lambda x: '{}'.format( x, models[0], models[1], x), ) # Format the table if len(models) == 2: cat_df['score diff'] = cat_df.apply(lambda x: (x[models[1]] - x[models[0]]) / x[models[0]], axis=1) cat_df.rename( columns={ 'category': '类别', 'count': 'Case数量', 'score diff': '分差百分比', }, inplace=True, ) style_df = cat_df.style.format({ **dict(zip(models, ['{:.2f}' for model in models])), '分差百分比': '{:.2%}', }).applymap( get_color, subset=['分差百分比']) else: cat_df.rename( columns={ 'category': '类别', 'count': 'Case数量', }, inplace=True, ) def format(row): p = '{:.2%}'.format((row[model] - row[baseline_model]) / row[baseline_model]) p = '+' + p if p[0] != '-' else p return f'{row[model]:.2f} ({p})' def color(value): match = re.search(r'\((.*?)\)', value) return get_color(float(match.group(1).strip('%')) / 100) if match else '' baseline_model = models[0] for model in models[1:]: cat_df[model] = cat_df.apply( format, axis=1, ) cat_df[baseline_model] = cat_df[baseline_model].apply(lambda x: '{:.2f}'.format(x)) style_df = cat_df.style.applymap(color, subset=models[1:]) # df_html = style_df.to_html(escape=False, index=False) # TODO df_html = style_df.to_html() st.markdown(df_html, unsafe_allow_html=True) def show_radar_chart(df): score_list = [] for index, row in df.iterrows(): score_list.append(dict(model=row['model_a'], category=row['category'], score=row['scores'][0])) score_list.append(dict(model=row['model_b'], category=row['category'], score=row['scores'][1])) score_df = pd.DataFrame(score_list) df_agg = score_df.groupby(['model', 'category'])['score'].mean().reset_index() pivot_df = df_agg.pivot(index='model', columns='category', values='score').fillna(0) categories = pivot_df.columns.tolist() fig = go.Figure() num_models = len(pivot_df.index.tolist()) color_palette = generate_color_palette(num_models) color_dict = dict(zip(pivot_df.index.tolist(), color_palette)) for model in pivot_df.index.tolist(): model_values = pivot_df[pivot_df.index == model].values.tolist()[0] model_values.append(model_values[0]) # Make the data cyclic fig.add_trace( go.Scatterpolar( r=model_values, theta=categories + [categories[0]], # Make the categories cyclic fill='none', name=model, line=dict(color=color_dict[model]), )) fig.update_layout(polar=dict(radialaxis=dict(visible=True, range=[0, 10])), showlegend=True) st.plotly_chart(fig) def show_single_result(df, category, model_a, model_b): categories = df['category'].unique().tolist() model_names = df['model_a'].unique().tolist() col1, col2 = st.columns([1, 3]) with col1: category = st.selectbox( '选择类别', categories, index=categories.index(category) if category in categories else 0, ) df = df[df['category'] == category] with col2: ques = st.selectbox('选择问题', df['question'].unique().tolist()) col1, col2 = st.columns(2) with col1: model_a_options = [model_a] model_a = st.selectbox( '选择模型A', model_a_options, index=model_a_options.index(model_a), ) with col2: model_b = st.selectbox('选择模型B', [m for m in model_names if m != model_a]) with st.container(): st.markdown( """
{model_a} 回答
{output_a}{model_b} 回答
{output_b}