1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
- """
- Transform dataset to feature set.
- Routine Listings
- ----------------
- get_params()
- Get the DVC stage parameters.
- featurize(train_input, test_input, train_output, test_output)
- Transform data to features.
- """
- import sys
- import dask
- import dask.distributed
- import numpy as np
- import pandas as pd
- import scipy.sparse as sparse
- from sklearn.feature_extraction.text import CountVectorizer
- from sklearn.feature_extraction.text import TfidfTransformer
- import pickle
- import conf
- def get_params():
- """Get the DVC stage parameters."""
- return {
- 'max_features': 5000
- }
- @dask.delayed
- def featurize(train_input, test_input, train_output, test_output,
- max_features):
- """Transform data to features."""
- def get_df(input):
- """Load dataset from a CSV file."""
- df = pd.read_csv(
- input,
- encoding='utf-8',
- header=None,
- delimiter='\t',
- names=['id', 'label', 'text']
- )
- sys.stderr.write('The input data frame {} size is {}\n'.format(
- input, df.shape))
- return df
- def save_matrix(df, matrix, output):
- """Save feature matrix."""
- id_matrix = sparse.csr_matrix(df.id.astype(np.int64)).T
- label_matrix = sparse.csr_matrix(df.label.astype(np.int64)).T
- result = sparse.hstack([id_matrix, label_matrix, matrix], format='csr')
- msg = 'The output matrix {} size is {} and data type is {}\n'
- sys.stderr.write(msg.format(output, result.shape, result.dtype))
- with open(output, 'wb') as fd:
- pickle.dump(result, fd, pickle.HIGHEST_PROTOCOL)
- pass
- df_train = get_df(train_input)
- train_words = np.array(df_train.text.str.lower().values.astype('U'))
- bag_of_words = CountVectorizer(
- stop_words='english', max_features=max_features)
- bag_of_words.fit(train_words)
- train_words_binary_matrix = bag_of_words.transform(train_words)
- tfidf = TfidfTransformer(smooth_idf=False)
- tfidf.fit(train_words_binary_matrix)
- train_words_tfidf_matrix = tfidf.transform(train_words_binary_matrix)
- save_matrix(df_train, train_words_tfidf_matrix, train_output)
- del df_train
- df_test = get_df(test_input)
- test_words = np.array(df_test.text.str.lower().values.astype('U'))
- test_words_binary_matrix = bag_of_words.transform(test_words)
- test_words_tfidf_matrix = tfidf.transform(test_words_binary_matrix)
- save_matrix(df_test, test_words_tfidf_matrix, test_output)
- if __name__ == '__main__':
- client = dask.distributed.Client('localhost:8786')
- np.set_printoptions(suppress=True)
- INPUT_TRAIN_TSV_PATH = conf.data_dir/'Posts-train.tsv'
- INPUT_TEST_TSV_PATH = conf.data_dir/'Posts-test.tsv'
- OUTPUT_TRAIN_MATRIX_PATH = conf.data_dir/'matrix-train.p'
- OUTPUT_TEST_MATRIX_PATH = conf.data_dir/'matrix-test.p'
- config = get_params()
- MAX_FEATUERS = config['max_features']
- featurize(
- INPUT_TRAIN_TSV_PATH, INPUT_TEST_TSV_PATH,
- OUTPUT_TRAIN_MATRIX_PATH, OUTPUT_TEST_MATRIX_PATH,
- MAX_FEATUERS).compute()
|