Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

ol-import.js 2.0 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
  1. const util = require('util');
  2. const zlib = require('zlib');
  3. const fs = require('fs');
  4. const through = require('through2');
  5. const stream = require('stream');
  6. const childProcess = require('child_process');
  7. const pg = require('pg');
  8. const async = require('async');
  9. const logger = require('gulplog');
  10. const throughput = require('./throughput');
  11. const io = require('./io');
  12. const pgu = require('./pgutil');
  13. var ninserts = 0;
  14. var autp = throughput('authors');
  15. var wtp = throughput('works');
  16. var etp = throughput('editions');
  17. function decodeLine(buf) {
  18. let ltab = buf.lastIndexOf('\t');
  19. if (ltab < 0) {
  20. throw new Error("no tab found in line");
  21. }
  22. let data = buf.slice(ltab + 1).toString();
  23. let json = JSON.parse(data);
  24. return json;
  25. }
  26. const imports = {
  27. authors: {
  28. table: 'ol_author',
  29. pfx: 'author',
  30. label_field: 'author_name',
  31. label: 'name'
  32. },
  33. works: {
  34. table: 'ol_work',
  35. pfx: 'work',
  36. label_field: 'work_title',
  37. label: 'title'
  38. },
  39. editions: {
  40. table: 'ol_edition',
  41. pfx: 'edition',
  42. label_field: 'edition_title',
  43. label: 'title'
  44. }
  45. };
  46. async function doImport(name, date) {
  47. const def = imports[name];
  48. if (def === undefined) {
  49. throw new Error("no such import " + name);
  50. }
  51. let cp = childProcess.spawn('psql', ['-c', `\\copy ${def.table} (${def.pfx}_key, ${def.label_field}, ${def.pfx}_data) FROM STDIN`], {
  52. stdio: ['pipe', process.stdout, process.stderr]
  53. });
  54. let resP = new Promise((ok, fail) => {
  55. io.openFile(util.format("data/ol_dump_%s_%s.txt.gz", name, date))
  56. .pipe(zlib.createUnzip())
  57. .pipe(io.decodeLines(decodeLine))
  58. .pipe(new stream.Transform({
  59. objectMode: true,
  60. transform(rec, enc, cb) {
  61. cb(null, [rec.key, rec[def.label], JSON.stringify(rec)]);
  62. }
  63. }))
  64. .pipe(pgu.encodePGText())
  65. .pipe(cp.stdin)
  66. .on('finish', () => ok())
  67. .on('error', fail);
  68. });
  69. return resP;
  70. }
  71. for (let name of Object.keys(imports)) {
  72. module.exports[name] = doImport.bind(null, name);
  73. }
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...