Source code for xpmir.evaluation

import logging
import sys
from itertools import chain
from attrs import define
from datamaestro_ir.interfaces.trec import write_run_dict
import pandas as pd
from pathlib import Path
from typing import DefaultDict, Dict, List, Protocol, Union, Optional
import ir_measures

from experimaestro import (
    Task,
    Param,
    Meta,
    field,
    pathgenerator,
    Annotated,
    tags,
    TagDict,
)
from datamaestro_ir.data import (
    Adhoc,
    AdhocAssessments,
    AdhocRun,
    AdhocRunDict,
    Documents,
    AdhocResults,
)
from datamaestro_ir.data.trec import TrecAdhocRun, TrecAdhocResults
from datamaestro_ir.transforms import TopicWrapper

from xpm_torch.configuration import FabricConfiguration

import xpmir.measures as m
from xpmir.measures import Measure
from xpmir.metrics import evaluator
from xpmir.rankers import Retriever, RunRetriever
from experimaestro.launchers import Launcher


logger = logging.getLogger(__name__)


def get_evaluator(metrics: List[ir_measures.Metric], assessments: AdhocAssessments):
    qrels = {
        assessedTopic.topic_id: {r.doc_id: r.rel for r in assessedTopic.assessments}
        for assessedTopic in assessments.iter()
    }
    return evaluator(metrics, qrels)


[docs] class BaseEvaluation(Task): """Base class for evaluation tasks""" measures: Param[List[Measure]] = field( default=[m.AP, m.P @ 20, m.nDCG, m.nDCG @ 20, m.RR], ignore_default=True ) """List of metrics""" aggregated: Annotated[Path, pathgenerator("aggregated.txt")] """Path for aggregated results""" detailed: Annotated[Path, pathgenerator("detailed.dat")] """Path for detailed results""" with_run: Param[bool] = field(default=False, ignore_default=True) """Saves the run together with the evaluation""" run_path: Annotated[Path, pathgenerator("run.txt")] """Path to save the run (TREC format). Only used if with_run is True""" def task_outputs(self, dep): results = dep( TrecAdhocResults.C( id="", results=self.aggregated, detailed=self.detailed, metrics=self.measures, ) ) if self.with_run: return results, dep(TrecAdhocRun.C(id="", path=self.run_path)) return results def _execute(self, run: AdhocRunDict, assessments): """Evaluate an IR ad-hoc run with trec-eval""" from experimaestro import taskglobals if taskglobals.Env.instance().slave: logging.info("Slave process: skipping evaluation write") return if self.with_run: logging.info("Writing the run") write_run_dict(run, self.run_path) evaluator = get_evaluator([measure() for measure in self.measures], assessments) def print_line(fp, measure, scope, value): fp.write("{:25s} {:10s} {:.4f}\n".format(measure, scope, value)) with self.detailed.open("w") as fp: for metric in evaluator.iter_calc(run): print_line(fp, str(metric.measure), metric.query_id, metric.value) with self.aggregated.open("w") as fp: for key, value in evaluator.calc_aggregate(run).items(): print_line(fp, str(key), "all", value)
def get_run(retriever: Retriever, dataset: Adhoc) -> AdhocRunDict: """Returns the scored documents for each topic in a dataset""" results = retriever.retrieve_all( {topic["id"]: topic for topic in dataset.topics.iter()} ) return { qid: {sd.document["id"]: sd.score for sd in scoredocs} for qid, scoredocs in results.items() } def evaluate(retriever: Retriever, dataset: Adhoc, measures: List[str], details=False): """Evaluate a retriever on a given dataset :param retriever: The retriever to evaluate :param dataset: The dataset on which to evaluate :param measures: The list of measures to compute (using ir_measures) :param details: if query-level metrics should be reported, defaults to False :return: The metrics (if details is False) or a tuple (metrics, detailed metrics) """ evaluator = get_evaluator( [ir_measures.parse_measure(m) for m in measures], dataset.assessments ) run = get_run(retriever, dataset) aggregators = {m: m.aggregator() for m in evaluator.measures} details = DefaultDict(lambda: {}) if details else None for metric in evaluator.iter_calc(run): aggregators[metric.measure].add(metric.value) if details is not None: details[str(metric.measure)][metric.query_id] = metric.value metrics = {str(m): agg.result() for m, agg in aggregators.items()} if details is not None: return metrics, details return metrics
[docs] class RunEvaluation(BaseEvaluation, Task): """Evaluate a run""" run: Param[TrecAdhocRun] assessments: Param[AdhocAssessments] def execute(self): run = ir_measures.read_trec_run(self.run.path, self.assessments) return self._execute(run)
[docs] class Evaluate(BaseEvaluation, Task): """Evaluate a retriever directly (without generating the run explicitly)""" dataset: Param[Adhoc] """The dataset for retrieval""" retriever: Param[Retriever] """The retriever to evaluate""" topic_wrapper: Param[Optional[TopicWrapper]] = field( default=None, ignore_default=True ) """Topic extractor""" fabric_config: Meta[FabricConfiguration] = field( default_factory=FabricConfiguration.C ) """Runtime configuration, managed by Fabric""" def execute(self): # 1. Initialize Fabric first fabric = self.fabric_config.get_fabric() fabric.launch() # 2. Initialize the retriever (loads the model) self.retriever.initialize() # 3. Wrap necessary children with fabric self.retriever.setup_with_fabric(fabric) run = get_run(self.retriever, self.dataset) if fabric.world_size > 1: fabric.barrier() self._execute(run, self.dataset.assessments)
class RetrieverFactory(Protocol): """Generates a retriever for a given dataset""" def __call__(self, dataset: Documents, key: str = None) -> Retriever: ...
[docs] class Evaluations: """Holds experiment results for several models on one dataset""" dataset: Adhoc measures: List[Measure] results: List[BaseEvaluation] per_tag: Dict[TagDict, AdhocResults] topic_wrapper: Optional[TopicWrapper] def __init__( self, dataset: Adhoc, measures: List[Measure], *, topic_wrapper: Optional[TopicWrapper] = None, ): self.dataset = dataset self.measures = measures self.results = [] self.per_tags = {} self.topic_wrapper = topic_wrapper
[docs] def evaluate_retriever( self, key: str, retriever: Union[Retriever, RetrieverFactory], launcher: Launcher = None, *, init_tasks=[], with_run=False, ) -> "EvaluationResult": """Evaluates a retriever :param key: test collection key :param retriever: the retriever (or the retriever factory) """ if not isinstance(retriever, Retriever): retriever = retriever(self.dataset.documents, key=key) task = Evaluate.C( retriever=retriever, measures=self.measures, dataset=self.dataset, topic_wrapper=self.topic_wrapper, with_run=with_run, ).tag("dataset", key) evaluation = task.submit(launcher=launcher, init_tasks=init_tasks) run = None if with_run: evaluation, run = evaluation self.add(evaluation) # Use retriever tags retriever_tags = tags(evaluation) if retriever_tags: self.per_tags[retriever_tags] = evaluation return EvaluationResult(key, evaluation, run, task)
def add(self, *results: BaseEvaluation): self.results.extend(results) def output_results_per_tag(self, file=sys.stdout): return self.to_dataframe().to_markdown(file) def to_dataframe(self) -> pd.DataFrame: # Get all the tags tags = list( set(chain(*[tags_dict.keys() for tags_dict in self.per_tags.keys()])) ) tags.sort() assert len(tags) > 0, ( "No tags found, please tag your evaluations to convert results to dataframe" ) # Get all the results and metrics to_process = [] metrics = set() for tags_dict, evaluate in self.per_tags.items(): try: results = evaluate.get_results() metrics.update(results.keys()) to_process.append((tags_dict, results)) except FileNotFoundError: logger.error("Cannot retrieve evaluation results for %s", tags_dict) # Sort metrics metrics = list(metrics) metrics.sort() # Table header columns = [] for tag in tags: columns.append(["tag", tag]) for metric in metrics: columns.append(["metric", metric]) # Output the results rows = [] for tags_dict, results in to_process: row = [] # tag values for k in tags: row.append(str(tags_dict.get(k, ""))) # metric values for metric in metrics: row.append(results.get(metric, "")) rows.append(row) index = pd.MultiIndex.from_tuples(columns) return pd.DataFrame(rows, columns=index)
@define class EvaluationResult: key: str """Dataset identifier""" result: AdhocResults """Results""" run: Optional[AdhocRun] """The run (if available)""" task: Evaluate """The task for this result""" class FutureEvaluationResult: """An evaluation that needs to be run""" key: str """Dataset identifier""" def __init__(self, key: str, evaluations: "Evaluations"): self.key = key self._evaluations = evaluations self._result: Optional[EvaluationResult] = None @property def result(self) -> AdhocResults: """Results""" return self.result.result @property def run(self) -> Optional[AdhocRun]: """The run (if available)""" return self.result.run @property def task(self) -> Evaluate: return self.result.result def evaluate(self, retriever: Retriever, **kwargs): self._result = self._evaluations.evaluate_retriever( self.key, retriever, **kwargs ) @property def dataset(self) -> "Adhoc": return self._evaluations.dataset AnyEvaluationResult = FutureEvaluationResult | EvaluationResult
[docs] class EvaluationsCollection: """A collection of evaluation This is useful to group all the evaluations to be conducted, and then to call the :py:meth:`evaluate_retriever` """ collection: Dict[str, Evaluations] per_model: Dict[str, List[AnyEvaluationResult]] """List of results per model""" def __init__(self, **collection: Evaluations): self.collection = collection self.per_model = {}
[docs] def evaluate_retriever( self, retriever: Union[Retriever, RetrieverFactory], launcher: Launcher = None, model_id: Optional[str] = None, overwrite: bool = False, with_run: bool = False, init_tasks=[], ) -> list[EvaluationResult]: """Evaluate a retriever for all the evaluations in this collection (the tasks are submitted to the experimaestro scheduler) :param with_run: should the run be preserved (default False). Note that this changes the experiment ID. """ if model_id is not None and not overwrite: assert model_id not in self.per_model, ( f"Model with ID `{model_id}` was already evaluated" ) results = [] for key, evaluations in self.collection.items(): result = evaluations.evaluate_retriever( key, retriever, launcher, init_tasks=init_tasks, with_run=with_run ) results.append(result) # Adds to per model results if model_id is not None: self.per_model[model_id] = results return results
def evaluations(self, model_id: str): """Returns a list of dataset to evaluate""" results: list[FutureEvaluationResult] = [] for key, evaluations in self.collection.items(): result = FutureEvaluationResult(key, evaluations) yield result results.append(result) # Adds to per model results if model_id is not None: self.per_model[model_id] = results def output_results(self, file=sys.stdout): """Print all the results""" for key, dsevaluations in self.collection.items(): print(f"## Dataset {key}\n", file=file) # noqa: T201 for evaluation in dsevaluations.results: with evaluation.results.open("rt") as fp: results = [f"- {line}" for line in fp.readlines()] results = "".join(results) print( # noqa: T201 f"### Results for {evaluation.__xpm__.tags()}" f"""\n{results}\n""", file=file, )
[docs] def to_dataframe(self) -> pd.DataFrame: """Returns a Pandas dataframe""" all_data = [] for key, evaluations in self.collection.items(): data = evaluations.to_dataframe() data["dataset"] = key all_data.append(data) return pd.concat(all_data, ignore_index=True)
def output_results_per_tag(self, file=sys.stdout): """Outputs the results for each collection, based on the retriever tags to build the table """ # Loop over all collections for key, evaluations in self.collection.items(): print(f"## Dataset {key}\n", file=file) # noqa: T201 evaluations.output_results_per_tag(file) def output_model_results(self, model_id: str, file=sys.stdout): """Outputs the result of a model over various datasets (in markdown format) :param model_id: The model id, as given by :meth:`evaluate_retriever` :param file: The output stream, defaults to sys.stdout """ all_results = {} all_metrics = set() for key, evaluation in self.per_model[model_id]: all_results[key] = evaluation.get_results() all_metrics.update(all_results[key].keys()) all_metrics = sorted(all_metrics) file.write(f"| Dataset | {' | '.join(all_metrics)} |\n") # noqa: E221 file.write(f"|----| {'---|---'.join('' for _ in all_metrics)}---|\n") for key, values in all_results.items(): file.write(f"| {key}") for metric in all_metrics: value = values.get(metric, "") file.write(f" | {value}") file.write(" |\n")
class MultiRunRetrieverFactory(RetrieverFactory): """A factory that returns the appropriate `RunRetriever` for a given dataset""" def __init__(self, retriever_name: str): self.retriever_name = retriever_name self.runs: Dict[str, AdhocRun] = {} self.documents: Dict[str, Documents] = {} def add_run(self, key: str, documents: Documents, run: AdhocRun): """Register a run for a given document collection""" if key in self.runs.keys(): logger.warning( f"{key} Retrival run already stored for {self.retriever_name}" ) self.runs[key] = run self.documents[key] = documents def __call__(self, dataset: Documents, key: str = None) -> RunRetriever: # Try to find the run by key first, then by dataset ID run = self.runs.get(key) if key else None if run is None: # Fallback to dataset ID if key not provided or not found # This is less specific but better than nothing for k, docs in self.documents.items(): if docs.id == dataset.id: run = self.runs[k] break if run is None: raise KeyError( f"No run found for dataset key='{key}' or id='{dataset.id}'" f"Available: {','.join(self.runs.keys())}" ) return RunRetriever.C(run=run, documents=dataset).tag( "first_stage", self.retriever_name ) @classmethod def from_results(cls, name: str, results: List) -> "MultiRunRetrieverFactory": factory = cls(name) for res in results: factory.add_run(res.key, res.task.dataset.documents, res.run) return factory