Source code for skcmeans.algorithms

"""Implementations of a number of C-means algorithms.

References
----------
.. [1] J. C. Bezdek, J. Keller, R. Krisnapuram, and N. R. Pal, Fuzzy models
   and algorithms for pattern recognition and image processing. Kluwer Academic
   Publishers, 2005.

"""
import numpy as np
from scipy.spatial.distance import cdist

from .initialization import initialize_random, initialize_probabilistic


[docs]class CMeans: """Base class for C-means algorithms. Parameters ---------- n_clusters : int, optional The number of clusters to find. n_init : int, optional The number of times to attempt convergence with new initial centroids. max_iter : int, optional The number of cycles of the alternating optimization routine to run for *each* convergence. tol : float, optional The stopping condition. Convergence is considered to have been reached when the objective function changes less than `tol`. verbosity : int, optional The verbosity of the instance. May be 0, 1, or 2. .. note:: Very much not yet implemented. random_state : :obj:`int` or :obj:`np.random.RandomState`, optional The generator used for initialization. Using an integer fixes the seed. eps : float, optional To avoid numerical errors, zeros are sometimes replaced with a very small number, specified here. Attributes ---------- metric : :obj:`string` or :obj:`function` The distance metric used. May be any of the strings specified for :obj:`cdist`, or a user-specified function. initialization : function The method used to initialize the cluster centers. centers : :obj:`np.ndarray` (n_clusters, n_features) The derived or supplied cluster centers. memberships : :obj:`np.ndarray` (n_samples, n_clusters) The derived or supplied cluster memberships. """ metric = 'euclidean' initialization = staticmethod(initialize_random) def __init__(self, n_clusters=2, n_init=10, max_iter=300, tol=1e-4, verbosity=0, random_state=None, eps=1e-18, **kwargs): self.n_clusters = n_clusters self.n_init = n_init self.max_iter = max_iter self.tol = tol self.verbosity = verbosity self.random_state = random_state self.eps = eps self.params = kwargs self.centers = None self.memberships = None
[docs] def distances(self, x): """Calculates the distance between data x and the centers. The distance, by default, is calculated according to `metric`, but this method should be overridden by subclasses if required. Parameters ---------- x : :obj:`np.ndarray` (n_samples, n_features) The original data. Returns ------- :obj:`np.ndarray` (n_samples, n_clusters) Each entry (i, j) is the distance between sample i and cluster center j. """ return cdist(x, self.centers, metric=self.metric)
[docs] def calculate_memberships(self, x): raise NotImplementedError( "`calculate_memberships` should be implemented by subclasses.")
[docs] def calculate_centers(self, x): raise NotImplementedError( "`calculate_centers` should be implemented by subclasses.")
[docs] def objective(self, x): raise NotImplementedError( "`objective` should be implemented by subclasses.")
[docs] def fit(self, x): """Optimizes cluster centers by restarting convergence several times. Parameters ---------- x : :obj:`np.ndarray` (n_samples, n_features) The original data. """ objective_best = np.infty memberships_best = None centers_best = None j_list = [] for i in range(self.n_init): self.centers = None self.memberships = None self.converge(x) objective = self.objective(x) j_list.append(objective) if objective < objective_best: memberships_best = self.memberships.copy() centers_best = self.centers.copy() objective_best = objective self.memberships = memberships_best self.centers = centers_best return j_list
[docs] def converge(self, x): """Finds cluster centers through an alternating optimization routine. Terminates when either the number of cycles reaches `max_iter` or the objective function changes by less than `tol`. Parameters ---------- x : :obj:`np.ndarray` (n_samples, n_features) The original data. """ centers = [] j_new = np.infty for i in range(self.max_iter): j_old = j_new self.update(x) centers.append(self.centers) j_new = self.objective(x) if np.abs(j_old - j_new) < self.tol: break return np.array(centers)
[docs] def update(self, x): """Updates cluster memberships and centers in a single cycle. If the cluster centers have not already been initialized, they are chosen according to `initialization`. Parameters ---------- x : :obj:`np.ndarray` (n_samples, n_features) The original data. """ self.initialize(x) self.memberships = self.calculate_memberships(x) self.centers = self.calculate_centers(x)
[docs] def initialize(self, x): if self.centers is None and self.memberships is None: self.memberships, self.centers = \ self.initialization(x, self.n_clusters, self.random_state) elif self.memberships is None: self.memberships = \ self.initialization(x, self.n_clusters, self.random_state)[0] elif self.centers is None: self.centers = \ self.initialization(x, self.n_clusters, self.random_state)[1]
[docs]class Hard(CMeans): """Hard C-means, equivalent to K-means clustering. Methods ------- calculate_memberships(x) The membership of a sample is 1 to the closest cluster and 0 otherwise. calculate_centers(x) New centers are calculated as the mean of the points closest to them. objective(x) Interpretable as the data's rotational inertia about the cluster centers. To be minimised. """
[docs] def calculate_memberships(self, x): distances = self.distances(x) return (np.arange(distances.shape[1])[:, np.newaxis] == np.argmin( distances, axis=1)).T
[docs] def calculate_centers(self, x): return np.dot(self.memberships.T, x) / \ np.sum(self.memberships, axis=0)[..., np.newaxis]
[docs] def objective(self, x): if self.memberships is None or self.centers is None: return np.infty distances = self.distances(x) return np.sum(self.memberships * distances)
[docs]class Fuzzy(CMeans): """Base class for fuzzy C-means clusters. Attributes ---------- m : float Fuzziness parameter. Higher values reduce the rate of drop-off from full membership to zero membership. Methods ------- fuzzifier(memberships) Fuzzification operator. By default, for memberships $u$ this is $u^m$. objective(x) Interpretable as the data's weighted rotational inertia about the cluster centers. To be minimised. """ m = 2
[docs] def fuzzifier(self, memberships): return np.power(memberships, self.m)
[docs] def objective(self, x): if self.memberships is None or self.centers is None: return np.infty distances = self.distances(x) return np.sum(self.fuzzifier(self.memberships) * distances)
[docs]class Probabilistic(Fuzzy): """Probabilistic C-means. In the probabilistic algorithm, sample points have total membership of unity, distributed equally among each of the centers. This tends to push cluster centers away from each other. Methods ------- calculate_memberships(x) Memberships are calculated from the distance :math:`d_{ij}` between the sample :math:`j` and the cluster center :math:`i`. .. math:: u_{ik} = \left(\sum_j \left( \\frac{d_{ik}}{d_{jk}} \\right)^{\\frac{2}{m - 1}} \\right)^{-1} calculate_centers(x) New centers are calculated as the mean of the points closest to them, weighted by the fuzzified memberships. .. math:: c_i = \left. \sum_k u_{ik}^m x_k \middle/ \sum_k u_{ik} \\right. """
[docs] def calculate_memberships(self, x): distances = self.distances(x) distances[distances == 0.] = 1e-18 return np.sum(np.power( np.divide(distances[:, :, np.newaxis], distances[:, np.newaxis, :]), 2 / (self.m - 1)), axis=2) ** -1
[docs] def calculate_centers(self, x): return np.dot(self.fuzzifier(self.memberships).T, x) / \ np.sum(self.fuzzifier(self.memberships).T, axis=1)[..., np.newaxis]
[docs]class Possibilistic(Fuzzy): """Possibilistic C-means. In the possibilistic algorithm, sample points are assigned memberships according to their relative proximity to the centers. This is controlled through a weighting to the cluster centers, approximately the variance of each cluster. Methods ------- calculate_memberships(x) Memberships are calculated from the distance :math:`d_{ij}` between the sample :math:`j` and the cluster center :math:`i`, and the weighting :math:`w_i` of each center. .. math:: u_{ik} = \left(1 + \left(\\frac{d_{ik}}{w_i}\\right)^\\frac{1}{m -1} \\right)^{-1} calculate_centers(x) New centers are calculated as the mean of the points closest to them, weighted by the fuzzified memberships. .. math:: c_i = \left. \sum_k u_{ik}^m x_k \middle/ \sum_k u_{ik} \\right. """ initialization = staticmethod(initialize_probabilistic) _weights = None
[docs] def weights(self, x): if self._weights is None: distances = self.distances(x) memberships = self.memberships self._weights = np.sum(self.fuzzifier(memberships) * distances, axis=0) / np.sum(self.fuzzifier(memberships), axis=0) return self._weights
[docs] def calculate_memberships(self, x): distances = self.distances(x) return (1. + (distances / self.weights(x)) ** ( 1. / (self.m - 1))) ** -1.
[docs] def calculate_centers(self, x): return np.divide(np.dot(self.fuzzifier(self.memberships).T, x), np.sum(self.fuzzifier(self.memberships), axis=0)[ ..., np.newaxis])
[docs]class GustafsonKesselMixin(Fuzzy): """Gives clusters ellipsoidal character. The Gustafson-Kessel algorithm redefines the distance measurement such that clusters may adopt ellipsoidal shapes. This is achieved through updates to a covariance matrix assigned to each cluster center. Examples -------- Create a algorithm for probabilistic clustering with ellipsoidal clusters: >>> class ProbabilisticGustafsonKessel(GustafsonKesselMixin, Probabilistic): >>> pass >>> pgk = ProbabilisticGustafsonKessel() >>> pgk.fit(x) """ covariance = None
[docs] def fit(self, x): """Optimizes cluster centers by restarting convergence several times. Extends the default behaviour by recalculating the covariance matrix with resultant memberships and centers. Parameters ---------- x : :obj:`np.ndarray` (n_samples, n_features) The original data. """ j_list = super(GustafsonKesselMixin, self).fit(x) self.covariance = self.calculate_covariance(x) return j_list
[docs] def update(self, x): """Single update of the cluster algorithm. Extends the default behaviour by including a covariance calculation after updating the centers Parameters ---------- x : :obj:`np.ndarray` (n_samples, n_features) The original data. """ self.initialize(x) self.centers = self.calculate_centers(x) self.covariance = self.calculate_covariance(x) self.memberships = self.calculate_memberships(x)
[docs] def distances(self, x): covariance = self.covariance if self.covariance is not None \ else self.calculate_covariance(x) d = x - self.centers[:, np.newaxis] left_multiplier = \ np.einsum('...ij,...jk', d, np.linalg.inv(covariance)) return np.sum(left_multiplier * d, axis=2).T
[docs] def calculate_covariance(self, x): """Calculates the covariance of the data `u` with cluster centers `v`. Parameters ---------- x : :obj:`np.ndarray` (n_samples, n_features) The original data. Returns ------- :obj:`np.ndarray` (n_clusters, n_features, n_features) The covariance matrix of each cluster. """ v = self.centers if v is None: return None q, p = v.shape if self.memberships is None: # If no memberships have been calculated assume n-spherical clusters return (np.eye(p)[..., np.newaxis] * np.ones((p, q))).T q, p = v.shape vector_difference = x - v[:, np.newaxis] fuzzy_memberships = self.fuzzifier(self.memberships) right_multiplier = \ np.einsum('...i,...j->...ij', vector_difference, vector_difference) einstein_sum = \ np.einsum('i...,...ijk', fuzzy_memberships, right_multiplier) / \ np.sum(fuzzy_memberships, axis=0)[..., np.newaxis, np.newaxis] return np.nan_to_num( einstein_sum / (np.linalg.det(einstein_sum) ** (1 / q))[ ..., np.newaxis, np.newaxis])