tracking.rs 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. use std::io;
  2. use std::io::Write;
  3. use std::path::{Path, PathBuf};
  4. use std::fs::OpenOptions;
  5. use anyhow::Result;
  6. use sha1::Sha1;
  7. use structopt::StructOpt;
  8. use postgres::Connection;
  9. use log::*;
  10. use crate::io::HashRead;
  11. /// Options controlling the import stage
  12. #[derive(StructOpt, Debug, Clone)]
  13. pub struct StageOpts {
  14. /// Stage name
  15. #[structopt(long="stage", short="s")]
  16. stage: Option<String>,
  17. /// Stage dependencies
  18. #[structopt(long="stage-dep", short="D")]
  19. deps: Vec<String>,
  20. /// Transcript file
  21. #[structopt(long="transcript", short="T")]
  22. transcript: Option<PathBuf>,
  23. }
  24. /// An import stage. Writing to the stage writes to its transcript file.
  25. pub struct Stage<'o, 'c> {
  26. options: &'o StageOpts,
  27. cxn: Option<&'c Connection>,
  28. transcript: Box<dyn io::Write>
  29. }
  30. /// A source file for a stage
  31. pub struct StageSource<'s> {
  32. stage: &'s Stage<'s, 's>,
  33. path: String,
  34. hash: Sha1
  35. }
  36. impl StageOpts {
  37. /// Start the stage
  38. pub fn begin_stage<'o, 'c>(&'o self, cxn: &'c Connection) -> Result<Stage<'o, 'c>> {
  39. match self.stage {
  40. Some (ref s) => {
  41. info!("beginning stage {}", s);
  42. cxn.execute("INSERT INTO stage_status (stage_name)
  43. VALUES ($1)
  44. ON CONFLICT (stage_name)
  45. DO UPDATE SET started_at = now(), finished_at = NULL, stage_key = NULL",
  46. &[s])?;
  47. cxn.execute("DELETE FROM stage_file WHERE stage_name = $1", &[s])?;
  48. cxn.execute("DELETE FROM stage_dep WHERE stage_name = $1", &[s])?;
  49. for d in &self.deps {
  50. cxn.execute("INSERT INTO stage_dep (stage_name, dep_name, dep_key)
  51. SELECT $1, stage_name, stage_key
  52. FROM stage_status WHERE stage_name = $2", &[s, &d])?;
  53. }
  54. },
  55. None => {
  56. warn!("no stage specified");
  57. }
  58. };
  59. let w: Box<dyn io::Write> = match self.transcript {
  60. Some(ref p) => {
  61. Box::new(OpenOptions::new().write(true).create(true).truncate(true).open(p)?)
  62. },
  63. None => Box::new(io::stdout())
  64. };
  65. Ok(Stage {
  66. options: self,
  67. cxn: Some(cxn),
  68. transcript: w
  69. })
  70. }
  71. /// Create a no-op stage.
  72. pub fn empty<'o>(&'o self) -> Stage<'o, 'o> {
  73. Stage {
  74. options: self,
  75. cxn: None,
  76. transcript: Box::new(io::stderr())
  77. }
  78. }
  79. }
  80. impl <'o, 'c> Write for Stage<'o, 'c> {
  81. fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
  82. self.transcript.write(buf)
  83. }
  84. fn flush(&mut self) -> io::Result<()> {
  85. self.transcript.flush()
  86. }
  87. fn write_fmt(&mut self, fmt: std::fmt::Arguments) -> io::Result<()> {
  88. self.transcript.write_fmt(fmt)
  89. }
  90. }
  91. impl <'o,'c> Stage<'o,'c> {
  92. /// End the stage
  93. pub fn end(self, key: &Option<String>) -> Result<()> {
  94. match self.options.stage {
  95. Some (ref s) => {
  96. info!("finishing stage {}", s);
  97. self.db_action(|db| {
  98. Ok(db.execute("UPDATE stage_status
  99. SET finished_at = NOW(), stage_key = $2
  100. WHERE stage_name = $1",
  101. &[s, &key])?)
  102. })?;
  103. },
  104. None => ()
  105. };
  106. Ok(())
  107. }
  108. fn db_action<F, R, Rt>(&self, func: F) -> Result<Option<R>> where F: FnOnce(&Connection) -> Rt, Rt: Into<Result<R>> {
  109. match self.cxn {
  110. Some(ref c) => Ok(Some(func(c).into()?)),
  111. None => Ok(None)
  112. }
  113. }
  114. /// Record a source file with its hash
  115. pub fn record_file<P: AsRef<Path>>(&self, path: P, hash: &str) -> Result<()> {
  116. let sf = self.source_file(path);
  117. sf.record_hash(hash)?;
  118. Ok(())
  119. }
  120. /// Set up to record a file with its reader, to both source and transcript
  121. pub fn source_file<'s, P: AsRef<Path>>(&'s self, path: P) -> StageSource<'s> {
  122. let path: &Path = path.as_ref();
  123. StageSource {
  124. stage: self,
  125. path: path.to_string_lossy().to_string(),
  126. hash: Sha1::new()
  127. }
  128. }
  129. }
  130. impl <'s> StageSource<'s> {
  131. /// Wrap a reader to compute this file's hash
  132. pub fn wrap_read<'a, R: io::Read>(&'a mut self, read: R) -> HashRead<'a, R> {
  133. HashRead::create(read, &mut self.hash)
  134. }
  135. /// Record the accumulated file hash (and return it)
  136. pub fn record(self) -> Result<String> {
  137. let hash = self.hash.hexdigest();
  138. self.record_hash(&hash)?;
  139. Ok(hash)
  140. }
  141. fn record_hash(&self, hash: &str) -> Result<()> {
  142. info!("recording checksum {} for file {}", hash, &self.path);
  143. self.stage.db_action(|db| {
  144. db.execute("INSERT INTO source_file (filename, checksum)
  145. VALUES ($1, $2)
  146. ON CONFLICT (filename)
  147. DO UPDATE SET checksum = $2, reg_time = NOW()",
  148. &[&self.path, &hash])?;
  149. match self.stage.options.stage {
  150. Some (ref s) => {
  151. debug!("attaching to stage {}", s);
  152. db.execute("INSERT INTO stage_file (stage_name, filename)
  153. VALUES ($1, $2)",
  154. &[s, &self.path])?
  155. },
  156. None => 0
  157. };
  158. Ok(())
  159. })?;
  160. Ok(())
  161. }
  162. }