欢迎来到代码驿站!

Python代码

当前位置:首页 > 软件编程 > Python代码

Python自定义指标聚类实例代码

时间:2022-07-02 10:31:30|栏目:Python代码|点击:

前言

最近在研究 Yolov2 论文的时候,发现作者在做先验框聚类使用的指标并非欧式距离,而是IOU。在找了很多资料之后,基本确定 Python 没有自定义指标聚类的函数,所以打算自己做一个

设训练集的 shape 是 [n_sample, n_feature],基本思路是:

  • 簇中心初始化:第 1 个簇中心取样本的特征均值,shape = [n_feature, ];从第 2 个簇中心开始,用距离函数 (自定义) 计算每个样本到最近中心点的距离,归一化后作为选取下一个簇中心的概率 —— 迭代到选取到足够的簇中心为止
  • 簇中心调整:训练多轮,每一轮以样本点到最近中心点的距离之和作为 loss,梯度下降法 + Adam 优化器逼近最优解,在 loss 浮动值小于阈值的次数达到一定值时停止训练

因为设计之初就打算使用自定义距离函数,所以求导是很大的难题。笔者不才,最终决定借助 PyTorch 自动求导的天然优势

先给出欧式距离的计算函数

def Eu_dist(data, center):
    """ 以 欧氏距离 为聚类准则的距离计算函数
        data: 形如 [n_sample, n_feature] 的 tensor
        center: 形如 [n_cluster, n_feature] 的 tensor"""
    data = data.unsqueeze(1)
    center = center.unsqueeze(0)
    dist = ((data - center) ** 2).sum(dim=2)
    return dist

然后就是聚类器的代码:使用时只需关注 __init__、fit、classify 函数

import torch
import numpy as np
import matplotlib.pyplot as plt
Adam = torch.optim.Adam
 
def get_progress(current, target, bar_len=30):
    """ current: 当前完成任务数
        target: 任务总数
        bar_len: 进度条长度
        return: 进度条字符串"""
    assert current <= target
    percent = round(current / target * 100, 1)
    unit = 100 / bar_len
    solid = int(percent / unit)
    hollow = bar_len - solid
    return "■" * solid + "□" * hollow + f" {current}/{target}({percent}%)"
 
 
class Cluster:
    """ 聚类器
        n_cluster: 簇中心数
        dist_fun: 距离计算函数
            kwargs:
                data: 形如 [n_sample, n_feather] 的 tensor
                center: 形如 [n_cluster, n_feature] 的 tensor
            return: 形如 [n_sample, n_cluster] 的 tensor
        init: 初始簇中心
        max_iter: 最大迭代轮数
        lr: 中心点坐标学习率
        stop_thresh: 停止训练的loss浮动阈值
        cluster_centers_: 聚类中心
        labels_: 聚类结果"""
 
    def __init__(self, n_cluster, dist_fun, init=None, max_iter=300, lr=0.08, stop_thresh=1e-4):
        self._n_cluster = n_cluster
        self._dist_fun = dist_fun
        self._max_iter = max_iter
        self._lr = lr
        self._stop_thresh = stop_thresh
        # 初始化参数
        self.cluster_centers_ = None if init is None else torch.FloatTensor(init)
        self.labels_ = None
        self._bar_len = 20
 
    def fit(self, data):
        """ data: 形如 [n_sample, n_feature] 的 tensor
            return: loss浮动日志"""
        if self.cluster_centers_ is None:
            self._init_cluster(data, self._max_iter // 5)
        log = self._train(data, self._max_iter, self._lr)
        # 开始若干轮次的训练,得到loss浮动日志
        return log
 
    def classify(self, data, show=False):
        """ data: 形如 [n_sample, n_feature] 的 tensor
            show: 绘制分类结果
            return: 分类标签"""
        dist = self._dist_fun(data, self.cluster_centers_)
        self.labels_ = dist.argmin(axis=1)
        # 将标签加载到实例属性
        if show:
            for idx in range(self._n_cluster):
                container = data[self.labels_ == idx]
                plt.scatter(container[:, 0], container[:, 1], alpha=0.7)
            plt.scatter(self.cluster_centers_[:, 0], self.cluster_centers_[:, 1], c="gold", marker="p", s=50)
            plt.show()
        return self.labels_
 
    def _init_cluster(self, data, epochs):
        self.cluster_centers_ = data.mean(dim=0).reshape(1, -1)
        for idx in range(1, self._n_cluster):
            dist = np.array(self._dist_fun(data, self.cluster_centers_).min(dim=1)[0])
            new_cluster = data[np.random.choice(range(data.shape[0]), p=dist / dist.sum())].reshape(1, -1)
            # 取新的中心点
            self.cluster_centers_ = torch.cat([self.cluster_centers_, new_cluster], dim=0)
            progress = get_progress(idx, self._n_cluster, bar_len=self._n_cluster if self._n_cluster <= self._bar_len else self._bar_len)
            print(f"\rCluster Init: {progress}", end="")
            self._train(data, epochs, self._lr * 2.5, init=True)
            # 初始化簇中心时使用较大的lr
 
    def _train(self, data, epochs, lr, init=False):
        center = self.cluster_centers_.cuda()
        center.requires_grad = True
        data = data.cuda()
        optimizer = Adam([center], lr=lr)
        # 将中心数据加载到 GPU 上
        init_patience = int(epochs ** 0.5)
        patience = init_patience
        update_log = []
        min_loss = np.inf
        for epoch in range(epochs):
            # 对样本分类并更新中心点
            sample_dist = self._dist_fun(data, center).min(dim=1)
            self.labels_ = sample_dist[1]
            loss = sum([sample_dist[0][self.labels_ == idx].mean() for idx in range(len(center))])
            # loss 函数: 所有样本到中心点的最小距离和 - 中心点间的最小间隔
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            # 反向传播梯度更新中心点
            loss = loss.item()
            progress = min_loss - loss
            update_log.append(progress)
            if progress > 0:
                self.cluster_centers_ = center.cpu().detach()
                min_loss = loss
                # 脱离计算图后记录中心点
            if progress < self._stop_thresh:
                patience -= 1
                # 耐心值减少
                if patience < 0:
                    break
                    # 耐心值归零时退出
            else:
                patience = init_patience
                # 恢复耐心值
            progress = get_progress(init_patience - patience, init_patience, bar_len=self._bar_len)
            if not init:
                print(f"\rCluster: {progress}\titer: {epoch + 1}", end="")
        if not init:
            print("")
        return torch.FloatTensor(update_log)

与KMeans++比较

KMeans++ 是以欧式距离为聚类准则的经典聚类算法。在 iris 数据集上,KMeans++ 远远快于我的聚类器。但在我反复对比测试的几轮里,我的聚类器精度也是不差的 —— 可以看到下图里的聚类结果完全一致

  KMeans++ My Cluster
Cost 145 ms 1597 ms
Center

[[5.9016, 2.7484, 4.3935, 1.4339],

[5.0060, 3.4280, 1.4620, 0.2460],
[6.8500, 3.0737, 5.7421, 2.0711]]

[[5.9016, 2.7485, 4.3934, 1.4338],
[5.0063, 3.4284, 1.4617, 0.2463],
[6.8500, 3.0741, 5.7420, 2.0714]]

虽然速度方面与老牌算法对比的确不行,但是我的这个聚类器最大的亮点还是自定义距离函数

Yolo 检测框聚类

本来想用 Yolov4 检测框聚类引入的 CIoU 做聚类,但是没法解决梯度弥散的问题,所以退其次用了 DIoU

def DIoU_dist(boxes, anchor):
    """ 以 DIoU 为聚类准则的距离计算函数
        boxes: 形如 [n_sample, 2] 的 tensor
        anchor: 形如 [n_cluster, 2] 的 tensor"""
    n_sample = boxes.shape[0]
    n_cluster = anchor.shape[0]
    dist = Eu_dist(boxes, anchor)
    # 计算欧式距离
    union_inter = torch.prod(boxes, dim=1).reshape(-1, 1) + torch.prod(anchor, dim=1).reshape(1, -1)
    boxes = boxes.unsqueeze(1).repeat(1, n_cluster, 1)
    anchor = anchor.unsqueeze(0).repeat(n_sample, 1, 1)
    compare = torch.stack([boxes, anchor], dim=2)
    # 组合检测框与 anchor 的信息
    diag = torch.sum(compare.max(dim=2)[0] ** 2, dim=2)
    dist /= diag
    # 计算外接矩形的对角线长度
    inter = torch.prod(compare.min(dim=2)[0], dim=2)
    iou = inter / (union_inter - inter)
    # 计算 IoU
    dist += 1 - iou
    return dist

我提取了 DroneVehicle 数据集的 650156 个预测框的尺寸做聚类,在这个过程中发现因为小尺寸的预测框过多,导致聚类中心聚集在原点附近。所以对 loss 函数做了改进:先分类,再计算每个分类下的最大距离之和

横轴表示检测框的宽度,纵轴表示检测框的高度,其数值都是相对于原图尺寸的比例。若原图尺寸为 608 * 608,则得到的 9 个先验框为:

[ 2,  3 ] [ 9,  13 ] [ 19,  35 ]
[ 10,  76 ] [ 60,  14 ] [ 25,  134 ]
[ 167,  25 ] [ 115,  54 ] [ 70, 176 ]

总结

上一篇:python cx_Oracle模块的安装和使用详细介绍

栏    目:Python代码

下一篇:python批处理将图片进行放大实例代码

本文标题:Python自定义指标聚类实例代码

本文地址:http://www.codeinn.net/misctech/206591.html

推荐教程

广告投放 | 联系我们 | 版权申明

重要申明:本站所有的文章、图片、评论等,均由网友发表或上传并维护或收集自网络,属个人行为,与本站立场无关。

如果侵犯了您的权利,请与我们联系,我们将在24小时内进行处理、任何非本站因素导致的法律后果,本站均不负任何责任。

联系QQ:914707363 | 邮箱:codeinn#126.com(#换成@)

Copyright © 2020 代码驿站 版权所有