1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
- '''Shared transforms between different interpretable models
- '''
- import numpy as np
- from sklearn.base import BaseEstimator, TransformerMixin
- import pandas as pd
- class Winsorizer():
- """Performs Winsorization 1->1*
- Warning: this class should not be used directly.
- """
- def __init__(self, trim_quantile=0.0):
- self.trim_quantile = trim_quantile
- self.winsor_lims = None
- def train(self, X):
- # get winsor limits
- self.winsor_lims = np.ones([2, X.shape[1]]) * np.inf
- self.winsor_lims[0, :] = -np.inf
- if self.trim_quantile > 0:
- for i_col in np.arange(X.shape[1]):
- lower = np.percentile(X[:, i_col], self.trim_quantile * 100)
- upper = np.percentile(
- X[:, i_col], 100 - self.trim_quantile * 100)
- self.winsor_lims[:, i_col] = [lower, upper]
- def trim(self, X):
- X_ = X.copy()
- X_ = np.where(X > self.winsor_lims[1, :], np.tile(self.winsor_lims[1, :], [X.shape[0], 1]),
- np.where(X < self.winsor_lims[0, :], np.tile(self.winsor_lims[0, :], [X.shape[0], 1]), X))
- return X_
- class FriedScale():
- """Performs scaling of linear variables according to Friedman et alpha_l. 2005 Sec 5
- Each variable is first Winsorized l->l*, then standardised as 0.4 x l* / std(l*)
- Warning: this class should not be used directly.
- """
- def __init__(self, winsorizer=None):
- self.scale_multipliers = None
- self.winsorizer = winsorizer
- def train(self, X):
- # get multipliers
- if self.winsorizer != None:
- X_trimmed = self.winsorizer.trim(X)
- else:
- X_trimmed = X
- scale_multipliers = np.ones(X.shape[1])
- for i_col in np.arange(X.shape[1]):
- num_uniq_vals = len(np.unique(X[:, i_col]))
- if num_uniq_vals > 2: # don't scale binary variables which are effectively already rules
- scale_multipliers[i_col] = 0.4 / \
- (1.0e-12 + np.std(X_trimmed[:, i_col]))
- self.scale_multipliers = scale_multipliers
- def scale(self, X):
- if self.winsorizer != None:
- return self.winsorizer.trim(X) * self.scale_multipliers
- else:
- return X * self.scale_multipliers
- class CorrelationScreenTransformer(BaseEstimator, TransformerMixin):
- '''Finds correlated features above a magnitude threshold
- and zeros out all but the first of them
- '''
- def __init__(self, threshold=1.0):
- # Initialize with a correlation threshold
- self.threshold = threshold
- self.correlated_feature_sets = []
- def fit(self, X, y=None):
- # Check if X is a pandas DataFrame; if not, convert it to DataFrame
- if not isinstance(X, pd.DataFrame):
- X = pd.DataFrame(X)
- # Calculate the correlation matrix
- corr_matrix = X.corr().abs()
- # Identify the features that are correlated based on the threshold
- for i in range(len(corr_matrix.columns)):
- for j in range(i):
- if corr_matrix.iloc[i, j] >= self.threshold or corr_matrix.iloc[i, j] <= -self.threshold:
- # Find the set this feature belongs to
- found_set = False
- for feature_set in self.correlated_feature_sets:
- if i in feature_set or j in feature_set:
- feature_set.update([i, j])
- found_set = True
- break
- if not found_set:
- self.correlated_feature_sets.append(set([i, j]))
- # Convert the sets to list of lists where each sublist has indexes to keep and to remove
- self.to_keep_remove = []
- for feature_set in self.correlated_feature_sets:
- feature_list = list(feature_set)
- # keep the first, remove the rest
- self.to_keep_remove.append((feature_list[0], feature_list[1:]))
- return self
- def transform(self, X):
- # Again, check if X is a pandas DataFrame; if not, convert it
- input_type = type(X)
- if not isinstance(X, pd.DataFrame):
- X = pd.DataFrame(X)
- # Set the identified correlated features (except the first) to 0
- X_transformed = X.copy()
- for keep, to_remove in self.to_keep_remove:
- X_transformed.iloc[:, to_remove] = 0
- if input_type == np.ndarray:
- X_transformed == X_transformed.values
- return X_transformed
- if __name__ == '__main__':
- X = np.random.randn(5, 5)
- X[:, 0] = [1, 1, 0, 1, 1]
- X[:, 1] = X[:, 0]
- transformer = CorrelationScreenTransformer()
- print(X)
- X_transformed = transformer.fit_transform(X)
- print(X_transformed)
|