1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
- use error::{Result, err};
- use std::io::prelude::*;
- use os_pipe::{pipe, PipeWriter};
- use postgres::{Connection, TlsMode};
- use structopt::StructOpt;
- use std::thread;
- pub trait ConnectInfo {
- fn db_url(&self) -> Result<String>;
- }
- impl ConnectInfo for String {
- fn db_url(&self) -> Result<String> {
- Ok(self.clone())
- }
- }
- impl ConnectInfo for Option<String> {
- fn db_url(&self) -> Result<String> {
- match self {
- Some(ref s) => Ok(s.clone()),
- None => Err(err("no URL provided"))
- }
- }
- }
- /// Database options
- #[derive(StructOpt, Debug, Clone)]
- pub struct DbOpts {
- /// Database URL to connect to
- #[structopt(long="db-url")]
- db_url: Option<String>,
- /// Database schema
- #[structopt(long="db-schema")]
- db_schema: Option<String>
- }
- impl DbOpts {
- /// Open the database connection
- pub fn open(&self) -> Result<Connection> {
- let url = self.url()?;
- connect(&url)
- }
- pub fn url<'a>(&'a self) -> Result<String> {
- Ok(match self.db_url {
- Some(ref s) => s.clone(),
- None => std::env::var("DB_URL")?
- })
- }
- /// Get the DB schema
- pub fn schema<'a>(&'a self) -> &'a str {
- match self.db_schema {
- Some(ref s) => s,
- None => "public"
- }
- }
- }
- impl ConnectInfo for DbOpts {
- fn db_url(&self) -> Result<String> {
- self.url()
- }
- }
- pub fn connect(url: &str) -> Result<Connection> {
- Ok(Connection::connect(url, TlsMode::None)?)
- }
- pub struct CopyTarget {
- writer: Option<PipeWriter>,
- name: String,
- thread: Option<thread::JoinHandle<u64>>
- }
- impl Write for CopyTarget {
- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- self.writer.as_ref().expect("writer missing").write(buf)
- }
- fn flush(&mut self) -> std::io::Result<()> {
- self.writer.as_ref().expect("writer missing").flush()
- }
- }
- impl Drop for CopyTarget {
- fn drop(&mut self) {
- if let Some(w) = self.writer.take() {
- std::mem::drop(w);
- }
- if let Some(thread) = self.thread.take() {
- match thread.join() {
- Ok(n) => info!("{}: wrote {} lines", self.name, n),
- Err(e) => error!("{}: error: {:?}", self.name, e)
- };
- } else {
- error!("{} already shut down", self.name);
- }
- }
- }
- /// Open a writer to copy data into PostgreSQL
- pub fn copy_target<C: ConnectInfo>(ci: &C, query: &str, name: &str) -> Result<CopyTarget> {
- let url = ci.db_url()?;
- let query = query.to_string();
- let (mut reader, writer) = pipe()?;
-
- let tb = thread::Builder::new().name(name.to_string());
- let jh = tb.spawn(move || {
- let query = query;
- let db = connect(&url).unwrap();
- info!("preparing {}", query);
- let stmt = db.prepare(&query).unwrap();
- stmt.copy_in(&[], &mut reader).unwrap()
- })?;
- Ok(CopyTarget {
- writer: Some(writer),
- name: name.to_string(),
- thread: Some(jh)
- })
- }
- /// Truncate a table
- pub fn truncate_table<C: ConnectInfo>(ci: &C, table: &str, schema: &str) -> Result<()> {
- let url = ci.db_url()?;
- let db = connect(&url)?;
- let q = format!("TRUNCATE {}.{}", schema, table);
- info!("running {}", q);
- db.execute(&q, &[])?;
- Ok(())
- }
|