Source code for dhg.utils.dataset_split

import random
from typing import Union, Optional

import torch
import numpy as np

from dhg.structure.graphs import BiGraph


def _idx2mask(num_v: int, train_idx: list, test_idx: list, val_idx: Optional[list] = None):
    train_mask, test_mask = torch.zeros(num_v, dtype=torch.bool), torch.zeros(num_v, dtype=torch.bool)
    train_mask[train_idx] = True
    test_mask[test_idx] = True
    if val_idx is not None:
        val_mask = torch.zeros(num_v, dtype=torch.bool)
        val_mask[val_idx] = True
        return train_mask, val_mask, test_mask
    else:
        return train_mask, test_mask


[docs]def split_by_num( num_v: int, v_label: Union[list, torch.Tensor, np.ndarray], train_num: int, val_num: Optional[int] = None, test_num: Optional[int] = None, ): r"""Split the dataset by the number of vertices in each category, and return the masks of [``train_mask`` and ``test_mask``] or [``train_mask``, ``val_mask`` and ``test_mask``]. Args: num_v (``int``): The number of vertices. v_label (``Union[list, torch.Tensor, np.ndarray]``): The vertex labels. train_num (``int``): The number of vertices in the training set for each category. val_num (``Optional[int]``, optional): The number of vertices in the validation set for each category. If set to ``None``, this function will only return the masks of ``train_mask`` and ``test_mask``. Defaults to ``None``. test_num (``Optional[int]``, optional): The number of vertices in the test set for each category. If set to ``None``, except for the training and validation sets, the remaining all vertices will be used for testing. Defaults to ``None``. Examples: >>> import numpy as np >>> from dhg.utils import split_by_num >>> num_v = 100 >>> v_label = np.random.randint(0, 3, num_v) # 3 categories >>> train_num, val_num, test_num = 10, 2, 5 >>> train_mask, val_mask, test_mask = split_by_num(num_v, v_label, train_num, val_num, test_num) >>> train_mask.sum(), val_mask.sum(), test_mask.sum() (tensor(30), tensor(6), tensor(15)) >>> train_mask, val_mask, test_mask = split_by_num(num_v, v_label, train_num, val_num) >>> train_mask.sum(), val_mask.sum(), test_mask.sum() (tensor(30), tensor(6), tensor(64)) """ if isinstance(v_label, list): v_label = np.array(v_label) if isinstance(v_label, torch.Tensor): v_label = v_label.detach().cpu().numpy() assert isinstance(v_label, np.ndarray) v_label = v_label.squeeze().astype(int) assert v_label.ndim == 1 if v_label.min() == 1: v_label -= 1 num_classes = np.unique(v_label).shape[0] train_idx, test_idx = [], [] val_num = val_num if val_num is not None else 0 if val_num != 0: val_idx = [] else: val_idx = None for lbl_idx in range(num_classes): lbl_v_idx = np.where(v_label == lbl_idx)[0] random.shuffle(lbl_v_idx) train_idx.extend(lbl_v_idx[:train_num]) if val_num != 0: val_idx.extend(lbl_v_idx[train_num : train_num + val_num]) if test_num is not None: test_idx.extend(lbl_v_idx[train_num + val_num : train_num + val_num + test_num]) else: test_idx.extend(lbl_v_idx[train_num + val_num :]) return _idx2mask(num_v, train_idx, test_idx, val_idx)
[docs]def split_by_ratio( num_v: int, v_label: Union[list, torch.Tensor, np.ndarray], train_ratio: float, val_ratio: Optional[float] = None, test_ratio: Optional[float] = None, ): r"""Split the dataset by the ratio of vertices in each category, and return the masks of [``train_mask`` and ``test_mask``] or [``train_mask``, ``val_mask`` and ``test_mask``]. Args: num_v (``int``): The number of vertices. v_label (``Union[list, torch.Tensor, np.ndarray]``): The vertex labels. train_ratio (``float``): The ratio of vertices in the training set for each category. val_ratio (``Optional[float]``, optional): The ratio of vertices in the validation set for each category. If set to ``None``, this function will only return the masks of ``train_mask`` and ``test_mask``. Defaults to ``None``. test_ratio (``Optional[float]``, optional): The ratio of vertices in the test set for each category. If set to ``None``, except for the training and validation sets, the remaining all vertices will be used for testing. Defaults to ``None``. Examples: >>> import numpy as np >>> from dhg.utils import split_by_ratio >>> num_v = 100 >>> v_label = np.random.randint(0, 3, num_v) # 3 categories >>> train_ratio, val_ratio, test_ratio = 0.6, 0.1, 0.2 >>> train_mask, val_mask, test_mask = split_by_ratio(num_v, v_label, train_ratio, val_ratio, test_ratio) >>> train_mask.sum(), val_mask.sum(), test_mask.sum() (tensor(59), tensor(9), tensor(18)) >>> train_mask, val_mask, test_mask = split_by_ratio(num_v, v_label, train_ratio, val_ratio) >>> train_mask.sum(), val_mask.sum(), test_mask.sum() (tensor(59), tensor(9), tensor(32)) """ if isinstance(v_label, list): v_label = np.array(v_label) if isinstance(v_label, torch.Tensor): v_label = v_label.detach().cpu().numpy() assert isinstance(v_label, np.ndarray) v_label = v_label.squeeze().astype(int) assert v_label.ndim == 1 if v_label.min() == 1: v_label -= 1 num_classes = np.unique(v_label).shape[0] val_ratio = val_ratio if val_ratio is not None else 0 if test_ratio is not None: assert train_ratio + val_ratio + test_ratio <= 1 else: assert train_ratio + val_ratio < 1 train_idx, test_idx = [], [] if val_ratio != 0: val_idx = [] else: val_idx = None for lbl_idx in range(num_classes): lbl_v_idx = np.where(v_label == lbl_idx)[0] _num = lbl_v_idx.shape[0] random.shuffle(lbl_v_idx) train_num = int(_num * train_ratio) val_num = int(_num * val_ratio) train_idx.extend(lbl_v_idx[:train_num]) if val_ratio != 0: val_idx.extend(lbl_v_idx[train_num : train_num + val_num]) if test_ratio is not None: test_num = int(_num * test_ratio) test_idx.extend(lbl_v_idx[train_num + val_num : train_num + val_num + test_num]) else: test_idx.extend(lbl_v_idx[train_num + val_num :]) return _idx2mask(num_v, train_idx, test_idx, val_idx)
[docs]def split_by_num_for_UI_bigraph(g: BiGraph, train_num: int): r"""Split the User-Item bipartite graph by the number of the items connected to each user. This function will return two adjacency matrices for training and testing, respectively. Args: g (``BiGraph``): The User-Item bipartite graph. train_num (``int``): The number of items for the training set for each user. Examples: >>> import dhg >>> from dhg.utils import edge_list_to_adj_list, split_by_num_for_UI_bigraph >>> g = dhg.random.bigraph_Gnm(5, 8, 20) >>> edge_list_to_adj_list(g.e[0]) [[3, 4, 0, 6, 5], [0, 5, 1, 4, 3, 6], [2, 2, 5, 1], [1, 0, 6, 5, 1, 4, 7], [4, 5, 7]] >>> train_num = 3 >>> train_adj, test_adj = split_by_num_for_UI_bigraph(g, train_num) >>> train_adj [[0, 1, 3, 4], [1, 6, 0, 5], [2, 1, 2, 5], [3, 6, 4, 5], [4, 5, 7]] >>> test_adj [[0, 5, 6], [1, 1, 4, 7], [3, 0]] """ train_adj_list, test_adj_list = [], [] for u_idx in range(g.num_u): cur_train, cur_test = [u_idx], [u_idx] nbr_v_list = g.nbr_v(u_idx) random.shuffle(nbr_v_list) _num = len(nbr_v_list) if _num == 0: continue if _num <= train_num: cur_train.extend(nbr_v_list) train_adj_list.append(cur_train) else: cur_train.extend(nbr_v_list[:train_num]) cur_test.extend(nbr_v_list[train_num:]) train_adj_list.append(cur_train) test_adj_list.append(cur_test) return train_adj_list, test_adj_list
[docs]def split_by_ratio_for_UI_bigraph(g: BiGraph, train_ratio: float): r"""Split the User-Item bipartite graph by ratio of the items connected to each user. This function will return two adjacency matrices for training and testing, respectively. Args: g (``BiGraph``): The User-Item bipartite graph. train_ratio (``float``): The ratio of items for the training set for each user. Examples: >>> import dhg >>> from dhg.utils import edge_list_to_adj_list, split_by_ratio_for_UI_bigraph >>> g = dhg.random.bigraph_Gnm(5, 8, 20) >>> edge_list_to_adj_list(g.e[0]) [[4, 0, 6, 5, 4], [3, 4, 7, 0, 3, 6, 2], [2, 2, 5, 0, 6], [1, 0, 3, 1, 7], [0, 3, 6]] >>> train_ratio = 0.8 >>> train_adj, test_adj = split_by_ratio_for_UI_bigraph(g, train_ratio) >>> train_adj [[0, 6], [1, 3, 0, 1], [2, 2, 6, 5], [3, 0, 4, 3, 6], [4, 0, 4, 6]] >>> test_adj [[0, 3], [1, 7], [2, 0], [3, 2, 7], [4, 5]] """ train_adj_list, test_adj_list = [], [] for u_idx in range(g.num_u): cur_train, cur_test = [u_idx], [u_idx] nbr_v_list = g.nbr_v(u_idx) random.shuffle(nbr_v_list) _num = len(nbr_v_list) if _num == 0: continue train_num = int(_num * train_ratio) cur_train.extend(nbr_v_list[:train_num]) cur_test.extend(nbr_v_list[train_num:]) train_adj_list.append(cur_train) test_adj_list.append(cur_test) return train_adj_list, test_adj_list