1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
- import os
- import json
- import yaml
- import time
- import argparse
- import numpy as np
- import pandas as pd
- from typing import Tuple, Optional
- from skbio import Sequence
- import dask.dataframe as dd
- from sklearn.model_selection import StratifiedKFold
- def get_kmer_string(sequence, n_kmers, sep=' ', overlap=True):
- """
- Function to generate strings composed of the different kmers in the sequence with separator `sep`
- :param sequence: DNA sequence string
- :param n_kmers: kmers order to look for in the sequence
- :param sep: String to use to separate the kmers found. Usually ' ' works well with vectorizers
- :param overlap: Whether to use overlap when generating the kmers r not
- :return:
- """
- kmer_sequence = sep.join(
- [str(kmer) for kmer in Sequence(sequence=sequence).iter_kmers(n_kmers, overlap=overlap)]
- )
- return kmer_sequence
- def compute_kmer_string_parallel(X_ddf_: dd.Series, kmer_order: int):
- print(f'------------------Generate strings of order {kmer_order}------------------')
- start = time.time()
- kmer_sequence_string_ = X_ddf_ \
- .apply(lambda dna_seq: get_kmer_string(dna_seq, n_kmers=kmer_order), meta=('string', 'string')) \
- .compute(scheduler='processes')
- kmer_sequence_string_.rename(f'overlapping_{kmer_order}mers', inplace=True)
- end = time.time()
- print(f'total time to generate strings of {kmer_order}mers is: {end - start} \n\n')
- return kmer_sequence_string_
- def splice_and_save_folds(X_: pd.Series, y_: Optional[pd.Series], folds_idx_list: tuple,
- folds_data_dir_path_: str, name_suffix: str):
- print(f'-----------------Split {name_suffix} into folds and save them-----------------')
- for i, (train_fold_i_idx, holdout_i_idx) in enumerate(folds_idx_list):
- fold_n = f'fold_{i + 1}'
- X_train = X_.iloc[train_fold_i_idx]
- X_holdout = X_.iloc[holdout_i_idx]
- if y_ is not None:
- y_train = y_.iloc[train_fold_i_idx]
- y_holdout = y_.iloc[holdout_i_idx]
- train_dir = os.path.join(folds_data_dir_path_, fold_n, 'train')
- os.makedirs(train_dir, exist_ok=True)
- pd.DataFrame(X_train)\
- .to_parquet(os.path.join(train_dir, f'X_train{name_suffix}.parquet'),
- engine='pyarrow', compression='snappy', index=True)
- if y_ is not None:
- pd.DataFrame(y_train)\
- .to_parquet(os.path.join(train_dir, f'y_train{name_suffix}.parquet'),
- engine='pyarrow', compression='snappy', index=True)
- holdout_dir = os.path.join(folds_data_dir_path_, fold_n, 'holdout')
- os.makedirs(holdout_dir, exist_ok=True)
- pd.DataFrame(X_holdout)\
- .to_parquet(os.path.join(holdout_dir, f'X_holdout{name_suffix}.parquet'),
- engine='pyarrow', compression='snappy', index=True)
- if y_ is not None:
- pd.DataFrame(y_holdout)\
- .to_parquet(os.path.join(holdout_dir, f'y_holdout{name_suffix}.parquet'),
- engine='pyarrow', compression='snappy', index=True)
- def create_folds(train_data_path: str, folds_data_dir_path_: str, n_folds: int = 5,
- kmer_orders: Tuple[int] = (1, 2, 3, 4, 5, 6)):
- """
- Create and save splits in the specified folds_data_dir_path_
- :param kmer_orders:
- :param train_data_path: Path to the csv file where the training data is stored
- :param folds_data_dir_path_: path of where to store the data
- :param n_folds: Number of folds to create. Usually 5
- :return:
- """
- dfH_train = pd.read_csv(train_data_path, dtype={'sequences': pd.StringDtype(), 'labels': np.int8})
- dfH_train.index.rename('index', inplace=True)
- X_ = dfH_train['sequences'].copy()
- y_ = dfH_train['labels'].copy()
- folder = StratifiedKFold(n_splits=n_folds)
- folds_idx_tuple = tuple(folder.split(X_, y_))
- splice_and_save_folds(X_=X_, y_=y_, folds_idx_list=folds_idx_tuple, folds_data_dir_path_=folds_data_dir_path_,
- name_suffix='')
- X_ddf = dd.from_pandas(X_, npartitions=50)
- for kmer_order in kmer_orders:
- kmer_sequence_string = compute_kmer_string_parallel(X_ddf_=X_ddf, kmer_order=kmer_order)
- splice_and_save_folds(
- X_=kmer_sequence_string, y_=None, folds_idx_list=folds_idx_tuple, folds_data_dir_path_=folds_data_dir_path_,
- name_suffix='_' + kmer_sequence_string.name)
- def parse_arguments():
- parser = argparse.ArgumentParser()
- parser.add_argument('--config_file_path', type=str, default='config.yml',
- help='Config file with paths to input data')
- args_ = parser.parse_args()
- return args_
- if __name__ == '__main__':
- args = parse_arguments()
- with open(args.config_file_path) as yaml_file:
- config_params = yaml.load(yaml_file, Loader=yaml.FullLoader)
- input_data_params = config_params['input_data_params']
- training_params = config_params['training_params']
- preprocess_params = training_params['preprocesing_params']
- assert isinstance(input_data_params, dict)
- assert isinstance(training_params, dict)
- assert isinstance(preprocess_params, dict)
- start_time = time.time()
- print('Create and save folds')
- create_folds(
- train_data_path=os.path.join(input_data_params['input_data_dir_path'], input_data_params['human_train']),
- folds_data_dir_path_=preprocess_params['input_training_data_dir_path'],
- n_folds=training_params['n_folds'],
- kmer_orders=preprocess_params['kmer_orders']
- )
- end_time = time.time()
- with open('create_folds_metrics.json', 'w') as f:
- json.dump({'create_cv_folds_time': end_time - start_time}, f)
- print("done")
|