Source code for hi_lasso_spark.Hi_LASSO_spark

# Author: Seungha Jeong <jsh29368602@gmail.com>
# License: MIT
# Date: 30, Jun 2020

# Import package
import numpy as np
import math
from scipy.stats import binom, norm
import glmnet
import os
import binascii

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext


"""
    • SparkConf() : Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
                    Most of the time, you would create a SparkConf object with new SparkConf(), which will load values from any spark.
                    For example, you can write new SparkConf().setMaster("local").setAppName("My app").
    • getOrCreate() : This function may be used to get or instantiate a SparkContext and register it as a singleton object.
"""
conf = SparkConf().setMaster("local[*]").setAppName("Hi_LASSO_Spark")
sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)


[docs]class HiLASSO_Spark(): """ Hi-LASSO_Spark(High-Demensinal LASSO Spark) is to improve the LASSO solutions for extremely high-dimensional data using pyspark. PySpark is the Python API written in python to support Apache Spark. Apache Spark is a distributed framework that can handle Big Data analysis. Spark is basically a computational engine, that works with huge sets of data by processing them in parallel and batch systems. The main contributions of Hi-LASSO are as following: • Rectifying systematic bias introduced by bootstrapping. • Refining the computation for importance scores. • Providing a statistical strategy to determine the number of bootstrapping. • Taking advantage of global oracle property. • Allowing tests of significance for feature selection with appropriate distribution. Parameters ---------- X: array-like of shape (n_sample, n_feature) predictor variables y: array-like of shape (n_sample,) response variables q1: 'auto' or int, optional [default = 'auto'] The number of predictors to randomly selecting in Procedure 1. When to set 'auto', use q1 as number of samples. q2: 'auto' or int, optional [default = 'auto'] The number of predictors to randomly selecting in Procedure 2. When to set 'auto', use q2 as number of samples. L: int [default=30] The expected value at least how many times a predictor is selected in a bootstrapping. alpha: float [default=0.05] significance level used for significance test for feature selection node: Node refers to node which runs the application code in the cluster. If you do not specify the number of nodes, the 8 nodes are automatically set to the default node. Attributes ---------- n : int number of samples. p : int number of features. Examples -------- >>> from Hi_LASSO_spark_fin import HiLASSO_Spark >>> model = HiLASSO_Spark(X, y) >>> model.fit() >>> model.coef_ >>> model.p_values_ >>> model.selected_var_ """ def __init__(self, X, y, X_test = 'auto', y_test = 'auto', alpha = 0.05, q1 = 'auto', q2 = 'auto', L = 30, cv = 5, node = 'auto', logistic = False): self.X = np.array(X) self.y = np.array(y).flatten() self.X_test = np.array(X_test) self.y_test = np.array(y_test).flatten() self.n_sample, self.n_feature = X.shape self.q1 = self.X.shape[0] if q1 == 'auto' else q1 self.q2 = self.X.shape[0] if q2 == 'auto' else q2 self.alpha = alpha self.cv = cv self.L = L self.node = 8 if node == 'auto' else node self.logistic = logistic
[docs] def standardization(self, X, y): """ The response is mean-corrected and the predictors are standardized Parameters --------- X: array-like of shape (n_sample, n_feature) predictor y: array-like of shape (n_sample,) response Returns ------- np.ndarray scaled_X, scaled_y, std """ mean_x = X - X.mean() std = np.sqrt((mean_x**2).sum(axis=0)) X_sc = mean_x / std y_sc = y - y.mean() return X_sc, y_sc, std
[docs] def fit(self): """Fit the model with Procedure 1 and Procedure 2. Procedure 1: Compute importance scores for predictors. Procedure 2: Compute coefficients and Select variables. Parallel processing of Spark One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Attributes ---------- sc.parallelize(): method The sc.parallelize() method is the SparkContext's parallelize method to create a parallelized collection. This allows Spark to distribute the data across multiple nodes, instead of depending on a single node to process the data. map(): method A map is a transformation operation in Apache Spark. It applies to each element of RDD and it returns the result as new RDD. In the Map, operation developer can define his own custom business logic. The same logic will be applied to all the elements of RDD. Procedure_1_coef_ : array Estimated coefficients by Elastic_net. Procedure_2_coef_ : array Estimated coefficients by Adaptive_LASSO. coef_ : array Estimated coefficients by Hi-LASSO. p_values_ : array P-values of each coefficients. selected_var_: array Selected variables by significance test. Returns ------- self : object """ # Procedure 1: Compute coefficients and importance scores for predictors. # Perform parallel processing by mapping the number of bootstraps. self.B = math.floor(self.L * self.n_feature / self.q1) Elastic_Estimate = self.Estimate_coefficient_Elastic Procedure_1_coef = sc.parallelize(range(self.B), self.node).map(lambda x: Elastic_Estimate(x)).collect() Procedure_1_coef_ = np.array(list(Procedure_1_coef)).T # Compute the importance scores Importance_score = np.nanmean(np.abs(Procedure_1_coef_), axis=1) Importance_score_2 = np.where(Importance_score == 0, 1e-10, Importance_score) self.Importance_score_fin = Importance_score_2 / Importance_score_2.sum() print('Procedure_1') # Procedure 2: Compute coefficients and Select variables. # Perform parallel processing by mapping the number of bootstraps. if self.logistic: self.B = math.floor(self.L * self.n_feature / self.q2) Adaptive_Estimate = self.Estimate_coefficient_Adaptive_logistic Procedure_2_coef = sc.parallelize(range(self.B), self.node).map(lambda x: Adaptive_Estimate(x)).collect() Procedure_2_coef_ = np.array(list(Procedure_2_coef)).T #Estimate Final Coefficient and Select variables. coef = np.nanmean(Procedure_2_coef_, axis=1) self.aaa = coef y = np.zeros_like(coef) y[coef>0.5] = 1 self.finn = y print('Procedure_2') else: self.B = math.floor(self.L * self.n_feature / self.q2) Adaptive_Estimate = self.Estimate_coefficient_Adaptive Procedure_2_coef = sc.parallelize(range(self.B), self.node).map(lambda x: Adaptive_Estimate(x)).collect() Procedure_2_coef_ = np.array(list(Procedure_2_coef)).T #Estimate Final Coefficient and Select variables. coef = np.nanmean(Procedure_2_coef_, axis=1) self.coef_ = coef self.p_values = self.Calculate_p_value(Procedure_2_coef_) self.selected_var = np.where(self.p_values < self.alpha / self.n_feature, np.nanmean(Procedure_2_coef_, axis=1), 0) print('Procedure_2') return self
[docs] def Estimate_coefficient_Elastic(self, value): """ Estimation of coefficients for each bootstrap sample using Elastic_net Returns ------- coef_result : coefficient for Elastic_Net """ # Set random seed as each bootstrap_number. seed = None seed = (seed if seed is not None else int(binascii.hexlify(os.urandom(4)), 16)) rs = np.random.RandomState(seed) # Randomly select q1 on each bootstrap sample # Generate bootstrap index of sample and predictor q = self.q1 select_prop = None coef_result = np.zeros(self.n_feature) Selected_q = rs.choice(np.arange(self.n_feature), size = self.q1, replace=False, p = select_prop) Bootstrap_Index = rs.choice(np.arange(self.n_sample), size = self.n_sample, replace=True, p = None) X_train_Data = self.X[Bootstrap_Index, :][:, Selected_q] y_train_Data = self.y[Bootstrap_Index] # Search for otpimal alpha mses = np.array([]) accuracys = np.array([]) alphas = np.arange(0, 1.1, 0.1) # Estimate coefficients if self.logistic: for j in alphas: cv_enet = glmnet.LogitNet(standardize=True, fit_intercept=True, n_splits=self.cv, scoring='accuracy', alpha=j).fit(X_train_Data, y_train_Data) accuracys = np.append(accuracys, cv_enet.cv_mean_score_.max()) opt_alpha = alphas[accuracys.argmax()] enet_fin = glmnet.LogitNet(standardize=True, fit_intercept=True, n_splits=self.cv, scoring='accuracy', alpha=opt_alpha) coef_result[Selected_q] = (enet_fin.fit(X_train_Data, y_train_Data).coef_) else: # Standardization X_train_Data, y_train_Data, std = self.standardization(X_train_Data, y_train_Data) for j in alphas: cv_enet = glmnet.ElasticNet(standardize=False, fit_intercept=False, n_splits=self.cv, scoring='mean_squared_error', alpha=j).fit(X_train_Data, y_train_Data) mses = np.append(mses, cv_enet.cv_mean_score_.max()) opt_alpha = alphas[mses.argmax()] enet_fin = glmnet.ElasticNet(standardize=False, fit_intercept=False, n_splits=self.cv, scoring='mean_squared_error', alpha=opt_alpha) coef_result[Selected_q] = (enet_fin.fit(X_train_Data, y_train_Data).coef_) / std return coef_result
[docs] def Estimate_coefficient_Adaptive(self, value): """ Estimation of coefficients for each bootstrap sample using Adaptive_LASSO Returns ------- coef_result : coefficient for Adaprive_LASSO """ # Set random seed as each bootstrap_number. seed = None seed = (seed if seed is not None else int(binascii.hexlify(os.urandom(4)), 16)) rs = np.random.RandomState(seed) # Randomly select q2 on each bootstrap sample. # Generate bootstrap index of sample and predictor. q = self.q2 select_prop = self.Importance_score_fin coef_result = np.zeros(self.n_feature) Selected_q = rs.choice(np.arange(self.n_feature), size = self.q1, replace=False, p = select_prop) Bootstrap_Index = rs.choice(np.arange(self.n_sample), size = self.n_sample, replace=True, p = None) X_train_Data = self.X[Bootstrap_Index, :][:, Selected_q] y_train_Data = self.y[Bootstrap_Index] # Standardization X_train_Data, y_train_Data, std = self.standardization(X_train_Data, y_train_Data) ad_fin = glmnet.ElasticNet(fit_intercept = False, standardize = False, n_splits = self.cv, scoring='mean_squared_error', alpha = 1) coef_result[Selected_q] = (ad_fin.fit(X_train_Data, y_train_Data, relative_penalties = 1 / (select_prop[Selected_q] * 100)).coef_) / std return coef_result
[docs] def Estimate_coefficient_Adaptive_logistic(self, value): """ Estimation of coefficients for each bootstrap sample using Adaptive_LASSO Returns ------- coef_result : coefficient for Adaprive_LASSO """ # Set random seed as each bootstrap_number. seed = None seed = (seed if seed is not None else int(binascii.hexlify(os.urandom(4)), 16)) rs = np.random.RandomState(seed) # Randomly select q2 on each bootstrap sample. # Generate bootstrap index of sample and predictor. q = self.q2 select_prop = self.Importance_score_fin coef_result = np.zeros(self.n_feature) Selected_q = rs.choice(np.arange(self.n_feature), size = self.q2, replace=False, p = select_prop) Bootstrap_Index = rs.choice(np.arange(self.n_sample), size = self.n_sample, replace=True, p = None) X_train_Data = self.X[Bootstrap_Index, :][:, Selected_q] y_train_Data = self.y[Bootstrap_Index] X_test = self.X_test[: , :][:, Selected_q] ad_fin = glmnet.LogitNet(standardize=True, fit_intercept=True, n_splits = self.cv, alpha = 1) coef_result = (ad_fin.fit(X_train_Data, y_train_Data, relative_penalties = 1 / (select_prop[Selected_q] * 100)).predict(X_test)) return coef_result
[docs] def Calculate_p_value(self, coef_result): """ Compute p-values of each predictor for Statistical Test of Variable Selection. """ # Calculate_Boolean: non-zero and notnull of coef_result not_null_value = np.isfinite(coef_result) Calculate_Boolean = np.logical_and(not_null_value, coef_result != 0).sum(axis=1) # pi: the average of the selcetion ratio of all feature variables in B boostrap samples. pi = Calculate_Boolean.sum() / not_null_value.sum().sum() return binom.sf(Calculate_Boolean - 1, n = self.B, p = pi)