Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

tracking.rs 2.9 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
  1. use std::io;
  2. use std::path::{Path, PathBuf};
  3. use std::fs::OpenOptions;
  4. use anyhow::Result;
  5. use structopt::StructOpt;
  6. use postgres::Connection;
  7. use log::*;
  8. /// An import stage
  9. #[derive(StructOpt, Debug, Clone)]
  10. pub struct StageOpts {
  11. /// Stage name
  12. #[structopt(long="stage", short="s")]
  13. stage: Option<String>,
  14. /// Stage dependencies
  15. #[structopt(long="stage-dep", short="D")]
  16. deps: Vec<String>,
  17. /// Transcript file
  18. #[structopt(long="transcript", short="T")]
  19. transcript: Option<PathBuf>,
  20. }
  21. impl StageOpts {
  22. /// Start the stage
  23. pub fn begin_stage(&self, cxn: &Connection) -> Result<()> {
  24. match self.stage {
  25. Some (ref s) => {
  26. info!("beginning stage {}", s);
  27. cxn.execute("INSERT INTO stage_status (stage_name)
  28. VALUES ($1)
  29. ON CONFLICT (stage_name)
  30. DO UPDATE SET started_at = now(), finished_at = NULL, stage_key = NULL",
  31. &[s])?;
  32. cxn.execute("DELETE FROM stage_file WHERE stage_name = $1", &[s])?;
  33. cxn.execute("DELETE FROM stage_dep WHERE stage_name = $1", &[s])?;
  34. for d in &self.deps {
  35. cxn.execute("INSERT INTO stage_dep (stage_name, dep_name, dep_key)
  36. SELECT $1, stage_name, stage_key
  37. FROM stage_status WHERE stage_name = $2", &[s, &d])?;
  38. }
  39. },
  40. None => {
  41. warn!("no stage specified");
  42. }
  43. };
  44. Ok(())
  45. }
  46. /// End the stage
  47. pub fn end_stage(&self, cxn: &Connection, key: &Option<String>) -> Result<()> {
  48. match self.stage {
  49. Some (ref s) => {
  50. info!("finishing stage {}", s);
  51. cxn.execute("UPDATE stage_status
  52. SET finished_at = NOW(), stage_key = $2
  53. WHERE stage_name = $1",
  54. &[s, &key])?
  55. },
  56. None => 0
  57. };
  58. Ok(())
  59. }
  60. /// Record a file
  61. pub fn record_file<P: AsRef<Path>>(&self, cxn: &Connection, path: P, hash: &str) -> Result<()> {
  62. let path: &Path = path.as_ref();
  63. let name = path.to_string_lossy();
  64. info!("recording checksum {} for file {}", hash, name);
  65. cxn.execute("INSERT INTO source_file (filename, checksum)
  66. VALUES ($1, $2)
  67. ON CONFLICT (filename)
  68. DO UPDATE SET checksum = $2, reg_time = NOW()",
  69. &[&name, &hash])?;
  70. match self.stage {
  71. Some (ref s) => {
  72. debug!("attaching to stage {}", s);
  73. cxn.execute("INSERT INTO stage_file (stage_name, filename)
  74. VALUES ($1, $2)",
  75. &[s, &name])?
  76. },
  77. None => 0
  78. };
  79. Ok(())
  80. }
  81. pub fn open_transcript(&self) -> Result<Box<dyn io::Write>> {
  82. let w: Box<dyn io::Write> = match self.transcript {
  83. Some(ref p) => {
  84. Box::new(OpenOptions::new().write(true).create(true).truncate(true).open(p)?)
  85. },
  86. None => Box::new(io::stdout())
  87. };
  88. Ok(w)
  89. }
  90. }
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...