Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

tracking.rs 4.9 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
  1. use std::io;
  2. use std::io::Write;
  3. use std::path::{Path, PathBuf};
  4. use std::fs::OpenOptions;
  5. use anyhow::Result;
  6. use sha1::Sha1;
  7. use structopt::StructOpt;
  8. use postgres::Connection;
  9. use log::*;
  10. use crate::io::HashRead;
  11. /// Options controlling the import stage
  12. #[derive(StructOpt, Debug, Clone)]
  13. pub struct StageOpts {
  14. /// Stage name
  15. #[structopt(long="stage", short="s")]
  16. stage: Option<String>,
  17. /// Stage dependencies
  18. #[structopt(long="stage-dep", short="D")]
  19. deps: Vec<String>,
  20. /// Transcript file
  21. #[structopt(long="transcript", short="T")]
  22. transcript: Option<PathBuf>,
  23. }
  24. /// An import stage. Writing to the stage writes to its transcript file.
  25. pub struct Stage<'o, 'c> {
  26. options: &'o StageOpts,
  27. cxn: Option<&'c Connection>,
  28. transcript: Box<dyn io::Write>
  29. }
  30. /// A source file for a stage
  31. pub struct StageSource<'s> {
  32. stage: &'s Stage<'s, 's>,
  33. path: String,
  34. hash: Sha1
  35. }
  36. impl StageOpts {
  37. /// Start the stage
  38. pub fn begin_stage<'o, 'c>(&'o self, cxn: &'c Connection) -> Result<Stage<'o, 'c>> {
  39. match self.stage {
  40. Some (ref s) => {
  41. info!("beginning stage {}", s);
  42. cxn.execute("INSERT INTO stage_status (stage_name)
  43. VALUES ($1)
  44. ON CONFLICT (stage_name)
  45. DO UPDATE SET started_at = now(), finished_at = NULL, stage_key = NULL",
  46. &[s])?;
  47. cxn.execute("DELETE FROM stage_file WHERE stage_name = $1", &[s])?;
  48. cxn.execute("DELETE FROM stage_dep WHERE stage_name = $1", &[s])?;
  49. for d in &self.deps {
  50. cxn.execute("INSERT INTO stage_dep (stage_name, dep_name, dep_key)
  51. SELECT $1, stage_name, stage_key
  52. FROM stage_status WHERE stage_name = $2", &[s, &d])?;
  53. }
  54. },
  55. None => {
  56. warn!("no stage specified");
  57. }
  58. };
  59. let w: Box<dyn io::Write> = match self.transcript {
  60. Some(ref p) => {
  61. Box::new(OpenOptions::new().write(true).create(true).truncate(true).open(p)?)
  62. },
  63. None => Box::new(io::stdout())
  64. };
  65. Ok(Stage {
  66. options: self,
  67. cxn: Some(cxn),
  68. transcript: w
  69. })
  70. }
  71. /// Create a no-op stage.
  72. pub fn empty<'o>(&'o self) -> Stage<'o, 'o> {
  73. Stage {
  74. options: self,
  75. cxn: None,
  76. transcript: Box::new(io::stderr())
  77. }
  78. }
  79. }
  80. impl <'o, 'c> Write for Stage<'o, 'c> {
  81. fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
  82. self.transcript.write(buf)
  83. }
  84. fn flush(&mut self) -> io::Result<()> {
  85. self.transcript.flush()
  86. }
  87. fn write_fmt(&mut self, fmt: std::fmt::Arguments) -> io::Result<()> {
  88. self.transcript.write_fmt(fmt)
  89. }
  90. }
  91. impl <'o,'c> Stage<'o,'c> {
  92. /// End the stage
  93. pub fn end(self, key: &Option<String>) -> Result<()> {
  94. match self.options.stage {
  95. Some (ref s) => {
  96. info!("finishing stage {}", s);
  97. self.db_action(|db| {
  98. Ok(db.execute("UPDATE stage_status
  99. SET finished_at = NOW(), stage_key = $2
  100. WHERE stage_name = $1",
  101. &[s, &key])?)
  102. })?;
  103. },
  104. None => ()
  105. };
  106. Ok(())
  107. }
  108. fn db_action<F, R, Rt>(&self, func: F) -> Result<Option<R>> where F: FnOnce(&Connection) -> Rt, Rt: Into<Result<R>> {
  109. match self.cxn {
  110. Some(ref c) => Ok(Some(func(c).into()?)),
  111. None => Ok(None)
  112. }
  113. }
  114. /// Record a source file with its hash
  115. pub fn record_file<P: AsRef<Path>>(&self, path: P, hash: &str) -> Result<()> {
  116. let sf = self.source_file(path);
  117. sf.record_hash(hash);
  118. Ok(())
  119. }
  120. /// Set up to record a file with its reader, to both source and transcript
  121. pub fn source_file<'s, P: AsRef<Path>>(&'s self, path: P) -> StageSource<'s> {
  122. let path: &Path = path.as_ref();
  123. StageSource {
  124. stage: self,
  125. path: path.to_string_lossy().to_string(),
  126. hash: Sha1::new()
  127. }
  128. }
  129. }
  130. impl <'s> StageSource<'s> {
  131. /// Wrap a reader to compute this file's hash
  132. pub fn wrap_read<'a, R: io::Read>(&'a mut self, read: R) -> HashRead<'a, R> {
  133. HashRead::create(read, &mut self.hash)
  134. }
  135. /// Record the accumulated file hash (and return it)
  136. pub fn record(self) -> Result<String> {
  137. let hash = self.hash.hexdigest();
  138. self.record_hash(&hash)?;
  139. Ok(hash)
  140. }
  141. fn record_hash(&self, hash: &str) -> Result<()> {
  142. info!("recording checksum {} for file {}", hash, &self.path);
  143. self.stage.db_action(|db| {
  144. db.execute("INSERT INTO source_file (filename, checksum)
  145. VALUES ($1, $2)
  146. ON CONFLICT (filename)
  147. DO UPDATE SET checksum = $2, reg_time = NOW()",
  148. &[&self.path, &hash])?;
  149. match self.stage.options.stage {
  150. Some (ref s) => {
  151. debug!("attaching to stage {}", s);
  152. db.execute("INSERT INTO stage_file (stage_name, filename)
  153. VALUES ($1, $2)",
  154. &[s, &self.path])?
  155. },
  156. None => 0
  157. };
  158. Ok(())
  159. })?;
  160. Ok(())
  161. }
  162. }
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...