1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
- from google.cloud import bigquery
- import reddit_utils
- def make_dataset(remote_wfs):
- # TODO: To simplify this project, the automated extraction of data has been removed,
- # due to the fact that BigQuery Python SDK does not support downloading of data.
- # It might be possible to add this step back in using a different Reddit API in the future
- client = bigquery.Client()
- temp_dataset_name = "reddit_dataset"
- temp_table_name = "posts"
- # Create Temporary BigQuery Dataset
- dataset_id = "{}.{}".format(reddit_utils.BIGQUERY_PROJECT, temp_dataset_name)
- dataset = bigquery.Dataset(dataset_id)
- dataset.location = "US"
- dataset = client.create_dataset(dataset)
- print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
- # Set table_id to the ID of the destination table.
- temp_table_id = "{}.{}.{}".format(
- reddit_utils.BIGQUERY_PROJECT, temp_dataset_name, temp_table_name
- )
- job_config = bigquery.QueryJobConfig(destination=temp_table_id)
- sql = """
- SELECT id, title, selftext, link_flair_text, is_self AS self_post, thumbnail, author,
- CAST(FORMAT_TIMESTAMP('%H', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS hour,
- CAST(FORMAT_TIMESTAMP('%M', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS minute,
- CAST(FORMAT_TIMESTAMP('%w', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS dayofweek,
- CAST(FORMAT_TIMESTAMP('%j', TIMESTAMP_SECONDS(created_utc), 'America/New_York') AS INT64) AS dayofyear,
- gilded, score,
- IF(PERCENT_RANK() OVER (ORDER BY score ASC) >= 0.50, 1, 0) as is_top_median,
- IF(PERCENT_RANK() OVER (ORDER BY score ASC) >= 0.90, 1, 0) as is_top_decile,
- IF(PERCENT_RANK() OVER (ORDER BY score ASC) >= 0.99, 1, 0) as is_top_percent,
- FROM `fh-bigquery.reddit_posts.*`
- WHERE (_TABLE_SUFFIX BETWEEN '2018_08' AND '2019_08')
- AND subreddit = 'MachineLearning'
- """
- # Start the query, passing in the extra configuration.
- query_job = client.query(sql, job_config=job_config) # Make an API request.
- query_job.result() # Wait for the job to complete.
- print("Query results loaded to the temporary table {}".format(temp_table_name))
- # Export temporary dataset to GCS
- destination_uri = "{}/{}".format(remote_wfs, reddit_utils.RAW_DF_PATH)
- dataset_ref = bigquery.DatasetReference(
- reddit_utils.BIGQUERY_PROJECT, temp_dataset_name
- )
- table_ref = dataset_ref.table(temp_table_name)
- extract_job = client.extract_table(
- table_ref,
- destination_uri,
- # Location must match that of the source table.
- location="US",
- ) # API request
- extract_job.result() # Waits for job to complete.
- print(
- "Exported {}:{}.{} to {}".format(
- reddit_utils.BIGQUERY_PROJECT,
- temp_dataset_name,
- temp_table_name,
- destination_uri,
- )
- )
- # Remove temp BigQuery table
- client.delete_dataset(
- dataset_id, delete_contents=True, not_found_ok=True
- ) # Make an API request.
- print("Deleted dataset '{}'.".format(dataset_id))
- if __name__ == "__main__":
- remote_wfs = reddit_utils.get_remote_gs_wfs()
- make_dataset(remote_wfs)
- print("Created raw data file in remote working file system!")
|