Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

preprocess.py 2.6 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
  1. import pandas as pd
  2. import gcsfs
  3. import os
  4. from sklearn.model_selection import train_test_split
  5. PROJECT_NAME = "talos-project"
  6. GCLOUD_CRED_ENV_VAR = "GOOGLE_APPLICATION_CREDENTIALS"
  7. CHUNK_SIZE = 5000
  8. TARGET_LABEL = "is_top_decile"
  9. raw_df_path = "rML-raw-data.csv"
  10. train_df_path = "rML-train.csv"
  11. test_df_path = "rML-test.csv"
  12. def get_remote_gs_wfs():
  13. print("Retreiving location of remote working file system...")
  14. stream = os.popen("dvc remote list --local")
  15. output = stream.read()
  16. remote_wfs_loc = output.split("\t")[1].split("\n")[0]
  17. return remote_wfs_loc
  18. def load_and_process_data(remote_wfs, random_state=42):
  19. fs = gcsfs.GCSFileSystem(
  20. project=PROJECT_NAME, token=os.environ[GCLOUD_CRED_ENV_VAR]
  21. )
  22. with fs.open(os.path.join(remote_wfs, train_df_path), "a") as train_f, fs.open(
  23. os.path.join(remote_wfs, test_df_path), "a"
  24. ) as test_f:
  25. print("Loading data in chuncks...")
  26. for i, chunk in enumerate(
  27. pd.read_csv(os.path.join(remote_wfs, raw_df_path), chunksize=CHUNK_SIZE)
  28. ):
  29. print(f"Processing chunk {i+1}...")
  30. processed_data = process(chunk)
  31. print("Splitting into train and test data...")
  32. train_chunk, test_chunk = train_test_split(
  33. processed_data,
  34. random_state=random_state,
  35. stratify=processed_data[TARGET_LABEL],
  36. )
  37. print("Saving to cloud...")
  38. save_data(train_chunk, train_f, test_chunk, test_f, i)
  39. def process(chunk):
  40. df = chunk.copy()
  41. df = df.drop(columns=["id", "author"])
  42. df = df.rename(columns={"selftext": "body", "link_flair_text": "flair"})
  43. df["title_len"] = df.title.str.len()
  44. df["body_len"] = df.body.str.len()
  45. df["has_thumbnail"] = [
  46. 0 if (x == "self" or x == "default") else 1 for x in df["thumbnail"]
  47. ]
  48. df = df.fillna({"body": "", "flair": "None", "body_len": 0})
  49. df["flair"] = ["Discussion" if (x == "Discusssion") else x for x in df["flair"]]
  50. df = pd.concat([df, pd.get_dummies(df["flair"], prefix="flair")], axis=1).drop(
  51. ["flair"], axis=1
  52. )
  53. df["title_and_body"] = df["title"] + " " + df["body"]
  54. return df
  55. def save_data(train_chunk, train_f, test_chunk, test_f, i):
  56. # We want to write the headers only once
  57. header = True if i == 0 else False
  58. train_chunk.to_csv(train_f, header=header, mode="a")
  59. test_chunk.to_csv(test_f, header=header, mode="a")
  60. if __name__ == "__main__":
  61. remote_wfs = get_remote_gs_wfs()
  62. load_and_process_data(remote_wfs)
  63. print("Loading and processing done!")
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...