Source code for dhg.metrics.retrieval

from typing import Optional, Union, Tuple

import torch
import numpy as np


__all__ = [
    "available_retrieval_metrics",
    "precision",
    "recall",
    "ap" "map",
    "ndcg",
    "rr",
    "mrr",
    "pr_curve",
]


[docs]def available_retrieval_metrics(): r"""Return available metrics for the retrieval task. The available metrics are: ``precision``, ``recall``, ``ap``, ``map``, ``ndcg``, ``rr``, ``mrr``, ``pr_curve``. """ return ("precision", "recall", "ap", "map", "ndcg", "rr", "mrr", "pr_curve")
def _format_inputs( y_true: torch.Tensor, y_pred: torch.Tensor, k: Optional[int] = None ) -> Tuple[torch.Tensor, torch.Tensor, int]: r"""Format the inputs Args: ``y_true`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``y_pred`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``k`` (``int``, optional): The specified top-k value. Default to :math:`N_{target}`. """ assert ( y_true.shape == y_pred.shape ), "The shape of y_true and y_pred must be the same." assert y_true.dim() in (1, 2), "The input y_true must be 1-D or 2-D." assert y_pred.dim() in (1, 2), "The input y_pred must be 1-D or 2-D." if y_true.dim() == 1: y_true = y_true.unsqueeze(0) if y_pred.dim() == 1: y_pred = y_pred.unsqueeze(0) y_true, y_pred = y_true.detach().float(), y_pred.detach().float() max_k = y_true.shape[1] k = min(k, max_k) if k is not None else max_k return y_true, y_pred, k
[docs]def precision( y_true: torch.Tensor, y_pred: torch.Tensor, k: Optional[int] = None, ret_batch: bool = False, ) -> Union[float, list]: r"""Calculate the Precision score for the retrieval task. Args: ``y_true`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``y_pred`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``k`` (``int``, optional): The specified top-k value. Defaults to :math:`N_{target}`. ``ret_batch`` (``bool``): Whether to return the raw score list. Defaults to ``False``. Examples: >>> import torch >>> import dhg.metrics as dm >>> y_true = torch.tensor([0, 1, 0, 0, 1, 1]) >>> y_pred = torch.tensor([0.8, 0.9, 0.6, 0.7, 0.4, 0.5]) >>> dm.retrieval.precision(y_true, y_pred, k=2) 0.5 """ y_true, y_pred, k = _format_inputs(y_true, y_pred, k) assert y_true.max() == 1, "The input y_true must be binary." pred_seq = y_true.gather(1, torch.argsort(y_pred, dim=-1, descending=True))[:, :k] res_list = pred_seq.sum(dim=1) / k if ret_batch: return res_list else: return res_list.mean().item()
[docs]def recall( y_true: torch.Tensor, y_pred: torch.Tensor, k: Optional[int] = None, ret_batch: bool = False, ) -> Union[float, list]: r"""Calculate the Recall score for the retrieval task. Args: ``y_true`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``y_pred`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``k`` (``int``, optional): The specified top-k value. Defaults to :math:`N_{target}`. ``ret_batch`` (``bool``): Whether to return the raw score list. Defaults to ``False``. Examples: >>> import torch >>> import dhg.metrics as dm >>> y_true = torch.tensor([0, 1, 0, 0, 1, 1]) >>> y_pred = torch.tensor([0.8, 0.9, 0.6, 0.7, 0.4, 0.5]) >>> dm.retrieval.recall(y_true, y_pred, k=5) 0.6666666666666666 """ y_true, y_pred, k = _format_inputs(y_true, y_pred, k) assert y_true.max() == 1, "The input y_true must be binary." pred_seq = y_true.gather(1, torch.argsort(y_pred, dim=-1, descending=True))[:, :k] num_true = y_true.sum(dim=1) res_list = pred_seq.sum(dim=1) / num_true if ret_batch: return res_list else: return res_list.mean().item()
[docs]def ap( y_true: torch.Tensor, y_pred: torch.Tensor, k: Optional[int] = None, method: str = "pascal_voc", ) -> Union[float, list]: r"""Calculate the Average Precision (AP) for the retrieval task. Args: ``y_true`` (``torch.Tensor``): A 1-D tensor. Size :math:`(N_{target},)`. ``y_pred`` (``torch.Tensor``): A 1-D tensor. Size :math:`(N_{target},)`. ``k`` (``int``, optional): The specified top-k value. Defaults to :math:`N_{target}`. ``method`` (``str``): The method to compute the AP can be ``legacy`` or ``pascal_voc``. Defaults to ``pascal_voc``. Examples: >>> import torch >>> import dhg.metrics as dm >>> y_true = torch.tensor([True, False, True]) >>> y_pred = torch.tensor([0.2, 0.3, 0.5]) >>> dm.retrieval.ap(y_true, y_pred, method="legacy") 0.8333333730697632 """ assert method in ( "legacy", "pascal_voc", ), "The method must be either legacy or pascal_voc." assert ( y_true.shape == y_pred.shape ), "The shape of y_true and y_pred must be the same." assert y_true.dim() == 1, "The input y_true must be 1-D." assert y_pred.dim() == 1, "The input y_pred must be 1-D." y_true, y_pred = y_true.detach().float(), y_pred.detach().float() max_k = y_true.shape[0] k = min(k, max_k) if k is not None else max_k pred_seq = y_true[torch.argsort(y_pred, descending=True)] pred_index = torch.arange(1, len(y_true) + 1, device=y_true.device)[pred_seq > 0] recall_seq = torch.arange(1, len(pred_index) + 1, device=y_true.device) res = recall_seq / pred_index if method == "pascal_voc": res = torch.flip(res, dims=(0,)) res = torch.cummax(res, dim=0)[0] return res.mean().item()
[docs]def map( y_true: torch.LongTensor, y_pred: torch.LongTensor, k: Optional[int] = None, method: str = "pascal_voc", ret_batch: bool = False, ) -> Union[float, list]: r"""Calculate the mean Average Precision (mAP) for the retrieval task. Args: ``y_true`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``y_pred`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``k`` (``int``, optional): The specified top-k value. Default to :math:`N_{target}`. ``method`` (``str``): The specified method: ``legacy`` or ``pascal_voc``. Defaults to ``pascal_voc``. ``ret_batch`` (``bool``): Whether to return the raw score list. Defaults to ``False``. Examples: >>> import torch >>> import dhg.metrics as dm >>> y_true = torch.tensor([ [True, False, True, False, True], [False, False, False, True, True], [True, True, False, True, False], ]) >>> y_pred = torch.tensor([ [0.2, 0.3, 0.5, 0.4, 0.3], [0.8, 0.2, 0.3, 0.5, 0.4], [0.2, 0.4, 0.5, 0.2, 0.8], ]) >>> dm.retrieval.map(y_true, y_pred, method="legacy") 0.587037056684494 """ assert method in ( "legacy", "pascal_voc", ), "The method must be either legacy or pascal_voc." y_true, y_pred, k = _format_inputs(y_true, y_pred, k) res_list = [ap(y_true[i, :], y_pred[i, :], k) for i in range(y_true.shape[0])] if ret_batch: return res_list else: return np.mean(res_list)
def _dcg(matrix: torch.Tensor) -> torch.Tensor: r"""Calculate the Discounted Cumulative Gain (DCG). Args: ``sequence`` (``torch.Tensor``): A 2-D tensor. Size :math:`(N, K)` """ assert matrix.dim() == 2, "The input must be a 2-D tensor." n, k = matrix.shape denom = ( torch.log2(torch.arange(k, device=matrix.device) + 2.0).view(1, -1).repeat(n, 1) ) return (matrix / denom).sum(dim=-1)
[docs]def ndcg( y_true: torch.Tensor, y_pred: torch.Tensor, k: Optional[int] = None, ret_batch: bool = False, ) -> Union[float, list]: r"""Calculate the Normalized Discounted Cumulative Gain (NDCG) for the retrieval task. Args: ``y_true`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``y_pred`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``k`` (``int``, optional): The specified top-k value. Default to :math:`N_{target}`. ``ret_batch`` (``bool``): Whether to return the raw score list. Defaults to ``False``. Examples: >>> import torch >>> import dhg.metrics as dm >>> y_true = torch.tensor([10, 0, 0, 1, 5]) >>> y_pred = torch.tensor([.1, .2, .3, 4, 70]) >>> dm.retrieval.ndcg(y_true, y_pred) 0.695694088935852 >>> dm.retrieval.ndcg(y_true, y_pred, k=3) 0.4123818874359131 """ y_true, y_pred, k = _format_inputs(y_true, y_pred, k) pred_seq = y_true.gather(1, torch.argsort(y_pred, dim=-1, descending=True))[:, :k] ideal_seq = torch.sort(y_true, dim=-1, descending=True)[0][:, :k] pred_dcg = _dcg(pred_seq) ideal_dcg = _dcg(ideal_seq) res_list = pred_dcg / ideal_dcg res_list[torch.isinf(res_list)] = 0 if ret_batch: return res_list else: return res_list.mean().item()
[docs]def rr(y_true: torch.Tensor, y_pred: torch.Tensor, k: Optional[int] = None) -> float: r"""Calculate the Reciprocal Rank (RR) for the retrieval task. Args: ``y_true`` (``torch.Tensor``): A 1-D tensor. Size :math:`(N_{target},)``. ``y_pred`` (``torch.Tensor``): A 1-D tensor. Size :math:`(N_{target},)`. ``k`` (``int``, optional): The specified top-k value. Default to :math:`N_{target}`. Examples: >>> import torch >>> import dhg.metrics as dm >>> y_true = torch.tensor([False, True, False, True]) >>> y_pred = torch.tensor([0.2, 0.3, 0.5, 0.2]) >>> dm.retrieval.rr(y_true, y_pred) 0.375 >>> dm.retrieval.rr(y_true, y_pred, k=2) 0.5 """ assert y_true.shape == y_pred.shape assert y_true.dim() == 1, "The input y_true must be a 1-D tensor." assert y_pred.dim() == 1, "The input y_pred must be a 1-D tensor." y_true, y_pred = y_true.detach().float(), y_pred.detach().float() max_k = y_true.shape[0] k = min(k, max_k) if k is not None else max_k pred_seq = y_true[torch.argsort(y_pred, dim=-1, descending=True)][:k] pred_index = torch.nonzero(pred_seq).view(-1) res = (1 / (pred_index + 1)).mean() return res.mean().item()
[docs]def mrr( y_true: torch.Tensor, y_pred: torch.Tensor, k: Optional[int] = None, ret_batch: bool = False, ) -> Union[float, list]: r"""Calculate the mean Reciprocal Rank (MRR) for the retrieval task. Args: ``y_true`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``y_pred`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``k`` (``int``, optional): The specified top-k value. Default to :math:`N_{target}`. ``ret_batch`` (``bool``): Whether to return the raw score list. Defaults to ``False``. Examples: >>> import torch >>> import dhg.metrics as dm >>> y_true = torch.tensor([False, True, False, True]) >>> y_pred = torch.tensor([0.2, 0.3, 0.5, 0.2]) >>> dm.retrieval.mrr(y_true, y_pred) 0.375 >>> dm.retrieval.mrr(y_true, y_pred, k=2) 0.5 """ y_true, y_pred, k = _format_inputs(y_true, y_pred, k) res_list = [rr(y_true[i, :], y_pred[i, :], k) for i in range(y_true.shape[0])] if ret_batch: return res_list else: return np.mean(res_list)
def _pr_curve( y_true: torch.Tensor, y_pred: torch.Tensor, k: Optional[int] = None, method: str = "pascal_voc", n_points: int = 11, ) -> tuple: r"""Calculate the Precision-Recall Curve for the retrieval task. Args: ``y_true`` (``torch.Tensor``): A 1-D tensor. Size :math:`(N_{target},)`. ``y_pred`` (``torch.Tensor``): A 1-D tensor. Size :math:`(N_{target},)`. ``k`` (``int``, optional): The specified top-k value. Default to :math:`N_{target}`. ``method`` (``str``, optional): The method to compute the PR curve can be "legacy" or "pascal_voc". Default to "pascal_voc". ``n_points`` (``int``): The number of points to compute the PR curve. Default to ``11``. Examples: >>> import torch >>> import dhg.metrics as dm >>> y_true = torch.tensor([0, 1, 0, 1, 0, 0, 1, 0, 1, 0]) >>> y_pred = torch.tensor([0.23, 0.76, 0.01, 0.91, 0.13, 0.45, 0.12, 0.03, 0.38, 0.11]) >>> precision_coor, recall_coor = dm.retrieval.pr_curve(y_true, y_pred) >>> precision_coor [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.75, 0.75, 0.75, 0.5714285969734192] >>> recall_coor [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] """ assert method in ( "legacy", "pascal_voc", ), "The method must be either legacy or pascal_voc." assert ( y_true.shape == y_pred.shape ), "The shape of y_true and y_pred must be the same." assert y_true.dim() == 1, "The input y_true must be 1-D." assert y_pred.dim() == 1, "The input y_pred must be 1-D." y_true, y_pred = y_true.detach().float(), y_pred.detach().float() max_k = y_true.shape[0] k = min(k, max_k) if k is not None else max_k pred_seq = y_true[torch.argsort(y_pred, descending=True)] pred_index = torch.arange(1, len(y_true) + 1, device=y_true.device)[pred_seq > 0] recall_seq = torch.arange(1, len(pred_index) + 1, device=y_true.device) res = recall_seq / pred_index if method == "pascal_voc": res = torch.flip(res, dims=(0,)) res = torch.cummax(res, dim=0)[0] res = torch.flip(res, dims=(0,)) res = res.cpu().numpy() recall_coor = np.linspace(0, 1, n_points) recall_index = (recall_coor * (torch.sum(y_true).item() - 1)).astype(int) precision_coor = res[recall_index] return precision_coor.tolist(), recall_coor.tolist()
[docs]def pr_curve( y_true: torch.Tensor, y_pred: torch.Tensor, k: Optional[int] = None, method: str = "pascal_voc", n_points: int = 11, ret_batch: bool = False, ) -> tuple: r"""Calculate the Precision-Recall Curve for the retrieval task. Args: ``y_true`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``y_pred`` (``torch.Tensor``): A 1-D tensor or 2-D tensor. Size :math:`(N_{target},)` or :math:`(N_{samples}, N_{target})`. ``k`` (``int``, optional): The specified top-k value. Default to :math:`N_{target}`. ``method`` (``str``, optional): The method to compute the PR curve can be "legacy" or "pascal_voc". Default to "pascal_voc". ``n_points`` (``int``): The number of points to compute the PR curve. Default to ``11``. ``ret_batch`` (``bool``): Whether to return the raw score list. Defaults to ``False``. Examples: >>> import torch >>> import dhg.metrics as dm >>> y_true = torch.tensor([0, 1, 0, 1, 0, 0, 1, 0, 1, 0]) >>> y_pred = torch.tensor([0.23, 0.76, 0.01, 0.91, 0.13, 0.45, 0.12, 0.03, 0.38, 0.11]) >>> precision_coor, recall_coor = dm.retrieval.pr_curve(y_true, y_pred) >>> precision_coor [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.75, 0.75, 0.75, 0.5714285969734192] >>> recall_coor [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] """ assert method in ( "legacy", "pascal_voc", ), "The method must be either legacy or pascal_voc." y_true, y_pred, k = _format_inputs(y_true, y_pred, k) precision_coor_list, recall_coor_list = [], [] for i in range(y_true.shape[0]): precision_coor, recall_coor = _pr_curve( y_true[i, :], y_pred[i, :], k, method, n_points ) precision_coor_list.append(precision_coor) recall_coor_list.append(recall_coor) if ret_batch: return precision_coor_list, recall_coor_list else: precision_coor = np.mean(precision_coor_list, axis=0) recall_coor = np.mean(recall_coor_list, axis=0) return precision_coor.tolist(), recall_coor.tolist()