Browse Source

Refactor Rust and add transcripts to ntriples

Michael Ekstrand 6 months ago
parent
commit
9940c3a73d
8 changed files with 200 additions and 86 deletions
  1. 34
    0
      Cargo.lock
  2. 1
    0
      Cargo.toml
  3. 8
    11
      src/commands/import_json.rs
  4. 13
    13
      src/commands/parse_isbns/mod.rs
  5. 8
    10
      src/commands/parse_marc.rs
  6. 14
    14
      src/commands/pcat.rs
  7. 2
    1
      src/db.rs
  8. 120
    37
      src/tracking.rs

+ 34
- 0
Cargo.lock

@@ -46,6 +46,11 @@ name = "autocfg"
 version = "0.1.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
+[[package]]
+name = "autocfg"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
 [[package]]
 name = "base64"
 version = "0.6.0"
@@ -75,6 +80,7 @@ version = "0.1.0"
 dependencies = [
  "anyhow 1.0.26 (registry+https://github.com/rust-lang/crates.io-index)",
  "console 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
  "derive_more 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "fallible-iterator 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
  "flate2 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -216,6 +222,25 @@ dependencies = [
  "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
+[[package]]
+name = "crossbeam-channel"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "crossbeam-utils"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
 [[package]]
 name = "crypto-mac"
 version = "0.5.2"
@@ -371,6 +396,11 @@ name = "matches"
 version = "0.1.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
+[[package]]
+name = "maybe-uninit"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
 [[package]]
 name = "md5"
 version = "0.3.8"
@@ -1095,6 +1125,7 @@ dependencies = [
 "checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee"
 "checksum atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9a7d5b8723950951411ee34d271d99dddcc2035a16ab25310ea2c8cfd4369652"
 "checksum autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a6d640bee2da49f60a4068a7fae53acde8982514ab7bae8b8cea9e88cbcfd799"
+"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
 "checksum base64 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "96434f987501f0ed4eb336a411e0631ecd1afa11574fe148587adc4ff96143c9"
 "checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12"
 "checksum block-buffer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a076c298b9ecdb530ed9d967e74a6027d6a7478924520acddcddc24c1c8ab3ab"
@@ -1113,6 +1144,8 @@ dependencies = [
 "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e"
 "checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb"
 "checksum crc32fast 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e91d5240c6975ef33aeb5f148f35275c25eda8e8a5f95abe421978b05b8bf192"
+"checksum crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061"
+"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
 "checksum crypto-mac 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0999b4ff4d3446d4ddb19a63e9e00c1876e75cd7000d20e57a693b4b3f08d958"
 "checksum derive_more 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "64fba494f26a7e7906d1b4123b46d17717670a9b199cba941bf1a3885d917f0f"
 "checksum digest 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "03b072242a8cbaf9c145665af9d250c59af3b958f83ed6824e13533cf76d5b90"
@@ -1134,6 +1167,7 @@ dependencies = [
 "checksum lock_api 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "62ebf1391f6acad60e5c8b43706dde4582df75c06698ab44511d15016bc2442c"
 "checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
 "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
+"checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
 "checksum md5 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "79c56d6a0b07f9e19282511c83fc5b086364cbae4ba8c7d5f190c3d9b0425a48"
 "checksum memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a"
 "checksum memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"

+ 1
- 0
Cargo.toml

@@ -23,3 +23,4 @@ glob = "0.3"
 anyhow = "1.0.26"
 serde = { version="1.0", features=["derive"] }
 toml = "^0.5"
+crossbeam-channel = "~0.4.2"

+ 8
- 11
src/commands/import_json.rs

@@ -13,7 +13,7 @@ use anyhow::{Result};
 use serde::{Deserialize};
 use toml;
 
-use crate::io::{HashRead, HashWrite, DelimPrinter};
+use crate::io::{HashWrite, DelimPrinter};
 use crate::cleaning::*;
 use crate::db::{DbOpts, CopyRequest};
 use crate::tracking::StageOpts;
@@ -124,7 +124,7 @@ impl Command for ImportJson {
     let dbo = self.db.default_schema(&spec.schema);
 
     let dbc = dbo.open()?;
-    self.stage.begin_stage(&dbc)?;
+    let mut stage = self.stage.begin_stage(&dbc)?;
 
     // Set up the input file, tracking read progress
     let infn = &self.infile;
@@ -135,8 +135,8 @@ impl Command for ImportJson {
     let _pbl = set_progress(&pb);
 
     // We want to hash the file while we read it
-    let mut in_hash = Sha1::new();
-    let read = HashRead::create(fs, &mut in_hash);
+    let mut in_sf = stage.source_file(infn);
+    let read = in_sf.wrap_read(fs);
     // And wrap it in progress
     let pbr = pb.wrap_read(read);
     let pbr = BufReader::new(pbr);
@@ -160,17 +160,14 @@ impl Command for ImportJson {
     drop(buf_out);
 
     // Grab the hashes and save them to the transcript
-    let in_hash = in_hash.hexdigest();
+    let in_hash = in_sf.record()?;
     let out_hash = out_hash.hexdigest();
-    let mut t_out = self.stage.open_transcript()?;
     info!("loaded {} records with hash {}", n, out_hash);
-    writeln!(&mut t_out, "SOURCE {:?}", infn)?;
-    writeln!(&mut t_out, "SHASH {}", in_hash)?;
-    writeln!(&mut t_out, "HASH {}", out_hash)?;
+    writeln!(&mut stage, "READ {:?} {}", infn, in_hash)?;
+    writeln!(&mut stage, "HASH {}", out_hash)?;
 
     // All done! Record success and exit.
-    self.stage.record_file(&dbc, infn, &in_hash)?;
-    self.stage.end_stage(&dbc, &Some(out_hash))?;
+    stage.end(&Some(out_hash))?;
     Ok(())
   }
 }

+ 13
- 13
src/commands/parse_isbns/mod.rs

@@ -1,3 +1,4 @@
+use std::io::prelude::*;
 use std::path::{Path,PathBuf};
 use std::fs::{File, OpenOptions};
 use std::io::{BufReader, BufWriter};
@@ -13,7 +14,7 @@ use fallible_iterator::FallibleIterator;
 
 use super::Command;
 use crate::db::DbOpts;
-use crate::tracking::StageOpts;
+use crate::tracking::{StageOpts};
 
 mod parsers;
 mod sources;
@@ -164,14 +165,14 @@ impl ParseISBNs {
 impl Command for ParseISBNs {
   fn exec(self) -> Result<()> {
     let db = self.db.open()?;
-    let mut tx = self.stage.open_transcript()?;
+    let mut stage = self.stage.begin_stage(&db)?;
     let writer: Box<dyn WriteISBNs> = if let Some(ref tbl) = self.out_table {
       info!("opening output table {}", tbl);
-      writeln!(tx, "DEST TABLE {}", tbl)?;
+      writeln!(stage, "DEST TABLE {}", tbl)?;
       Box::new(DBWriter::new(&self.db, tbl)?)
     } else if let Some(ref path) = self.out_file {
       info!("opening output file {:?}", path);
-      writeln!(tx, "DEST FILE {:?}", path)?;
+      writeln!(stage, "DEST FILE {:?}", path)?;
       let out = OpenOptions::new().write(true).create(true).truncate(true).open(path)?;
       let buf = BufWriter::new(out);
       Box::new(FileWriter {
@@ -180,26 +181,25 @@ impl Command for ParseISBNs {
     } else {
       Box::new(NullWriter {})
     };
-    self.stage.begin_stage(&db)?;
     let stats = if let Some(ref tbl) = self.src_table {
-      writeln!(tx, "SOURCE TABLE {}", tbl)?;
+      writeln!(stage, "SOURCE TABLE {}", tbl)?;
       let n = self.scan_db(&db, tbl, writer)?;
       n
     } else if let Some(ref path) = self.src_file {
-      writeln!(tx, "SOURCE FILE {:?}", path)?;
+      writeln!(stage, "SOURCE FILE {:?}", path)?;
       self.scan_file(&path, writer)?
     } else {
       error!("no source data specified");
       return Err(anyhow!("no source data"));
     };
-    writeln!(tx, "{} RECORDS", stats.total)?;
-    writeln!(tx, "{} IMPORTED", stats.valid)?;
-    writeln!(tx, "{} UNMATCHED", stats.unmatched)?;
-    writeln!(tx, "{} IGNORED", stats.ignored)?;
+    writeln!(stage, "{} RECORDS", stats.total)?;
+    writeln!(stage, "{} IMPORTED", stats.valid)?;
+    writeln!(stage, "{} UNMATCHED", stats.unmatched)?;
+    writeln!(stage, "{} IGNORED", stats.ignored)?;
     if let Some(ref h) = stats.hash {
-      writeln!(tx, "OUT HASH {}", h)?;
+      writeln!(stage, "OUT HASH {}", h)?;
     }
-    self.stage.end_stage(&db, &stats.hash)?;
+    stage.end(&stats.hash)?;
     info!("processed {} ISBN records", stats.total);
     info!("matched {}, ignored {}, and {} were unmatched",
           stats.valid, stats.ignored, stats.unmatched);

+ 8
- 10
src/commands/parse_marc.rs

@@ -18,7 +18,7 @@ use anyhow::{Result, anyhow};
 use crate::cleaning::write_pgencoded;
 use crate::tsv::split_first;
 use crate::tracking::StageOpts;
-use crate::io::{HashWrite, HashRead};
+use crate::io::{HashWrite};
 use crate::db::{DbOpts, CopyRequest};
 use crate::logging;
 use super::Command;
@@ -215,8 +215,7 @@ impl Command for ParseMarc {
     let out = HashWrite::create(out, &mut out_h);
     let mut out = BufWriter::new(out);
 
-    self.stage.begin_stage(&db)?;
-    let mut tx = self.stage.open_transcript()?;
+    let mut stage = self.stage.begin_stage(&db)?;
 
     let mut count = 0;
 
@@ -227,11 +226,11 @@ impl Command for ParseMarc {
       let pb = ProgressBar::new(fs.metadata()?.len());
       pb.set_style(ProgressStyle::default_bar().template("{elapsed_precise} {bar} {percent}% {bytes}/{total_bytes} (eta: {eta})"));
       let _pbs = logging::set_progress(&pb);
-      let mut in_h = Sha1::new();
+      let mut in_sf = stage.source_file(inf);
       let pbr = pb.wrap_read(fs);
       let pbr = BufReader::new(pbr);
       let gzf = MultiGzDecoder::new(pbr);
-      let gzf = HashRead::create(gzf, &mut in_h);
+      let gzf = in_sf.wrap_read(gzf);
       let mut bfs = BufReader::new(gzf);
       let nrecs = if self.linemode {
         process_delim_file(&mut bfs, &mut out, count)
@@ -241,10 +240,9 @@ impl Command for ParseMarc {
       drop(bfs);
       match nrecs {
         Ok(n) => {
-          let in_h = in_h.hexdigest();
-          self.stage.record_file(&db, inf, &in_h)?;
           info!("processed {} records from {:?}", n, inf);
-          writeln!(&mut tx, "READ {:?} {} {}", inf, n, in_h)?;
+          let hash = in_sf.record()?;
+          writeln!(&mut stage, "READ {:?} {} {}", inf, n, hash)?;
           count += n;
         },
         Err(e) => {
@@ -256,8 +254,8 @@ impl Command for ParseMarc {
 
     drop(out);
     let out_h = out_h.hexdigest();
-    writeln!(&mut tx, "COPY {}", out_h)?;
-    self.stage.end_stage(&db, &Some(out_h))?;
+    writeln!(&mut stage, "COPY {}", out_h)?;
+    stage.end(&Some(out_h))?;
 
     Ok(())
   }

+ 14
- 14
src/commands/pcat.rs

@@ -10,8 +10,8 @@ use sha1::Sha1;
 use anyhow::Result;
 
 use crate::db::{DbOpts, CopyRequest};
-use crate::tracking::StageOpts;
-use crate::io::{HashRead, HashWrite};
+use crate::tracking::{Stage, StageOpts};
+use crate::io::{HashWrite};
 use crate::logging::set_progress;
 use super::Command;
 
@@ -41,7 +41,7 @@ pub struct PCat {
 }
 
 /// Cat a file from input to output, hashing on the way.
-fn cat_file<P: AsRef<Path>, W: Write>(inf: P, out: &mut W) -> Result<String> {
+fn cat_file<'o, 'c, P: AsRef<Path>, W: Write>(stage: &mut Stage<'o, 'c>, inf: P, out: &mut W) -> Result<()> {
   let inf: &Path = inf.as_ref();
   let fstr = inf.to_string_lossy();
   info!("opening file {:?}", inf);
@@ -50,12 +50,14 @@ fn cat_file<P: AsRef<Path>, W: Write>(inf: P, out: &mut W) -> Result<String> {
   pb.set_style(ProgressStyle::default_bar().template(PB_STYLE));
   pb.set_prefix(&fstr);
   let _pbs = set_progress(&pb);
-  let mut hash = Sha1::new();
-  let read = HashRead::create(fs, &mut hash);
+  let mut sf = stage.source_file(inf);
+  let read = sf.wrap_read(fs);
   let mut pbr = pb.wrap_read(read);
   io::copy(&mut pbr, out)?;
   drop(pbr);
-  Ok(hash.hexdigest())
+  let hash = sf.record()?;
+  write!(stage, "READ {:?} {}", inf, hash)?;
+  Ok(())
 }
 
 impl Command for PCat {
@@ -71,37 +73,35 @@ impl PCat {
   fn raw_cat(&self) -> Result<()> {
     let stdout = io::stdout();
     let mut out = stdout.lock();
+    let mut stage = self.stage.empty();
 
     for inf in &self.infiles {
-      cat_file(inf, &mut out)?;
+      cat_file(&mut stage, inf, &mut out)?;
     }
     Ok(())
   }
 
   fn db_cat(&self, table: &str) -> Result<()> {
     let db = self.dbo.open()?;
-    self.stage.begin_stage(&db)?;
+    let mut stage = self.stage.begin_stage(&db)?;
     let mut req = CopyRequest::new(&self.dbo, table)?.truncate(true);
     if let Some(ref fmt) = self.format {
       req = req.with_format(fmt);
     }
     info!("copying to table {}", table);
-    let mut txout = self.stage.open_transcript()?;
-    writeln!(&mut txout, "COPY TO {}", table)?;
+    writeln!(stage, "COPY TO {}", table)?;
     let out = req.open()?;
     let mut out_hash = Sha1::new();
     let mut out = HashWrite::create(out, &mut out_hash);
 
     for inf in &self.infiles {
       let inf = inf.as_path();
-      let hash = cat_file(inf, &mut out)?;
-      self.stage.record_file(&db, inf, &hash)?;
-      writeln!(&mut txout, "READ {:?} {}", inf, hash)?;
+      cat_file(&mut stage, inf, &mut out)?;
     }
 
     drop(out);
     let hash = out_hash.hexdigest();
-    self.stage.end_stage(&db, &Some(hash))?;
+    stage.end(&Some(hash))?;
     Ok(())
   }
 }

+ 2
- 1
src/db.rs

@@ -4,8 +4,9 @@ use log::*;
 
 use anyhow::{anyhow, Result};
 use os_pipe::{pipe, PipeWriter};
-use postgres::{Connection, TlsMode};
+use postgres::{TlsMode};
 use structopt::StructOpt;
+pub use postgres::Connection;
 
 use std::thread;
 

+ 120
- 37
src/tracking.rs

@@ -1,14 +1,18 @@
 use std::io;
+use std::io::Write;
 use std::path::{Path, PathBuf};
 use std::fs::OpenOptions;
 
 use anyhow::Result;
+use sha1::Sha1;
 use structopt::StructOpt;
 use postgres::Connection;
 
 use log::*;
 
-/// An import stage
+use crate::io::HashRead;
+
+/// Options controlling the import stage
 #[derive(StructOpt, Debug, Clone)]
 pub struct StageOpts {
   /// Stage name
@@ -24,9 +28,23 @@ pub struct StageOpts {
   transcript: Option<PathBuf>,
 }
 
+/// An import stage.  Writing to the stage writes to its transcript file.
+pub struct Stage<'o, 'c> {
+  options: &'o StageOpts,
+  cxn: Option<&'c Connection>,
+  transcript: Box<dyn io::Write>
+}
+
+/// A source file for a stage
+pub struct StageSource<'s> {
+  stage: &'s Stage<'s, 's>,
+  path: String,
+  hash: Sha1
+}
+
 impl StageOpts {
   /// Start the stage
-  pub fn begin_stage(&self, cxn: &Connection) -> Result<()> {
+  pub fn begin_stage<'o, 'c>(&'o self, cxn: &'c Connection) -> Result<Stage<'o, 'c>> {
     match self.stage {
       Some (ref s) => {
         info!("beginning stage {}", s);
@@ -47,53 +65,118 @@ impl StageOpts {
         warn!("no stage specified");
       }
     };
-    Ok(())
+    let w: Box<dyn io::Write> = match self.transcript {
+      Some(ref p) => {
+        Box::new(OpenOptions::new().write(true).create(true).truncate(true).open(p)?)
+      },
+      None => Box::new(io::stdout())
+    };
+    Ok(Stage {
+      options: self,
+      cxn: Some(cxn),
+      transcript: w
+    })
   }
 
+  /// Create a no-op stage.
+  pub fn empty<'o>(&'o self) -> Stage<'o, 'o> {
+    Stage {
+      options: self,
+      cxn: None,
+      transcript: Box::new(io::stderr())
+    }
+  }
+}
+
+impl <'o, 'c> Write for Stage<'o, 'c> {
+  fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+    self.transcript.write(buf)
+  }
+
+  fn flush(&mut self) -> io::Result<()> {
+    self.transcript.flush()
+  }
+
+  fn write_fmt(&mut self, fmt: std::fmt::Arguments) -> io::Result<()> {
+    self.transcript.write_fmt(fmt)
+  }
+}
+
+impl <'o,'c> Stage<'o,'c> {
   /// End the stage
-  pub fn end_stage(&self, cxn: &Connection, key: &Option<String>) -> Result<()> {
-    match self.stage {
+  pub fn end(self, key: &Option<String>) -> Result<()> {
+    match self.options.stage {
       Some (ref s) => {
         info!("finishing stage {}", s);
-        cxn.execute("UPDATE stage_status
-                     SET finished_at = NOW(), stage_key = $2
-                     WHERE stage_name = $1",
-                    &[s, &key])?
+        self.db_action(|db| {
+          Ok(db.execute("UPDATE stage_status
+                         SET finished_at = NOW(), stage_key = $2
+                         WHERE stage_name = $1",
+                        &[s, &key])?)
+        })?;
       },
-      None => 0
+      None => ()
     };
     Ok(())
   }
 
-  /// Record a file
-  pub fn record_file<P: AsRef<Path>>(&self, cxn: &Connection, path: P, hash: &str) -> Result<()> {
-    let path: &Path = path.as_ref();
-    let name = path.to_string_lossy();
-    info!("recording checksum {} for file {}", hash, name);
-    cxn.execute("INSERT INTO source_file (filename, checksum)
-                 VALUES ($1, $2)
-                 ON CONFLICT (filename)
-                 DO UPDATE SET checksum = $2, reg_time = NOW()",
-                &[&name, &hash])?;
-    match self.stage {
-      Some (ref s) => {
-        debug!("attaching to stage {}", s);
-        cxn.execute("INSERT INTO stage_file (stage_name, filename)
-                     VALUES ($1, $2)",
-                    &[s, &name])?
-      },
-      None => 0
-    };
+  fn db_action<F, R, Rt>(&self, func: F) -> Result<Option<R>> where F: FnOnce(&Connection) -> Rt, Rt: Into<Result<R>> {
+    match self.cxn {
+      Some(ref c) => Ok(Some(func(c).into()?)),
+      None => Ok(None)
+    }
+  }
+
+  /// Record a source file with its hash
+  pub fn record_file<P: AsRef<Path>>(&self, path: P, hash: &str) -> Result<()> {
+    let sf = self.source_file(path);
+    sf.record_hash(hash);
     Ok(())
   }
 
-  pub fn open_transcript(&self) -> Result<Box<dyn io::Write>> {
-    let w: Box<dyn io::Write> = match self.transcript {
-      Some(ref p) => {
-        Box::new(OpenOptions::new().write(true).create(true).truncate(true).open(p)?)
-      },
-      None => Box::new(io::stdout())
-    };
-    Ok(w)
+  /// Set up to record a file with its reader, to both source and transcript
+  pub fn source_file<'s, P: AsRef<Path>>(&'s self, path: P) -> StageSource<'s> {
+    let path: &Path = path.as_ref();
+    StageSource {
+      stage: self,
+      path: path.to_string_lossy().to_string(),
+      hash: Sha1::new()
+    }
+  }
+}
+
+impl <'s> StageSource<'s> {
+  /// Wrap a reader to compute this file's hash
+  pub fn wrap_read<'a, R: io::Read>(&'a mut self, read: R) -> HashRead<'a, R> {
+    HashRead::create(read, &mut self.hash)
+  }
+
+  /// Record the accumulated file hash (and return it)
+  pub fn record(self) -> Result<String> {
+    let hash = self.hash.hexdigest();
+    self.record_hash(&hash)?;
+    Ok(hash)
+  }
+
+  fn record_hash(&self, hash: &str) -> Result<()> {
+    info!("recording checksum {} for file {}", hash, &self.path);
+    self.stage.db_action(|db| {
+      db.execute("INSERT INTO source_file (filename, checksum)
+                  VALUES ($1, $2)
+                  ON CONFLICT (filename)
+                  DO UPDATE SET checksum = $2, reg_time = NOW()",
+                 &[&self.path, &hash])?;
+      match self.stage.options.stage {
+        Some (ref s) => {
+          debug!("attaching to stage {}", s);
+          db.execute("INSERT INTO stage_file (stage_name, filename)
+                      VALUES ($1, $2)",
+                     &[s, &self.path])?
+        },
+        None => 0
+      };
+      Ok(())
+    })?;
+    Ok(())
   }
 }