tracking.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. """
  2. Code for supporting import data tracking and relationships.
  3. """
  4. import hashlib
  5. import logging
  6. from io import StringIO
  7. from pathlib import Path
  8. from . import db
  9. _log = logging.getLogger(__name__)
  10. def _init_db(dbc):
  11. # initialize database, in case nothing has been run
  12. with dbc, dbc.cursor() as cur:
  13. cur.execute(db.meta_schema)
  14. def hash_and_record_file(cur, path, stage=None):
  15. """
  16. Compute the checksum of a file and record it in the database.
  17. """
  18. h = hashlib.md5()
  19. with open(path, 'rb') as f:
  20. data = f.read(8192 * 4)
  21. while data:
  22. h.update(data)
  23. data = f.read(8192 * 4)
  24. hash = h.hexdigest()
  25. path = Path(path).as_posix()
  26. _log.info('recording file %s with hash %s', path, hash)
  27. record_file(cur, path, hash, stage)
  28. return hash
  29. def begin_stage(cur, stage):
  30. """
  31. Record that a stage is beginning.
  32. """
  33. if hasattr(cur, 'cursor'):
  34. # this is a connection
  35. with cur, cur.cursor() as c:
  36. return begin_stage(c, stage)
  37. _log.info('starting or resetting stage %s', stage)
  38. cur.execute('''
  39. INSERT INTO stage_status (stage_name)
  40. VALUES (%s)
  41. ON CONFLICT (stage_name)
  42. DO UPDATE SET started_at = now(), finished_at = NULL, stage_key = NULL
  43. ''', [stage])
  44. cur.execute('DELETE FROM stage_file WHERE stage_name = %s', [stage])
  45. cur.execute('DELETE FROM stage_dep WHERE stage_name = %s', [stage])
  46. cur.execute('DELETE FROM stage_table WHERE stage_name = %s', [stage])
  47. def record_dep(cur, stage, dep):
  48. """
  49. Record a dependency for a stage.
  50. """
  51. if hasattr(cur, 'cursor'):
  52. # this is a connection
  53. with cur, cur.cursor() as c:
  54. return record_dep(c, stage, dep)
  55. _log.info('recording dep %s -> %s', stage, dep);
  56. cur.execute('''
  57. INSERT INTO stage_dep (stage_name, dep_name, dep_key)
  58. SELECT %s, stage_name, stage_key
  59. FROM stage_status WHERE stage_name = %s
  60. RETURNING dep_name, dep_key
  61. ''', [stage, dep])
  62. return cur.fetchall()
  63. def record_tbl(cur, stage, ns, tbl):
  64. """
  65. Record a table associated with a stage.
  66. """
  67. if hasattr(cur, 'cursor'):
  68. # this is a connection
  69. with cur, cur.cursor() as c:
  70. return record_tbl(c, stage, ns, tbl)
  71. _log.info('recording table %s -> %s.%s', stage, ns, tbl);
  72. cur.execute('''
  73. INSERT INTO stage_table (stage_name, st_ns, st_name)
  74. VALUES (%s, %s, %s)
  75. ''', [stage, ns, tbl])
  76. cur.execute('''
  77. SELECT oid, kind FROM stage_table_oids WHERE stage_name = %s AND st_ns = %s AND st_name = %s
  78. ''', [stage, ns, tbl])
  79. return cur.fetchone()
  80. def record_file(cur, file, hash, stage=None):
  81. """
  82. Record a file and optionally associate it with a stage.
  83. """
  84. if hasattr(cur, 'cursor'):
  85. # this is a connection
  86. with cur, cur.cursor() as c:
  87. return record_file(c, stage)
  88. _log.info('recording checksum %s for file %s', hash, file)
  89. cur.execute("""
  90. INSERT INTO source_file (filename, checksum)
  91. VALUES (%(file)s, %(hash)s)
  92. ON CONFLICT (filename)
  93. DO UPDATE SET checksum = %(hash)s, reg_time = NOW()
  94. """, {'file': file, 'hash': hash})
  95. if stage is not None:
  96. cur.execute("INSERT INTO stage_file (stage_name, filename) VALUES (%s, %s)", [stage, file])
  97. def end_stage(cur, stage, key=None):
  98. """
  99. Record that an import stage has finished.
  100. Args:
  101. cur(psycopg2.connection or psycopg2.cursor): the database connection to use.
  102. stage(string): the name of the stage.
  103. key(string or None): the key (checksum or other key) to record.
  104. """
  105. if hasattr(cur, 'cursor'):
  106. # this is a connection
  107. with cur, cur.cursor() as c:
  108. return end_stage(c, stage, key)
  109. _log.info('finishing stage %s', stage)
  110. cur.execute('''
  111. UPDATE stage_status
  112. SET finished_at = NOW(), stage_key = coalesce(%(key)s, stage_key)
  113. WHERE stage_name = %(stage)s
  114. ''', {'stage': stage, 'key': key})
  115. def stage_exists(stage):
  116. "Query whether we have data for a stage"
  117. with db.connect() as dbc, dbc.cursor() as cur:
  118. _init_db(dbc)
  119. cur.execute('SELECT COUNT(*) FROM stage_status WHERE stage_name = %s', [stage])
  120. count, = cur.fetchone()
  121. _log.debug('have %d records for stage %s', count, stage)
  122. return count
  123. def stage_status(stage, file=None, *, timestamps=False):
  124. if file is None:
  125. sf = StringIO()
  126. else:
  127. sf = file
  128. with db.connect() as dbc:
  129. _init_db(dbc)
  130. # get the status
  131. with dbc, dbc.cursor() as cur:
  132. cur.execute('''
  133. SELECT started_at, finished_at, stage_key FROM stage_status WHERE stage_name = %s
  134. ''', [stage])
  135. row = cur.fetchone()
  136. if not row:
  137. _log.error('stage %s not run', stage)
  138. sys.exit(2)
  139. start, end, key = row
  140. _log.info('stage %s finished at %s', stage, end)
  141. print('STAGE', stage, file=sf)
  142. if timestamps:
  143. print('START', start, file=sf)
  144. cur.execute('''
  145. SELECT dep_name, dep_key
  146. FROM stage_dep
  147. WHERE stage_name = %s
  148. ORDER BY dep_name
  149. ''', [stage])
  150. for dn, dk in cur:
  151. print('DEP', dn, dk, file=sf)
  152. cur.execute('''
  153. SELECT filename, COALESCE(link.checksum, src.checksum)
  154. FROM source_file src
  155. JOIN stage_file link USING (filename)
  156. WHERE stage_name = %s
  157. ORDER BY filename
  158. ''', [stage])
  159. for fn, fh in cur:
  160. print('SOURCE', fn, fh, file=sf)
  161. cur.execute('''
  162. SELECT st_ns, st_name, oid, kind
  163. FROM stage_table_oids
  164. WHERE stage_name = %s
  165. ORDER BY st_ns, st_name
  166. ''', [stage])
  167. for ns, tbl, oid, kind in cur:
  168. print(f'TABLE {ns}.{tbl} OID {oid} KIND {kind}', file=sf)
  169. if timestamps:
  170. print('FINISH', end, file=sf)
  171. if key:
  172. print('KEY', key, file=sf)
  173. if file is None:
  174. return sf.getvalue()
Tip!

Press p or to see the previous file or, n or to see the next file