1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
- const util = require('util');
- const zlib = require('zlib');
- const fs = require('fs');
- const through = require('through2');
- const stream = require('stream');
- const childProcess = require('child_process');
- const pg = require('pg');
- const async = require('async');
- const logger = require('gulplog');
- const throughput = require('./throughput');
- const io = require('./io');
- const pgu = require('./pgutil');
- var ninserts = 0;
- var autp = throughput('authors');
- var wtp = throughput('works');
- var etp = throughput('editions');
- function decodeLine(buf) {
- let ltab = buf.lastIndexOf('\t');
- if (ltab < 0) {
- throw new Error("no tab found in line");
- }
- let data = buf.slice(ltab + 1).toString();
- let json = JSON.parse(data);
- return json;
- }
- const imports = {
- authors: {
- table: 'ol_author',
- pfx: 'author',
- label_field: 'author_name',
- label: 'name'
- },
- works: {
- table: 'ol_work',
- pfx: 'work',
- label_field: 'work_title',
- label: 'title'
- },
- editions: {
- table: 'ol_edition',
- pfx: 'edition',
- label_field: 'edition_title',
- label: 'title'
- }
- };
- async function doImport(name, date) {
- const def = imports[name];
- if (def === undefined) {
- throw new Error("no such import " + name);
- }
- let cp = childProcess.spawn('psql', ['-c', `\\copy ${def.table} (${def.pfx}_key, ${def.label_field}, ${def.pfx}_data) FROM STDIN`], {
- stdio: ['pipe', process.stdout, process.stderr]
- });
-
- let resP = new Promise((ok, fail) => {
- io.openFile(util.format("data/ol_dump_%s_%s.txt.gz", name, date))
- .pipe(zlib.createUnzip())
- .pipe(io.decodeLines(decodeLine))
- .pipe(new stream.Transform({
- objectMode: true,
- transform(rec, enc, cb) {
- cb(null, [rec.key, rec[def.label], JSON.stringify(rec)]);
- }
- }))
- .pipe(pgu.encodePGText())
- .pipe(cp.stdin)
- .on('finish', () => ok())
- .on('error', fail);
- });
- return resP;
- }
- for (let name of Object.keys(imports)) {
- module.exports[name] = doImport.bind(null, name);
- }
|