Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

tracking.py 3.8 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
  1. """
  2. Code for supporting import data tracking and relationships.
  3. """
  4. import hashlib
  5. import logging
  6. from pathlib import Path
  7. _log = logging.getLogger(__name__)
  8. def hash_and_record_file(cur, path, stage=None):
  9. """
  10. Compute the checksum of a file and record it in the database.
  11. """
  12. h = hashlib.md5()
  13. with open(path, 'rb') as f:
  14. data = f.read(8192 * 4)
  15. while data:
  16. h.update(data)
  17. data = f.read(8192 * 4)
  18. hash = h.hexdigest()
  19. path = Path(path).as_posix()
  20. _log.info('recording file %s with hash %s', path, hash)
  21. record_file(cur, path, hash, stage)
  22. return hash
  23. def begin_stage(cur, stage):
  24. """
  25. Record that a stage is beginning.
  26. """
  27. if hasattr(cur, 'cursor'):
  28. # this is a connection
  29. with cur, cur.cursor() as c:
  30. return begin_stage(c, stage)
  31. _log.info('starting or resetting stage %s', stage)
  32. cur.execute('''
  33. INSERT INTO stage_status (stage_name)
  34. VALUES (%s)
  35. ON CONFLICT (stage_name)
  36. DO UPDATE SET started_at = now(), finished_at = NULL, stage_key = NULL
  37. ''', [stage])
  38. cur.execute('DELETE FROM stage_file WHERE stage_name = %s', [stage])
  39. cur.execute('DELETE FROM stage_dep WHERE stage_name = %s', [stage])
  40. cur.execute('DELETE FROM stage_table WHERE stage_name = %s', [stage])
  41. def record_dep(cur, stage, dep):
  42. """
  43. Record a dependency for a stage.
  44. """
  45. if hasattr(cur, 'cursor'):
  46. # this is a connection
  47. with cur, cur.cursor() as c:
  48. return record_dep(c, stage, dep)
  49. _log.info('recording dep %s -> %s', stage, dep);
  50. cur.execute('''
  51. INSERT INTO stage_dep (stage_name, dep_name, dep_key)
  52. SELECT %s, stage_name, stage_key
  53. FROM stage_status WHERE stage_name = %s
  54. RETURNING dep_name, dep_key
  55. ''', [stage, dep])
  56. return cur.fetchall()
  57. def record_tbl(cur, stage, ns, tbl):
  58. """
  59. Record a table associated with a stage.
  60. """
  61. if hasattr(cur, 'cursor'):
  62. # this is a connection
  63. with cur, cur.cursor() as c:
  64. return record_tbl(c, stage, ns, tbl)
  65. _log.info('recording table %s -> %s.%s', stage, ns, tbl);
  66. cur.execute('''
  67. INSERT INTO stage_table (stage_name, st_ns, st_name)
  68. VALUES (%s, %s, %s)
  69. ''', [stage, ns, tbl])
  70. cur.execute('''
  71. SELECT oid, kind FROM stage_table_oids WHERE stage_name = %s AND st_ns = %s AND st_name = %s
  72. ''', [stage, ns, tbl])
  73. return cur.fetchone()
  74. def record_file(cur, file, hash, stage=None):
  75. """
  76. Record a file and optionally associate it with a stage.
  77. """
  78. if hasattr(cur, 'cursor'):
  79. # this is a connection
  80. with cur, cur.cursor() as c:
  81. return record_file(c, stage)
  82. _log.info('recording checksum %s for file %s', hash, file)
  83. cur.execute("""
  84. INSERT INTO source_file (filename, checksum)
  85. VALUES (%(file)s, %(hash)s)
  86. ON CONFLICT (filename)
  87. DO UPDATE SET checksum = %(hash)s, reg_time = NOW()
  88. """, {'file': file, 'hash': hash})
  89. if stage is not None:
  90. cur.execute("INSERT INTO stage_file (stage_name, filename) VALUES (%s, %s)", [stage, file])
  91. def end_stage(cur, stage, key=None):
  92. """
  93. Record that an import stage has finished.
  94. Args:
  95. cur(psycopg2.connection or psycopg2.cursor): the database connection to use.
  96. stage(string): the name of the stage.
  97. key(string or None): the key (checksum or other key) to record.
  98. """
  99. if hasattr(cur, 'cursor'):
  100. # this is a connection
  101. with cur, cur.cursor() as c:
  102. return end_stage(c, stage, key)
  103. _log.info('finishing stage %s', stage)
  104. cur.execute('''
  105. UPDATE stage_status
  106. SET finished_at = NOW(), stage_key = coalesce(%(key)s, stage_key)
  107. WHERE stage_name = %(stage)s
  108. ''', {'stage': stage, 'key': key})
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...