Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

make_dataset.py 3.3 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
  1. from google.cloud import bigquery
  2. import reddit_utils
  3. def make_dataset(remote_wfs):
  4. # TODO: To simplify this project, the automated extraction of data has been removed,
  5. # due to the fact that BigQuery Python SDK does not support downloading of data.
  6. # It might be possible to add this step back in using a different Reddit API in the future
  7. client = bigquery.Client()
  8. temp_dataset_name = "reddit_dataset"
  9. temp_table_name = "posts"
  10. # Create Temporary BigQuery Dataset
  11. dataset_id = "{}.{}".format(reddit_utils.BIGQUERY_PROJECT, temp_dataset_name)
  12. dataset = bigquery.Dataset(dataset_id)
  13. dataset.location = "US"
  14. dataset = client.create_dataset(dataset)
  15. print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
  16. # Set table_id to the ID of the destination table.
  17. temp_table_id = "{}.{}.{}".format(
  18. reddit_utils.BIGQUERY_PROJECT, temp_dataset_name, temp_table_name
  19. )
  20. job_config = bigquery.QueryJobConfig(destination=temp_table_id)
  21. sql = """
  22. SELECT id, title, selftext, link_flair_text, is_self AS self_post, thumbnail, author,
  23. CAST(FORMAT_TIMESTAMP('%H', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS hour,
  24. CAST(FORMAT_TIMESTAMP('%M', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS minute,
  25. CAST(FORMAT_TIMESTAMP('%w', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS dayofweek,
  26. CAST(FORMAT_TIMESTAMP('%j', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS dayofyear,
  27. gilded, score,
  28. IF(PERCENT_RANK() OVER (ORDER BY score ASC) >= 0.50, 1, 0) as is_top_median,
  29. IF(PERCENT_RANK() OVER (ORDER BY score ASC) >= 0.90, 1, 0) as is_top_decile,
  30. IF(PERCENT_RANK() OVER (ORDER BY score ASC) >= 0.99, 1, 0) as is_top_percent,
  31. FROM `fh-bigquery.reddit_posts.*`
  32. WHERE (_TABLE_SUFFIX BETWEEN '2018_08' AND '2019_08')
  33. AND subreddit = 'MachineLearning'
  34. """
  35. # Start the query, passing in the extra configuration.
  36. query_job = client.query(sql, job_config=job_config) # Make an API request.
  37. query_job.result() # Wait for the job to complete.
  38. print("Query results loaded to the temporary table {}".format(temp_table_name))
  39. # Export temporary dataset to GCS
  40. destination_uri = "{}/{}".format(remote_wfs, reddit_utils.RAW_DF_PATH)
  41. dataset_ref = bigquery.DatasetReference(
  42. reddit_utils.BIGQUERY_PROJECT, temp_dataset_name
  43. )
  44. table_ref = dataset_ref.table(temp_table_name)
  45. extract_job = client.extract_table(
  46. table_ref,
  47. destination_uri,
  48. # Location must match that of the source table.
  49. location="US",
  50. ) # API request
  51. extract_job.result() # Waits for job to complete.
  52. print(
  53. "Exported {}:{}.{} to {}".format(
  54. reddit_utils.BIGQUERY_PROJECT,
  55. temp_dataset_name,
  56. temp_table_name,
  57. destination_uri,
  58. )
  59. )
  60. # Remove temp BigQuery table
  61. client.delete_dataset(
  62. dataset_id, delete_contents=True, not_found_ok=True
  63. ) # Make an API request.
  64. print("Deleted dataset '{}'.".format(dataset_id))
  65. if __name__ == "__main__":
  66. remote_wfs = reddit_utils.get_remote_gs_wfs()
  67. make_dataset(remote_wfs)
  68. print("Created raw data file in remote working file system!")
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...