Browse Source

Load graphs into graph-tool

Michael Ekstrand 9 months ago
parent
commit
5fbe0cc672
5 changed files with 294 additions and 81 deletions
  1. 12
    0
      bookdata/db.py
  2. 219
    0
      bookdata/graph.py
  3. 22
    5
      bookdata/schema.py
  4. 1
    0
      environment.yml
  5. 40
    76
      scripts/inspect-cluster.py

+ 12
- 0
bookdata/db.py

@@ -19,6 +19,7 @@ from more_itertools import peekable
 import psycopg2, psycopg2.errorcodes
 from psycopg2 import sql
 from psycopg2.pool import ThreadedConnectionPool
+from sqlalchemy import create_engine
 import sqlparse
 import git
 
@@ -28,6 +29,7 @@ _log = logging.getLogger(__name__)
 _ms_path = Path(__file__).parent.parent / 'schemas' / 'meta-schema.sql'
 meta_schema = _ms_path.read_text()
 _pool = None
+_engine = None
 
 # DB configuration info
 class DBConfig:
@@ -107,6 +109,16 @@ def connect():
         _pool.putconn(conn)
 
 
+def engine():
+    "Get an SQLAlchemy engine"
+    global _engine
+    if _engine is None:
+        _log.info('connecting to %s', db_url())
+        _engine = create_engine(db_url())
+
+    return _engine
+
+
 def _tokens(s, start=-1, skip_ws=True, skip_cm=True):
     i, t = s.token_next(start, skip_ws=skip_ws, skip_cm=skip_cm)
     while t is not None:

+ 219
- 0
bookdata/graph.py

@@ -0,0 +1,219 @@
+"""
+Utiltiies for loading & working with the book identifier graph.
+"""
+
+import logging
+
+import pandas as pd
+import numpy as np
+from graph_tool import Graph
+from .schema import *
+
+_log = logging.getLogger(__name__)
+
+class GraphLoader:
+    cluster = None
+    isbn_table = 'isbn_id'
+
+    def set_cluster(self, cluster, cur):
+        _log.info('restricting graph load to cluster %s', cluster)
+        self.cluster = cluster
+        self.isbn_table = 'gc_isbns'
+        cur.execute('''
+            CREATE TEMPORARY TABLE gc_isbns
+            AS SELECT isbn_id, isbn
+            FROM isbn_cluster JOIN isbn_id USING (isbn_id)
+            WHERE cluster = %s
+        ''', [self.cluster])
+
+    def q_isbns(self):
+        return f'SELECT isbn_id AS id, isbn FROM {self.isbn_table}'
+
+    @property
+    def limit(self):
+        if self.isbn_table == 'isbn_id':
+            return ''
+        else:
+            return f'JOIN {self.isbn_table} USING (isbn_id)'
+
+    def q_loc_nodes(self, full=False):
+        if full:
+            return f'''
+                SELECT DISTINCT rec_id AS id, title
+                FROM locmds.book_rec_isbn {self.limit}
+                LEFT JOIN locmds.book_title USING (rec_id)
+            '''
+        else:
+            return f'''
+                SELECT DISTINCT rec_id AS id
+                FROM locmds.book_rec_isbn {self.limit}
+                '''
+
+    def q_loc_edges(self):
+        return f'''
+            SELECT isbn_id, rec_id
+            FROM locmds.book_rec_isbn {self.limit}
+        '''
+
+    def q_ol_edition_nodes(self, full=False):
+        if full:
+            return f'''
+                SELECT DISTINCT
+                    edition_id AS id, edition_key AS label,
+                    NULLIF(edition_data->>'title', '') AS title
+                FROM ol.isbn_link {self.limit}
+                JOIN ol.edition USING (edition_id)
+            '''
+        else:
+            return f'''
+                SELECT DISTINCT edition_id AS id
+                FROM ol.isbn_link {self.limit}
+            '''
+
+    def q_ol_work_nodes(self, full=False):
+        if full:
+            return f'''
+                SELECT DISTINCT
+                    work_id AS id, work_key AS label,
+                    NULLIF(work_data->>'title', '') AS title
+                FROM ol.isbn_link {self.limit}
+                JOIN ol.work USING (work_id)
+            '''
+        else:
+            return f'''
+                SELECT DISTINCT work_id AS id
+                FROM ol.isbn_link {self.limit}
+                WHERE work_id IS NOT NULL
+            '''
+
+    def q_ol_edition_edges(self):
+        return f'''
+            SELECT DISTINCT isbn_id, edition_id
+            FROM ol.isbn_link {self.limit}
+        '''
+
+    def q_ol_work_edges(self):
+        return f'''
+            SELECT DISTINCT edition_id, work_id
+            FROM ol.isbn_link {self.limit}
+            WHERE work_id IS NOT NULL
+        '''
+
+    def q_gr_book_nodes(self, full=False):
+        return f'''
+            SELECT DISTINCT gr_book_id AS id
+            FROM gr.book_isbn {self.limit}
+        '''
+
+    def q_gr_work_nodes(self, full=False):
+        if full:
+            return f'''
+                SELECT DISTINCT gr_work_id AS id, work_title
+                FROM gr.book_isbn {self.limit}
+                JOIN gr.book_ids ids USING (gr_book_id)
+                LEFT JOIN gr.work_title USING (gr_work_id)
+                WHERE ids.gr_work_id IS NOT NULL
+            '''
+        else:
+            return f'''
+                SELECT DISTINCT gr_work_id AS id
+                FROM gr.book_isbn {self.limit}
+                JOIN gr.book_ids ids USING (gr_book_id)
+                WHERE ids.gr_work_id IS NOT NULL
+            '''
+
+    def q_gr_book_edges(self, full=False):
+        return f'''
+            SELECT DISTINCT isbn_id, gr_book_id
+            FROM gr.book_isbn {self.limit}
+        '''
+
+    def q_gr_work_edges(self):
+        return f'''
+            SELECT DISTINCT gr_book_id, gr_work_id
+            FROM gr.book_isbn {self.limit}
+            JOIN gr.book_ids ids USING (gr_book_id)
+            WHERE ids.gr_work_id IS NOT NULL
+        '''
+
+    def load_minimal_graph(self, cxn):
+        g = Graph(directed=False)
+        codes = []
+        sources = []
+
+        def add_nodes(df, ns, src):
+            n = len(df)
+            _log.info('adding %d nodes to graph', n)
+            start = g.num_vertices()
+            vs = g.add_vertex(n)
+            end = g.num_vertices()
+            assert end - start == n
+            nodes = pd.Series(np.arange(start, end, dtype='i4'), index=df['id'])
+            codes.append(df['id'].values + ns)
+            sources.append(np.full(n, src, dtype='i2'))
+            return nodes
+
+        def add_edges(f, src, dst):
+            _log.info('adding %d edges to graph', len(f))
+            edges = np.zeros((len(f), 2), dtype='i4')
+            edges[:, 0] = src.loc[f.iloc[:, 0]]
+            edges[:, 1] = dst.loc[f.iloc[:, 1]]
+            g.add_edge_list(edges)
+
+        _log.info('fetching ISBNs')
+        isbns = pd.read_sql_query(self.q_isbns(), cxn)
+        isbn_nodes = add_nodes(isbns.drop(columns=['isbn']), ns_isbn, 9)
+
+        _log.info('fetching LOC records')
+        loc_recs = pd.read_sql_query(self.q_loc_nodes(False), cxn)
+        loc_nodes = add_nodes(loc_recs, ns_rec, 3)
+
+        _log.info('fetching LOC ISBN links')
+        loc_edges = pd.read_sql_query(self.q_loc_edges(), cxn)
+        add_edges(loc_edges, isbn_nodes, loc_nodes)
+
+        _log.info('fetching OL editions')
+        ol_eds = pd.read_sql_query(self.q_ol_edition_nodes(False), cxn)
+        ol_e_nodes = add_nodes(ol_eds, ns_edition, 2)
+
+        _log.info('fetching OL works')
+        ol_wks = pd.read_sql_query(self.q_ol_work_nodes(False), cxn)
+        ol_w_nodes = add_nodes(ol_wks, ns_work, 1)
+
+        _log.info('fetching OL ISBN edges')
+        ol_ie_edges = pd.read_sql_query(self.q_ol_edition_edges(), cxn)
+        add_edges(ol_ie_edges, isbn_nodes, ol_e_nodes)
+
+        _log.info('fetching OL edition/work edges')
+        ol_ew_edges = pd.read_sql_query(self.q_ol_work_edges(), cxn)
+        add_edges(ol_ew_edges, ol_e_nodes, ol_w_nodes)
+
+        _log.info('fetching GR books')
+        gr_books = pd.read_sql_query(self.q_gr_book_nodes(False), cxn)
+        gr_b_nodes = add_nodes(gr_books, ns_gr_book, 5)
+
+        _log.info('fetching GR ISBN edges')
+        gr_ib_edges = pd.read_sql_query(self.q_gr_book_edges(), cxn)
+        add_edges(gr_ib_edges, isbn_nodes, gr_b_nodes)
+
+        _log.info('fetching GR works')
+        gr_works = pd.read_sql_query(self.q_gr_work_nodes(False), cxn)
+        gr_w_nodes = add_nodes(gr_works, ns_gr_work, 4)
+
+        _log.info('fetching GR work/edition edges')
+        gr_bw_edges = pd.read_sql_query(self.q_gr_work_edges(), cxn)
+        add_edges(gr_bw_edges, gr_b_nodes, gr_w_nodes)
+
+        _log.info('setting code attributes')
+        code_a = g.new_vp('int64_t')
+        code_a.a[:] = np.concatenate(codes)
+        g.vp['code'] = code_a
+
+        _log.info('setting source attributes')
+        source_a = g.new_vp('int16_t')
+        source_a.a[:] = np.concatenate(sources)
+        g.vp['source'] = source_a
+
+        _log.info('imported %s', g)
+
+        return g

+ 22
- 5
bookdata/schema.py

@@ -2,8 +2,25 @@
 Data schema information for the book data tools.
 """
 
-"Bases for for numeric ID number spaces."
-numspaces = dict(work=100000000, edition=200000000, rec=300000000,
-                 gr_work=400000000, gr_book=500000000,
-                 loc_work=600000000, loc_instance=700000000,
-                 isbn=900000000)
+import pandas as pd
+
+ns_work=100000000
+ns_edition=200000000
+ns_rec=300000000
+ns_gr_work=400000000
+ns_gr_book=500000000
+ns_loc_work=600000000
+ns_loc_instance=700000000
+ns_isbn=900000000
+
+src_labels = pd.Series({
+    'OL-W': 1,
+    'OL-E': 2,
+    'LOC': 3,
+    'GR-W': 4,
+    'GR-B': 5,
+    'LOC-W': 6,
+    'LOC-I': 7,
+    'ISBN': 9
+})
+src_label_rev = pd.Series(src_labels.index, index=src_labels.values)

+ 1
- 0
environment.yml

@@ -16,6 +16,7 @@ dependencies:
 - colorama
 - seaborn
 - sqlalchemy
+- graph-tool
 - sqlparse
 - rust>=1.40
 - postgresql>=12

+ 40
- 76
scripts/inspect-cluster.py

@@ -2,9 +2,10 @@
 Inspect a book cluster.
 
 Usage:
-    inspect-cluster.py [options] --stats
-    inspect-cluster.py [options] --records CLUSTER
-    inspect-cluster.py [options] --graph CLUSTER
+    inspect-idgraph.py [options] --stats
+    inspect-idgraph.py [options] --records CLUSTER
+    inspect-idgraph.py [options] --graph CLUSTER
+    inspect-idgraph.py [options] --full-graph
 
 Options:
     -o FILE
@@ -25,6 +26,7 @@ from docopt import docopt
 import pandas as pd
 
 from bookdata import tracking, db, script_log
+from bookdata.graph import GraphLoader
 
 
 class GMLWriter:
@@ -204,87 +206,48 @@ def graph(dbc, out, opts):
     gw.node_attr('category')
     gw.node_attr('title')
 
+    gl = GraphLoader()
+
     with dbc.cursor() as cur:
-        cur.execute('''
-            CREATE TEMPORARY TABLE gc_isbns
-            AS SELECT isbn_id, isbn
-            FROM isbn_cluster JOIN isbn_id USING (isbn_id)
-            WHERE cluster = %s
-        ''', [cluster])
+        gl.set_cluster(cluster, dbc)
 
         _log.info('fetching ISBNs')
-        cur.execute('SELECT * FROM gc_isbns')
+        cur.execute(gl.q_isbns())
         for iid, isbn in cur:
             gw.node(id=f'i{iid}', label=isbn, category='ISBN')
 
         _log.info('fetching LOC records')
-        cur.execute('''
-            SELECT DISTINCT rec_id, title
-            FROM gc_isbns
-            JOIN locmds.book_rec_isbn USING (isbn_id)
-            LEFT JOIN locmds.book_title USING (rec_id)
-        ''')
+        cur.execute(gl.q_loc_nodes(True))
         for rid, title in cur:
             gw.node(id=f'l{rid}', label=rid, category='LOC', title=title)
 
         _log.info('fetching LOC ISBN links')
-        cur.execute('''
-            SELECT isbn_id, rec_id
-            FROM gc_isbns
-            JOIN locmds.book_rec_isbn USING (isbn_id)
-        ''')
+        cur.execute(gl.q_loc_edges())
         for iid, rid in cur:
             gw.edge(source=f'l{rid}', target=f'i{iid}')
 
         _log.info('fetching OL editions')
-        cur.execute('''
-            SELECT DISTINCT
-                edition_id, edition_key,
-                NULLIF(edition_data->>'title', '') AS title
-            FROM gc_isbns
-            JOIN ol.isbn_link USING (isbn_id)
-            JOIN ol.edition USING (edition_id)
-        ''')
+        cur.execute(gl.q_ol_edition_nodes(True))
         for eid, ek, e_title in cur:
             gw.node(id=f'ole{eid}', label=ek, category='OLE', title=e_title)
 
         _log.info('fetching OL works')
-        cur.execute('''
-            SELECT DISTINCT
-                work_id, work_key,
-                NULLIF(work_data->>'title', '') AS title
-            FROM gc_isbns
-            JOIN ol.isbn_link USING (isbn_id)
-            JOIN ol.work USING (work_id)
-        ''')
+        cur.execute(gl.q_ol_work_nodes(True))
         for wid, wk, w_title in cur:
             gw.node(id=f'olw{wid}', label=wk, category='OLW', title=w_title)
 
         _log.info('fetching OL ISBN edges')
-        cur.execute('''
-            SELECT DISTINCT isbn_id, edition_id
-            FROM gc_isbns
-            JOIN ol.isbn_link USING (isbn_id)
-        ''')
+        cur.execute(gl.q_ol_edition_edges())
         for iid, eid in cur:
             gw.edge(source=f'ole{eid}', target=f'i{iid}')
 
         _log.info('fetching OL edition/work edges')
-        cur.execute('''
-            SELECT DISTINCT edition_id, work_id
-            FROM gc_isbns
-            JOIN ol.isbn_link USING (isbn_id)
-            WHERE work_id IS NOT NULL
-        ''')
+        cur.execute(gl.q_ol_work_edges())
         for eid, wid in cur:
             gw.edge(source=f'ole{eid}', target=f'olw{wid}')
 
         _log.info('fetching GR books')
-        cur.execute('''
-            SELECT DISTINCT isbn_id, gr_book_id
-            FROM gc_isbns
-            JOIN gr.book_isbn USING (isbn_id)
-        ''')
+        cur.execute(gl.q_gr_book_edges())
         bids = set()
         for iid, bid in cur:
             if bid not in bids:
@@ -293,32 +256,30 @@ def graph(dbc, out, opts):
             gw.edge(source=f'grb{bid}', target=f'i{iid}')
 
         _log.info('fetching GR works')
-        cur.execute('''
-            SELECT DISTINCT gr_work_id, work_title
-            FROM gc_isbns
-            JOIN gr.book_isbn USING (isbn_id)
-            JOIN gr.book_ids ids USING (gr_book_id)
-            LEFT JOIN gr.work_title USING (gr_work_id)
-            WHERE ids.gr_work_id IS NOT NULL
-        ''')
+        cur.execute(gl.q_gr_work_nodes(True))
         for wid, title in cur:
             gw.node(id=f'grw{wid}', label=wid, category='GRW', title=title)
 
         _log.info('fetching GR work/edition edges')
-        cur.execute('''
-            SELECT DISTINCT gr_work_id, gr_book_id
-            FROM gc_isbns
-            JOIN gr.book_isbn USING (isbn_id)
-            JOIN gr.book_ids ids USING (gr_book_id)
-            WHERE ids.gr_work_id IS NOT NULL
-        ''')
-        for wid, bid in cur:
+        cur.execute(gl.q_gr_work_edges())
+        for bid, wid in cur:
             gw.edge(source=f'grw{wid}', target=f'grb{bid}')
 
     gw.finish()
     _log.info('exported graph')
 
 
+def full_graph(opts):
+    gl = GraphLoader()
+    with db.engine().connect() as cxn:
+        g = gl.load_minimal_graph(cxn)
+
+
+    ofn = opts['-o']
+    _log.info('saving graph to %s', ofn)
+    g.save(ofn)
+
+
 _log = script_log(__name__)
 opts = docopt(__doc__)
 
@@ -327,10 +288,13 @@ if opts['-o']:
 else:
     out = sys.stdout
 
-with db.connect() as dbc:
-    if opts['--stats']:
-        stats(dbc, out, opts)
-    elif opts['--records']:
-        records(dbc, out, opts)
-    elif opts['--graph']:
-        graph(dbc, out, opts)
+if opts['--full-graph']:
+    full_graph(opts)
+else:
+    with db.connect() as dbc:
+        if opts['--stats']:
+            stats(dbc, out, opts)
+        elif opts['--records']:
+            records(dbc, out, opts)
+        elif opts['--graph']:
+            graph(dbc, out, opts)