Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

tracking.py 6.2 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
  1. """
  2. Code for supporting import data tracking and relationships.
  3. """
  4. import hashlib
  5. import logging
  6. from io import StringIO
  7. from pathlib import Path
  8. from . import db
  9. _log = logging.getLogger(__name__)
  10. def _init_db(dbc):
  11. # initialize database, in case nothing has been run
  12. with dbc, dbc.cursor() as cur:
  13. cur.execute(db.meta_schema)
  14. def hash_and_record_file(cur, path, stage=None):
  15. """
  16. Compute the checksum of a file and record it in the database.
  17. """
  18. h = hashlib.md5()
  19. with open(path, 'rb') as f:
  20. data = f.read(8192 * 4)
  21. while data:
  22. h.update(data)
  23. data = f.read(8192 * 4)
  24. hash = h.hexdigest()
  25. path = Path(path).as_posix()
  26. _log.info('recording file %s with hash %s', path, hash)
  27. record_file(cur, path, hash, stage)
  28. return hash
  29. def begin_stage(cur, stage):
  30. """
  31. Record that a stage is beginning.
  32. """
  33. if hasattr(cur, 'cursor'):
  34. # this is a connection
  35. with cur, cur.cursor() as c:
  36. return begin_stage(c, stage)
  37. _log.info('starting or resetting stage %s', stage)
  38. cur.execute('''
  39. INSERT INTO stage_status (stage_name)
  40. VALUES (%s)
  41. ON CONFLICT (stage_name)
  42. DO UPDATE SET started_at = now(), finished_at = NULL, stage_key = NULL
  43. ''', [stage])
  44. cur.execute('DELETE FROM stage_file WHERE stage_name = %s', [stage])
  45. cur.execute('DELETE FROM stage_dep WHERE stage_name = %s', [stage])
  46. cur.execute('DELETE FROM stage_table WHERE stage_name = %s', [stage])
  47. def record_dep(cur, stage, dep):
  48. """
  49. Record a dependency for a stage.
  50. """
  51. if hasattr(cur, 'cursor'):
  52. # this is a connection
  53. with cur, cur.cursor() as c:
  54. return record_dep(c, stage, dep)
  55. _log.info('recording dep %s -> %s', stage, dep);
  56. cur.execute('''
  57. INSERT INTO stage_dep (stage_name, dep_name, dep_key)
  58. SELECT %s, stage_name, stage_key
  59. FROM stage_status WHERE stage_name = %s
  60. RETURNING dep_name, dep_key
  61. ''', [stage, dep])
  62. return cur.fetchall()
  63. def record_tbl(cur, stage, ns, tbl):
  64. """
  65. Record a table associated with a stage.
  66. """
  67. if hasattr(cur, 'cursor'):
  68. # this is a connection
  69. with cur, cur.cursor() as c:
  70. return record_tbl(c, stage, ns, tbl)
  71. _log.info('recording table %s -> %s.%s', stage, ns, tbl);
  72. cur.execute('''
  73. INSERT INTO stage_table (stage_name, st_ns, st_name)
  74. VALUES (%s, %s, %s)
  75. ''', [stage, ns, tbl])
  76. cur.execute('''
  77. SELECT oid, kind FROM stage_table_oids WHERE stage_name = %s AND st_ns = %s AND st_name = %s
  78. ''', [stage, ns, tbl])
  79. return cur.fetchone()
  80. def record_file(cur, file, hash, stage=None):
  81. """
  82. Record a file and optionally associate it with a stage.
  83. """
  84. if hasattr(cur, 'cursor'):
  85. # this is a connection
  86. with cur, cur.cursor() as c:
  87. return record_file(c, stage)
  88. _log.info('recording checksum %s for file %s', hash, file)
  89. cur.execute("""
  90. INSERT INTO source_file (filename, checksum)
  91. VALUES (%(file)s, %(hash)s)
  92. ON CONFLICT (filename)
  93. DO UPDATE SET checksum = %(hash)s, reg_time = NOW()
  94. """, {'file': file, 'hash': hash})
  95. if stage is not None:
  96. cur.execute("INSERT INTO stage_file (stage_name, filename) VALUES (%s, %s)", [stage, file])
  97. def end_stage(cur, stage, key=None):
  98. """
  99. Record that an import stage has finished.
  100. Args:
  101. cur(psycopg2.connection or psycopg2.cursor): the database connection to use.
  102. stage(string): the name of the stage.
  103. key(string or None): the key (checksum or other key) to record.
  104. """
  105. if hasattr(cur, 'cursor'):
  106. # this is a connection
  107. with cur, cur.cursor() as c:
  108. return end_stage(c, stage, key)
  109. _log.info('finishing stage %s', stage)
  110. cur.execute('''
  111. UPDATE stage_status
  112. SET finished_at = NOW(), stage_key = coalesce(%(key)s, stage_key)
  113. WHERE stage_name = %(stage)s
  114. ''', {'stage': stage, 'key': key})
  115. def stage_exists(stage):
  116. "Query whether we have data for a stage"
  117. with db.connect() as dbc, dbc.cursor() as cur:
  118. _init_db(dbc)
  119. cur.execute('SELECT COUNT(*) FROM stage_status WHERE stage_name = %s', [stage])
  120. count, = cur.fetchone()
  121. _log.debug('have %d records for stage %s', count, stage)
  122. return count
  123. def stage_status(stage, file=None, *, timestamps=False):
  124. if file is None:
  125. sf = StringIO()
  126. else:
  127. sf = file
  128. with db.connect() as dbc:
  129. _init_db(dbc)
  130. # get the status
  131. with dbc, dbc.cursor() as cur:
  132. cur.execute('''
  133. SELECT started_at, finished_at, stage_key FROM stage_status WHERE stage_name = %s
  134. ''', [stage])
  135. row = cur.fetchone()
  136. if not row:
  137. _log.error('stage %s not run', stage)
  138. sys.exit(2)
  139. start, end, key = row
  140. _log.info('stage %s finished at %s', stage, end)
  141. print('STAGE', stage, file=sf)
  142. if timestamps:
  143. print('START', start, file=sf)
  144. cur.execute('''
  145. SELECT dep_name, dep_key
  146. FROM stage_dep
  147. WHERE stage_name = %s
  148. ORDER BY dep_name
  149. ''', [stage])
  150. for dn, dk in cur:
  151. print('DEP', dn, dk, file=sf)
  152. cur.execute('''
  153. SELECT filename, COALESCE(link.checksum, src.checksum)
  154. FROM source_file src
  155. JOIN stage_file link USING (filename)
  156. WHERE stage_name = %s
  157. ORDER BY filename
  158. ''', [stage])
  159. for fn, fh in cur:
  160. print('SOURCE', fn, fh, file=sf)
  161. cur.execute('''
  162. SELECT st_ns, st_name, oid, kind
  163. FROM stage_table_oids
  164. WHERE stage_name = %s
  165. ORDER BY st_ns, st_name
  166. ''', [stage])
  167. for ns, tbl, oid, kind in cur:
  168. print(f'TABLE {ns}.{tbl} OID {oid} KIND {kind}', file=sf)
  169. if timestamps:
  170. print('FINISH', end, file=sf)
  171. if key:
  172. print('KEY', key, file=sf)
  173. if file is None:
  174. return sf.getvalue()
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...