|
import torch |
|
|
|
from transformers.utils import ModelOutput |
|
from typing import Any, Dict |
|
|
|
|
|
def update_causal_attention_mask(attention_mask, cache=False): |
|
""" |
|
Updates a causal attention mask by expanding it to (n+1, n+1) during generation. |
|
|
|
Parameters: |
|
attention_mask (torch.Tensor): Current causal attention mask of shape (1, 1, n, n). |
|
|
|
Returns: |
|
torch.Tensor: Updated causal attention mask of shape (1, 1, n+1, n+1). |
|
""" |
|
|
|
_, _, n, _ = attention_mask.shape |
|
|
|
|
|
new_row = torch.full((1, 1, 1, n), 1, device=attention_mask.device) |
|
new_col = torch.full((1, 1, n+1, 1), 0, device=attention_mask.device) |
|
|
|
new_col[0, 0, -1, -1] = 1 |
|
|
|
|
|
attention_mask = torch.cat([attention_mask, new_row], dim=2) |
|
attention_mask = torch.cat([attention_mask, new_col], dim=3) |
|
|
|
if cache: |
|
return attention_mask[:, :, -1:, :] |
|
else: |
|
return attention_mask |
|
|
|
|
|
def _aki_update_model_kwargs_for_generation( |
|
self, |
|
outputs: ModelOutput, |
|
model_kwargs: Dict[str, Any], |
|
is_encoder_decoder: bool = False, |
|
standardize_cache_format: bool = False, |
|
num_new_tokens: int = 1, |
|
) -> Dict[str, Any]: |
|
|
|
model_kwargs["past_key_values"] = self._extract_past_from_model_output( |
|
outputs, standardize_cache_format=standardize_cache_format |
|
) |
|
if getattr(outputs, "state", None) is not None: |
|
model_kwargs["state"] = outputs.state |
|
|
|
|
|
if "token_type_ids" in model_kwargs: |
|
token_type_ids = model_kwargs["token_type_ids"] |
|
model_kwargs["token_type_ids"] = torch.cat([token_type_ids, token_type_ids[:, -1].unsqueeze(-1)], dim=-1) |
|
|
|
if not is_encoder_decoder: |
|
|
|
if "attention_mask" in model_kwargs: |
|
|
|
attention_mask = model_kwargs["attention_mask"] |
|
|
|
model_kwargs["attention_mask"] = torch.full((1, attention_mask.shape[-1]+1), 1, device=attention_mask.device) |
|
else: |
|
|
|
if "decoder_attention_mask" in model_kwargs: |
|
decoder_attention_mask = model_kwargs["decoder_attention_mask"] |
|
model_kwargs["decoder_attention_mask"] = torch.cat( |
|
[decoder_attention_mask, decoder_attention_mask.new_ones((decoder_attention_mask.shape[0], 1))], |
|
dim=-1, |
|
) |
|
|
|
if ( |
|
model_kwargs.get("use_cache", True) |
|
and "cache_position" in model_kwargs |
|
and model_kwargs["cache_position"] is not None |
|
): |
|
model_kwargs["cache_position"] = model_kwargs["cache_position"][-1:] + num_new_tokens |
|
|
|
|
|
position_ids = torch.arange(model_kwargs["past_key_values"][0][0].shape[2]+1, device=model_kwargs["attention_mask"].device).unsqueeze(0) |
|
if model_kwargs.get("past_key_values", None) is not None: |
|
position_ids = position_ids[:, -1:] |
|
|
|
model_kwargs["position_ids"] = position_ids |
|
|
|
return model_kwargs |