Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

db.rs 3.0 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
  1. use error::{Result, err};
  2. use std::io::prelude::*;
  3. use os_pipe::{pipe, PipeWriter};
  4. use postgres::{Connection, TlsMode};
  5. use structopt::StructOpt;
  6. use std::thread;
  7. pub trait ConnectInfo {
  8. fn db_url(&self) -> Result<String>;
  9. }
  10. impl ConnectInfo for String {
  11. fn db_url(&self) -> Result<String> {
  12. Ok(self.clone())
  13. }
  14. }
  15. impl ConnectInfo for Option<String> {
  16. fn db_url(&self) -> Result<String> {
  17. match self {
  18. Some(ref s) => Ok(s.clone()),
  19. None => Err(err("no URL provided"))
  20. }
  21. }
  22. }
  23. /// Database options
  24. #[derive(StructOpt, Debug, Clone)]
  25. pub struct DbOpts {
  26. /// Database URL to connect to
  27. #[structopt(long="db-url")]
  28. db_url: Option<String>,
  29. /// Database schema
  30. #[structopt(long="db-schema")]
  31. db_schema: Option<String>
  32. }
  33. impl DbOpts {
  34. /// Open the database connection
  35. pub fn open(&self) -> Result<Connection> {
  36. let url = self.url()?;
  37. connect(&url)
  38. }
  39. pub fn url<'a>(&'a self) -> Result<String> {
  40. Ok(match self.db_url {
  41. Some(ref s) => s.clone(),
  42. None => std::env::var("DB_URL")?
  43. })
  44. }
  45. /// Get the DB schema
  46. pub fn schema<'a>(&'a self) -> &'a str {
  47. match self.db_schema {
  48. Some(ref s) => s,
  49. None => "public"
  50. }
  51. }
  52. }
  53. impl ConnectInfo for DbOpts {
  54. fn db_url(&self) -> Result<String> {
  55. self.url()
  56. }
  57. }
  58. pub fn connect(url: &str) -> Result<Connection> {
  59. Ok(Connection::connect(url, TlsMode::None)?)
  60. }
  61. pub struct CopyTarget {
  62. writer: Option<PipeWriter>,
  63. name: String,
  64. thread: Option<thread::JoinHandle<u64>>
  65. }
  66. impl Write for CopyTarget {
  67. fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
  68. self.writer.as_ref().expect("writer missing").write(buf)
  69. }
  70. fn flush(&mut self) -> std::io::Result<()> {
  71. self.writer.as_ref().expect("writer missing").flush()
  72. }
  73. }
  74. impl Drop for CopyTarget {
  75. fn drop(&mut self) {
  76. if let Some(w) = self.writer.take() {
  77. std::mem::drop(w);
  78. }
  79. if let Some(thread) = self.thread.take() {
  80. match thread.join() {
  81. Ok(n) => info!("{}: wrote {} lines", self.name, n),
  82. Err(e) => error!("{}: error: {:?}", self.name, e)
  83. };
  84. } else {
  85. error!("{} already shut down", self.name);
  86. }
  87. }
  88. }
  89. /// Open a writer to copy data into PostgreSQL
  90. pub fn copy_target<C: ConnectInfo>(ci: &C, query: &str, name: &str) -> Result<CopyTarget> {
  91. let url = ci.db_url()?;
  92. let query = query.to_string();
  93. let (mut reader, writer) = pipe()?;
  94. let tb = thread::Builder::new().name(name.to_string());
  95. let jh = tb.spawn(move || {
  96. let query = query;
  97. let db = connect(&url).unwrap();
  98. info!("preparing {}", query);
  99. let stmt = db.prepare(&query).unwrap();
  100. stmt.copy_in(&[], &mut reader).unwrap()
  101. })?;
  102. Ok(CopyTarget {
  103. writer: Some(writer),
  104. name: name.to_string(),
  105. thread: Some(jh)
  106. })
  107. }
  108. /// Truncate a table
  109. pub fn truncate_table<C: ConnectInfo>(ci: &C, table: &str, schema: &str) -> Result<()> {
  110. let url = ci.db_url()?;
  111. let db = connect(&url)?;
  112. let q = format!("TRUNCATE {}.{}", schema, table);
  113. info!("running {}", q);
  114. db.execute(&q, &[])?;
  115. Ok(())
  116. }
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...