Source code for rectools.models.popular_in_category

#  Copyright 2022-2024 MTS (Mobile Telesystems)
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

"""Popular in category model."""

import typing as tp
import warnings
from datetime import datetime, timedelta
from enum import Enum

import numpy as np
import pandas as pd
import typing_extensions as tpe

from rectools import Columns, InternalIds
from rectools.dataset import Dataset, Interactions, features
from rectools.types import InternalIdsArray

from .base import ModelBase, Scores
from .popular import FixedColdRecoModelMixin, PopularModel, PopularModelConfig, PopularModelMixin, PopularityOptions


[docs]class MixingStrategy(Enum): """Types of mixing strategy""" ROTATE = "rotate" GROUP = "group"
[docs]class RatioStrategy(Enum): """Types of ratio strategy""" EQUAL = "equal" PROPORTIONAL = "proportional"
[docs]class PopularInCategoryModelConfig(PopularModelConfig): """Config for `PopularInCategoryModel`.""" category_feature: str n_categories: tp.Optional[int] = None mixing_strategy: MixingStrategy = MixingStrategy.ROTATE ratio_strategy: RatioStrategy = RatioStrategy.PROPORTIONAL
[docs]class PopularInCategoryModel( FixedColdRecoModelMixin, PopularModelMixin, ModelBase[PopularInCategoryModelConfig] ): # pylint: disable=too-many-instance-attributes """ Model generating recommendations based on popularity of items. Parameters ---------- category_feature: str Name of category feature in item features dataframe. n_categories: int, optional, default ``None`` Number of most popular categories to take for recommendations mixing_strategy: {"rotate", "group"}, default `"rotate"` Method of mixing recommendations from different categories. The following methods are available: - `rotate` - items from different categories take turns in final recommendations, starting from the most popular category - `group` - items from each category are grouped together. Categories are sorted by popularity ratio_strategy: {"equal", "proportional"}, default `"proportional"` Method of defining ratios for categories. The following methods are available: - `equal` - all categories gain equal ratios in recommendations. Exceeding places for items are given to most popular categories - `proportional` - categories gain ratios in recommendations based on their popularity. Each category gains at least one item in recommendations if number of categories doesn't exceed number of recs. popularity : {"n_users", "n_interactions", "mean_weight", "sum_weight"}, default `"n_users"` Method of calculating item popularity. To evaluate `popularity score` the following methods are available: - `n_users` - number of unique users that interacted with item; - `n_interactions` - number of interactions with item; - `mean_weight` - mean item interactions weight; - `sum_weight` - total item interactions weight. period : timedelta, optional, default ``None`` Period before last interaction to consider interactions for popularity calculation. Either `period` or `begin_from` can be set at once. If both are ``None`` all interactions will be used. begin_from : datetime, optional, default ``None`` Exact datetime to consider interactions from for popularity calculation. Either `period` or `begin_from` can be set at once. If both are ``None`` all interactions will be used. add_cold : bool, default ``False`` If ``True`` cold items will be added to the end of popularity list and can be recommended. Item is cold if it's not present in interactions at all (but present in id map) or not present in last interactions defined by either `period` or `begin_from` arguments. Order of cold items is unpredictable. Cold items score will be equal to ``0``. inverse : bool, default ``False`` If ``True`` least popular items will be selected. verbose : int, default ``0`` Degree of verbose output. If ``0``, no output will be provided. """ recommends_for_warm = False recommends_for_cold = True config_class = PopularInCategoryModelConfig def __init__( self, category_feature: str, n_categories: tp.Optional[int] = None, mixing_strategy: tp.Literal["rotate", "group"] = "rotate", ratio_strategy: tp.Literal["proportional", "equal"] = "proportional", popularity: PopularityOptions = "n_users", period: tp.Optional[timedelta] = None, begin_from: tp.Optional[datetime] = None, add_cold: bool = False, inverse: bool = False, verbose: int = 0, ): super().__init__( verbose=verbose, ) self.popularity = self._validate_popularity(popularity) self._validate_time_attributes(period, begin_from) self.period = period self.begin_from = begin_from self.add_cold = add_cold self.inverse = inverse self.category_feature = category_feature try: self.mixing_strategy = MixingStrategy(mixing_strategy) except ValueError: possible_values = {item.value for item in MixingStrategy.__members__.values()} raise ValueError(f"`mixing_strategy` must be one of the {possible_values}. Got {mixing_strategy}.") try: self.ratio_strategy = RatioStrategy(ratio_strategy) except ValueError: possible_values = {item.value for item in RatioStrategy.__members__.values()} raise ValueError(f"`ratio_strategy` must be one of the {possible_values}. Got {ratio_strategy}.") self.category_columns: tp.List[int] = [] self.category_interactions: tp.Dict[int, pd.DataFrame] = {} self.category_scores: pd.Series self.models: tp.Dict[int, PopularModel] = {} self.n_effective_categories: int if n_categories is None or n_categories > 0: self.n_categories = n_categories else: raise ValueError(f"`n_categories` must be a positive number. Got {n_categories}") def _get_config(self) -> PopularInCategoryModelConfig: return PopularInCategoryModelConfig( cls=self.__class__, category_feature=self.category_feature, n_categories=self.n_categories, mixing_strategy=self.mixing_strategy, ratio_strategy=self.ratio_strategy, popularity=self.popularity, period=self.period, begin_from=self.begin_from, add_cold=self.add_cold, inverse=self.inverse, verbose=self.verbose, ) @classmethod def _from_config(cls, config: PopularInCategoryModelConfig) -> tpe.Self: return cls( category_feature=config.category_feature, n_categories=config.n_categories, mixing_strategy=config.mixing_strategy.value, ratio_strategy=config.ratio_strategy.value, popularity=config.popularity.value, period=config.period, begin_from=config.begin_from, add_cold=config.add_cold, inverse=config.inverse, verbose=config.verbose, ) def _check_category_feature(self, dataset: Dataset) -> None: if not dataset.item_features: raise ValueError( "Dataset must have `item_features` for PopularInCategoryModel. " "Specify `item_features_df` when creating Dataset" ) if not isinstance(dataset.item_features, features.SparseFeatures): raise TypeError("Only sparse features are supported for PopularInCategoryModel. ") for num_col, (name, value) in enumerate(dataset.item_features.names): if name == self.category_feature and value != features.DIRECT_FEATURE_VALUE: self.category_columns.append(num_col) if not self.category_columns: raise ValueError("`category_feature` must be present in `cat_item_features` when creating Dataset") def _calc_category_scores(self, dataset: Dataset, interactions: pd.DataFrame) -> None: scores_dict = {} empty_columns = [] for column_num in self.category_columns: item_idx = dataset.item_features.values.getcol(column_num).nonzero()[0] # type: ignore category_interactions = interactions[interactions[Columns.Item].isin(item_idx)] # Category interactions might be empty if category_interactions.shape[0] == 0: empty_columns.append(column_num) else: self.category_interactions[column_num] = category_interactions.copy() col, func = self._get_groupby_col_and_agg_func(self.popularity) scores_dict[column_num] = self.category_interactions[column_num][col].apply(func) self.category_columns = [col for col in self.category_columns if col not in empty_columns] self.category_scores = pd.Series(scores_dict).sort_values(ascending=False) def _define_categories_for_analysis(self) -> None: if self.n_categories: if len(self.category_columns) >= self.n_categories: self.n_effective_categories = self.n_categories relevant_categories = self.category_scores.head(self.n_categories).index self.category_scores = self.category_scores.loc[relevant_categories] self.category_columns = relevant_categories.to_list() else: self.n_effective_categories = len(self.category_columns) warnings.warn( "`n_categories` exceeds number of unique category values. " f"Only {self.n_effective_categories} categories will be analysed" ) else: self.n_effective_categories = len(self.category_columns) def _fit(self, dataset: Dataset) -> None: # type: ignore self.category_columns = [] self.category_interactions = {} self.models = {} self.category_scores = pd.Series() self.n_effective_categories = 0 self._check_category_feature(dataset) interactions = self._filter_interactions(dataset.interactions.df, self.period, self.begin_from) self._calc_category_scores(dataset, interactions) self._define_categories_for_analysis() for column_num in self.category_columns: category_model = PopularModel( popularity=self.popularity.value, add_cold=self.add_cold, inverse=self.inverse ) category_interactions = Interactions(self.category_interactions[column_num]) category_dataset = Dataset( user_id_map=dataset.user_id_map, item_id_map=dataset.item_id_map, interactions=category_interactions ) category_model.fit(category_dataset) self.models[column_num] = category_model def _get_num_recs_for_each_category(self, k: int) -> pd.Series: if self.ratio_strategy == RatioStrategy.PROPORTIONAL: sum_scores = self.category_scores.sum() num_recs = np.floor(k * self.category_scores / sum_scores).astype("int32") # Because of np.floor not all of the required k recommendations were distributed exceeding_recs = k - num_recs.sum() num_recs.iloc[:exceeding_recs] += 1 # Now we redistribute some of the recommendations to categories which didn't receive any numbers at all zero_mask = num_recs == 0 may_decrease_mask = num_recs > 1 num_changing_zeros = min(zero_mask.sum(), may_decrease_mask.sum()) if num_changing_zeros > 0: indexes_to_increase_zeros = np.arange(len(num_recs))[zero_mask][:num_changing_zeros] indexes_to_decrease_others = np.arange(len(num_recs))[may_decrease_mask][-num_changing_zeros:] num_recs.iloc[indexes_to_increase_zeros] = 1 num_recs.iloc[indexes_to_decrease_others] -= 1 elif self.ratio_strategy == RatioStrategy.EQUAL: num_recs = pd.Series({num_col: k // self.n_effective_categories for num_col in self.category_scores.index}) exceeding_recs = k - num_recs.sum() num_recs.iloc[:exceeding_recs] += 1 return num_recs def _get_full_recs_from_main_and_fallback( self, main_recs: tp.List[pd.DataFrame], fallback_recs: tp.List[pd.DataFrame], k: int, user_ids: InternalIdsArray, ) -> pd.DataFrame: cat_recs = pd.concat(main_recs, sort=False) cat_recs.drop_duplicates(subset=[Columns.User, Columns.Item], inplace=True) num_recs_per_user = cat_recs[Columns.User].value_counts() user_w_insufficient_recs = num_recs_per_user[num_recs_per_user < k].index # Some users were not present in main_recs, but could be present in fallback_recs # within cold categories (categories with num_recs = 0). They need to be added # explicitly to receive recommendations user_w_no_recs = np.setdiff1d(user_ids, num_recs_per_user.index) user_w_insufficient_recs = np.union1d(user_w_insufficient_recs, user_w_no_recs) sufficient_mask = ~cat_recs[Columns.User].isin(user_w_insufficient_recs) sufficient_recs = cat_recs[sufficient_mask] insufficient_recs = cat_recs[~sufficient_mask].copy() insufficient_recs["is_main_rec"] = True extra_recs = pd.concat(fallback_recs, sort=False) extra_recs = extra_recs[extra_recs[Columns.User].isin(user_w_insufficient_recs)].copy() extra_recs["is_main_rec"] = False insufficient_recs = pd.concat([insufficient_recs, extra_recs], sort=False) insufficient_recs.drop_duplicates(subset=[Columns.User, Columns.Item], inplace=True) # Extra recommendations are given in a specific logic to guarantee that fallback recommendations # never replace main recommendations in final result. And popular category doesn't dominate # over other categories in fallback recs. Thus `rotate` mixing strategy is applied before getting # k recs for each user. insufficient_recs.sort_values( by=[Columns.User, "is_main_rec", "category_rank", "category_priority"], ascending=[True, False, True, True], inplace=True, ) insufficient_recs = insufficient_recs.groupby(Columns.User).head(k) full_recs = pd.concat([sufficient_recs, insufficient_recs], sort=False) return full_recs def _recommend_u2i( self, user_ids: InternalIdsArray, dataset: Dataset, k: int, filter_viewed: bool, sorted_item_ids_to_recommend: tp.Optional[InternalIdsArray], ) -> tp.Tuple[InternalIds, InternalIds, Scores]: num_recs = self._get_num_recs_for_each_category(k) main_recs = [] fallback_recs = [] for priority, num_col in enumerate(num_recs.index): model = self.models[num_col] all_user_ids, all_reco_ids, all_scores = model._recommend_u2i( # pylint: disable=protected-access user_ids=user_ids, dataset=dataset, k=k, filter_viewed=filter_viewed, sorted_item_ids_to_recommend=sorted_item_ids_to_recommend, ) reco_df = pd.DataFrame( { Columns.User: all_user_ids, Columns.Item: all_reco_ids, Columns.Score: all_scores, "category_priority": priority, } ) reco_df["category_rank"] = reco_df.groupby([Columns.User], sort=False).cumcount() main_mask = reco_df["category_rank"] < num_recs.loc[num_col] main_recs.append(reco_df[main_mask]) fallback_recs.append(reco_df[~main_mask]) full_recs = self._get_full_recs_from_main_and_fallback(main_recs, fallback_recs, k, user_ids) if self.mixing_strategy == MixingStrategy.GROUP: full_recs.sort_values(by=[Columns.User, "category_priority", "category_rank"], inplace=True) elif self.mixing_strategy == MixingStrategy.ROTATE: full_recs["category_rank"] = full_recs.groupby([Columns.User, "category_priority"], sort=False).cumcount() full_recs.sort_values(by=[Columns.User, "category_rank", "category_priority"], inplace=True) return full_recs[Columns.User].values, full_recs[Columns.Item].values, full_recs[Columns.Score].values def _recommend_i2i( self, target_ids: InternalIdsArray, dataset: Dataset, k: int, sorted_item_ids_to_recommend: tp.Optional[InternalIdsArray], ) -> tp.Tuple[InternalIds, InternalIds, Scores]: single_reco, single_scores = self._get_cold_reco(dataset, k, sorted_item_ids_to_recommend) n_targets = len(target_ids) n_reco_per_target = len(single_reco) all_target_ids = np.repeat(target_ids, n_reco_per_target) all_reco_ids = np.tile(single_reco, n_targets) all_scores = np.tile(single_scores, n_targets) return all_target_ids, all_reco_ids, all_scores def _get_cold_reco( self, dataset: Dataset, k: int, sorted_item_ids_to_recommend: tp.Optional[InternalIdsArray] ) -> tp.Tuple[InternalIds, Scores]: num_recs = self._get_num_recs_for_each_category(k) main_recs = [] fallback_recs = [] for priority, num_col in enumerate(num_recs.index): model = self.models[num_col] reco_ids, reco_scores = model._get_cold_reco( # pylint: disable=protected-access dataset, k, sorted_item_ids_to_recommend ) reco_df = pd.DataFrame( { Columns.Item: reco_ids, Columns.Score: reco_scores, "category_priority": priority, } ) reco_df["category_rank"] = range(len(reco_df)) main_mask = reco_df["category_rank"] < num_recs.loc[num_col] main_recs.append(reco_df[main_mask]) fallback_recs.append(reco_df[~main_mask]) cat_recs = pd.concat(main_recs, sort=False) cat_recs.drop_duplicates(subset=[Columns.Item], inplace=True) if len(cat_recs) < k: cat_recs["is_main_rec"] = True extra_recs = pd.concat(fallback_recs, sort=False) extra_recs["is_main_rec"] = False full_recs = pd.concat([cat_recs, extra_recs], sort=False) full_recs.drop_duplicates(subset=[Columns.Item], inplace=True) # Extra recommendations are given in a specific logic to guarantee that fallback recommendations # never replace main recommendations in final result. And popular category doesn't dominate # over other categories in fallback recs. Thus `rotate` mixing strategy is applied before getting # k recs for each user. full_recs.sort_values( by=["is_main_rec", "category_rank", "category_priority"], ascending=[False, True, True], inplace=True, ) full_recs = full_recs.head(k) else: full_recs = cat_recs if self.mixing_strategy == MixingStrategy.GROUP: full_recs.sort_values(by=["category_priority", "category_rank"], inplace=True) elif self.mixing_strategy == MixingStrategy.ROTATE: full_recs["category_rank"] = full_recs.groupby(["category_priority"], sort=False).cumcount() full_recs.sort_values(by=["category_rank", "category_priority"], inplace=True) return full_recs[Columns.Item].values, full_recs[Columns.Score].values