Source code for rectools.models.implicit_als

#  Copyright 2022 MTS (Mobile Telesystems)
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

import typing as tp
import warnings
from copy import deepcopy

import numpy as np
from implicit.als import AlternatingLeastSquares
from implicit.utils import check_random_state
from scipy import sparse
from tqdm.auto import tqdm

from rectools.dataset import Dataset, Features
from rectools.exceptions import NotFittedError

from .vector import Distance, Factors, VectorModel

MAX_GPU_FACTORS = 1024
AVAILABLE_RECOMMEND_METHODS = ("loop",)


[docs]class ImplicitALSWrapperModel(VectorModel): """ Wrapper for `implicit.als.AlternatingLeastSquares` with possibility to use explicit features and GPU support. See https://implicit.readthedocs.io/en/latest/als.html for details of base model. Parameters ---------- model : AlternatingLeastSquares Base model that will be used. verbose : int, default 0 Degree of verbose output. If 0, no output will be provided. fit_features_together: bool, default False Whether fit explicit features together with latent features or not. Used only if explicit features are present in dataset. See documentations linked above for details. """ u2i_dist = Distance.DOT i2i_dist = Distance.COSINE def __init__(self, model: AlternatingLeastSquares, verbose: int = 0, fit_features_together: bool = False): super().__init__(verbose=verbose) if model.use_gpu and model.factors > MAX_GPU_FACTORS: # pragma: no cover raise ValueError(f"When using GPU max number of factors is {MAX_GPU_FACTORS}") self.model: AlternatingLeastSquares self._model = model # for refit; TODO: try to do it better self.fit_features_together = fit_features_together def _fit(self, dataset: Dataset) -> None: # type: ignore self.model = deepcopy(self._model) ui_csr = dataset.get_user_item_matrix(include_weights=True) if self.fit_features_together: user_factors, item_factors = fit_als_with_features_together( self.model, ui_csr, dataset.user_features, dataset.item_features, self.verbose, ) else: user_factors, item_factors = fit_als_with_features_separately( self.model, ui_csr, dataset.user_features, dataset.item_features, self.verbose, ) self.model.user_factors = user_factors self.model.item_factors = item_factors def _get_users_factors(self, dataset: Dataset) -> Factors: return Factors(self.model.user_factors) def _get_items_factors(self, dataset: Dataset) -> Factors: return Factors(self.model.item_factors)
[docs] def get_vectors(self) -> tp.Tuple[np.ndarray, np.ndarray]: """ Return user and item vector representations from fitted model. Returns ------- (np.ndarray, np.ndarray) User and item embeddings. Shapes are (n_users, n_factors) and (n_items, n_factors). """ if not self.is_fitted: raise NotFittedError(self.__class__.__name__) return self.model.user_factors, self.model.item_factors
[docs]def fit_als_with_features_separately( model: AlternatingLeastSquares, ui_csr: sparse.csr_matrix, user_features: tp.Optional[Features], item_features: tp.Optional[Features], verbose: int = 0, ) -> tp.Tuple[np.ndarray, np.ndarray]: """ Fit ALS model with explicit features, explicit features fit separately from latent. Parameters ---------- model: AlternatingLeastSquares Base model to fit. ui_csr : sparse.csr_matrix Matrix of interactions. user_features : (SparseFeatures | DenseFeatures), optional Explicit user features. item_features : (SparseFeatures | DenseFeatures), optional Explicit item features. verbose : int Whether to print output. Returns ------- user_factors : np.ndarray Combined latent and explicit user factors. item_factors : np.ndarray Combined latent and explicit user factors. """ iu_csr = ui_csr.T.tocsr(copy=False) model.fit(iu_csr, show_progress=verbose > 0) user_factors_chunks = [model.user_factors] item_factors_chunks = [model.item_factors] if user_features is not None: user_feature_factors = user_features.get_dense() item_factors_paired_to_user_features = _fit_paired_factors(model, iu_csr, user_feature_factors) user_factors_chunks.append(user_feature_factors) item_factors_chunks.append(item_factors_paired_to_user_features) if item_features is not None: item_feature_factors = item_features.get_dense() user_factors_paired_to_item_features = _fit_paired_factors(model, ui_csr, item_feature_factors) item_factors_chunks.append(item_feature_factors) user_factors_chunks.append(user_factors_paired_to_item_features) user_factors = np.hstack(user_factors_chunks) item_factors = np.hstack(item_factors_chunks) return user_factors, item_factors
def _fit_paired_factors(model: AlternatingLeastSquares, xy_csr: sparse.csr_matrix, y_factors: np.ndarray) -> np.ndarray: if model.use_gpu: # pragma: no cover paired_factors = _fit_paired_factors_on_gpu(model, xy_csr, y_factors) else: paired_factors = _fit_paired_factors_on_cpu(model, xy_csr, y_factors) return paired_factors def _fit_paired_factors_on_cpu( model: AlternatingLeastSquares, xy_csr: sparse.csr_matrix, y_factors: np.ndarray, ) -> np.ndarray: x_factors = np.zeros(shape=(xy_csr.shape[0], y_factors.shape[1]), dtype=y_factors.dtype) model.solver( xy_csr, x_factors, y_factors, model.regularization, model.num_threads, ) return x_factors def _fit_paired_factors_on_gpu( model: AlternatingLeastSquares, xy_csr: sparse.csr_matrix, y_factors: np.ndarray, ) -> np.ndarray: # pragma: no cover try: from implicit.cuda import ( # pylint: disable=import-outside-toplevel CuCSRMatrix, CuDenseMatrix, CuLeastSquaresSolver, ) except ImportError: raise RuntimeError("implicit.cuda is not available") n_factors = y_factors.shape[1] if n_factors > MAX_GPU_FACTORS: raise ValueError(f"When using GPU max number of factors is {MAX_GPU_FACTORS}, here is {n_factors} factors") x_factors = np.zeros(shape=(xy_csr.shape[0], n_factors), dtype=y_factors.dtype) x_cuda = CuDenseMatrix(x_factors) y_cuda = CuDenseMatrix(y_factors) xy_csr_cuda = CuCSRMatrix(xy_csr) solver = CuLeastSquaresSolver(n_factors) solver.least_squares(xy_csr_cuda, x_cuda, y_cuda, model.regularization, model.cg_steps) x_cuda.to_host(x_factors) return x_factors
[docs]def fit_als_with_features_together( model: AlternatingLeastSquares, ui_csr: sparse.csr_matrix, user_features: tp.Optional[Features], item_features: tp.Optional[Features], verbose: int = 0, ) -> tp.Tuple[np.ndarray, np.ndarray]: """ Fit ALS model with explicit features, explicit features fit together with latent. Parameters ---------- model: AlternatingLeastSquares Base model to fit. ui_csr : sparse.csr_matrix Matrix of interactions. user_features : (SparseFeatures | DenseFeatures), optional Explicit user features. item_features : (SparseFeatures | DenseFeatures), optional Explicit item features. verbose : int Whether to print output. Returns ------- user_factors : np.ndarray Combined latent and explicit user factors. item_factors : np.ndarray Combined latent and explicit user factors. """ n_users, n_items = ui_csr.shape # Prepare explicit factors user_explicit_factors: np.ndarray if user_features is None: user_explicit_factors = np.array([]).reshape((n_users, 0)) else: user_explicit_factors = user_features.get_dense() n_user_explicit_factors = user_explicit_factors.shape[1] item_explicit_factors: np.ndarray if item_features is None: item_explicit_factors = np.array([]).reshape((n_items, 0)) else: item_explicit_factors = item_features.get_dense() n_item_explicit_factors = item_explicit_factors.shape[1] # Fix number of factors n_factors_all = model.factors + n_user_explicit_factors + n_item_explicit_factors if model.use_gpu and n_factors_all % 32: # pragma: no cover padding = 32 - n_factors_all % 32 warnings.warn( "GPU training requires number of factors to be a multiple of 32." f" Increasing factors from {n_factors_all} to {n_factors_all + padding}" f" (increasing latent factors from {model.factors} to {model.factors + padding})" ) n_latent_factors = model.factors + padding else: n_latent_factors = model.factors n_factors_all = n_latent_factors + n_user_explicit_factors + n_item_explicit_factors model.factors = n_factors_all # Prepare latent factors random_state = check_random_state(model.random_state) user_latent_factors = random_state.rand(n_users, n_latent_factors) * 0.01 item_latent_factors = random_state.rand(n_items, n_latent_factors) * 0.01 # Prepare paired factors user_factors_paired_to_items = np.zeros((n_users, n_item_explicit_factors)) item_factors_paired_to_users = np.zeros((n_items, n_user_explicit_factors)) # Make full factors user_factors = np.hstack( ( user_explicit_factors, user_latent_factors, user_factors_paired_to_items, ) ).astype(model.dtype) item_factors = np.hstack( ( item_factors_paired_to_users, item_latent_factors, item_explicit_factors, ) ).astype(model.dtype) ui_csr = ui_csr.astype(np.float32) if model.use_gpu: # pragma: no cover _fit_combined_factors_on_gpu_inplace( model, ui_csr, user_factors, item_factors, n_user_explicit_factors, n_item_explicit_factors, verbose, ) else: _fit_combined_factors_on_cpu_inplace( model, ui_csr, user_factors, item_factors, n_user_explicit_factors, n_item_explicit_factors, verbose, ) return user_factors, item_factors
def _fit_combined_factors_on_cpu_inplace( model: AlternatingLeastSquares, ui_csr: sparse.csr_matrix, user_factors: np.ndarray, item_factors: np.ndarray, n_user_explicit_factors: int, n_item_explicit_factors: int, verbose: int, ) -> None: n_factors = user_factors.shape[1] user_explicit_factors = user_factors[:, :n_user_explicit_factors].copy() item_explicit_factors = item_factors[:, n_factors - n_item_explicit_factors :].copy() iu_csr = ui_csr.T.tocsr(copy=False) for _ in tqdm(range(model.iterations), disable=verbose == 0): model.solver( ui_csr, user_factors, item_factors, model.regularization, model.num_threads, ) user_factors[:, :n_user_explicit_factors] = user_explicit_factors model.solver( iu_csr, item_factors, user_factors, model.regularization, model.num_threads, ) item_factors[:, n_factors - n_item_explicit_factors :] = item_explicit_factors def _fit_combined_factors_on_gpu_inplace( model: AlternatingLeastSquares, ui_csr: sparse.csr_matrix, user_factors: np.ndarray, item_factors: np.ndarray, n_user_explicit_factors: int, n_item_explicit_factors: int, verbose: int, ) -> None: # pragma: no cover try: from implicit.cuda import ( # pylint: disable=import-outside-toplevel CuCSRMatrix, CuDenseMatrix, CuLeastSquaresSolver, ) except ImportError: raise RuntimeError("implicit.cuda is not available") n_factors = user_factors.shape[1] if n_factors > MAX_GPU_FACTORS: raise ValueError(f"When using GPU max number of factors is {MAX_GPU_FACTORS}, here is {n_factors} factors") user_explicit_factors = user_factors[:, :n_user_explicit_factors].copy() item_explicit_factors = item_factors[:, n_factors - n_item_explicit_factors :].copy() iu_csr = ui_csr.T.tocsr(copy=False) iu_csr_cuda = CuCSRMatrix(iu_csr) ui_csr_cuda = CuCSRMatrix(ui_csr) user_factors_cuda = CuDenseMatrix(user_factors) item_factors_cuda = CuDenseMatrix(item_factors) solver = CuLeastSquaresSolver(n_factors) for _ in tqdm(range(model.iterations), disable=verbose == 0): solver.least_squares( ui_csr_cuda, user_factors_cuda, item_factors_cuda, model.regularization, model.cg_steps, ) user_factors_cuda.to_host(user_factors) user_factors[:, :n_user_explicit_factors] = user_explicit_factors user_factors_cuda = CuDenseMatrix(user_factors) solver.least_squares( iu_csr_cuda, item_factors_cuda, user_factors_cuda, model.regularization, model.cg_steps, ) item_factors_cuda.to_host(item_factors) item_factors[:, n_factors - n_item_explicit_factors :] = item_explicit_factors item_factors_cuda = CuDenseMatrix(item_factors) user_factors_cuda.to_host(user_factors) item_factors_cuda.to_host(item_factors)