Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

import_json.rs 5.0 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
  1. use std::io::prelude::*;
  2. use std::io::{BufReader, BufWriter};
  3. use std::fs::{File, read_to_string};
  4. use std::path::PathBuf;
  5. use log::*;
  6. use structopt::StructOpt;
  7. use flate2::bufread::MultiGzDecoder;
  8. use indicatif::{ProgressBar, ProgressStyle};
  9. use sha1::Sha1;
  10. use anyhow::{Result, anyhow};
  11. use serde::{Deserialize};
  12. use toml;
  13. use crate::io::{HashRead, HashWrite, DelimPrinter};
  14. use crate::cleaning::*;
  15. use crate::tsv::split_first;
  16. use crate::db::{DbOpts, CopyRequest};
  17. use crate::tracking::StageOpts;
  18. use crate::logging::set_progress;
  19. use super::Command;
  20. /// Process OpenLib data into format suitable for PostgreSQL import.
  21. #[derive(StructOpt)]
  22. #[structopt(name="import-json")]
  23. pub struct ImportJson {
  24. #[structopt(flatten)]
  25. db: DbOpts,
  26. #[structopt(flatten)]
  27. stage: StageOpts,
  28. /// Truncate the table before importing
  29. #[structopt(long="truncate")]
  30. truncate: bool,
  31. /// TOML spec file that describes the input
  32. #[structopt(name="SPEC")]
  33. spec: PathBuf,
  34. /// Input file
  35. #[structopt(name = "INPUT", parse(from_os_str))]
  36. infile: PathBuf
  37. }
  38. #[derive(Deserialize, Debug)]
  39. enum ColOp {
  40. #[serde(rename="_")]
  41. Skip,
  42. #[serde(rename="str")]
  43. String,
  44. #[serde(rename="json")]
  45. JSON
  46. }
  47. /// Import specification read from TOML
  48. #[derive(Deserialize, Debug)]
  49. struct ImportSpec {
  50. schema: String,
  51. table: String,
  52. columns: Vec<String>,
  53. #[serde(default)]
  54. format: Vec<ColOp>
  55. }
  56. impl ImportSpec {
  57. fn import<R: BufRead, W: Write>(&self, src: &mut R, dst: &mut W) -> Result<usize> {
  58. if self.format.is_empty() {
  59. self.import_raw(src, dst)
  60. } else {
  61. self.import_delim(src, dst)
  62. }
  63. }
  64. fn import_raw<R: BufRead, W: Write>(&self, src: &mut R, dst: &mut W) -> Result<usize> {
  65. let mut jsbuf = String::new();
  66. let mut n = 0;
  67. for line in src.lines() {
  68. let json = line?;
  69. clean_json(&json, &mut jsbuf);
  70. write_pgencoded(dst, jsbuf.as_bytes())?;
  71. dst.write_all(b"\n")?;
  72. n += 1;
  73. }
  74. Ok(n)
  75. }
  76. fn import_delim<R: BufRead, W: Write>(&self, src: &mut R, dst: &mut W) -> Result<usize> {
  77. let mut jsbuf = String::new();
  78. let mut n = 0;
  79. for line in src.lines() {
  80. let mut line = line?;
  81. let mut delim = DelimPrinter::new("\t", "\n");
  82. for i in 0..self.format.len() {
  83. let (fld, rest) = split_first(&line).ok_or_else(|| anyhow!("invalid line"))?;
  84. match self.format[i] {
  85. ColOp::Skip => (),
  86. ColOp::String => {
  87. debug!("writing string field {}", fld);
  88. delim.preface(dst)?;
  89. write_pgencoded(dst, fld.as_bytes())?;
  90. },
  91. ColOp::JSON => {
  92. delim.preface(dst)?;
  93. debug!("writing JSON field {}", fld);
  94. clean_json(&fld, &mut jsbuf);
  95. write_pgencoded(dst, jsbuf.as_bytes())?;
  96. }
  97. }
  98. line = rest.to_string();
  99. }
  100. delim.end(dst)?;
  101. n += 1;
  102. }
  103. Ok(n)
  104. }
  105. }
  106. impl Command for ImportJson {
  107. fn exec(self) -> Result<()> {
  108. info!("reading spec from {:?}", &self.spec);
  109. let spec = read_to_string(&self.spec)?;
  110. let spec: ImportSpec = toml::from_str(&spec)?;
  111. let dbo = self.db.default_schema(&spec.schema);
  112. let dbc = dbo.open()?;
  113. self.stage.begin_stage(&dbc)?;
  114. // Set up the input file, tracking read progress
  115. let infn = &self.infile;
  116. info!("reading from {:?}", infn);
  117. let fs = File::open(infn)?;
  118. let pb = ProgressBar::new(fs.metadata()?.len());
  119. pb.set_style(ProgressStyle::default_bar().template("{elapsed_precise} {bar} {percent}% {bytes}/{total_bytes} (eta: {eta})"));
  120. let _pbl = set_progress(&pb);
  121. // We want to hash the file while we read it
  122. let mut in_hash = Sha1::new();
  123. let read = HashRead::create(fs, &mut in_hash);
  124. // And wrap it in progress
  125. let pbr = pb.wrap_read(read);
  126. let pbr = BufReader::new(pbr);
  127. let gzf = MultiGzDecoder::new(pbr);
  128. let mut bfs = BufReader::new(gzf);
  129. // Set up the output stream, writing to the database
  130. let req = CopyRequest::new(&dbo, &spec.table)?;
  131. let req = req.with_schema(dbo.schema());
  132. let cref: Vec<&str> = spec.columns.iter().map(String::as_str).collect();
  133. let req = req.with_columns(&cref);
  134. let req = req.truncate(self.truncate);
  135. let out = req.open()?;
  136. let mut out_hash = Sha1::new();
  137. let hout = HashWrite::create(out, &mut out_hash);
  138. let mut buf_out = BufWriter::new(hout);
  139. // Actually run the import
  140. let n = spec.import(&mut bfs, &mut buf_out)?;
  141. buf_out.flush()?;
  142. drop(buf_out);
  143. // Grab the hashes and save them to the transcript
  144. let in_hash = in_hash.hexdigest();
  145. let out_hash = out_hash.hexdigest();
  146. let mut t_out = self.stage.open_transcript()?;
  147. info!("loaded {} records with hash {}", n, out_hash);
  148. writeln!(&mut t_out, "SOURCE {:?}", infn)?;
  149. writeln!(&mut t_out, "SHASH {}", in_hash)?;
  150. writeln!(&mut t_out, "HASH {}", out_hash)?;
  151. // All done! Record success and exit.
  152. self.stage.record_file(&dbc, infn, &in_hash)?;
  153. self.stage.end_stage(&dbc, &Some(out_hash))?;
  154. Ok(())
  155. }
  156. }
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...