Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

tasks.py 2.6 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
  1. import sys
  2. from pathlib import Path
  3. import subprocess as sp
  4. import os
  5. from invoke import task
  6. data_dir = Path('data')
  7. tgt_dir = Path('target')
  8. bin_dir = tgt_dir / 'release'
  9. from ratings import *
  10. def pipeline(steps, outfile=None):
  11. last = sp.DEVNULL
  12. if outfile is not None:
  13. outfd = os.open(outfile, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o666)
  14. else:
  15. outfd = None
  16. procs = []
  17. for step in steps[:-1]:
  18. proc = sp.Popen(step, stdin=last, stdout=sp.PIPE)
  19. last = proc.stdout
  20. procs.append(proc)
  21. proc = sp.Popen(steps[-1], stdin=last, stdout=outfd)
  22. procs.append(proc)
  23. for p, s in zip(procs, steps):
  24. rc = p.wait()
  25. if rc != 0:
  26. print(f'{s[0]} exited with code {rc}', file=sys.stderr)
  27. raise RuntimeError('subprocess failed')
  28. @task
  29. def build(c, debug=False):
  30. "Compile the Rust support executables"
  31. global bin_dir
  32. if debug:
  33. c.run('cargo build')
  34. bin_dir = tgt_dir / 'debug'
  35. else:
  36. c.run('cargo build --release')
  37. @task
  38. def init_viaf(c):
  39. "Initialize the VIAF schema"
  40. print('initializing VIAF schema')
  41. c.run('psql -f viaf-schema.sql')
  42. @task(build, init_viaf)
  43. def import_viaf(c, date='20181104', progress=True):
  44. "Import VIAF data"
  45. infile = data_dir / f'viaf-{date}-clusters-marc21.xml.gz'
  46. print('importing VIAF data from %s', infile)
  47. pipeline([
  48. [bin_dir / 'parse-marc', infile],
  49. ['psql', '-c', '\\copy viaf_marc_field FROM STDIN']
  50. ])
  51. @task
  52. def init_ol(c):
  53. "Initialize the OpenLibrary schema"
  54. print('initializing OpenLibrary schema')
  55. c.run('psql -f ol-schema.sql')
  56. @task(build)
  57. def import_ol_authors(c, date='2018-10-31', progress=True):
  58. infile = data_dir / f'ol_dump_authors_{date}.txt.gz'
  59. pipeline([
  60. [bin_dir / 'clean-openlib', infile],
  61. ['psql', '-c', '\\copy ol_author (author_key, author_data) FROM STDIN']
  62. ])
  63. @task(build)
  64. def import_ol_editions(c, date='2018-10-31', progress=True):
  65. infile = data_dir / f'ol_dump_editions_{date}.txt.gz'
  66. pipeline([
  67. [bin_dir / 'clean-openlib', infile],
  68. ['psql', '-c', '\\copy ol_edition (edition_key, edition_data) FROM STDIN']
  69. ])
  70. @task(build)
  71. def import_ol_works(c, date='2018-10-31', progress=True):
  72. infile = data_dir / f'ol_dump_works_{date}.txt.gz'
  73. pipeline([
  74. [bin_dir / 'clean-openlib', infile],
  75. ['psql', '-c', '\\copy ol_work (work_key, work_data) FROM STDIN']
  76. ])
  77. if __name__ == '__main__':
  78. import invoke.program
  79. program = invoke.program.Program()
  80. program.run()
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...