Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

query-eval-stream.js 2.4 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
  1. "use strict";
  2. const miss = require('mississippi');
  3. const async = require('async');
  4. const logger = require('gulplog');
  5. function runQueries(db, options) {
  6. if (options === undefined) {
  7. options = {};
  8. }
  9. var batchSize = options.batchSize;
  10. if (batchSize === undefined) {
  11. batchSize = 10000;
  12. }
  13. var nqueries = 0;
  14. var started = false;
  15. var manage = !db || typeof(db) === 'string';
  16. var client;
  17. var lastRV;
  18. if (!manage) {
  19. client = db;
  20. }
  21. function write(data, enc, next) {
  22. async.series([
  23. (cb) => {
  24. if (client) {
  25. cb();
  26. } else {
  27. var pg = require('pg');
  28. if (options.native) {
  29. pg = pg.native;
  30. }
  31. client = new pg.Client(db);
  32. client.connect((err) => cb(err));
  33. }
  34. },
  35. (cb) => {
  36. if (started || !batchSize) {
  37. cb();
  38. } else {
  39. client.query('BEGIN ISOLATION LEVEL READ UNCOMMITTED', (err) => {
  40. started = true;
  41. cb(err);
  42. });
  43. }
  44. },
  45. (cb) => {
  46. var q = data;
  47. if (typeof(q) === 'function') {
  48. q = q(lastRV);
  49. }
  50. client.query(q, (err, result) => {
  51. if (err) return cb(err);
  52. if (q.returns) {
  53. lastRV = result.rows[0];
  54. }
  55. cb();
  56. });
  57. },
  58. (cb) => {
  59. nqueries += 1;
  60. if (batchSize && nqueries % batchSize === 0) {
  61. if (options.logger) {
  62. options.logger.debug('committing');
  63. }
  64. async.series([
  65. (cb) => client.query('COMMIT', cb),
  66. (cb) => client.query('BEGIN ISOLATION LEVEL READ UNCOMMITTED', cb)
  67. ], cb);
  68. } else {
  69. process.nextTick(cb);
  70. }
  71. }
  72. ], (err) => {
  73. if (err) {
  74. if (options.logger) {
  75. options.logger.error('error in query %s: %s', data.name, data.message);
  76. options.logger.error('query text: %s', data.text);
  77. }
  78. err.message = `in query ${data.name}: ${err.message}`;
  79. err.query = data;
  80. }
  81. next(err);
  82. });
  83. }
  84. function flush(cb) {
  85. if (!client) return cb();
  86. if (options.logger) {
  87. options.logger.info('flushing stream');
  88. }
  89. client.query('COMMIT', (err) => {
  90. if (manage) {
  91. client.end((e2) => cb(err || e2));
  92. } else {
  93. cb(err);
  94. }
  95. });
  96. }
  97. return miss.to.obj(write, flush);
  98. }
  99. module.exports = runQueries;
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...