Browse Source

Add table output support to statuses

Michael Ekstrand 10 months ago
parent
commit
6ef66ae4e0

+ 11
- 2
bookdata/db.py

@@ -162,7 +162,7 @@ class SqlScript:
 
     def _parse(self, lines):
         self.chunks = []
-        self.deps = self._parse_script_header(lines)
+        self.deps, self.tables = self._parse_script_header(lines)
         next_chunk = self._parse_chunk(lines, len(self.chunks) + 1)
         while next_chunk is not None:
             if next_chunk:
@@ -172,6 +172,7 @@ class SqlScript:
     @classmethod
     def _parse_script_header(cls, lines):
         deps = []
+        tables = []
 
         line = lines.peek(None)
         while line is not None:
@@ -190,12 +191,20 @@ class SqlScript:
             if code == 'dep':
                 deps.append(args)
                 next(lines)  # eat line
+            elif code == 'table':
+                parts = args.split('.', 2)
+                if len(parts) > 1:
+                    ns, tbl = parts
+                    tables.append((ns, tbl))
+                else:
+                    tables.append(('public', args))
+                next(lines)  # eat line
             else:  # any other code, we're out of header
                 break
 
             line = lines.peek(None)
 
-        return deps
+        return deps, tables
 
     @classmethod
     def _parse_chunk(cls, lines: peekable, n: int):

+ 21
- 0
bookdata/tracking.py

@@ -43,6 +43,7 @@ def begin_stage(cur, stage):
     ''', [stage])
     cur.execute('DELETE FROM stage_file WHERE stage_name = %s', [stage])
     cur.execute('DELETE FROM stage_dep WHERE stage_name = %s', [stage])
+    cur.execute('DELETE FROM stage_table WHERE stage_name = %s', [stage])
 
 
 def record_dep(cur, stage, dep):
@@ -64,6 +65,26 @@ def record_dep(cur, stage, dep):
     return cur.fetchall()
 
 
+def record_tbl(cur, stage, ns, tbl):
+    """
+    Record a table associated with a stage.
+    """
+    if hasattr(cur, 'cursor'):
+        # this is a connection
+        with cur, cur.cursor() as c:
+            return record_tbl(c, stage, ns, tbl)
+
+    _log.info('recording table %s -> %s.%s', stage, ns, tbl);
+    cur.execute('''
+        INSERT INTO stage_table (stage_name, st_ns, st_name)
+        VALUES (%s, %s, %s)
+    ''', [stage, ns, tbl])
+    cur.execute('''
+        SELECT oid, kind FROM stage_table_oids WHERE stage_name = %s AND st_ns = %s AND st_name = %s
+    ''', [stage, ns, tbl])
+    return cur.fetchone()
+
+
 def record_file(cur, file, hash, stage=None):
     """
     Record a file and optionally associate it with a stage.

+ 2
- 2
init.status.dvc

@@ -1,7 +1,7 @@
-md5: cee6cdec00547eb53c2b58c279f234a9
+md5: 544f043fb838273acb74201ad880bdd1
 cmd: python run.py stage-status init
 outs:
-- md5: 02988539f4d0fe5131b88ea5d875f06f
+- md5: 1ddbd0ddfedf41ea48b773f4fbdb768e
   path: init.status
   cache: false
   metric: false

+ 1
- 0
schemas/az-schema.sql

@@ -1,4 +1,5 @@
 --- #dep common-schema
+--- #table az.raw_ratings
 CREATE SCHEMA IF NOT EXISTS az;
 
 DROP TABLE IF EXISTS az.raw_ratings CASCADE;

+ 1
- 0
schemas/bx-schema.sql

@@ -1,4 +1,5 @@
 --- #dep common-schema
+--- #table bx.raw_ratings
 CREATE SCHEMA IF NOT EXISTS bx;
 
 DROP TABLE IF EXISTS bx.raw_ratings CASCADE;

+ 4
- 4
schemas/common-schema.dvc

@@ -1,12 +1,12 @@
-md5: f0dda6f3a9eb3c7e9be42fe1934fa646
+md5: ba5459d095f88febe6a7b9f2ec7596f2
 cmd: python ../run.py sql-script common-schema.sql
 deps:
-- md5: b9ecdf15bfef29c34299f5b7ca33be05
+- md5: 9046af622b79bcbc45b5b23f10c67bd6
   path: common-schema.sql
 - path: ../init.status
-  md5: 02988539f4d0fe5131b88ea5d875f06f
+  md5: 1ddbd0ddfedf41ea48b773f4fbdb768e
 outs:
-- md5: d17ebc8f90b642369a92c0597b336d29
+- md5: 626720b0c63d391fc797d612846a015b
   path: common-schema.transcript
   cache: true
   metric: false

+ 1
- 0
schemas/common-schema.sql

@@ -1,4 +1,5 @@
 --- #dep init
+--- #table isbn_id
 --- #step ISBN ID storage
 CREATE TABLE IF NOT EXISTS isbn_id (
   isbn_id SERIAL PRIMARY KEY,

+ 3
- 3
schemas/common-schema.status.dvc

@@ -1,10 +1,10 @@
-md5: 531b63c50cbd3c079c70363386455efb
+md5: 6b01de122141917232350b96712919fb
 cmd: python ../run.py stage-status common-schema
 deps:
-- md5: d17ebc8f90b642369a92c0597b336d29
+- md5: 626720b0c63d391fc797d612846a015b
   path: common-schema.transcript
 outs:
-- md5: eb8515ecc2b9dba8764b2cbfd7005eaa
+- md5: f102f23f1b9fe9ee18dce05284a4bce9
   path: common-schema.status
   cache: false
   metric: false

+ 7
- 0
schemas/gr-schema.sql

@@ -1,4 +1,11 @@
 --- #dep common-schema
+--- #table gr.raw_interaction
+--- #table gr.raw_book
+--- #table gr.raw_work
+--- #table gr.raw_author
+--- #table gr.raw_series
+--- #table gr.raw_book_genres
+
 DROP SCHEMA IF EXISTS gr CASCADE;
 CREATE SCHEMA gr;
 

+ 3
- 0
schemas/loc-mds-schema.sql

@@ -1,4 +1,7 @@
 --- #dep common-schema
+--- #table locmds.book_marc_field
+--- #table locmds.name_marc_field
+
 CREATE SCHEMA IF NOT EXISTS locmds;
 
 DROP TABLE IF EXISTS locmds.book_marc_field CASCADE;

+ 18
- 0
schemas/meta-schema.sql

@@ -26,6 +26,24 @@ CREATE TABLE IF NOT EXISTS stage_dep (
     dep_key VARCHAR NULL
 );
 
+CREATE TABLE IF NOT EXISTS stage_table (
+    stage_name VARCHAR NOT NULL REFERENCES stage_status,
+    st_ns VARCHAR NOT NULL DEFAULT 'public',
+    st_name VARCHAR NOT NULL
+);
+
+DO $cv$
+BEGIN
+    CREATE VIEW stage_table_oids
+        AS SELECT stage_name, st_ns, st_name, c.oid AS oid, c.relkind AS kind
+            FROM stage_table
+            LEFT OUTER JOIN pg_namespace ns ON (ns.nspname = st_ns)
+            LEFT OUTER JOIN pg_class c ON (c.relnamespace = ns.oid AND c.relname = st_name);
+EXCEPTION
+    WHEN duplicate_table THEN RETURN;
+END;
+$cv$;
+
 INSERT INTO stage_status (stage_name, started_at, finished_at, stage_key)
 VALUES ('init', NOW(), NOW(), uuid_generate_v4())
 ON CONFLICT (stage_name) DO NOTHING;

+ 4
- 0
schemas/ol-schema.sql

@@ -1,4 +1,8 @@
 --- #dep common-schema
+--- #table ol.author
+--- #table ol.work
+--- #table ol.edition
+
 -- Initial table creation with no constraints or indexes
 CREATE SCHEMA IF NOT EXISTS ol;
 

+ 2
- 0
schemas/viaf-schema.sql

@@ -1,4 +1,6 @@
 --- #dep common-schema
+--- #table viaf.marc_field
+
 --- #step Create VIAF schema
 CREATE SCHEMA IF NOT EXISTS viaf;
 --- #step Delete old table

+ 4
- 0
scripts/sql-script.py

@@ -67,5 +67,9 @@ else:
             # hash the source file
             key.update(h.encode('utf-8'))
         script.execute(dbc, transcript=txf)
+
         with dbc, dbc.cursor() as cur:
+            for ns, tbl in script.tables:
+                oid, kind = tracking.record_tbl(cur, stage, ns, tbl)
+                key.update(f'{ns}.{tbl}:{oid}:{kind}'.encode('utf-8'))
             tracking.end_stage(cur, stage, key=key.hexdigest())

+ 9
- 0
scripts/stage-status.py

@@ -71,6 +71,15 @@ with db.connect() as dbc:
         for fn, fh in cur:
             print('SOURCE', fn, fh, file=sf)
 
+        cur.execute('''
+            SELECT st_ns, st_name, oid, kind
+            FROM stage_table_oids
+            WHERE stage_name = %s
+            ORDER BY st_ns, st_name
+        ''', [stage])
+        for ns, tbl, oid, kind in cur:
+            print(f'TABLE {ns}.{tbl} OID {oid} KIND {kind}', file=sf)
+
         if timestamps:
             print('FINISH', end, file=sf)
         if key: