Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

ratings.py 2.3 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
  1. import logging
  2. from io import StringIO
  3. import csv
  4. import subprocess as sp
  5. import numpy as np
  6. from tqdm import tqdm
  7. import psycopg2
  8. from invoke import task
  9. import support as s
  10. _log = logging.getLogger(__name__)
  11. @task(s.init)
  12. def import_bx(c, force=False):
  13. "Import BookCrossing ratings"
  14. s.start('bx-ratings', force=force)
  15. _log.info("initializing BX schema")
  16. s.psql(c, 'bx-schema.sql')
  17. _log.info("cleaning BX rating data")
  18. with open('data/BX-Book-Ratings.csv', 'rb') as bf:
  19. data = bf.read()
  20. barr = np.frombuffer(data, dtype='u1')
  21. # delete bytes that are too big
  22. barr = barr[barr < 128]
  23. # convert to LF
  24. barr = barr[barr != ord('\r')]
  25. # change delimiter to comma
  26. barr[barr == ord(';')] = ord(',')
  27. # write
  28. _log.info('importing BX to database')
  29. data = bytes(barr)
  30. rd = StringIO(data.decode('utf8'))
  31. with s.database() as dbc:
  32. # with dbc encapsulates a transaction
  33. with dbc, dbc.cursor() as cur:
  34. for row in tqdm(csv.DictReader(rd)):
  35. cur.execute('INSERT INTO bx.raw_ratings (user_id, isbn, rating) VALUES (%s, %s, %s)',
  36. (row['User-ID'], row['ISBN'], row['Book-Rating']))
  37. s.finish('bx-ratings', dbc)
  38. @task(s.init, s.build)
  39. def import_az(c, force=False):
  40. "Import Amazon ratings"
  41. s.start('az-ratings', force=force)
  42. _log.info('Resetting Amazon schema')
  43. s.psql(c, 'az-schema.sql')
  44. _log.info('Importing Amazon ratings')
  45. s.pipeline([
  46. [s.bin_dir / 'pcat', s.data_dir / 'ratings_Books.csv'],
  47. ['psql', '-c', '\\copy az.raw_ratings FROM STDIN (FORMAT CSV)']
  48. ])
  49. s.finish('az-ratings')
  50. @task(s.init)
  51. def index_az(c, force=False):
  52. "Index Amazon rating data"
  53. s.check_prereq('az-ratings')
  54. s.check_prereq('cluster')
  55. s.start('az-index', force=force)
  56. _log.info('building Amazon indexes')
  57. s.psql(c, 'az-index.sql')
  58. s.finish('az-index')
  59. @task(s.init)
  60. def index_bx(c, force=False):
  61. "Index BookCrossing rating data"
  62. s.check_prereq('bx-ratings')
  63. s.check_prereq('cluster')
  64. s.start('bx-index', force=force)
  65. _log.info('building BX indexes')
  66. s.psql(c, 'bx-index.sql')
  67. s.finish('bx-index')
  68. @task(s.init, index_az, index_bx)
  69. def index(c):
  70. "Index all rating data"
  71. _log.info('done')
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...