爱气象,爱气象家园! 

气象家园

 找回密码
 立即注册

QQ登录

只需一步,快速开始

搜索
查看: 91|回复: 3

尝试实现用典型相关分析CCA的典型因子做回归

[复制链接]
发表于 前天 22:07 | 显示全部楼层 |阅读模式

登录后查看更多精彩内容~

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
在工具的帮助下,实现奇异值分解SVD方法做CCA分析,包括典型相关系数显著行检验、回归预报等,用黄嘉佑老师的教材数据做了测试。工具给出的代码是基于python ,在MI环境下,有部分代码要修改:@ 是矩阵乘法,改成dot;eigvals = np.maximum(eigvals, 1e-12),编译不过,改为eigvals = np.array([max(v, 1e-12) for v in eigvals);coef_use = self.coef_[np.ix_(components, components)],np.ix_ 不可用,也进行了相应修改。另外要注意原始数据的标准化。代码如下:class CCA:
    '''
    基于SVD的典型相关分析(CCA)
    参数:
        n_components : int, 要保留的典型变量对数(默认为None,保留所有)
    '''
    def __init__(self, n_components=None):
        self.selected_components_ = None  # 新增,用于存储用户指定的默认索引
        self.n_components = n_components
        self.x_mean_ = None
        self.y_mean_ = None
        self.A_ = None          # X的典型系数 (p, n_components)
        self.B_ = None          # Y的典型系数 (q, n_components)
        self.cancorr_ = None    # 典型相关系数
        self.coef_ = None       # 回归系数,用于从U预测V
        self.n_samples_ = None  # 样本数
        self.p_ = None          # X变量数
        self.q_ = None          # Y变量数

    def fit(self, X, Y):
        '''
        拟合CCA模型
        X : array-like, shape (n_samples, p)
        Y : array-like, shape (n_samples, q)
        '''
        #X = np.asarray(X, dtype=float)
        #Y = np.asarray(Y, dtype=float)
        n, p = X.shape
        q = Y.shape[1
        self.n_samples_ = n
        self.p_ = p
        self.q_ = q
        
        # 中心化
        self.x_mean_ = X.mean(axis=0)
        self.y_mean_ = Y.mean(axis=0)
        Xc = X - self.x_mean_
        Yc = Y - self.y_mean_

        # 计算协方差矩阵
        #Cxx = Xc.T @ Xc / (n - 1)
        #Cyy = Yc.T @ Yc / (n - 1)
        #Cxy = Xc.T @ Yc / (n - 1)
        Cxx = dot(Xc.T,Xc)/ (n - 1)
        Cyy = dot(Yc.T,Yc)/ (n - 1)
        Cxy = dot(Xc.T,Yc)/ (n - 1)
        # 计算逆平方根(使用特征值分解以保证对称半正定)
        #print Cxx
        #print Cyy
        Cxx = Cxx*(n-1)/ (n)
        Cyy = Cyy*(n-1)/ (n)
        Cxy = Cxy*(n-1)/ (n)
        
        def inv_sqrtm(mat):
            eigvals, eigvecs = linalg.eig(mat)
            #print eigvals
            #eigvals = np.maximum(eigvals, 1e-12)  # 防止除零
            eigvals = np.where(eigvals < 1e-12, 1e-12, eigvals)
            return dot(dot(eigvecs,np.diag(1.0 / np.sqrt(eigvals))), eigvecs.T)

        Cxx_inv_sqrt = inv_sqrtm(Cxx)
        Cyy_inv_sqrt = inv_sqrtm(Cyy)

        # 构造K矩阵
        K = dot(dot(Cxx_inv_sqrt, Cxy) , Cyy_inv_sqrt)

        # 奇异值分解
        U, s, Vt = np.linalg.svd(K, full_matrices=False)
        V = Vt.T

        # 保留前n_components个
        if self.n_components is not None:
            U = U[:, :self.n_components
            s = s[:self.n_components
            V = V[:, :self.n_components

        self.cancorr_ = s
        self.A_ = dot(Cxx_inv_sqrt , U)
        self.B_ = dot(Cyy_inv_sqrt , V)

        # 计算典型变量
        U_scores = dot(Xc , self.A_)
        V_scores = dot(Yc , self.B_)

        # 回归系数:用U预测V(最小二乘)
        A = np.dot(U_scores.T, U_scores)   # 等价于 U_scores.T @ U_scores
        B = np.dot(U_scores.T, V_scores)   # 等价于 U_scores.T @ V_scores
        self.coef_ = np.dot(np.linalg.pinv(A), B)
        #self.coef_ = np.linalg.pinv(U_scores.T @ U_scores) @ U_scores.T @ V_scores
        
        return self

    def set_components(self, components):
        """
        设置要使用的典型变量索引(默认在 predict 中使用)
        components : list of int, 要保留的典型变量索引(0-based)
        """
        if components is not None:
            components = np.asarray(components)
            if components.max() >= len(self.cancorr_) or components.min() < 0:
                raise ValueError("索引超出范围")
        self.selected_components_ = components

    def transform(self, X, Y=None):
        '''
        将数据转换为典型变量空间
        如果Y提供,返回X和Y的典型变量;否则只返回X的典型变量
        '''
        X = np.asarray(X)
        Xc = X - self.x_mean_
        U = dot(Xc , self.A_)
        if Y is not None:
            Y = np.asarray(Y)
            Yc = Y - self.y_mean_
            V = dot(Yc , self.B_)
            return U, V
        
        return U

   
    def predict(self,X,Syy,components=None):
        """
        基于典型变量回归预测 Y
        参数:
        X : array-like, 预测因子
        components : list or None, 要使用的典型变量索引(0-based)
                    若为 None,则使用 self.selected_components_;若仍为 None,则使用所有
        返回:
        Y_pred : 预测的 Y
        """
        if components is None:
            components = self.selected_components_
        U = self.transform(X)
        
        if components is None:
            # 使用所有典型变量
            U_use = U
            coef_use = self.coef_
            B_use = self.B_
        else:
            # 确保 components 是列表(兼容单个整数输入)
            if isinstance(components, (int)):
                components = [components
            else:
                components = list(components)  # 确保可迭代
            print components
            U_use = U[:, components
            # 提取子矩阵:coef_ 的行和列均按 components 选取
            #print dot(self.coef_,self.coef_)
            coef_use = self.coef_[components[:, components  # 先选行,再选列
            #print coef_use
            A_use = self.A_[:, components
            B_use = self.B_[:, components
            
        B1 = dot(A_use,B_use.T)
        B2 = coef_use*dot(B1,Syy)

        Y_pred = dot(B2.T,X.T)
        #V_pred = dot(U_use , coef_use)
        #Y_pred = dot(V_pred , B_use.T)  + self.y_mean_

        return Y_pred


    def score(self, X, Y):
        """
        计算预测Y与实际Y的相关系数(按变量平均)
        """
        Y_pred = self.predict(X)
        corrs = [np.corrcoef(Y[:, i, Y_pred[:, i)[0, 1 for i in range(Y.shape[1)
        return np.mean(corrs)
        
    def significance_test(self):
        """
        Bartlett's 卡方检验,检验每个典型相关系数的显著性
        返回:
            p_values : array, 每个典型相关系数对应的p值
            chi2_stats : array, 卡方统计量
            df : array, 自由度
        """
        if self.cancorr_ is None:
            raise ValueError(u"模型尚未拟合,请先调用fit方法。")

        r = len(self.cancorr_)        
        n = self.n_samples_
        p = self.p_
        q = self.q_
        cancors = self.cancorr_
        #print cancors[0:]
        #print cancors[1:]
        p_values = zeros(r)
        chi2_stats = zeros(r)
        dfs = zeros(r, dtype='int')

        # 从第k个开始累积乘积
        for k in range(r):
            # 计算 Wilks' Lambda: 从第k个到最后一个的 (1 - rho_i^2) 乘积
            prod = np.prod(1 - cancors[k:**2)
            #print 1-cancors[k:]**2
            print prod
            # 防止log(0)
            if prod <= 0:
                prod = 1e-12
            # Bartlett's 卡方统计量
            
            chi2 = - (n - k-1 - 0.5*(p + q + 1)) * np.log(prod)
            #print chi2
            # 自由度
            df = (p - k) * (q - k)
            # 计算p值
            p_val = 1 - stats.chi2.cdf(chi2, df)

            chi2_stats[k = chi2
            dfs[k = df
            p_values[k = p_val

        return p_values, chi2_stats, dfs

# 准备数据# 准备数据       fn = 'temp_bj.csv'
table = readtable(fn, delimiter=',', format='%i%6f')
data = table['x1'
X = zeros([26,3)
X[:,0 = data
data = table['x2'
X[:,1 = data
data = table['x3'
X[:,2 = data

Y = zeros([26,3)
data = table['temp12'
Y[:,0 = data
data = table['temp1'
Y[:,1 = data
data = table['temp2'
Y[:,2 = data

Xc = X-X.mean(axis=0)
Yc = Y-Y.mean(axis=0)
Cxx = dot(Xc.T,Xc)/n
Cyy = dot(Yz.T,Yz)/n
cca.fit(Xc,Yc)
# 进行显著性检验
p_vals, _, _ = cca.significance_test()
print(u"p值:", p_vals)

# 选择显著水平下的典型变量(如 p < 0.05)
significant = np.where(p_vals < 0.05)[0
print(u"显著的典型变量索引:", significant)

# 可以在 predict 中临时指定
Cyy = dot(Yc.T,Yc)/n
Y_pred1 = cca.predict(Xz, Cyy,components=[0)  # 只使用前一个
print Y_pred1



密码修改失败请联系微信:mofangbao
发表于 昨天 07:14 | 显示全部楼层
密码修改失败请联系微信:mofangbao
回复

使用道具 举报

发表于 昨天 09:43 | 显示全部楼层
非常好的尝试, 能不能把用到的数据文件传上来,方便测试
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

 楼主| 发表于 昨天 22:11 | 显示全部楼层
教材的数据。

temp_bj.csv

768 Bytes, 下载次数: 0, 下载积分: 金钱 -5

密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Copyright ©2011-2014 bbs.06climate.com All Rights Reserved.  Powered by Discuz! (京ICP-10201084)

本站信息均由会员发表,不代表气象家园立场,禁止在本站发表与国家法律相抵触言论

快速回复 返回顶部 返回列表