# Copyright 2022-2024 MTS (Mobile Telesystems)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import typing as tp
from copy import deepcopy
import implicit.gpu
import numpy as np
from implicit.cpu.als import AlternatingLeastSquares as CPUAlternatingLeastSquares
from implicit.gpu.als import AlternatingLeastSquares as GPUAlternatingLeastSquares
from implicit.utils import check_random_state
from scipy import sparse
from tqdm.auto import tqdm
from rectools.dataset import Dataset, Features
from rectools.exceptions import NotFittedError
from .rank import Distance
from .vector import Factors, VectorModel
AVAILABLE_RECOMMEND_METHODS = ("loop",)
AnyAlternatingLeastSquares = tp.Union[CPUAlternatingLeastSquares, GPUAlternatingLeastSquares]
[docs]class ImplicitALSWrapperModel(VectorModel):
"""
Wrapper for `implicit.als.AlternatingLeastSquares`
with possibility to use explicit features and GPU support.
See https://implicit.readthedocs.io/en/latest/als.html for details of base model.
Parameters
----------
model : AnyAlternatingLeastSquares
Base model that will be used.
verbose : int, default 0
Degree of verbose output. If 0, no output will be provided.
fit_features_together: bool, default False
Whether fit explicit features together with latent features or not.
Used only if explicit features are present in dataset.
See documentations linked above for details.
"""
recommends_for_warm = False
recommends_for_cold = False
u2i_dist = Distance.DOT
i2i_dist = Distance.COSINE
def __init__(self, model: AnyAlternatingLeastSquares, verbose: int = 0, fit_features_together: bool = False):
super().__init__(verbose=verbose)
self.model: AnyAlternatingLeastSquares
self._model = model # for refit; TODO: try to do it better
self.fit_features_together = fit_features_together
self.use_gpu = isinstance(model, GPUAlternatingLeastSquares)
if not self.use_gpu:
self.n_threads = model.num_threads
def _fit(self, dataset: Dataset) -> None: # type: ignore
self.model = deepcopy(self._model)
ui_csr = dataset.get_user_item_matrix(include_weights=True).astype(np.float32)
if self.fit_features_together:
fit_als_with_features_together_inplace(
self.model,
ui_csr,
dataset.get_hot_user_features(),
dataset.get_hot_item_features(),
self.verbose,
)
else:
fit_als_with_features_separately_inplace(
self.model,
ui_csr,
dataset.get_hot_user_features(),
dataset.get_hot_item_features(),
self.verbose,
)
def _get_users_factors(self, dataset: Dataset) -> Factors:
return Factors(get_users_vectors(self.model))
def _get_items_factors(self, dataset: Dataset) -> Factors:
return Factors(get_items_vectors(self.model))
[docs] def get_vectors(self) -> tp.Tuple[np.ndarray, np.ndarray]:
"""
Return user and item vector representations from fitted model.
Returns
-------
(np.ndarray, np.ndarray)
User and item embeddings.
Shapes are (n_users, n_factors) and (n_items, n_factors).
"""
if not self.is_fitted:
raise NotFittedError(self.__class__.__name__)
return get_users_vectors(self.model), get_items_vectors(self.model)
[docs]def get_users_vectors(model: AnyAlternatingLeastSquares) -> np.ndarray:
"""
Get users vectors from ALS model as numpy array
Parameters
----------
model : AnyAlternatingLeastSquares
Model to get vectors from. Can be CPU or GPU model
Returns
-------
np.ndarray
User vectors
"""
if isinstance(model, GPUAlternatingLeastSquares): # pragma: no cover
return model.user_factors.to_numpy()
return model.user_factors
[docs]def get_items_vectors(model: AnyAlternatingLeastSquares) -> np.ndarray:
"""
Get items vectors from ALS model as numpy array
Parameters
----------
model : AnyAlternatingLeastSquares
Model to get vectors from. Can be CPU or GPU model
Returns
-------
np.ndarray
Item vectors
"""
if isinstance(model, GPUAlternatingLeastSquares): # pragma: no cover
return model.item_factors.to_numpy()
return model.item_factors
[docs]def fit_als_with_features_separately_inplace(
model: AnyAlternatingLeastSquares,
ui_csr: sparse.csr_matrix,
user_features: tp.Optional[Features],
item_features: tp.Optional[Features],
verbose: int = 0,
) -> None:
"""
Fit ALS model with explicit features, explicit features fit separately from latent.
Parameters
----------
model: AnyAlternatingLeastSquares
Base model to fit.
ui_csr : sparse.csr_matrix
Matrix of interactions.
user_features : (SparseFeatures | DenseFeatures), optional
Explicit user features.
item_features : (SparseFeatures | DenseFeatures), optional
Explicit item features.
verbose : int
Whether to print output.
"""
iu_csr = ui_csr.T.tocsr(copy=False)
model.fit(ui_csr, show_progress=verbose > 0)
user_factors_chunks = [get_users_vectors(model)]
item_factors_chunks = [get_items_vectors(model)]
if user_features is not None:
user_feature_factors = user_features.get_dense()
item_factors_paired_to_user_features = _fit_paired_factors(model, iu_csr, user_feature_factors)
user_factors_chunks.append(user_feature_factors)
item_factors_chunks.append(item_factors_paired_to_user_features)
if item_features is not None:
item_feature_factors = item_features.get_dense()
user_factors_paired_to_item_features = _fit_paired_factors(model, ui_csr, item_feature_factors)
item_factors_chunks.append(item_feature_factors)
user_factors_chunks.append(user_factors_paired_to_item_features)
user_factors = np.hstack(user_factors_chunks)
item_factors = np.hstack(item_factors_chunks)
if isinstance(model, GPUAlternatingLeastSquares): # pragma: no cover
user_factors = implicit.gpu.Matrix(user_factors)
item_factors = implicit.gpu.Matrix(item_factors)
model.user_factors = user_factors
model.item_factors = item_factors
def _fit_paired_factors(
model: AnyAlternatingLeastSquares, xy_csr: sparse.csr_matrix, y_factors: np.ndarray
) -> np.ndarray:
features_model_params = {
"factors": y_factors.shape[1],
"regularization": model.regularization,
"alpha": model.alpha,
"dtype": model.dtype,
"iterations": 1,
"random_state": model.random_state,
}
if isinstance(model, GPUAlternatingLeastSquares): # pragma: no cover
features_model = GPUAlternatingLeastSquares(**features_model_params)
features_model.item_factors = implicit.gpu.Matrix(y_factors)
features_model.fit(xy_csr)
x_factors = features_model.user_factors.to_numpy()
else:
features_model_params.update(
{
"num_threads": model.num_threads,
"use_native": model.use_native,
"use_cg": model.use_cg,
}
)
features_model = CPUAlternatingLeastSquares(**features_model_params)
features_model.item_factors = y_factors.copy()
features_model.fit(xy_csr)
x_factors = features_model.user_factors
return x_factors
def _init_latent_factors_cpu(
model: CPUAlternatingLeastSquares, n_users: int, n_items: int
) -> tp.Tuple[np.ndarray, np.ndarray]:
"""Logic is copied and pasted from original implicit library code"""
random_state = check_random_state(model.random_state)
if model.user_factors is None:
user_latent_factors = random_state.random((n_users, model.factors)) * 0.01
else:
user_latent_factors = model.user_factors
if model.item_factors is None:
item_latent_factors = random_state.random((n_items, model.factors)) * 0.01
else:
item_latent_factors = model.item_factors
return user_latent_factors, item_latent_factors
def _init_latent_factors_gpu(
model: GPUAlternatingLeastSquares, n_users: int, n_items: int
) -> tp.Tuple[np.ndarray, np.ndarray]: # pragma: no cover
"""Logic is copied and pasted from original implicit library code"""
random_state = check_random_state(model.random_state)
if model.user_factors is None:
user_latent_factors = random_state.uniform(
low=-0.5 / model.factors, high=0.5 / model.factors, size=(n_users, model.factors)
)
else:
user_latent_factors = model.user_factors.to_numpy()
if model.item_factors is None:
item_latent_factors = random_state.uniform(
low=-0.5 / model.factors, high=0.5 / model.factors, size=(n_items, model.factors)
)
else:
item_latent_factors = model.item_factors.to_numpy()
return user_latent_factors, item_latent_factors
[docs]def fit_als_with_features_together_inplace(
model: AnyAlternatingLeastSquares,
ui_csr: sparse.csr_matrix,
user_features: tp.Optional[Features],
item_features: tp.Optional[Features],
verbose: int = 0,
) -> None:
"""
Fit ALS model with explicit features, explicit features fit together with latent.
Parameters
----------
model: AnyAlternatingLeastSquares
Base model to fit.
ui_csr : sparse.csr_matrix
Matrix of interactions.
user_features : (SparseFeatures | DenseFeatures), optional
Explicit user features.
item_features : (SparseFeatures | DenseFeatures), optional
Explicit item features.
verbose : int
Whether to print output.
"""
n_users, n_items = ui_csr.shape
# Prepare explicit factors
user_explicit_factors: np.ndarray
if user_features is None:
user_explicit_factors = np.array([]).reshape((n_users, 0))
else:
user_explicit_factors = user_features.get_dense()
n_user_explicit_factors = user_explicit_factors.shape[1]
item_explicit_factors: np.ndarray
if item_features is None:
item_explicit_factors = np.array([]).reshape((n_items, 0))
else:
item_explicit_factors = item_features.get_dense()
n_item_explicit_factors = item_explicit_factors.shape[1]
# Prepare latent factors with the same math logic as in implicit library
if isinstance(model, GPUAlternatingLeastSquares): # pragma: no cover
user_latent_factors, item_latent_factors = _init_latent_factors_gpu(model, n_users, n_items)
else:
user_latent_factors, item_latent_factors = _init_latent_factors_cpu(model, n_users, n_items)
# Fix number of factors
n_latent_factors = model.factors
model.factors = n_latent_factors + n_user_explicit_factors + n_item_explicit_factors
# Prepare paired factors
user_factors_paired_to_items = np.zeros((n_users, n_item_explicit_factors))
item_factors_paired_to_users = np.zeros((n_items, n_user_explicit_factors))
# Make full factors
user_factors = np.hstack(
(
user_explicit_factors,
user_latent_factors,
user_factors_paired_to_items,
)
).astype(model.dtype)
item_factors = np.hstack(
(
item_factors_paired_to_users,
item_latent_factors,
item_explicit_factors,
)
).astype(model.dtype)
# Give the positive examples more weight if asked for (implicit library logic copy)
ui_csr = model.alpha * ui_csr
if isinstance(model, GPUAlternatingLeastSquares): # pragma: no cover
_fit_combined_factors_on_gpu_inplace(
model,
ui_csr,
user_factors,
item_factors,
n_user_explicit_factors,
n_item_explicit_factors,
verbose,
)
else:
_fit_combined_factors_on_cpu_inplace(
model,
ui_csr,
user_factors,
item_factors,
n_user_explicit_factors,
n_item_explicit_factors,
verbose,
)
def _fit_combined_factors_on_cpu_inplace(
model: CPUAlternatingLeastSquares,
ui_csr: sparse.csr_matrix,
user_factors: np.ndarray,
item_factors: np.ndarray,
n_user_explicit_factors: int,
n_item_explicit_factors: int,
verbose: int,
) -> None:
n_factors = user_factors.shape[1]
user_explicit_factors = user_factors[:, :n_user_explicit_factors].copy()
item_explicit_factors = item_factors[:, n_factors - n_item_explicit_factors :].copy()
iu_csr = ui_csr.T.tocsr(copy=False)
# invalidate cached norms and squared factors
model._item_norms = model._user_norms = None # pylint: disable=protected-access
model._YtY = None # pylint: disable=protected-access
model._XtX = None # pylint: disable=protected-access
solver = model.solver
for _ in tqdm(range(model.iterations), disable=verbose == 0):
solver(
ui_csr,
user_factors,
item_factors,
model.regularization,
model.num_threads,
)
user_factors[:, :n_user_explicit_factors] = user_explicit_factors
solver(
iu_csr,
item_factors,
user_factors,
model.regularization,
model.num_threads,
)
item_factors[:, n_factors - n_item_explicit_factors :] = item_explicit_factors
model.user_factors = user_factors
model.item_factors = item_factors
def _fit_combined_factors_on_gpu_inplace(
model: GPUAlternatingLeastSquares,
ui_csr: sparse.csr_matrix,
user_factors: np.ndarray,
item_factors: np.ndarray,
n_user_explicit_factors: int,
n_item_explicit_factors: int,
verbose: int,
) -> None: # pragma: no cover
n_factors = user_factors.shape[1]
user_explicit_factors = user_factors[:, :n_user_explicit_factors].copy()
item_explicit_factors = item_factors[:, n_factors - n_item_explicit_factors :].copy()
iu_csr = ui_csr.T.tocsr(copy=False)
iu_csr_cuda = implicit.gpu.CSRMatrix(iu_csr)
ui_csr_cuda = implicit.gpu.CSRMatrix(ui_csr)
X = implicit.gpu.Matrix(user_factors)
Y = implicit.gpu.Matrix(item_factors)
# invalidate cached norms and squared factors
model._item_norms = model._user_norms = None # pylint: disable=protected-access
model._item_norms_host = model._user_norms_host = None # pylint: disable=protected-access
model._YtY = model._XtX = None # pylint: disable=protected-access
_YtY = implicit.gpu.Matrix.zeros(model.factors, model.factors)
_XtX = implicit.gpu.Matrix.zeros(model.factors, model.factors)
for _ in tqdm(range(model.iterations), disable=verbose == 0):
model.solver.calculate_yty(Y, _YtY, model.regularization)
model.solver.least_squares(ui_csr_cuda, X, _YtY, Y, model.cg_steps)
user_factors_np = X.to_numpy()
user_factors_np[:, :n_user_explicit_factors] = user_explicit_factors
X = implicit.gpu.Matrix(user_factors_np)
model.solver.calculate_yty(X, _XtX, model.regularization)
model.solver.least_squares(iu_csr_cuda, Y, _XtX, X, model.cg_steps)
item_factors_np = Y.to_numpy()
item_factors_np[:, n_factors - n_item_explicit_factors :] = item_explicit_factors
Y = implicit.gpu.Matrix(item_factors_np)
model.user_factors = X
model.item_factors = Y