Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

db.rs 1.6 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
  1. #[macro_use]
  2. use log;
  3. use error::Result;
  4. use std::io::prelude::*;
  5. use os_pipe::{pipe, PipeWriter};
  6. use postgres::{Connection, TlsMode};
  7. use std::thread;
  8. pub fn db_open(url: &Option<String>) -> Result<Connection> {
  9. let env = std::env::var("DB_URL");
  10. let url = match url {
  11. Some(u) => u.clone(),
  12. None => env?
  13. };
  14. info!("connecting to database {}", url);
  15. Ok(Connection::connect(url, TlsMode::None)?)
  16. }
  17. pub struct CopyTarget {
  18. writer: Option<PipeWriter>,
  19. thread: Option<thread::JoinHandle<u64>>
  20. }
  21. impl Write for CopyTarget {
  22. fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
  23. self.writer.as_ref().expect("writer missing").write(buf)
  24. }
  25. fn flush(&mut self) -> std::io::Result<()> {
  26. self.writer.as_ref().expect("writer missing").flush()
  27. }
  28. }
  29. impl Drop for CopyTarget {
  30. fn drop(&mut self) {
  31. if let Some(w) = self.writer.take() {
  32. std::mem::drop(w);
  33. }
  34. if let Some(thread) = self.thread.take() {
  35. let n = thread.join().unwrap();
  36. info!("wrote {} lines", n);
  37. }
  38. }
  39. }
  40. /// Open a writer to copy data into PostgreSQL
  41. pub fn copy_target(url: &Option<String>, query: &str, name: &str) -> Result<CopyTarget> {
  42. let url = url.as_ref().map(|s| s.clone());
  43. let query = query.to_string();
  44. let (mut reader, writer) = pipe()?;
  45. let tb = thread::Builder::new().name(name.to_string());
  46. let jh = tb.spawn(move || {
  47. let query = query;
  48. let db = db_open(&url).unwrap();
  49. info!("preparing {}", query);
  50. let stmt = db.prepare(&query).unwrap();
  51. stmt.copy_in(&[], &mut reader).unwrap()
  52. })?;
  53. Ok(CopyTarget {
  54. writer: Some(writer),
  55. thread: Some(jh)
  56. })
  57. }
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...