1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
- "use strict";
- const util = require('util');
- const zlib = require('zlib');
- const fs = require('fs');
- const through = require('through2');
- const fws = require('flush-write-stream');
- const pg = require('pg');
- const async = require('async');
- const throughput = require('./lib/throughput');
- const io = require('./lib/io');
- const options = require('yargs').argv;
- const date = options.date || '2016-07-31';
- var ninserts = 0;
- var autp = throughput('authors');
- var wtp = throughput('works');
- var etp = throughput('editions');
- /**
- * Pipe that runs PostgreSQL queries.
- */
- function runQueries(client, finished) {
- var nqueries = 0;
- var started = false;
- function write(data, enc, next) {
- async.series([
- (cb) => {
- if (started) {
- cb();
- } else {
- client.query('BEGIN ISOLATION LEVEL READ UNCOMMITTED', (err) => {
- started = true;
- cb(err);
- });
- }
- },
- (cb) => client.query(data, cb),
- (cb) => {
- nqueries += 1;
- if (nqueries % 10000 === 0) {
- async.series([
- (cb) => client.query('COMMIT', cb),
- (cb) => client.query('BEGIN ISOLATION LEVEL READ UNCOMMITTED', cb)
- ], cb);
- } else {
- process.nextTick(cb);
- }
- }
- ], next);
- }
- function flush(cb) {
- client.query('COMMIT', (err) => {
- process.nextTick(finished, err, nqueries);
- });
- }
- return fws.obj(write, flush);
- }
- const imports = {
- authors: function (rec) {
- return {
- text: 'INSERT INTO authors (author_key, author_name, author_data) VALUES ($1, $2, $3)',
- name: 'insert-author',
- values: [rec.key, rec.name, JSON.stringify(rec)]
- };
- },
- works: function (rec) {
- return {
- text: 'INSERT INTO works (work_key, work_title, work_data) VALUES ($1, $2, $3)',
- name: 'insert-work',
- values: [rec.key, rec.title, JSON.stringify(rec)]
- };
- },
- editions: function(rec) {
- return {
- text: 'INSERT INTO editions (edition_key, edition_title, edition_data) VALUES ($1, $2, $3)',
- name: 'insert-edition',
- values: [rec.key, rec.title, JSON.stringify(rec)]
- };
- }
- };
- function doImport(name, callback) {
- const proc = imports[name];
- if (proc === undefined) {
- return callback(new Error("no such import " + name));
- }
- const client = new pg.Client(options['db-url']);
- async.waterfall([
- client.connect.bind(client),
- (_, cb) => io.openFile(util.format("data/ol_dump_%s_%s.txt.gz", name, date), cb),
- (stream, cb) => {
- stream.pipe(zlib.createUnzip())
- .pipe(io.decodeLines())
- .pipe(through.obj((rec, enc, cb) => {
- cb(null, proc(rec));
- }))
- .pipe(runQueries(client, cb));
- }
- ], (err) => {
- if (err) {
- console.error("error running %s: %s", name, err);
- } else {
- console.info("finished %s", name);
- }
- client.end((e2) => {
- if (err) {
- callback(err);
- } else if (e2) {
- callback(e2);
- } else {
- callback();
- }
- });
- });
- }
- for (let name of Object.keys(imports)) {
- module.exports[name] = doImport.bind(null, name);
- }
|