import sys
from itertools import chain
import pandas as pd
from pathlib import Path
from typing import DefaultDict, Dict, List, Protocol, Union, Tuple, Optional
import ir_measures
from experimaestro import Task, Param, pathgenerator, Annotated, tags, TagDict
from datamaestro_text.data.ir import (
Adhoc,
AdhocAssessments,
Documents,
AdhocResults,
IDItem,
)
from datamaestro_text.data.ir.trec import TrecAdhocRun, TrecAdhocResults
from datamaestro_text.transforms.ir import TopicWrapper
from xpmir.measures import Measure
import xpmir.measures as m
from xpmir.metrics import evaluator
from xpmir.rankers import Retriever
from experimaestro.launchers import Launcher
def get_evaluator(metrics: List[ir_measures.Metric], assessments: AdhocAssessments):
qrels = {
assessedTopic.topic_id: {r.doc_id: r.rel for r in assessedTopic.assessments}
for assessedTopic in assessments.iter()
}
return evaluator(metrics, qrels)
[docs]class BaseEvaluation(Task):
"""Base class for evaluation tasks"""
measures: Param[List[Measure]] = [m.AP, m.P @ 20, m.nDCG, m.nDCG @ 20, m.RR]
"""List of metrics"""
aggregated: Annotated[Path, pathgenerator("aggregated.txt")]
"""Path for aggregated results"""
detailed: Annotated[Path, pathgenerator("detailed.dat")]
"""Path for detailed results"""
def task_outputs(self, dep):
return dep(
TrecAdhocResults(
id="",
results=self.aggregated,
detailed=self.detailed,
metrics=self.measures,
)
)
def _execute(self, run, assessments):
"""Evaluate an IR ad-hoc run with trec-eval"""
evaluator = get_evaluator([measure() for measure in self.measures], assessments)
def print_line(fp, measure, scope, value):
fp.write("{:25s} {:10s} {:.4f}\n".format(measure, scope, value))
with self.detailed.open("w") as fp:
for metric in evaluator.iter_calc(run):
print_line(fp, str(metric.measure), metric.query_id, metric.value)
with self.aggregated.open("w") as fp:
for key, value in evaluator.calc_aggregate(run).items():
print_line(fp, str(key), "all", value)
def get_run(retriever: Retriever, dataset: Adhoc):
"""Returns the scored documents for each topic in a dataset"""
results = retriever.retrieve_all(
{topic[IDItem].id: topic for topic in dataset.topics.iter()}
)
return {
qid: {sd.document[IDItem].id: sd.score for sd in scoredocs}
for qid, scoredocs in results.items()
}
def evaluate(retriever: Retriever, dataset: Adhoc, measures: List[str], details=False):
"""Evaluate a retriever on a given dataset
:param retriever: The retriever to evaluate
:param dataset: The dataset on which to evaluate
:param measures: The list of measures to compute (using ir_measures)
:param details: if query-level metrics should be reported, defaults to False
:return: The metrics (if details is False) or a tuple (metrics, detailed metrics)
"""
evaluator = get_evaluator(
[ir_measures.parse_measure(m) for m in measures], dataset.assessments
)
run = get_run(retriever, dataset)
aggregators = {m: m.aggregator() for m in evaluator.measures}
details = DefaultDict(lambda: {}) if details else None
for metric in evaluator.iter_calc(run):
aggregators[metric.measure].add(metric.value)
if details is not None:
details[str(metric.measure)][metric.query_id] = metric.value
metrics = {str(m): agg.result() for m, agg in aggregators.items()}
if details is not None:
return metrics, details
return metrics
[docs]class RunEvaluation(BaseEvaluation, Task):
"""Evaluate a run"""
run: Param[TrecAdhocRun]
assessments: Param[AdhocAssessments]
def execute(self):
run = ir_measures.read_trec_run(self.run.path, self.assessments)
return self._execute(run)
[docs]class Evaluate(BaseEvaluation, Task):
"""Evaluate a retriever directly (without generating the run explicitly)"""
dataset: Param[Adhoc]
"""The dataset for retrieval"""
retriever: Param[Retriever]
"""The retriever to evaluate"""
topic_wrapper: Param[Optional[TopicWrapper]] = None
"""Topic extractor"""
def execute(self):
self.retriever.initialize()
run = get_run(self.retriever, self.dataset)
self._execute(run, self.dataset.assessments)
class RetrieverFactory(Protocol):
"""Generates a retriever for a given dataset"""
def __call__(self, dataset: Documents) -> Retriever:
...
[docs]class Evaluations:
"""Holds experiment results for several models
on one dataset"""
dataset: Adhoc
measures: List[Measure]
results: List[BaseEvaluation]
per_tag: Dict[TagDict, AdhocResults]
topic_wrapper: Optional[TopicWrapper]
def __init__(
self,
dataset: Adhoc,
measures: List[Measure],
*,
topic_wrapper: Optional[TopicWrapper] = None,
):
self.dataset = dataset
self.measures = measures
self.results = []
self.per_tags = {}
self.topic_wrapper = topic_wrapper
[docs] def evaluate_retriever(
self,
retriever: Union[Retriever, RetrieverFactory],
launcher: Launcher = None,
init_tasks=[],
) -> Tuple[Retriever, AdhocResults]:
"""Evaluates a retriever"""
if not isinstance(retriever, Retriever):
retriever = retriever(self.dataset.documents)
evaluation: AdhocResults = Evaluate(
retriever=retriever,
measures=self.measures,
dataset=self.dataset,
topic_wrapper=self.topic_wrapper,
).submit(launcher=launcher, init_tasks=init_tasks)
self.add(evaluation)
# Use retriever tags
retriever_tags = tags(retriever)
if retriever_tags:
self.per_tags[retriever_tags] = evaluation
return retriever, evaluation
def add(self, *results: BaseEvaluation):
self.results.extend(results)
def output_results_per_tag(self, file=sys.stdout):
return self.to_dataframe().to_markdown(file)
def to_dataframe(self) -> pd.DataFrame:
# Get all the tags
tags = list(
set(chain(*[tags_dict.keys() for tags_dict in self.per_tags.keys()]))
)
tags.sort()
# Get all the results and metrics
to_process = []
metrics = set()
for tags_dict, evaluate in self.per_tags.items():
results = evaluate.get_results()
metrics.update(results.keys())
to_process.append((tags_dict, results))
# Table header
columns = []
for tag in tags:
columns.append(["tag", tag])
for metric in metrics:
columns.append(["metric", metric])
# Output the results
rows = []
for tags_dict, results in to_process:
row = []
# tag values
for k in tags:
row.append(str(tags_dict.get(k, "")))
# metric values
for metric in metrics:
row.append(results.get(metric, ""))
rows.append(row)
index = pd.MultiIndex.from_tuples(columns)
return pd.DataFrame(rows, columns=index)
[docs]class EvaluationsCollection:
"""A collection of evaluation
This is useful to group all the evaluations to be conducted, and then
to call the :py:meth:`evaluate_retriever`
"""
collection: Dict[str, Evaluations]
per_model: Dict[str, List[Tuple[str, AdhocResults]]]
"""List of results per model"""
def __init__(self, **collection: Evaluations):
self.collection = collection
self.per_model = {}
[docs] def evaluate_retriever(
self,
retriever: Union[Retriever, RetrieverFactory],
launcher: Launcher = None,
model_id: str = None,
overwrite: bool = False,
init_tasks=[],
):
"""Evaluate a retriever for all the evaluations in this collection (the
tasks are submitted to the experimaestro scheduler)"""
if model_id is not None and not overwrite:
assert (
model_id not in self.per_model
), f"Model with ID `{model_id}` was already evaluated"
results = []
for key, evaluations in self.collection.items():
_retriever, result = evaluations.evaluate_retriever(
retriever, launcher, init_tasks=init_tasks
)
results.append((key, result))
# Adds to per model results
if model_id is not None:
self.per_model[model_id] = results
return results
def output_results(self, file=sys.stdout):
"""Print all the results"""
for key, dsevaluations in self.collection.items():
print(f"## Dataset {key}\n", file=file) # noqa: T201
for evaluation in dsevaluations.results:
with evaluation.results.open("rt") as fp:
results = [f"- {line}" for line in fp.readlines()]
results = "".join(results)
print( # noqa: T201
f"### Results for {evaluation.__xpm__.tags()}" f"""\n{results}\n""",
file=file,
)
[docs] def to_dataframe(self) -> pd.DataFrame:
"""Returns a Pandas dataframe"""
all_data = []
for key, evaluations in self.collection.items():
data = evaluations.to_dataframe()
data["dataset"] = key
all_data.append(data)
return pd.concat(all_data, ignore_index=True)
def output_results_per_tag(self, file=sys.stdout):
"""Outputs the results for each collection, based on the retriever tags
to build the table
"""
# Loop over all collections
for key, evaluations in self.collection.items():
print(f"## Dataset {key}\n", file=file) # noqa: T201
evaluations.output_results_per_tag(file)
def output_model_results(self, model_id: str, file=sys.stdout):
"""Outputs the result of a model over various datasets (in markdown format)
:param model_id: The model id, as given by :meth:`evaluate_retriever`
:param file: The output stream, defaults to sys.stdout
"""
all_results = {}
all_metrics = set()
for key, evaluation in self.per_model[model_id]:
all_results[key] = evaluation.get_results()
all_metrics.update(all_results[key].keys())
all_metrics = sorted(all_metrics)
file.write(f"| Dataset | {' | '.join(all_metrics)} |\n")
file.write(f"|----| {'---|---'.join('' for _ in all_metrics)}---|\n")
for key, values in all_results.items():
file.write(f"| {key}")
for metric in all_metrics:
value = values.get(metric, "")
file.write(f" | {value}")
file.write(" |\n")