Source code for rectools.dataset.features

#  Copyright 2022-2025 MTS (Mobile Telesystems)
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

"""Structures to save explicit features."""

import typing as tp
import warnings

import attr
import numpy as np
import pandas as pd
from scipy import sparse

from rectools import InternalIds

from .identifiers import IdMap

DIRECT_FEATURE_VALUE = "__is_direct_feature"


[docs]class UnknownIdError(KeyError): """The error is raised when there are some ids in the dataframe that are not present in the id map"""
[docs]class AbsentIdError(ValueError): """The error is raised when there are some ids in the id map that are not present in the dataframe"""
[docs]@attr.s(slots=True, frozen=True) class DenseFeatures: """ Storage for dense features. Dense features are represented as a dense matrix, where rows correspond to objects, columns - to features. Usually you do not need to create this object directly, use `from_dataframe` class method instead. If you want to use custom logic, use `from_iterables` class method instead of direct creation. Parameters ---------- values : np.ndarray Matrix of feature values in the classic format: rows - objects, columns - features. names : tuple(str) Names of features (number of names must be equal to the number of columns in values). """ values: np.ndarray = attr.ib() names: tp.Tuple[str, ...] = attr.ib() @names.validator def _check_names_length(self, _: str, value: tp.Tuple[str]) -> None: if len(value) != self.values.shape[1]: raise ValueError(f"Number of features is {self.values.shape[1]}, but number of names is {len(value)}")
[docs] @classmethod def from_iterables(cls, values: tp.Iterable[tp.Iterable[float]], names: tp.Iterable[str]) -> "DenseFeatures": """ Create class instance from any iterables of feature values and names. Parameters ---------- values : iterable(iterable(float)) Feature values matrix. E.g. list of lists: [[1, 2, 3], [4, 5, 6]]. names : iterable(str) Feature names. Returns ------- DenseFeatures """ return cls( values=np.asarray(values, dtype=np.float32), names=tuple(names), )
[docs] @classmethod def from_dataframe(cls, df: pd.DataFrame, id_map: IdMap, id_col: str = "id") -> "DenseFeatures": """ Create DenseFeatures object from dataframe. Assume that feature values are values in dataframe, and feature names are column names. Parameters ---------- df : pd.Dataframe Table in classic format: rows corresponds to objects, columns - to features. One special column `id_col` must contain object external ids. id_map : IdMap Mapping between external and internal ids. Sets of ids in `id_map` and in `df` must be equal. id_col : str, default ``id`` Name of column containing object ids. Returns ------- DenseFeatures """ extern_ids = df[id_col] df_ids = set(df[id_col].values) if len(df_ids) != len(df): raise ValueError("Ids in dataframe must be unique") map_ids = set(id_map.external_ids) if df_ids - map_ids: raise UnknownIdError("All ids in `df` must be present in `id_map`") if map_ids - df_ids: raise AbsentIdError("In `df` must be present all ids from `id_map`") features = df.drop(columns=id_col) values = features.values names = features.columns inner_ids = extern_ids.map(id_map.to_internal) sorter = np.argsort(inner_ids) values = values[sorter] return cls.from_iterables(values, names)
[docs] def get_dense(self) -> np.ndarray: """Return values in dense format.""" return self.values
[docs] def get_sparse(self) -> sparse.csr_matrix: """Return values in sparse format.""" return sparse.csr_matrix(self.values)
[docs] def take(self, ids: InternalIds) -> "DenseFeatures": """ Take a subset of features for given subject (user or item) ids. Parameters ---------- ids : array-like Array of internal ids to select features for. Returns ------- DenseFeatures """ return DenseFeatures( values=self.values[ids], names=self.names, )
def __len__(self) -> int: """Return number of objects.""" return self.values.shape[0]
SparseFeatureName = tp.Tuple[str, tp.Any]
[docs]@attr.s(slots=True, frozen=True) class SparseFeatures: """ Storage for sparse features. Sparse features are represented as CSR matrix, where rows correspond to objects, columns - to features. Assume that there are features of two types: direct and categorical. Each direct feature is represented in a single column with its real values. Direct features are numeric. E.g. +---+----+----+ | | f1 | f2 | +---+----+----+ | 1 | 23 | 3 | +---+----+----+ | 2 | 36 | 5 | +---+----+----+ Categorical features are one-hot encoded (https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html), values in matrix are counts in category. If you want to binarize a numeric feature, make it categorical with bin indices as categories. E.g. +---+------+------+------+ | | f1_a | f1_b | f2_1 | +---+------+------+------+ | 1 | 0 | 2 | 1 | +---+------+------+------+ | 2 | 1 | 1 | 0 | +---+------+------+------+ Usually you do not need to create this object directly, use `from_flatten` class method instead. If you want to use custom logic, use `from_iterables` class method instead of direct creation. Parameters ---------- values : csr_matrix CSR matrix containing OHE feature values. names : tuple(tuple(str, any)) Tuple of feature names. Direct features are represented only by names, so for direct features use (``feature name``, `None`). For sparse features use (``feature name``, ``value``), as they are one-hot encoded. E.g. If you have direct feature `age` and cat. feature `sex`, names will be *((age, None), (sex, m), (sex, f))*. Number of names must be equal to the number of columns in values. """ values: sparse.csr_matrix = attr.ib() names: tp.Tuple[SparseFeatureName, ...] = attr.ib() @names.validator def _check_names_length(self, _: str, value: tp.Tuple[str]) -> None: if len(value) != self.values.shape[1]: raise ValueError(f"Number of features is {self.values.shape[1]}, but number of names is {len(value)}")
[docs] @classmethod def from_iterables( cls, values: sparse.csr_matrix, names: tp.Iterable[tp.Tuple[str, tp.Any]], ) -> "SparseFeatures": """ Create class instance from sparse matrix and iterable feature names. Parameters ---------- values : csr_matrix Feature values matrix. names : iterable((str, any)) Feature names in same format as in constructor. Returns ------- SparseFeatures """ return cls( values=values.astype(np.float32), names=tuple(names), )
[docs] @classmethod def from_flatten( cls, df: pd.DataFrame, id_map: IdMap, cat_features: tp.Iterable[tp.Any] = (), id_col: str = "id", feature_col: str = "feature", value_col: str = "value", weight_col: str = "weight", ) -> "SparseFeatures": """ Construct `SparseFeatures` from flatten DataFrame. Flatten DataFrame has 3 obligatory columns: <id of object>, <feature name>, <feature value>, and <feature weight> as the optional fourth. If there is no <feature weight> column, all weights will be assumed to be equal to ``1``. Direct features converted to sparse matrix as is. Its values are multiplied by weights. Values for the same object and same feature are added up. E.g: +----+---------+-------+--------+ | id | feature | value | weight | +----+---------+-------+--------+ | 1 | f1 | 10 | 1 | +----+---------+-------+--------+ | 2 | f1 | 20 | 1.5 | +----+---------+-------+--------+ | 1 | f1 | 15 | 1 | +----+---------+-------+--------+ | 2 | f2 | 3 | 1 | +----+---------+-------+--------+ Out: +---+----+----+ | | f1 | f2 | +---+----+----+ | 1 | 25 | | +---+----+----+ | 2 | 30 | 3 | +---+----+----+ Categorical features are represented as horizontally stacked one-hot vectors. Duplicated values are counted. Final values (counts) are multiplied by weights. E.g: +----+---------+-------+--------+ | id | feature | value | weight | +----+---------+-------+--------+ | 1 | f1 | 10 | 1 | +----+---------+-------+--------+ | 2 | f1 | 20 | 1.5 | +----+---------+-------+--------+ | 1 | f1 | 10 | 1 | +----+---------+-------+--------+ | 2 | f2 | 3 | 1 | +----+---------+-------+--------+ Out: +---+--------+--------+-------+ | | f1__10 | f1__20 | f2__3 | +---+--------+--------+-------+ | 1 | 2 | | | +---+--------+--------+-------+ | 2 | | 1.5 | 1 | +---+--------+--------+-------+ Parameters ---------- df : pd.DataFrame Flatten table with features with columns `id_col`, `feature_col`, `value_col` in format described above. id_map : IdMap Mapping between external and internal ids. cat_features : iterable(str), default ``()`` List of categorical feature names. id_col : str, default ``id`` Name of column with object ids. feature_col : str, default ``feature`` Name of column with feature names. value_col : str, default ``value`` Name of column with feature values. weight_col : str, default ``weight`` Name of column with feature weight. If no such column provided, all weights will be equal to ``1``. Returns ------- SparseFeatures """ required_columns = {id_col, feature_col, value_col} actual_columns = set(df.columns) if not actual_columns >= required_columns: raise KeyError(f"Missed columns {required_columns - actual_columns}") try: ids = id_map.convert_to_internal(df[id_col]) except KeyError: raise UnknownIdError("All ids in `df` must be present in `id_map`") try: weights = df[weight_col].values.astype(float) if weight_col in df else 1 except ValueError: raise TypeError("Weights must be numeric") df = pd.DataFrame({"id": ids, "feature": df[feature_col], "value": df[value_col], "weight": weights}) all_features = df["feature"].unique() direct_features = set(all_features) - set(cat_features) csr_direct, names_direct = cls._make_direct_features(df, direct_features, id_map.internal_ids.size) csrs = [csr_direct] names_all = names_direct for cat_feature in cat_features: csr, names = cls._make_cat_feature(df, cat_feature, id_map.internal_ids.size) csrs.append(csr) names_all.extend(names) csr = sparse.hstack(csrs, format="csr") csr.sum_duplicates() return cls.from_iterables(csr, names_all)
@classmethod def _make_direct_features( cls, df: pd.DataFrame, features: tp.Collection[tp.Any], n_objects: int, ) -> tp.Tuple[sparse.csr_matrix, tp.List[SparseFeatureName]]: df = df.query("feature in @features") features_map = IdMap.from_values(df["feature"].unique()) try: values = df["value"].values.astype(np.float32) except ValueError: raise TypeError("Values of direct features must be numeric") csr = sparse.csr_matrix( ( values * df["weight"], ( df["id"], df["feature"].map(features_map.to_internal).values, ), ), shape=(n_objects, len(features)), ) names = [(feature, DIRECT_FEATURE_VALUE) for feature in features_map.get_external_sorted_by_internal()] return csr, names @classmethod def _make_cat_feature( cls, df: pd.DataFrame, feature: str, n_objects: int, ) -> tp.Tuple[sparse.csr_matrix, tp.List[SparseFeatureName]]: df = df.query("feature == @feature") unq_feature_values = df["value"].unique() n_unq_values = len(unq_feature_values) ids = np.arange(n_unq_values) value_map = pd.Series(ids, index=unq_feature_values) csr = sparse.csr_matrix( (np.ones(len(df)) * df["weight"], (df["id"], df["value"].map(value_map))), shape=(n_objects, n_unq_values), ) names = [(feature, value) for value in value_map.index.values[np.argsort(value_map.values)]] return csr, names
[docs] def get_dense(self) -> np.ndarray: """Return values in dense format.""" warnings.warn("Converting sparse features to dense array may cause MemoryError") return self.values.toarray()
[docs] def get_sparse(self) -> sparse.csr_matrix: """Return values in sparse format.""" return self.values
[docs] def take(self, ids: InternalIds) -> "SparseFeatures": """ Take a subset of features for given subject (user or item) ids. Parameters ---------- ids : array-like Array of internal ids to select features for. Returns ------- SparseFeatures """ return SparseFeatures( values=self.values[ids], names=self.names, )
def __len__(self) -> int: """Return number of objects.""" return self.values.shape[0] @property def cat_col_mask(self) -> np.ndarray: """Mask that identifies category columns in feature values sparse matrix.""" return np.array([feature_name[1] != DIRECT_FEATURE_VALUE for feature_name in self.names]) @property def cat_feature_indices(self) -> np.ndarray: """Category columns indices in feature values sparse matrix.""" return np.arange(len(self.names))[self.cat_col_mask]
[docs] def get_cat_features(self) -> "SparseFeatures": """Return `SparseFeatures` only with categorical features.""" return SparseFeatures( values=self.values[:, self.cat_feature_indices], names=tuple(map(self.names.__getitem__, self.cat_feature_indices)), )
Features = tp.Union[DenseFeatures, SparseFeatures]