Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

stage.rs 2.5 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
  1. use crate::error::{Result};
  2. use std::io;
  3. use std::path::{Path, PathBuf};
  4. use std::fs::OpenOptions;
  5. use structopt::StructOpt;
  6. use postgres::Connection;
  7. use log::*;
  8. /// An import stage
  9. #[derive(StructOpt, Debug, Clone)]
  10. pub struct StageOpts {
  11. /// Stage name
  12. #[structopt(long="stage", short="s")]
  13. stage: Option<String>,
  14. /// Transcript file
  15. #[structopt(long="transcript", short="T")]
  16. transcript: Option<PathBuf>,
  17. }
  18. impl StageOpts {
  19. /// Start the stage
  20. pub fn begin_stage(&self, cxn: &Connection) -> Result<()> {
  21. match self.stage {
  22. Some (ref s) => {
  23. info!("beginning stage {}", s);
  24. cxn.execute("INSERT INTO stage_status (stage_name)
  25. VALUES ($1)
  26. ON CONFLICT (stage_name)
  27. DO UPDATE SET started_at = now(), finished_at = NULL, stage_key = NULL",
  28. &[s])?;
  29. cxn.execute("DELETE FROM stage_file WHERE stage_name = $1", &[s])?
  30. },
  31. None => {
  32. warn!("no stage specified");
  33. 0
  34. }
  35. };
  36. Ok(())
  37. }
  38. /// End the stage
  39. pub fn end_stage(&self, cxn: &Connection, key: &Option<String>) -> Result<()> {
  40. match self.stage {
  41. Some (ref s) => {
  42. info!("finishing stage {}", s);
  43. cxn.execute("UPDATE stage_status
  44. SET finished_at = NOW(), stage_key = $2
  45. WHERE stage_name = $1",
  46. &[s, &key])?
  47. },
  48. None => 0
  49. };
  50. Ok(())
  51. }
  52. /// Record a file
  53. pub fn record_file<P: AsRef<Path>>(&self, cxn: &Connection, path: P, hash: &str) -> Result<()> {
  54. let path: &Path = path.as_ref();
  55. let name = path.to_string_lossy();
  56. info!("recording checksum {} for file {}", hash, name);
  57. cxn.execute("INSERT INTO source_file (filename, checksum)
  58. VALUES ($1, $2)
  59. ON CONFLICT (filename)
  60. DO UPDATE SET checksum = $2, reg_time = NOW()",
  61. &[&name, &hash])?;
  62. match self.stage {
  63. Some (ref s) => {
  64. debug!("attaching to stage {}", s);
  65. cxn.execute("INSERT INTO stage_file (stage_name, filename)
  66. VALUES ($1, $2)",
  67. &[s, &name])?
  68. },
  69. None => 0
  70. };
  71. Ok(())
  72. }
  73. pub fn open_transcript(&self) -> Result<Box<dyn io::Write>> {
  74. let w: Box<dyn io::Write> = match self.transcript {
  75. Some(ref p) => {
  76. Box::new(OpenOptions::new().write(true).create(true).truncate(true).open(p)?)
  77. },
  78. None => Box::new(io::stdout())
  79. };
  80. Ok(w)
  81. }
  82. }
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...