Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

db.rs 1.8 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
  1. #[macro_use]
  2. use log;
  3. use error::Result;
  4. use std::io::prelude::*;
  5. use os_pipe::{pipe, PipeWriter};
  6. use postgres::{Connection, TlsMode};
  7. use std::thread;
  8. pub fn db_open(url: &Option<String>) -> Result<Connection> {
  9. let env = std::env::var("DB_URL");
  10. let url = match url {
  11. Some(u) => u.clone(),
  12. None => env?
  13. };
  14. info!("connecting to database {}", url);
  15. Ok(Connection::connect(url, TlsMode::None)?)
  16. }
  17. pub struct CopyTarget {
  18. writer: Option<PipeWriter>,
  19. name: String,
  20. thread: Option<thread::JoinHandle<u64>>
  21. }
  22. impl Write for CopyTarget {
  23. fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
  24. self.writer.as_ref().expect("writer missing").write(buf)
  25. }
  26. fn flush(&mut self) -> std::io::Result<()> {
  27. self.writer.as_ref().expect("writer missing").flush()
  28. }
  29. }
  30. impl Drop for CopyTarget {
  31. fn drop(&mut self) {
  32. if let Some(w) = self.writer.take() {
  33. std::mem::drop(w);
  34. }
  35. if let Some(thread) = self.thread.take() {
  36. match thread.join() {
  37. Ok(n) => info!("{}: wrote {} lines", self.name, n),
  38. Err(e) => error!("{}: error: {:?}", self.name, e)
  39. };
  40. } else {
  41. error!("{} already shut down", self.name);
  42. }
  43. }
  44. }
  45. /// Open a writer to copy data into PostgreSQL
  46. pub fn copy_target(url: &Option<String>, query: &str, name: &str) -> Result<CopyTarget> {
  47. let url = url.as_ref().map(|s| s.clone());
  48. let query = query.to_string();
  49. let (mut reader, writer) = pipe()?;
  50. let tb = thread::Builder::new().name(name.to_string());
  51. let jh = tb.spawn(move || {
  52. let query = query;
  53. let db = db_open(&url).unwrap();
  54. info!("preparing {}", query);
  55. let stmt = db.prepare(&query).unwrap();
  56. stmt.copy_in(&[], &mut reader).unwrap()
  57. })?;
  58. Ok(CopyTarget {
  59. writer: Some(writer),
  60. name: name.to_string(),
  61. thread: Some(jh)
  62. })
  63. }
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...