import_json.rs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. use std::io::prelude::*;
  2. use std::io::{BufReader, BufWriter};
  3. use std::fs::{File, read_to_string};
  4. use std::path::PathBuf;
  5. use log::*;
  6. use structopt::StructOpt;
  7. use flate2::bufread::MultiGzDecoder;
  8. use indicatif::{ProgressBar, ProgressStyle};
  9. use sha1::Sha1;
  10. use anyhow::{Result};
  11. use serde::{Deserialize};
  12. use toml;
  13. use crate::io::{HashRead, HashWrite, DelimPrinter};
  14. use crate::cleaning::*;
  15. use crate::db::{DbOpts, CopyRequest};
  16. use crate::tracking::StageOpts;
  17. use crate::logging::set_progress;
  18. use super::Command;
  19. /// Process OpenLib data into format suitable for PostgreSQL import.
  20. #[derive(StructOpt)]
  21. #[structopt(name="import-json")]
  22. pub struct ImportJson {
  23. #[structopt(flatten)]
  24. db: DbOpts,
  25. #[structopt(flatten)]
  26. stage: StageOpts,
  27. /// Truncate the table before importing
  28. #[structopt(long="truncate")]
  29. truncate: bool,
  30. /// TOML spec file that describes the input
  31. #[structopt(name="SPEC")]
  32. spec: PathBuf,
  33. /// Input file
  34. #[structopt(name = "INPUT", parse(from_os_str))]
  35. infile: PathBuf
  36. }
  37. #[derive(Deserialize, Debug)]
  38. enum ColOp {
  39. #[serde(rename="_")]
  40. Skip,
  41. #[serde(rename="str")]
  42. String,
  43. #[serde(rename="json")]
  44. JSON
  45. }
  46. /// Import specification read from TOML
  47. #[derive(Deserialize, Debug)]
  48. struct ImportSpec {
  49. schema: String,
  50. table: String,
  51. columns: Vec<String>,
  52. #[serde(default)]
  53. format: Vec<ColOp>
  54. }
  55. impl ImportSpec {
  56. fn import<R: BufRead, W: Write>(&self, src: &mut R, dst: &mut W) -> Result<usize> {
  57. if self.format.is_empty() {
  58. self.import_raw(src, dst)
  59. } else {
  60. self.import_delim(src, dst)
  61. }
  62. }
  63. fn import_raw<R: BufRead, W: Write>(&self, src: &mut R, dst: &mut W) -> Result<usize> {
  64. let mut jsbuf = String::new();
  65. let mut n = 0;
  66. for line in src.lines() {
  67. let json = line?;
  68. clean_json(&json, &mut jsbuf);
  69. write_pgencoded(dst, jsbuf.as_bytes())?;
  70. dst.write_all(b"\n")?;
  71. n += 1;
  72. }
  73. Ok(n)
  74. }
  75. fn import_delim<R: BufRead, W: Write>(&self, src: &mut R, dst: &mut W) -> Result<usize> {
  76. let mut jsbuf = String::new();
  77. let mut n = 0;
  78. for line in src.lines() {
  79. let line = line?;
  80. let mut delim = DelimPrinter::new("\t", "\n");
  81. let split = line.split("\t");
  82. for (fld, fc) in split.zip(&self.format) {
  83. match fc {
  84. ColOp::Skip => (),
  85. ColOp::String => {
  86. delim.preface(dst)?;
  87. write_pgencoded(dst, fld.as_bytes())?;
  88. },
  89. ColOp::JSON => {
  90. delim.preface(dst)?;
  91. clean_json(&fld, &mut jsbuf);
  92. write_pgencoded(dst, jsbuf.as_bytes())?;
  93. }
  94. }
  95. }
  96. delim.end(dst)?;
  97. n += 1;
  98. }
  99. info!("processed {} lines", n);
  100. Ok(n)
  101. }
  102. }
  103. impl Command for ImportJson {
  104. fn exec(self) -> Result<()> {
  105. info!("reading spec from {:?}", &self.spec);
  106. let spec = read_to_string(&self.spec)?;
  107. let spec: ImportSpec = toml::from_str(&spec)?;
  108. let dbo = self.db.default_schema(&spec.schema);
  109. let dbc = dbo.open()?;
  110. self.stage.begin_stage(&dbc)?;
  111. // Set up the input file, tracking read progress
  112. let infn = &self.infile;
  113. info!("reading from {:?}", infn);
  114. let fs = File::open(infn)?;
  115. let pb = ProgressBar::new(fs.metadata()?.len());
  116. pb.set_style(ProgressStyle::default_bar().template("{elapsed_precise} {bar} {percent}% {bytes}/{total_bytes} (eta: {eta})"));
  117. let _pbl = set_progress(&pb);
  118. // We want to hash the file while we read it
  119. let mut in_hash = Sha1::new();
  120. let read = HashRead::create(fs, &mut in_hash);
  121. // And wrap it in progress
  122. let pbr = pb.wrap_read(read);
  123. let pbr = BufReader::new(pbr);
  124. let gzf = MultiGzDecoder::new(pbr);
  125. let mut bfs = BufReader::new(gzf);
  126. // Set up the output stream, writing to the database
  127. let req = CopyRequest::new(&dbo, &spec.table)?;
  128. let req = req.with_schema(dbo.schema());
  129. let cref: Vec<&str> = spec.columns.iter().map(String::as_str).collect();
  130. let req = req.with_columns(&cref);
  131. let req = req.truncate(self.truncate);
  132. let out = req.open()?;
  133. let mut out_hash = Sha1::new();
  134. let hout = HashWrite::create(out, &mut out_hash);
  135. let mut buf_out = BufWriter::new(hout);
  136. // Actually run the import
  137. let n = spec.import(&mut bfs, &mut buf_out)?;
  138. buf_out.flush()?;
  139. drop(buf_out);
  140. // Grab the hashes and save them to the transcript
  141. let in_hash = in_hash.hexdigest();
  142. let out_hash = out_hash.hexdigest();
  143. let mut t_out = self.stage.open_transcript()?;
  144. info!("loaded {} records with hash {}", n, out_hash);
  145. writeln!(&mut t_out, "SOURCE {:?}", infn)?;
  146. writeln!(&mut t_out, "SHASH {}", in_hash)?;
  147. writeln!(&mut t_out, "HASH {}", out_hash)?;
  148. // All done! Record success and exit.
  149. self.stage.record_file(&dbc, infn, &in_hash)?;
  150. self.stage.end_stage(&dbc, &Some(out_hash))?;
  151. Ok(())
  152. }
  153. }
Tip!

Press p or to see the previous file or, n or to see the next file