Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

sql-script.py 2.1 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
  1. """
  2. Usage:
  3. sql-script.py [options] SCRIPT
  4. Options:
  5. -T, --transcript FILE
  6. Write the execution transcript to FILE.
  7. -s, --stage-name NAME
  8. Record as stage NAME.
  9. --dry-run
  10. Print the script's information without actually running it.
  11. --verbose
  12. Verbose logging information.
  13. SCRIPT
  14. The script to run.
  15. """
  16. import os
  17. import sys
  18. import re
  19. import time
  20. import hashlib
  21. from pathlib import Path
  22. from datetime import timedelta
  23. from typing import NamedTuple, List
  24. from docopt import docopt
  25. import psycopg2, psycopg2.extensions, psycopg2.extras
  26. from more_itertools import peekable
  27. import sqlparse
  28. from bookdata import script_log
  29. from bookdata import db, tracking
  30. opts = docopt(__doc__)
  31. _log = script_log(__name__, opts.get('--verbose'))
  32. psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
  33. script_file = Path(opts.get('SCRIPT'))
  34. tfile = opts.get('-T', None)
  35. if tfile:
  36. tfile = Path(tfile)
  37. else:
  38. tfile = script_file.with_suffix('.transcript')
  39. stage = opts.get('-s', None)
  40. if not stage:
  41. stage = script_file.stem
  42. _log.info('reading %s', script_file)
  43. script = db.SqlScript(script_file)
  44. _log.info('%s has %d chunks', script_file, len(script.chunks))
  45. if opts.get('--dry-run'):
  46. script.describe()
  47. else:
  48. with tfile.open('w') as txf, db.connect() as dbc:
  49. key = hashlib.md5()
  50. with dbc, dbc.cursor() as cur:
  51. tracking.begin_stage(cur, stage)
  52. for dep in script.deps:
  53. dhs = tracking.record_dep(cur, stage, dep)
  54. # hash the dependency hashes
  55. for d, h in dhs: key.update(h.encode('utf-8'))
  56. h = tracking.hash_and_record_file(cur, script_file, stage)
  57. # hash the source file
  58. key.update(h.encode('utf-8'))
  59. script.execute(dbc, transcript=txf)
  60. with dbc, dbc.cursor() as cur:
  61. for ns, tbl in script.tables:
  62. oid, kind = tracking.record_tbl(cur, stage, ns, tbl)
  63. key.update(f'{ns}.{tbl}:{oid}:{kind}'.encode('utf-8'))
  64. tracking.end_stage(cur, stage, key=key.hexdigest())
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...