Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

tracking.py 3.1 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
  1. """
  2. Code for supporting import data tracking and relationships.
  3. """
  4. import hashlib
  5. import logging
  6. from pathlib import Path
  7. _log = logging.getLogger(__name__)
  8. def hash_and_record_file(cur, path, stage=None):
  9. """
  10. Compute the checksum of a file and record it in the database.
  11. """
  12. h = hashlib.md5()
  13. with open(path, 'rb') as f:
  14. data = f.read(8192 * 4)
  15. while data:
  16. h.update(data)
  17. data = f.read(8192 * 4)
  18. hash = h.hexdigest()
  19. path = Path(path).as_posix()
  20. _log.info('recording file %s with hash %s', path, hash)
  21. record_file(cur, path, hash, stage)
  22. return hash
  23. def begin_stage(cur, stage):
  24. """
  25. Record that a stage is beginning.
  26. """
  27. if hasattr(cur, 'cursor'):
  28. # this is a connection
  29. with cur, cur.cursor() as c:
  30. return begin_stage(c, stage)
  31. _log.info('starting or resetting stage %s', stage)
  32. cur.execute('''
  33. INSERT INTO stage_status (stage_name)
  34. VALUES (%s)
  35. ON CONFLICT (stage_name)
  36. DO UPDATE SET started_at = now(), finished_at = NULL, stage_key = NULL
  37. ''', [stage])
  38. cur.execute('DELETE FROM stage_file WHERE stage_name = %s', [stage])
  39. cur.execute('DELETE FROM stage_dep WHERE stage_name = %s', [stage])
  40. def record_dep(cur, stage, dep):
  41. """
  42. Record a dependency for a stage.
  43. """
  44. if hasattr(cur, 'cursor'):
  45. # this is a connection
  46. with cur, cur.cursor() as c:
  47. return record_dep(c, stage, dep)
  48. _log.info('recording dep %s -> %s', stage, dep);
  49. cur.execute('''
  50. INSERT INTO stage_dep (stage_name, dep_name, dep_key)
  51. SELECT %s, stage_name, stage_key
  52. FROM stage_status WHERE stage_name = %s
  53. RETURNING dep_name, dep_key
  54. ''', [stage, dep])
  55. return cur.fetchall()
  56. def record_file(cur, file, hash, stage=None):
  57. """
  58. Record a file and optionally associate it with a stage.
  59. """
  60. if hasattr(cur, 'cursor'):
  61. # this is a connection
  62. with cur, cur.cursor() as c:
  63. return record_file(c, stage)
  64. _log.info('recording checksum %s for file %s', hash, file)
  65. cur.execute("""
  66. INSERT INTO source_file (filename, checksum)
  67. VALUES (%(file)s, %(hash)s)
  68. ON CONFLICT (filename)
  69. DO UPDATE SET checksum = %(hash)s, reg_time = NOW()
  70. """, {'file': file, 'hash': hash})
  71. if stage is not None:
  72. cur.execute("INSERT INTO stage_file (stage_name, filename) VALUES (%s, %s)", [stage, file])
  73. def end_stage(cur, stage, key=None):
  74. """
  75. Record that an import stage has finished.
  76. Args:
  77. cur(psycopg2.connection or psycopg2.cursor): the database connection to use.
  78. stage(string): the name of the stage.
  79. key(string or None): the key (checksum or other key) to record.
  80. """
  81. if hasattr(cur, 'cursor'):
  82. # this is a connection
  83. with cur, cur.cursor() as c:
  84. return end_stage(c, stage, key)
  85. _log.info('finishing stage %s', stage)
  86. cur.execute('''
  87. UPDATE stage_status
  88. SET finished_at = NOW(), stage_key = coalesce(%(key)s, stage_key)
  89. WHERE stage_name = %(stage)s
  90. ''', {'stage': stage, 'key': key})
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...