Source code for xpmir.text.huggingface
import re
import logging
import torch.nn as nn
from dataclasses import dataclass
from typing import List, Optional, Tuple, Union
import torch
from experimaestro.compat import cached_property
from experimaestro import Param, Constant, deprecate
from xpmir.distributed import DistributableModel
from xpmir.learning.optim import ModuleInitMode, ModuleInitOptions
from xpmir.text.encoders import (
Encoder,
TokensEncoder,
DualTextEncoder,
TextEncoder,
TripletTextEncoder,
)
from xpmir.utils.utils import easylog
from xpmir.learning.context import TrainerContext, TrainState
from xpmir.learning.parameters import ParametersIterator
from .tokenizers import ( # noqa: F401
HFTokenizer,
HFTokenizerBase,
HFTokenizerAdapter,
HFStringTokenizer,
HFListTokenizer,
)
from .encoders import ( # noqa: F401
HFModel,
HFTokensEncoder,
HFCLSEncoder,
)
from .base import HFMaskedLanguageModel # noqa: F401
try:
from transformers import (
AutoModel,
AutoTokenizer,
AutoConfig,
DataCollatorForLanguageModeling,
AutoModelForMaskedLM,
)
except Exception:
logging.error("Install huggingface transformers to use these configurations")
raise
from xpmir.text import TokenizedTexts
logger = easylog()
logger.setLevel(logging.INFO)
[docs]class BaseTransformer(Encoder):
"""Base transformer class from Huggingface"""
model_id: Param[str] = "bert-base-uncased"
"""Model ID from huggingface"""
trainable: Param[bool]
"""Whether BERT parameters should be trained"""
layer: Param[int] = 0
"""Layer to use (0 is the last, -1 to use them all)"""
dropout: Param[Optional[float]] = 0
"""(deprecated) Define a dropout for all the layers"""
CLS: int
SEP: int
@cached_property
def tokenizer(self):
return AutoTokenizer.from_pretrained(self.model_id, use_fast=True)
@property
def pad_tokenid(self) -> int:
return self.tokenizer.pad_token_id
@property
def automodel(self):
return AutoModel
def __initialize__(self, options: ModuleInitOptions):
"""Initialize the HuggingFace transformer
Args:
options: loader options
"""
super().__initialize__(options)
# Load the model configuration
config = AutoConfig.from_pretrained(self.model_id)
if self.dropout != 0:
config.hidden_dropout_prob = self.dropout
config.attention_probs_dropout_prob = self.dropout
if options.mode == ModuleInitMode.NONE or options.mode == ModuleInitMode.RANDOM:
self.model = self.automodel.from_config(config)
else:
self.model = self.automodel.from_pretrained(self.model_id, config=config)
# Loads the tokenizer
self._tokenizer = AutoTokenizer.from_pretrained(self.model_id, use_fast=True)
self.CLS = self.tokenizer.cls_token_id
self.SEP = self.tokenizer.sep_token_id
if self.trainable:
self.model.train()
else:
self.model.eval()
def parameters(self, recurse=True):
if self.trainable:
return super().parameters(recurse)
return []
def train(self, mode: bool = True):
# We should not make this layer trainable unless asked
if mode:
if self.trainable:
self.model.train(mode)
else:
self.model.train(mode)
def tokenize(self, text):
return self.tokenizer.tokenize(text)
def tok2id(self, tok):
return self.tokenizer.vocab[tok]
def static(self):
return not self.trainable
def batch_tokenize(
self,
texts: Union[List[str], List[Tuple[str, str]]],
batch_first=True,
maxlen=None,
mask=False,
) -> TokenizedTexts:
if maxlen is None:
maxlen = self.tokenizer.model_max_length
else:
maxlen = min(maxlen, self.tokenizer.model_max_length)
assert batch_first, "Batch first is the only option"
r = self.tokenizer(
list(texts),
max_length=maxlen,
truncation=True,
padding=True,
return_tensors="pt",
return_length=True,
return_attention_mask=mask,
)
return TokenizedTexts(
None,
r["input_ids"].to(self.device),
r["length"],
r.get("attention_mask", None),
r.get("token_type_ids", None), # if r["token_type_ids"] else None
)
def id2tok(self, idx):
if torch.is_tensor(idx):
if len(idx.shape) == 0:
return self.id2tok(idx.item())
return [self.id2tok(x) for x in idx]
# return self.tokenizer.ids_to_tokens[idx]
return self.tokenizer.id_to_token(idx)
def lexicon_size(self) -> int:
return self.tokenizer._tokenizer.get_vocab_size()
def maxtokens(self) -> int:
return self.tokenizer.model_max_length
def dim(self):
return self.model.config.hidden_size
@property
def vocab_size(self) -> int:
"""Returns the size of the vocabulary"""
return self.tokenizer.vocab_size
[docs]class TransformerTokensEncoder(BaseTransformer, TokensEncoder):
"""A tokens encoder based on HuggingFace"""
def forward(self, toks: TokenizedTexts, all_outputs=False):
outputs = self.model(
toks.ids.to(self.device),
attention_mask=toks.mask.to(self.device) if toks.mask is not None else None,
)
if all_outputs:
return outputs
return outputs.last_hidden_state
[docs]class SentenceTransformerTextEncoder(TextEncoder):
"""A Sentence Transformers text encoder"""
model_id: Param[str] = "sentence-transformers/all-MiniLM-L6-v2"
def __initialize__(self, options: ModuleInitOptions):
super().__initialize__(options)
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(self.model_id)
def forward(self, texts: List[str]) -> torch.Tensor:
return self.model.encode(texts)
[docs]class OneHotHuggingFaceEncoder(TextEncoder):
"""A tokenizer which encodes the tokens into 0 and 1 vector
1 represents the text contains the token and 0 otherwise"""
model_id: Param[str] = "bert-base-uncased"
"""Model ID from huggingface"""
maxlen: Param[Optional[int]] = None
"""Max length for texts"""
version: Constant[int] = 2
def __initialize__(self, options: ModuleInitOptions):
super().__initialize__()
self._tokenizer = AutoTokenizer.from_pretrained(self.model_id, use_fast=True)
self.CLS = self._tokenizer.cls_token_id
self.SEP = self._tokenizer.sep_token_id
self.PAD = self._tokenizer.pad_token_id
self._dummy_params = nn.Parameter(torch.Tensor())
@property
def device(self):
return self._dummy_params.device
@cached_property
def tokenizer(self):
return self._tokenizer
def batch_tokenize(self, texts):
r = self.tokenizer(
list(texts),
max_length=self.maxlen,
truncation=True,
padding=True,
return_tensors="pt",
)
return r["input_ids"]
def forward(self, texts: List[str]) -> torch.Tensor:
"""Returns a batch x vocab tensor"""
tokenized_ids = self.batch_tokenize(texts)
batch_size = len(texts)
x = torch.zeros(batch_size, self.dimension)
x[torch.arange(batch_size).unsqueeze(-1), tokenized_ids] = 1
x[:, [self.PAD, self.SEP, self.CLS]] = 0
return x.to(self.device)
@property
def dimension(self):
return self.tokenizer.vocab_size
def static(self):
return False
[docs]@deprecate
class HuggingfaceTokenizer(OneHotHuggingFaceEncoder):
"""The old encoder for one hot"""
pass
[docs]class TransformerEncoder(BaseTransformer, TextEncoder, DistributableModel):
"""Encodes using the [CLS] token"""
maxlen: Param[Optional[int]] = None
def forward(self, texts: List[str], maxlen=None):
tokenized = self.batch_tokenize(texts, maxlen=maxlen or self.maxlen, mask=True)
with torch.set_grad_enabled(torch.is_grad_enabled() and self.trainable):
y = self.model(tokenized.ids, attention_mask=tokenized.mask.to(self.device))
# Assumes that [CLS] is the first token
return y.last_hidden_state[:, 0]
@property
def dimension(self):
return self.dim()
def with_maxlength(self, maxlen: int):
return TransformerTextEncoderAdapter(encoder=self, maxlen=maxlen)
def distribute_models(self, update):
self.model = update(self.model)
[docs]class TransformerTextEncoderAdapter(TextEncoder, DistributableModel):
encoder: Param[TransformerEncoder]
maxlen: Param[Optional[int]] = None
def __initialize__(self, options: ModuleInitOptions):
self.encoder.__initialize__(options)
@property
def dimension(self):
return self.encoder.dimension
def forward(self, texts: List[str], maxlen=None):
return self.encoder.forward(texts, maxlen=self.maxlen)
def static(self):
return self.encoder.static()
@property
def vocab_size(self):
return self.encoder.vocab_size
def distribute_models(self, update):
self.encoder.model = update(self.encoder.model)
[docs]class DualTransformerEncoder(BaseTransformer, DualTextEncoder):
"""Encodes the (query, document pair) using the [CLS] token
maxlen: Maximum length of the query document pair (in tokens) or None if
using the transformer limit
"""
maxlen: Param[Optional[int]] = None
version: Constant[int] = 2
def forward(self, texts: List[Tuple[str, str]]):
tokenized = self.batch_tokenize(texts, maxlen=self.maxlen, mask=True)
with torch.set_grad_enabled(torch.is_grad_enabled() and self.trainable):
kwargs = {}
if tokenized.token_type_ids is not None:
kwargs["token_type_ids"] = tokenized.token_type_ids.to(self.device)
y = self.model(
tokenized.ids, attention_mask=tokenized.mask.to(self.device), **kwargs
)
# Assumes that [CLS] is the first token
return y.last_hidden_state[:, 0]
@property
def dimension(self) -> int:
return self.model.config.hidden_size
[docs]class DualDuoBertTransformerEncoder(BaseTransformer, TripletTextEncoder):
"""Vector encoding of a (query, document, document) triplet
Be like: [cls] query [sep] doc1 [sep] doc2 [sep]
"""
maxlen_query: Param[int] = 64
"""Maximum length for the query, the first document and the second one"""
maxlen_doc: Param[int] = 224
"""Maximum length for the query, the first document and the second one"""
def __initialize__(self, options: ModuleInitOptions):
super().__initialize__(options)
# Add an extra token type
data = self.model.embeddings.token_type_embeddings.weight.data
if len(data) < 3:
logger.info("Adding an extra token type in transformer")
data = torch.cat((data, torch.zeros(1, data.shape[1])))
self.model.embeddings.token_type_embeddings = nn.Embedding.from_pretrained(
data, freeze=False
)
def batch_tokenize(
self,
texts: List[Tuple[str, str, str]],
batch_first=True,
mask=False,
) -> TokenizedTexts:
assert batch_first, "Batch first is the only option"
query = self.tokenizer(
[triplet[0] for triplet in texts],
max_length=self.maxlen_query,
truncation=True,
)
document_1 = self.tokenizer(
[triplet[1] for triplet in texts],
max_length=self.maxlen_doc,
truncation=True,
)
document_2 = self.tokenizer(
[triplet[2] for triplet in texts],
max_length=self.maxlen_doc,
truncation=True,
)
new_input_ids = []
new_attention_mask = []
new_token_type_ids = []
length_factory = (
[]
) # [[query_length, document_1_length, document_2_length, total_length],..]
new_length = []
maxlen = 0
batch_size = len(query["input_ids"])
# calculate the maxlen of the sum for the 3 texts and stock them
for index in range(batch_size):
query_length = len(query["input_ids"][index])
document_1_length = len(document_1["input_ids"][index]) - 1
document_2_length = len(document_2["input_ids"][index]) - 1
total_length_at_index = query_length + document_1_length + document_2_length
if total_length_at_index > maxlen:
maxlen = total_length_at_index
length_factory.append(
[
query_length,
document_1_length,
document_2_length,
total_length_at_index,
]
)
for index in range(batch_size):
new_input_ids.append(
query["input_ids"][index]
+ document_1["input_ids"][index][1:]
+ document_2["input_ids"][index][1:]
+ [self.pad_tokenid] * (maxlen - length_factory[index][3])
)
new_attention_mask.append(
[1] * length_factory[index][3]
+ [0] * (maxlen - length_factory[index][3])
)
new_token_type_ids.append(
[0] * length_factory[index][0]
+ [1] * length_factory[index][1]
+ [2] * length_factory[index][2]
+ [0] * (maxlen - length_factory[index][3])
)
new_length.append(length_factory[index][3])
new_input_ids = torch.Tensor(new_input_ids).type(torch.long)
new_attention_mask = torch.Tensor(new_attention_mask).type(torch.long)
new_token_type_ids = torch.Tensor(new_token_type_ids).type(torch.long)
new_length = torch.Tensor(new_length).type(torch.long)
return TokenizedTexts(
None,
new_input_ids.to(self.device),
new_length,
new_attention_mask if mask else None,
new_token_type_ids.to(self.device),
)
def forward(self, texts: List[Tuple[str, str, str]]):
tokenized = self.batch_tokenize(texts, mask=True)
with torch.set_grad_enabled(torch.is_grad_enabled() and self.trainable):
y = self.model(
tokenized.ids,
token_type_ids=tokenized.token_type_ids,
attention_mask=tokenized.mask.to(self.device),
)
# Assumes that [CLS] is the first token
# shape of y.last_hidden_state: (1, len(texts), dimension)
return y.last_hidden_state[:, 0]
@property
def dimension(self) -> int:
return self.model.config.hidden_size
# def distribute_models(self, update):
# self.model = update(self.model)
@dataclass
class MLMModelOutput:
"""Format for the output of the model during Masked Language Modeling"""
logits: torch.LongTensor
labels: torch.Tensor
[docs]class MLMEncoder(BaseTransformer, DistributableModel):
"""Implementation of the encoder for the Masked Language Modeling task"""
maxlen: Param[Optional[int]] = None
mlm_probability: Param[float] = 0.2
"""Probability to mask tokens"""
datacollator: DataCollatorForLanguageModeling = None
@property
def automodel(self):
return AutoModelForMaskedLM
def __initialize__(self, options: ModuleInitOptions):
super().__initialize__(options)
self.datacollator = DataCollatorForLanguageModeling(
tokenizer=self.tokenizer,
mlm_probability=self.mlm_probability,
return_tensors="pt",
)
def forward(self, texts: List[str], info: TrainerContext = None) -> MLMModelOutput:
tokenized = self.batch_tokenize(texts, mask=True)
masked = self.datacollator.torch_mask_tokens(tokenized.ids.cpu())
with torch.set_grad_enabled(torch.is_grad_enabled() and self.trainable):
y = self.model(
input_ids=masked[0].to(self.device),
labels=masked[1].to(self.device),
attention_mask=tokenized.mask.to(self.device),
)
# Maybe easier to simply returns the object returned by the BertForMaskedLM?
return MLMModelOutput(logits=y.logits, labels=masked[1])
@property
def dimension(self) -> int:
return self.config.hidden_size
def distribute_models(self, update):
self.model = update(self.model)
[docs]class LayerSelector(ParametersIterator):
"""This class can be used to pick some of the transformer layers"""
# For freezing everything except the embeddings
re_layer: Param[str] = r"""(?:encoder|transformer)\.layer\.(\d+)\."""
transformer: Param[BaseTransformer]
"""The model for which layers are selected"""
pick_layers: Param[int] = 0
"""Counting from the first processing layers (can be negative, i.e. -1 meaning
until the last layer excluded, etc. / 0 means no layer)"""
select_embeddings: Param[bool] = False
"""Whether to pick the embeddings layer"""
select_feed_forward: Param[bool] = False
"""Whether to pick the feed forward of Transformer layers"""
def __post_init__(self):
self._re_layer = re.compile(self.re_layer)
def __validate__(self):
if (
not (self.select_embeddings or self.select_feed_forward)
and self.pick_layers == 0
):
raise AssertionError("The layer selector will select nothing")
@cached_property
def nlayers(self):
count = 0
for name, _ in self.transformer.model.named_parameters():
if m := self._re_layer.search(name):
count = max(count, int(m.group(1)))
return count
def should_pick(self, name: str) -> bool:
if self.select_embeddings and ("embeddings." in name):
return True
if self.select_feed_forward and ("intermediate" in name):
return True
if self.pick_layers != 0:
if m := self._re_layer.search(name):
layer = int(m.group(1))
if self.pick_layers < 0:
return layer <= self.nlayers + self.pick_layers
return layer < self.pick_layers
return False
def iter(self):
for name, params in self.transformer.model.named_parameters():
yield f"model.{name}", params, self.should_pick(name)
def after(self, state: TrainState):
if not self._initialized:
self._initialized = True
for name, param in self.transformer.model.named_parameters():
if self.should_freeze(name):
logger.info("Freezing layer %s", name)
param.requires_grad = False
[docs]class TransformerTokensEncoderWithMLMOutput(TransformerTokensEncoder):
"""Transformer that output logits over the vocabulary"""
@property
def automodel(self):
return AutoModelForMaskedLM