from typing import Iterable, Iterator, List, Optional, Set, Union
from pathlib import Path
from experimaestro import (
Param,
Config,
Task,
tqdm,
cache,
pathgenerator,
Annotated,
Meta,
)
from experimaestro.compat import cached_property
from datamaestro_text.data.ir import (
Adhoc,
AdhocAssessments,
Document,
DocumentStore,
Documents,
Topics,
)
from datamaestro_text.data.ir.trec import TrecAdhocAssessments
from datamaestro_text.data.ir.csv import Topics as CSVTopics
from xpmir.rankers import Retriever
from xpmir.utils.utils import easylog
logger = easylog()
[docs]class TopicFold(Topics):
"""ID-based topic selection"""
ids: Param[List[str]]
"""A set of the ids for the topics where we select from"""
topics: Param[Topics]
"""The collection of the topics"""
def iter(self):
ids = set(self.ids)
for topic in self.topics.iter():
if topic.qid in ids:
yield topic
[docs]class AdhocAssessmentFold(AdhocAssessments):
"""Filter assessments by topic ID"""
ids: Param[List[str]]
"""A set of the ids for the assessments where we select from"""
qrels: Param[AdhocAssessments]
"""The collection of the assessments"""
@cache("assessements.qrels")
def trecpath(self, path):
ids = set(self.ids)
if not path.is_file():
with path.open("wt") as fp:
for qrels in self.iter():
if qrels.qid in ids:
for qrel in qrels.assessments:
fp.write(f"""{qrels.qid} 0 {qrel.docid} {qrel.rel}\n""")
return path
def iter(self):
ids = set(self.ids)
for qrels in self.qrels.iter():
if qrels.qid in ids:
yield qrels
def fold(ids: Iterable[str], dataset: Adhoc):
"""Returns a fold of a dataset, given topic ids"""
ids = sorted(list(ids))
topics = TopicFold(topics=dataset.topics, ids=ids)
qrels = AdhocAssessmentFold(assessments=dataset.assessments, ids=ids)
return Adhoc(topics=topics, assessments=qrels, documents=dataset.documents)
[docs]class ConcatFold(Task):
"""
Concatenation of several datasets to get a full dataset.
"""
datasets: Param[List[Adhoc]]
"""The list of Adhoc datasets to concatenate"""
assessments: Annotated[Path, pathgenerator("assessments.tsv")]
"""Generated assessments file"""
topics: Annotated[Path, pathgenerator("topics.tsv")]
"""Generated topics file"""
def task_outputs(self, dep) -> Adhoc:
dataset_document_id = set(dataset.document.id for dataset in self.datasets)
assert (
len(dataset_document_id) == 1
), "At the moment only one set of documents supported."
return Adhoc(
id="", # No need to have a more specific id since it is generated
topics=dep(CSVTopics(id="", path=self.topics)),
assessments=dep(TrecAdhocAssessments(id="", path=self.assessments)),
documents=self.datasets[0].documents,
)
def execute(self):
topics = []
# concat the topics
for dataset in self.datasets:
topics.extend([topic for topic in dataset.topics.iter()])
# Write topics and assessments
ids = set()
self.topics.parent.mkdir(parents=True, exist_ok=True)
with self.topics.open("wt") as fp:
for topic in topics:
ids.add(topic.qid)
slash_t = "\t"
fp.write(f"""{topic.qid}\t{topic.text.replace(slash_t, ' ')}\n""")
with self.assessments.open("wt") as fp:
for dataset in self.datasets:
for qrels in dataset.assessments.iter():
if qrels.qid in ids:
for qrel in qrels.assessments:
fp.write(f"""{qrels.qid} 0 {qrel.docid} {qrel.rel}\n""")
[docs]class RandomFold(Task):
"""Extracts a random subset of topics from a dataset"""
seed: Param[int]
"""Random seed used to compute the fold"""
sizes: Param[List[float]]
"""Number of topics of each fold (or percentage if sums to 1)"""
dataset: Param[Adhoc]
"""The Adhoc dataset from which a fold is extracted"""
fold: Param[int]
"""Which fold should be taken"""
exclude: Param[Optional[Topics]]
"""Exclude some topics from the random fold"""
assessments: Annotated[Path, pathgenerator("assessments.tsv")]
"""Generated assessments file"""
topics: Annotated[Path, pathgenerator("topics.tsv")]
"""Generated topics file"""
def __validate__(self):
assert self.fold < len(self.sizes)
[docs] @staticmethod
def folds(
seed: int,
sizes: List[float],
dataset: Param[Adhoc],
exclude: Param[Optional[Topics]] = None,
submit=True,
):
"""Creates folds
Parameters:
- submit: if true (default), submits the fold tasks to experimaestro
"""
folds = []
for ix in range(len(sizes)):
fold = RandomFold(
seed=seed, sizes=sizes, dataset=dataset, exclude=exclude, fold=ix
)
if submit:
fold = fold.submit()
folds.append(fold)
return folds
def task_outputs(self, dep) -> Adhoc:
return dep(
Adhoc(
id="", # No need to have a more specific id since it is generated
topics=dep(CSVTopics(id="", path=self.topics)),
assessments=dep(TrecAdhocAssessments(id="", path=self.assessments)),
documents=self.dataset.documents,
)
)
def execute(self):
import numpy as np
# Get topics
badids = (
set(topic.get_id() for topic in self.exclude.iter())
if self.exclude
else set()
)
topics = [
topic
for topic in self.dataset.topics.iter()
if topic.get_id() not in badids
]
random = np.random.RandomState(self.seed)
random.shuffle(topics)
# Get the fold
sizes = np.array([0.0] + self.sizes)
s = sizes.sum()
if abs(s - 1) < 1e-6:
sizes = np.round(len(topics) * sizes)
sizes = np.round(len(topics) * sizes / sizes.sum())
assert sizes[self.fold + 1] > 0
indices = sizes.cumsum().astype(int)
topics = topics[indices[self.fold] : indices[self.fold + 1]]
# Write topics and assessments
ids = set()
self.topics.parent.mkdir(parents=True, exist_ok=True)
with self.topics.open("wt") as fp:
for topic in topics:
ids.add(topic.get_id())
fp.write(f"""{topic.get_id()}\t{topic.get_text()}\n""")
with self.assessments.open("wt") as fp:
for qrels in self.dataset.assessments.iter():
if qrels.topic_id in ids:
for qrel in qrels.assessments:
fp.write(f"""{qrels.topic_id} 0 {qrel.doc_id} {qrel.rel}\n""")
[docs]class DocumentSubset(Documents):
"""ID-based topic selection"""
base: Param[DocumentStore]
"""The full document store"""
docids_path: Meta[Path]
"""Path to the file containing the document IDs"""
in_memory: Meta[bool] = False
"""Whether to load the dataset in memory"""
def __len__(self):
return len(self.docids)
def __post_init__(self):
super().__post_init__()
self.cache = {}
@property
def documentcount(self):
return len(self.docids)
def document_ext(self, docid: str):
if self.in_memory:
if doc := self.cache.get(docid, None):
return doc
doc = self.base.document_ext(docid)
self.cache[docid] = doc
return doc
return self.base.document_ext(docid)
def __getitem__(self, slice: Union[int, slice]):
docids = self.docids[slice]
if isinstance(docids, List):
return DocumentSubsetSlice(self, self.docids[slice])
return self.document_ext(docids)
@cached_property
def docids(self) -> List[str]:
# Read document IDs
with self.docids_path.open("rt") as fp:
return [line.strip() for line in fp]
def iter_ids(self):
yield from self.docids
def iter(self) -> Iterator[Document]:
for docid in self.iter_ids():
content = self.base.document_text(docid)
yield Document(docid, content)
class DocumentSubsetSlice:
"""A slice of a `DocumentSubset`"""
def __init__(self, subset: DocumentSubset, doc_ids: List[int]):
self.subset = subset
self.doc_ids = doc_ids
def __iter__(self):
for docid in self.doc_ids:
yield self.subset.document_ext(docid)
def __len__(self):
return len(self.doc_ids)
def __getitem__(self, ix):
return self.subset.document_ext(self.doc_ids[ix])
[docs]class RetrieverBasedCollection(Task):
"""Buils a subset of documents based on the output of a set of retrievers
and on relevance assessment.
First get all the document based on the assessment then add the retrieved ones.
"""
relevance_threshold: Param[float] = 0
"""Relevance threshold"""
dataset: Param[Adhoc]
"""A dataset"""
retrievers: Param[List[Retriever]]
"""Rankers"""
keepRelevant: Param[bool] = True
"""Keep documents judged relevant"""
keepNotRelevant: Param[bool] = False
"""Keep documents judged not relevant"""
docids_path: Annotated[Path, pathgenerator("docids.txt")]
"""The file containing the document identifiers of the collection"""
def __validate__(self):
assert len(self.retrievers) > 0, "At least one retriever should be given"
def task_outputs(self, dep) -> Adhoc:
return Adhoc(
id="", # No need to have a more specific id since it is generated
topics=self.dataset.topics,
assessments=self.dataset.assessments,
documents=dep(
DocumentSubset(
id="", base=self.dataset.documents, docids_path=self.docids_path
)
),
)
def execute(self):
for retriever in self.retrievers:
retriever.initialize()
# Selected document IDs
docids: Set[str] = set()
topics = {t.topic_id: t for t in self.dataset.assessments.iter()}
# Retrieve all documents
for topic in tqdm(
self.dataset.topics.iter(), total=self.dataset.topics.count()
):
qrels = topics.get(topic.get_id())
if qrels is None:
logger.warning(
"Skipping topic %s [%s], (no assessment)",
topic.get_id(),
topic.get_text(),
)
continue
# Add (not) relevant documents
if self.keepRelevant:
docids.update(
a.doc_id
for a in qrels.assessments
if a.rel > self.relevance_threshold
)
if self.keepNotRelevant:
docids.update(
a.doc_id
for a in qrels.assessments
if a.rel <= self.relevance_threshold
)
# Retrieve and add
# already defined the numbers to retrieve inside the retriever, so
# don't need to worry about the threshold here
for retriever in self.retrievers:
docids.update(
sd.document.get_id() for sd in retriever.retrieve(topic.text)
)
# Write the document IDs
with self.docids_path.open("wt") as fp:
fp.writelines(f"{docid}\n" for docid in docids)
[docs]class TextStore(Config):
"""Associates an ID with a text"""
def __getitem__(self, key: str) -> str:
raise NotImplementedError()
[docs]class MemoryTopicStore(TextStore):
"""View a set of topics as a (in memory) text store"""
topics: Param[Topics]
"""The collection of the topics to build the store"""
@cached_property
def store(self):
return {topic.get_id(): topic.text for topic in self.topics.iter()}
def __getitem__(self, key: str) -> str:
return self.store[key]