Source code for xpmir.text.huggingface
import os
import re
import logging
import torch.nn as nn
from dataclasses import dataclass
from typing import List, Optional, Tuple, Union
import torch
from experimaestro.compat import cached_property
from experimaestro import Param, Constant, deprecate
from xpmir.distributed import DistributableModel
from xpmir.learning.optim import ModuleInitMode, ModuleInitOptions
from xpmir.text.encoders import (
Encoder,
TokensEncoder,
DualTextEncoder,
TextEncoder,
TripletTextEncoder,
EncoderOutput,
RepresentationOutput,
)
from xpmir.utils.utils import easylog
from xpmir.learning.context import TrainerContext, TrainState
from xpmir.learning.parameters import ParametersIterator
from .tokenizers import ( # noqa: F401
HFTokenizer,
HFTokenizerBase,
HFTokenizerAdapter,
HFStringTokenizer,
HFListTokenizer,
)
from .encoders import ( # noqa: F401
HFModel,
HFTokensEncoder,
HFCLSEncoder,
)
from .base import HFMaskedLanguageModel # noqa: F401
try:
from transformers import (
AutoModel,
AutoTokenizer,
AutoConfig,
DataCollatorForLanguageModeling,
AutoModelForMaskedLM,
)
except Exception:
logging.error("Install huggingface transformers to use these configurations")
raise
from xpmir.text import TokenizedTexts
logger = easylog()
logger.setLevel(logging.INFO)
[docs]class BaseTransformer(Encoder):
"""Base transformer class from Huggingface"""
model_id: Param[str] = "bert-base-uncased"
"""Model ID from huggingface"""
trainable: Param[bool]
"""Whether BERT parameters should be trained"""
layer: Param[int] = 0
"""Layer to use (0 is the last, -1 to use them all)"""
dropout: Param[Optional[float]] = 0
"""(deprecated) Define a dropout for all the layers"""
CLS: int
SEP: int
@cached_property
def tokenizer(self):
return AutoTokenizer.from_pretrained(self.model_id, use_fast=True)
@property
def pad_tokenid(self) -> int:
return self.tokenizer.pad_token_id
@cached_property
def config(self):
return AutoConfig.from_pretrained(self.model_id)
@property
def automodel(self):
return AutoModel
def __initialize__(self, options: ModuleInitOptions):
"""Initialize the HuggingFace transformer
Args:
options: loader options
"""
super().__initialize__(options)
# Load the model configuration
if self.dropout != 0:
self.config.hidden_dropout_prob = self.dropout
self.config.attention_probs_dropout_prob = self.dropout
local_files_only = os.environ("HF_HUB_OFFLINE")
if options.mode == ModuleInitMode.NONE or options.mode == ModuleInitMode.RANDOM:
self.model = self.automodel.from_config(self.config)
else:
self.model = self.automodel.from_pretrained(
self.model_id, config=self.config, local_files_only=local_files_only
)
# Loads the tokenizer
self._tokenizer = AutoTokenizer.from_pretrained(
self.model_id, use_fast=True, local_files_only=local_files_only
)
self.CLS = self.tokenizer.cls_token_id
self.SEP = self.tokenizer.sep_token_id
if self.trainable:
self.model.train()
else:
self.model.eval()
def parameters(self, recurse=True):
if self.trainable:
return super().parameters(recurse)
return []
def train(self, mode: bool = True):
# We should not make this layer trainable unless asked
if mode:
if self.trainable:
self.model.train(mode)
else:
self.model.train(mode)
def tokenize(self, text):
return self.tokenizer.tokenize(text)
def tok2id(self, tok):
return self.tokenizer.vocab[tok]
def static(self):
return not self.trainable
def batch_tokenize(
self,
texts: Union[List[str], List[Tuple[str, str]]],
batch_first=True,
maxlen=None,
mask=False,
) -> TokenizedTexts:
if maxlen is None:
maxlen = self.tokenizer.model_max_length
else:
maxlen = min(maxlen, self.tokenizer.model_max_length)
assert batch_first, "Batch first is the only option"
r = self.tokenizer(
list(texts),
max_length=maxlen,
truncation=True,
padding=True,
return_tensors="pt",
return_length=True,
return_attention_mask=mask,
)
return TokenizedTexts(
None,
r["input_ids"].to(self.device),
r["length"],
r.get("attention_mask", None),
r.get("token_type_ids", None), # if r["token_type_ids"] else None
)
def id2tok(self, idx):
if torch.is_tensor(idx):
if len(idx.shape) == 0:
return self.id2tok(idx.item())
return [self.id2tok(x) for x in idx]
# return self.tokenizer.ids_to_tokens[idx]
return self.tokenizer.id_to_token(idx)
def lexicon_size(self) -> int:
return self.tokenizer._tokenizer.get_vocab_size()
def maxtokens(self) -> int:
try:
return self.tokenizer.max_model_length
except Exception:
logging.warning(
"No `max_model_length` in the tokenizer, "
"defaulting to `model.config.max_position_embeddings` instead"
)
return self.config.max_position_embeddings
def dim(self):
return self.config.hidden_size
@property
def vocab_size(self) -> int:
"""Returns the size of the vocabulary"""
return self.tokenizer.vocab_size
[docs]class TransformerTokensEncoder(BaseTransformer, TokensEncoder):
"""A tokens encoder based on HuggingFace"""
def forward(self, toks: TokenizedTexts, all_outputs=False):
outputs = self.model(
toks.ids.to(self.device),
attention_mask=toks.mask.to(self.device) if toks.mask is not None else None,
)
if all_outputs:
return outputs
return outputs.last_hidden_state
[docs]class SentenceTransformerTextEncoder(TextEncoder):
"""A Sentence Transformers text encoder"""
model_id: Param[str] = "sentence-transformers/all-MiniLM-L6-v2"
def __initialize__(self, options: ModuleInitOptions):
super().__initialize__(options)
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(self.model_id)
def forward(self, texts: List[str]) -> torch.Tensor:
return self.model.encode(texts)
[docs]class OneHotHuggingFaceEncoder(TextEncoder):
"""A tokenizer which encodes the tokens into 0 and 1 vector
1 represents the text contains the token and 0 otherwise"""
model_id: Param[str] = "bert-base-uncased"
"""Model ID from huggingface"""
maxlen: Param[Optional[int]] = None
"""Max length for texts"""
version: Constant[int] = 2
def __initialize__(self, options: ModuleInitOptions):
super().__initialize__()
self._tokenizer = AutoTokenizer.from_pretrained(self.model_id, use_fast=True)
self.CLS = self._tokenizer.cls_token_id
self.SEP = self._tokenizer.sep_token_id
self.PAD = self._tokenizer.pad_token_id
self._dummy_params = nn.Parameter(torch.Tensor())
@property
def device(self):
return self._dummy_params.device
@cached_property
def tokenizer(self):
return self._tokenizer
def batch_tokenize(self, texts):
r = self.tokenizer(
list(texts),
max_length=self.maxlen,
truncation=True,
padding=True,
return_tensors="pt",
)
return r["input_ids"]
def forward(self, texts: List[str]) -> torch.Tensor:
"""Returns a batch x vocab tensor"""
tokenized_ids = self.batch_tokenize(texts)
batch_size = len(texts)
x = torch.zeros(batch_size, self.dimension)
x[torch.arange(batch_size).unsqueeze(-1), tokenized_ids] = 1
x[:, [self.PAD, self.SEP, self.CLS]] = 0
return x.to(self.device)
@property
def dimension(self):
return self.tokenizer.vocab_size
def static(self):
return False
[docs]@deprecate
class HuggingfaceTokenizer(OneHotHuggingFaceEncoder):
"""The old encoder for one hot"""
pass
[docs]class TransformerEncoder(BaseTransformer, TextEncoder, DistributableModel):
"""Encodes using the [CLS] token"""
maxlen: Param[Optional[int]] = None
def forward(self, texts: List[str]):
tokenized = self.batch_tokenize(
texts,
maxlen=self.maxlen if self.maxlen is not None else self.maxtokens(),
mask=True,
)
with torch.set_grad_enabled(torch.is_grad_enabled() and self.trainable):
y = self.model(tokenized.ids, attention_mask=tokenized.mask.to(self.device))
# Assumes that [CLS] is the first token
return y.last_hidden_state[:, 0]
@property
def dimension(self):
return self.dim()
def with_maxlength(self, maxlen: int):
return TransformerTextEncoderAdapter(encoder=self, maxlen=maxlen)
def distribute_models(self, update):
self.model = update(self.model)
[docs]class TransformerTextEncoderAdapter(TextEncoder, DistributableModel):
encoder: Param[TransformerEncoder]
maxlen: Param[Optional[int]] = None
def __initialize__(self, options: ModuleInitOptions):
self.encoder.__initialize__(options)
@property
def dimension(self):
return self.encoder.dimension
def forward(self, texts: List[str], maxlen=None):
return self.encoder.forward(texts, maxlen=self.maxlen)
def static(self):
return self.encoder.static()
@property
def vocab_size(self):
return self.encoder.vocab_size
def distribute_models(self, update):
self.encoder.model = update(self.encoder.model)
[docs]class DualTransformerEncoder(BaseTransformer, DualTextEncoder):
"""Encodes the (query, document pair) using the [CLS] token
maxlen: Maximum length of the query document pair (in tokens) or None if
using the transformer limit
"""
maxlen: Param[Optional[int]] = None
version: Constant[int] = 2
def forward(self, texts: List[Tuple[str, str]]) -> EncoderOutput:
tokenized = self.batch_tokenize(
texts,
maxlen=self.maxlen if self.maxlen is not None else self.maxtokens(),
mask=True,
)
with torch.set_grad_enabled(torch.is_grad_enabled() and self.trainable):
kwargs = {}
if tokenized.token_type_ids is not None:
kwargs["token_type_ids"] = tokenized.token_type_ids.to(self.device)
y = self.model(
tokenized.ids, attention_mask=tokenized.mask.to(self.device), **kwargs
)
# Assumes that [CLS] is the first token
return RepresentationOutput(y.last_hidden_state[:, 0])
@property
def dimension(self) -> int:
return self.model.config.hidden_size
[docs]class DualDuoBertTransformerEncoder(BaseTransformer, TripletTextEncoder):
"""Vector encoding of a (query, document, document) triplet
Be like: [cls] query [sep] doc1 [sep] doc2 [sep]
"""
maxlen_query: Param[int] = 64
"""Maximum length for the query, the first document and the second one"""
maxlen_doc: Param[int] = 224
"""Maximum length for the query, the first document and the second one"""
def __initialize__(self, options: ModuleInitOptions):
super().__initialize__(options)
# Add an extra token type
data = self.model.embeddings.token_type_embeddings.weight.data
if len(data) < 3:
logger.info("Adding an extra token type in transformer")
data = torch.cat((data, torch.zeros(1, data.shape[1])))
self.model.embeddings.token_type_embeddings = nn.Embedding.from_pretrained(
data, freeze=False
)
def batch_tokenize(
self,
texts: List[Tuple[str, str, str]],
batch_first=True,
mask=False,
) -> TokenizedTexts:
assert batch_first, "Batch first is the only option"
query = self.tokenizer(
[triplet[0] for triplet in texts],
max_length=self.maxlen_query,
truncation=True,
)
document_1 = self.tokenizer(
[triplet[1] for triplet in texts],
max_length=self.maxlen_doc,
truncation=True,
)
document_2 = self.tokenizer(
[triplet[2] for triplet in texts],
max_length=self.maxlen_doc,
truncation=True,
)
new_input_ids = []
new_attention_mask = []
new_token_type_ids = []
length_factory = (
[]
) # [[query_length, document_1_length, document_2_length, total_length],..]
new_length = []
maxlen = 0
batch_size = len(query["input_ids"])
# calculate the maxlen of the sum for the 3 texts and stock them
for index in range(batch_size):
query_length = len(query["input_ids"][index])
document_1_length = len(document_1["input_ids"][index]) - 1
document_2_length = len(document_2["input_ids"][index]) - 1
total_length_at_index = query_length + document_1_length + document_2_length
if total_length_at_index > maxlen:
maxlen = total_length_at_index
length_factory.append(
[
query_length,
document_1_length,
document_2_length,
total_length_at_index,
]
)
for index in range(batch_size):
new_input_ids.append(
query["input_ids"][index]
+ document_1["input_ids"][index][1:]
+ document_2["input_ids"][index][1:]
+ [self.pad_tokenid] * (maxlen - length_factory[index][3])
)
new_attention_mask.append(
[1] * length_factory[index][3]
+ [0] * (maxlen - length_factory[index][3])
)
new_token_type_ids.append(
[0] * length_factory[index][0]
+ [1] * length_factory[index][1]
+ [2] * length_factory[index][2]
+ [0] * (maxlen - length_factory[index][3])
)
new_length.append(length_factory[index][3])
new_input_ids = torch.Tensor(new_input_ids).type(torch.long)
new_attention_mask = torch.Tensor(new_attention_mask).type(torch.long)
new_token_type_ids = torch.Tensor(new_token_type_ids).type(torch.long)
new_length = torch.Tensor(new_length).type(torch.long)
return TokenizedTexts(
None,
new_input_ids.to(self.device),
new_length,
new_attention_mask if mask else None,
new_token_type_ids.to(self.device),
)
def forward(self, texts: List[Tuple[str, str, str]]):
tokenized = self.batch_tokenize(texts, mask=True)
with torch.set_grad_enabled(torch.is_grad_enabled() and self.trainable):
y = self.model(
tokenized.ids,
token_type_ids=tokenized.token_type_ids,
attention_mask=tokenized.mask.to(self.device),
)
# Assumes that [CLS] is the first token
# shape of y.last_hidden_state: (1, len(texts), dimension)
return y.last_hidden_state[:, 0]
@property
def dimension(self) -> int:
return self.model.config.hidden_size
# def distribute_models(self, update):
# self.model = update(self.model)
@dataclass
class MLMModelOutput:
"""Format for the output of the model during Masked Language Modeling"""
logits: torch.LongTensor
labels: torch.Tensor
[docs]class MLMEncoder(BaseTransformer, DistributableModel):
"""Implementation of the encoder for the Masked Language Modeling task"""
maxlen: Param[Optional[int]] = None
mlm_probability: Param[float] = 0.2
"""Probability to mask tokens"""
datacollator: DataCollatorForLanguageModeling = None
@property
def automodel(self):
return AutoModelForMaskedLM
def __initialize__(self, options: ModuleInitOptions):
super().__initialize__(options)
self.datacollator = DataCollatorForLanguageModeling(
tokenizer=self.tokenizer,
mlm_probability=self.mlm_probability,
return_tensors="pt",
)
def forward(self, texts: List[str], info: TrainerContext = None) -> MLMModelOutput:
tokenized = self.batch_tokenize(texts, mask=True)
masked = self.datacollator.torch_mask_tokens(tokenized.ids.cpu())
with torch.set_grad_enabled(torch.is_grad_enabled() and self.trainable):
y = self.model(
input_ids=masked[0].to(self.device),
labels=masked[1].to(self.device),
attention_mask=tokenized.mask.to(self.device),
)
# Maybe easier to simply returns the object returned by the BertForMaskedLM?
return MLMModelOutput(logits=y.logits, labels=masked[1])
@property
def dimension(self) -> int:
return self.config.hidden_size
def distribute_models(self, update):
self.model = update(self.model)
[docs]class LayerSelector(ParametersIterator):
"""This class can be used to pick some of the transformer layers"""
# For freezing everything except the embeddings
re_layer: Param[str] = r"""(?:encoder|transformer)\.layer\.(\d+)\."""
transformer: Param[BaseTransformer]
"""The model for which layers are selected"""
pick_layers: Param[int] = 0
"""Counting from the first processing layers (can be negative, i.e. -1 meaning
until the last layer excluded, etc. / 0 means no layer)"""
select_embeddings: Param[bool] = False
"""Whether to pick the embeddings layer"""
select_feed_forward: Param[bool] = False
"""Whether to pick the feed forward of Transformer layers"""
def __post_init__(self):
self._re_layer = re.compile(self.re_layer)
def __validate__(self):
if (
not (self.select_embeddings or self.select_feed_forward)
and self.pick_layers == 0
):
raise AssertionError("The layer selector will select nothing")
@cached_property
def nlayers(self):
count = 0
for name, _ in self.transformer.model.named_parameters():
if m := self._re_layer.search(name):
count = max(count, int(m.group(1)))
return count
def should_pick(self, name: str) -> bool:
if self.select_embeddings and ("embeddings." in name):
return True
if self.select_feed_forward and ("intermediate" in name):
return True
if self.pick_layers != 0:
if m := self._re_layer.search(name):
layer = int(m.group(1))
if self.pick_layers < 0:
return layer <= self.nlayers + self.pick_layers
return layer < self.pick_layers
return False
def iter(self):
for name, params in self.transformer.model.named_parameters():
yield f"model.{name}", params, self.should_pick(name)
def after(self, state: TrainState):
if not self._initialized:
self._initialized = True
for name, param in self.transformer.model.named_parameters():
if self.should_freeze(name):
logger.info("Freezing layer %s", name)
param.requires_grad = False
[docs]class TransformerTokensEncoderWithMLMOutput(TransformerTokensEncoder):
"""Transformer that output logits over the vocabulary"""
@property
def automodel(self):
return AutoModelForMaskedLM