Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

etl.py 5.2 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
  1. import os
  2. import csv
  3. import glob
  4. import json
  5. from cql_queries import *
  6. from dataprep import DataPrep
  7. from cassandra.cluster import Cluster
  8. def connect_db():
  9. """Coonect and create a keyspace and cluster in Cassandra
  10. Returns
  11. -------
  12. session
  13. the Cassandra session to execute ETL
  14. cluster
  15. Cassandra node cluster
  16. """
  17. cluster = Cluster()
  18. session = cluster.connect()
  19. session.execute(
  20. """CREATE KEYSPACE IF NOT EXISTS p2_udacity
  21. WITH REPLICATION = {'class': 'SimpleStrategy',
  22. 'replication_factor': 1}"""
  23. )
  24. session.set_keyspace('p2_udacity')
  25. return session, cluster
  26. def execute_query_1(session, file, sessionId=338, itemInSession=4, verbose=True):
  27. """Give the artist, song title and song's length filtered by
  28. sessionId and itemInSession.
  29. Parameters
  30. ----------
  31. session
  32. the Cassandra session to execute ETL
  33. file
  34. path to csv file
  35. sessionId
  36. a filter in the query
  37. itemInSession
  38. a filter in the query
  39. verbose
  40. prints and validate the query
  41. """
  42. session.execute(song_features)
  43. with open(file, encoding = 'utf8') as f:
  44. csvreader = csv.reader(f)
  45. next(csvreader) # skip header
  46. for line in csvreader:
  47. itemInSession_, sessionId_ = int(line[3]), int(line[8])
  48. artist, song, length = str(line[0]), str(line[9]), float(line[5])
  49. session.execute(insert_data_song_features, (itemInSession_, sessionId_, artist, song, length))
  50. if verbose:
  51. print("\nQuery 1: Give me the artist, song title and song's length in the music app\
  52. history that was heard during sessionId=338, and itemInSession=4")
  53. rows = session.execute(query_1, (itemInSession, sessionId))
  54. for row in rows:
  55. print(f'\tartist: {row.artist}, song: {row.song} length: {row.length:.8}')
  56. def execute_query_2(session, file, userId=10, sessionId=182, verbose=True):
  57. """Give the name of artist, song and user (first and last name)
  58. filtered by userid and sessionid
  59. Parameters
  60. ----------
  61. session
  62. the Cassandra session to execute ETL
  63. file
  64. path to csv file
  65. userId
  66. a filter in the query
  67. sessionId
  68. a filter in the query
  69. verbose
  70. prints and validate the query
  71. """
  72. session.execute(artist_song_by_user)
  73. with open(file, encoding = 'utf8') as f:
  74. csvreader = csv.reader(f)
  75. next(csvreader) # skip header
  76. for line in csvreader:
  77. userId_, sessionId_, itemInSession = int(line[10]), int(line[8]), int(line[3])
  78. artist, song, firstName, lastName = str(line[0]), str(line[9]), str(line[1]), str(line[4])
  79. session.execute(insert_data_artist_song_by_user, (userId_, sessionId_, itemInSession, artist, song, firstName, lastName))
  80. if verbose:
  81. print("\nQuery 2: Give me only the following: name of artist, song (sorted by itemInSession)\
  82. and user (first and last name) for userid = 10, sessionid = 182")
  83. rows = session.execute(query_2, (userId, sessionId))
  84. for row in rows:
  85. print(f'\tartist: {row.artist}, song: {row.song}, user first name: {row.firstname}, user last name: {row.lastname}')
  86. def execute_query_3(session, file, song='All Hands Against His Own', verbose=True):
  87. """Give the every user name (first and last) filtered by the song 'All Hands Against His Own
  88. Parameters
  89. ----------
  90. session
  91. the Cassandra session to execute ETL
  92. file
  93. path to csv file
  94. song
  95. a filter in the query
  96. verbose
  97. prints and validate the query
  98. """
  99. session.execute(user_name)
  100. with open(file, encoding = 'utf8') as f:
  101. csvreader = csv.reader(f)
  102. next(csvreader) # skip header
  103. for line in csvreader:
  104. song, userId, = str(line[9]), int(line[10])
  105. firstName, lastName = str(line[1]), str(line[4])
  106. session.execute(insert_data_user_name, (song, userId, firstName, lastName))
  107. if verbose:
  108. print("\nQuery 3: Give me every user name (first and last) in my music app\
  109. history who listened to the song 'All Hands Against His Own'")
  110. rows = session.execute(query_3, ('All Hands Against His Own', ))
  111. for row in rows:
  112. print(f'\tuser first name: {row.firstname:>10}, user last name: {row.lastname}')
  113. def main():
  114. session, cluster = connect_db()
  115. file_name = 'event_data_file_new'
  116. file_path = os.getcwd() + '/'+file_name+'.csv'
  117. data = DataPrep(
  118. filepath_in=os.getcwd() + '/event_data',
  119. filepath_out=file_name
  120. )
  121. data.write_csv()
  122. execute_query_1(session, file_path, sessionId=338, itemInSession=4, verbose=True)
  123. execute_query_2(session, file_path, userId=10, sessionId=182, verbose=True)
  124. execute_query_3(session, file_path, song='All Hands Against His Own', verbose=True)
  125. session.execute("DROP TABLE IF EXISTS song_features")
  126. session.execute("DROP TABLE IF EXISTS artist_song_by_user")
  127. session.execute("DROP TABLE IF EXISTS user_name")
  128. session.shutdown()
  129. cluster.shutdown()
  130. if __name__ == "__main__":
  131. main()
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...