Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

make_dataset.py 3.1 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
  1. from google.cloud import bigquery
  2. import reddit_utils
  3. def make_dataset(remote_wfs):
  4. client = bigquery.Client()
  5. temp_dataset_name = "reddit_dataset"
  6. temp_table_name = "posts"
  7. # Create Temporary BigQuery Dataset
  8. dataset_id = "{}.{}".format(reddit_utils.BIGQUERY_PROJECT, temp_dataset_name)
  9. dataset = bigquery.Dataset(dataset_id)
  10. dataset.location = "US"
  11. dataset = client.create_dataset(dataset)
  12. print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
  13. # Set table_id to the ID of the destination table.
  14. temp_table_id = "{}.{}.{}".format(
  15. reddit_utils.BIGQUERY_PROJECT, temp_dataset_name, temp_table_name
  16. )
  17. job_config = bigquery.QueryJobConfig(destination=temp_table_id)
  18. sql = """
  19. SELECT id, title, selftext, link_flair_text, is_self AS self_post, thumbnail, author,
  20. CAST(FORMAT_TIMESTAMP('%H', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS hour,
  21. CAST(FORMAT_TIMESTAMP('%M', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS minute,
  22. CAST(FORMAT_TIMESTAMP('%w', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS dayofweek,
  23. CAST(FORMAT_TIMESTAMP('%j', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS dayofyear,
  24. gilded, score,
  25. IF(PERCENT_RANK() OVER (ORDER BY score ASC) >= 0.50, 1, 0) as is_top_median,
  26. IF(PERCENT_RANK() OVER (ORDER BY score ASC) >= 0.90, 1, 0) as is_top_decile,
  27. IF(PERCENT_RANK() OVER (ORDER BY score ASC) >= 0.99, 1, 0) as is_top_percent,
  28. FROM `fh-bigquery.reddit_posts.*`
  29. WHERE (_TABLE_SUFFIX BETWEEN '2018_08' AND '2019_08')
  30. AND subreddit = 'MachineLearning'
  31. """
  32. # Start the query, passing in the extra configuration.
  33. query_job = client.query(sql, job_config=job_config) # Make an API request.
  34. query_job.result() # Wait for the job to complete.
  35. print("Query results loaded to the temporary table {}".format(temp_table_name))
  36. # Export temporary dataset to GCS
  37. destination_uri = "{}/{}".format(remote_wfs, reddit_utils.RAW_DF_PATH)
  38. dataset_ref = bigquery.DatasetReference(
  39. reddit_utils.BIGQUERY_PROJECT, temp_dataset_name
  40. )
  41. table_ref = dataset_ref.table(temp_table_name)
  42. extract_job = client.extract_table(
  43. table_ref,
  44. destination_uri,
  45. # Location must match that of the source table.
  46. location="US",
  47. ) # API request
  48. extract_job.result() # Waits for job to complete.
  49. print(
  50. "Exported {}:{}.{} to {}".format(
  51. reddit_utils.BIGQUERY_PROJECT,
  52. temp_dataset_name,
  53. temp_table_name,
  54. destination_uri,
  55. )
  56. )
  57. # Remove temp BigQuery table
  58. client.delete_dataset(
  59. dataset_id, delete_contents=True, not_found_ok=True
  60. ) # Make an API request.
  61. print("Deleted dataset '{}'.".format(dataset_id))
  62. if __name__ == "__main__":
  63. remote_wfs = reddit_utils.get_remote_gs_wfs()
  64. make_dataset(remote_wfs)
  65. print("Created raw data file in remote working file system!")
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...