Browse Source

Cluster with graph-tool

Michael Ekstrand 9 months ago
parent
commit
4cba326732
4 changed files with 66 additions and 130 deletions
  1. 7
    0
      bookdata/graph.py
  2. 1
    0
      data/.gitignore
  3. 8
    3
      integrate/cluster.dvc
  4. 50
    127
      scripts/cluster.py

+ 7
- 0
bookdata/graph.py

@@ -15,6 +15,7 @@ class MinGraphBuilder:
     def __init__(self):
         self.graph = Graph(directed=False)
         self.codes = []
+        self.labels = []
         self.sources = []
 
     def add_nodes(self, df, ns):
@@ -26,6 +27,7 @@ class MinGraphBuilder:
         assert end - start == n
         nodes = pd.Series(np.arange(start, end, dtype='i4'), index=df['id'])
         self.codes.append(df['id'].values + ns.offset)
+        self.labels.append(df['id'].values)
         self.sources.append(np.full(n, ns.code, dtype='i2'))
         return nodes
 
@@ -42,6 +44,11 @@ class MinGraphBuilder:
         code_a.a[:] = np.concatenate(self.codes)
         self.graph.vp['code'] = code_a
 
+        _log.info('setting label attributes')
+        label_a = self.graph.new_vp('int64_t')
+        label_a.a[:] = np.concatenate(self.labels)
+        self.graph.vp['label'] = label_a
+
         _log.info('setting source attributes')
         source_a = self.graph.new_vp('int16_t')
         source_a.a[:] = np.concatenate(self.sources)

+ 1
- 0
data/.gitignore

@@ -15,3 +15,4 @@
 /loc-names
 /viaf-clusters-marc21.xml.gz
 /goodreads_book_genres_initial.json.gz
+/id-graph.gt

+ 8
- 3
integrate/cluster.dvc

@@ -1,4 +1,4 @@
-md5: ad2e17aa21807a9b8a24acdb32cc687c
+md5: fe0bbfb4546afbdad7f6b1fb99663d82
 cmd: python run.py cluster -T integrate/cluster.transcript
 wdir: ..
 deps:
@@ -11,11 +11,16 @@ deps:
 outs:
 - path: pgstat://cluster
   cache: false
-  md5: 1ba18b3a375cdc1e9a4e85f86a54125b
+  md5: 5f16cbd55fc3e29e87993c7cd5da2582
   metric: false
   persist: false
-- md5: 180d6560df96ae1e0a5503527c2005a0
+- md5: 31d30b80240253b83fa50a2031bb2bc4
   path: integrate/cluster.transcript
   cache: true
   metric: false
   persist: false
+- path: data/id-graph.gt
+  md5: 9d962202cbc2d72e8a94a7b6f858f19c
+  cache: true
+  metric: false
+  persist: false

+ 50
- 127
scripts/cluster.py

@@ -1,12 +1,10 @@
 """
 Usage:
-    cluster.py [-T FILE] [SCOPE]
+    cluster.py [-T FILE]
 
 Options:
     -T FILE
         Write transcript to FILE.
-    SCOPE
-        Cluster SCOPE.
 """
 import os
 import sys
@@ -22,79 +20,15 @@ from docopt import docopt
 import pandas as pd
 import numpy as np
 
+from graph_tool.all import label_components
+
 from bookdata import db, tracking, script_log
+from bookdata.graph import GraphLoader
+from bookdata.schema import *
 
 _log = script_log(__name__)
 
 
-class scope_loc_mds:
-    name = 'LOC-MDS'
-    schema = 'locmds'
-
-    node_query = dedent('''
-        SELECT isbn_id, MIN(bc_of_loc_rec(rec_id)) AS record
-        FROM locmds.book_rec_isbn GROUP BY isbn_id
-    ''')
-
-    edge_query = dedent('''
-        SELECT DISTINCT l.isbn_id AS left_isbn, r.isbn_id AS right_isbn
-        FROM locmds.book_rec_isbn l JOIN locmds.book_rec_isbn r ON (l.rec_id = r.rec_id)
-    ''')
-
-
-class scope_ol:
-    name = 'OpenLibrary'
-    schema = 'ol'
-
-    node_query = dedent('''
-        SELECT isbn_id, MIN(book_code) AS record
-        FROM ol.isbn_link GROUP BY isbn_id
-    ''')
-
-    edge_query = dedent('''
-        SELECT DISTINCT l.isbn_id AS left_isbn, r.isbn_id AS right_isbn
-        FROM ol.isbn_link l JOIN ol.isbn_link r ON (l.book_code = r.book_code)
-    ''')
-
-
-class scope_gr:
-    name = 'GoodReads'
-    schema = 'gr'
-
-    node_query = dedent('''
-        SELECT DISTINCT isbn_id, MIN(book_code) AS record
-        FROM gr.book_isbn GROUP BY isbn_id
-    ''')
-
-    edge_query = dedent('''
-        SELECT DISTINCT l.isbn_id AS left_isbn, r.isbn_id AS right_isbn
-        FROM gr.book_isbn l JOIN gr.book_isbn r ON (l.book_code = r.book_code)
-    ''')
-
-
-class scope_loc_id:
-    name = 'LOC'
-    schema = 'locid'
-
-    node_query = dedent('''
-        SELECT isbn_id, MIN(book_code) AS record
-        FROM locid.isbn_link GROUP BY isbn_id
-    ''')
-
-    edge_query = dedent('''
-        SELECT DISTINCT l.isbn_id AS left_isbn, r.isbn_id AS right_isbn
-        FROM locid.isbn_link l JOIN locid.isbn_link r ON (l.book_code = r.book_code)
-    ''')
-
-
-_all_scopes = ['ol', 'gr', 'loc-mds']
-
-
-def get_scope(name):
-    n = name.replace('-', '_')
-    return globals()[f'scope_{n}']
-
-
 def cluster_isbns(isbn_recs, edges):
     """
     Compute ISBN clusters.
@@ -146,22 +80,23 @@ def _make_clusters(clusters, ls, rs):
     return iters
 
 
-def _import_clusters(dbc, schema, frame):
+def _import_clusters(dbc, frame):
     with dbc.cursor() as cur:
-        schema_i = sql.Identifier(schema)
         _log.info('creating cluster table')
-        cur.execute(sql.SQL('DROP TABLE IF EXISTS {}.isbn_cluster CASCADE').format(schema_i))
+        cur.execute(sql.SQL('DROP TABLE IF EXISTS isbn_cluster CASCADE'))
         cur.execute(sql.SQL('''
-            CREATE TABLE {}.isbn_cluster (
+            CREATE TABLE isbn_cluster (
                 isbn_id INTEGER NOT NULL,
                 cluster INTEGER NOT NULL
             )
-        ''').format(schema_i))
-        _log.info('loading %d clusters into %s.isbn_cluster', len(frame), schema)
-        db.save_table(dbc, sql.SQL('{}.isbn_cluster').format(schema_i), frame)
-        cur.execute(sql.SQL('ALTER TABLE {}.isbn_cluster ADD PRIMARY KEY (isbn_id)').format(schema_i))
-        cur.execute(sql.SQL('CREATE INDEX isbn_cluster_idx ON {}.isbn_cluster (cluster)').format(schema_i))
-        cur.execute(sql.SQL('ANALYZE {}.isbn_cluster').format(schema_i))
+        '''))
+        _log.info('loading %d clusters into isbn_cluster', len(frame))
+
+    db.save_table(dbc, sql.SQL('isbn_cluster'), frame)
+    with dbc.cursor() as cur:
+        cur.execute(sql.SQL('ALTER TABLE isbn_cluster ADD PRIMARY KEY (isbn_id)'))
+        cur.execute(sql.SQL('CREATE INDEX isbn_cluster_idx ON isbn_cluster (cluster)'))
+        cur.execute(sql.SQL('ANALYZE isbn_cluster'))
 
 
 def _hash_frame(df):
@@ -171,56 +106,44 @@ def _hash_frame(df):
     return hash.hexdigest()
 
 
-def cluster(scope, txout):
+def cluster(txout):
     "Cluster ISBNs"
-    with db.connect() as dbc:
-        _log.info('preparing to cluster scope %s', scope)
-        if scope:
-            step = f'{scope}-cluster'
-            schema = get_scope(scope).schema
-            scopes = [scope]
-        else:
-            step = 'cluster'
-            schema = 'public'
-            scopes = _all_scopes
-
-        with dbc:
-            tracking.begin_stage(dbc, step)
-
-            isbn_recs = []
-            isbn_edges = []
-            for scope in scopes:
-                sco = get_scope(scope)
-                _log.info('reading ISBNs for %s', scope)
-                irs = db.load_table(dbc, sco.node_query)
-                n_hash = _hash_frame(irs)
-                isbn_recs.append(irs)
-                print('READ NODES', scope, n_hash, file=txout)
-
-                _log.info('reading edges for %s', scope)
-                ies = db.load_table(dbc, sco.edge_query)
-                e_hash = _hash_frame(ies)
-                isbn_edges.append(ies)
-                print('READ EDGES', scope, e_hash, file=txout)
-
-            isbn_recs = pd.concat(isbn_recs, ignore_index=True)
-            isbn_edges = pd.concat(isbn_edges, ignore_index=True)
-
-            _log.info('clustering %s ISBN records with %s edges',
-                      number(len(isbn_recs)), number(len(isbn_edges)))
-            loc_clusters = cluster_isbns(isbn_recs, isbn_edges)
-            _log.info('saving cluster records to database')
-            _import_clusters(dbc, schema, loc_clusters)
-
-            c_hash = _hash_frame(loc_clusters)
-            print('WRITE CLUSTERS', c_hash, file=txout)
-
-            tracking.end_stage(dbc, step, c_hash)
+    with db.connect() as dbc, dbc:
+        tracking.begin_stage(dbc, 'cluster')
+
+        with db.engine().connect() as cxn:
+            _log.info('loading graph')
+            gl = GraphLoader()
+            g = gl.load_graph(cxn, False)
+
+        print('NODES', g.num_vertices(), file=txout)
+        print('EDGES', g.num_edges(), file=txout)
+
+        _log.info('finding connected components')
+        comps, hist = label_components(g)
+        _log.info('found %d components, largest has %s items', len(hist), np.max(hist))
+        print('COMPONENTS', len(hist), file=txout)
+
+        _log.info('saving cluster records to database')
+        is_isbn = g.vp.source.a == ns_isbn.code
+        clusters = pd.DataFrame({
+            'isbn_id': g.vp.label.a[is_isbn],
+            'cluster': comps.a[is_isbn]
+        })
+        _import_clusters(dbc, clusters)
+
+        _log.info('saving ID graph')
+        g.vp['cluster'] = comps
+        g.save('data/id-graph.gt')
+
+        c_hash = _hash_frame(clusters)
+        print('WRITE CLUSTERS', c_hash, file=txout)
+
+        tracking.end_stage(dbc, 'cluster', c_hash)
 
 
 opts = docopt(__doc__)
 tx_fn = opts.get('-T', None)
-scope = opts.get('SCOPE', None)
 
 if tx_fn == '-' or not tx_fn:
     tx_out = sys.stdout
@@ -228,4 +151,4 @@ else:
     _log.info('writing transcript to %s', tx_fn)
     tx_out = open(tx_fn, 'w')
 
-cluster(scope, tx_out)
+cluster(tx_out)