graph.py 9.8 KB


  1. """
  2. Utiltiies for loading & working with the book identifier graph.
  3. """
  4. import logging
  5. import pandas as pd
  6. import numpy as np
  7. from graph_tool import Graph
  8. from .schema import *
  9. _log = logging.getLogger(__name__)
  10. class MinGraphBuilder:
  11. def __init__(self):
  12. self.graph = Graph(directed=False)
  13. self.codes = []
  14. self.labels = []
  15. self.sources = []
  16. def add_nodes(self, df, ns):
  17. n = len(df)
  18. _log.info('adding %d nodes to graph', n)
  19. start = self.graph.num_vertices()
  20. vs = self.graph.add_vertex(n)
  21. end = self.graph.num_vertices()
  22. assert end - start == n
  23. nodes = pd.Series(np.arange(start, end, dtype='i4'), index=df['id'])
  24. self.codes.append(df['id'].values + ns.offset)
  25. self.labels.append(df['id'].values)
  26. self.sources.append(np.full(n, ns.code, dtype='i2'))
  27. return nodes
  28. def add_edges(self, f, src, dst):
  29. _log.info('adding %d edges to graph', len(f))
  30. edges = np.zeros((len(f), 2), dtype='i4')
  31. edges[:, 0] = src.loc[f.iloc[:, 0]]
  32. edges[:, 1] = dst.loc[f.iloc[:, 1]]
  33. self.graph.add_edge_list(edges)
  34. def finish(self):
  35. _log.info('setting code attributes')
  36. code_a = self.graph.new_vp('int64_t')
  37. code_a.a[:] = np.concatenate(self.codes)
  38. self.graph.vp['code'] = code_a
  39. _log.info('setting label attributes')
  40. label_a = self.graph.new_vp('int64_t')
  41. label_a.a[:] = np.concatenate(self.labels)
  42. self.graph.vp['label'] = label_a
  43. _log.info('setting source attributes')
  44. source_a = self.graph.new_vp('int16_t')
  45. source_a.a[:] = np.concatenate(self.sources)
  46. self.graph.vp['source'] = source_a
  47. return self.graph
  48. class FullGraphBuilder:
  49. def __init__(self):
  50. self.graph = Graph(directed=False)
  51. self.codes = []
  52. self.sources = []
  53. self.labels = []
  54. self.attrs = set()
  55. def add_nodes(self, df, ns):
  56. n = len(df)
  57. _log.info('adding %d nodes to graph', n)
  58. start = self.graph.num_vertices()
  59. vs = self.graph.add_vertex(n)
  60. end = self.graph.num_vertices()
  61. assert end - start == n
  62. nodes = pd.Series(np.arange(start, end, dtype='i4'), index=df['id'])
  63. self.codes.append(df['id'].values + ns.offset)
  64. self.sources.append(np.full(n, ns.code, dtype='i2'))
  65. if 'label' in df.columns:
  66. self.labels += list(df['label'].values)
  67. else:
  68. self.labels += list(df['id'].astype('str').values)
  69. for c in df.columns:
  70. if c in ['id', 'label']:
  71. continue
  72. if c not in self.attrs:
  73. vp = self.graph.new_vp('string')
  74. self.graph.vp[c] = vp
  75. self.attrs.add(c)
  76. else:
  77. vp = self.graph.vp[c]
  78. for v, val in zip(vs, df[c].values):
  79. vp[v] = val
  80. return nodes
  81. def add_edges(self, f, src, dst):
  82. _log.info('adding %d edges to graph', len(f))
  83. edges = np.zeros((len(f), 2), dtype='i4')
  84. edges[:, 0] = src.loc[f.iloc[:, 0]]
  85. edges[:, 1] = dst.loc[f.iloc[:, 1]]
  86. self.graph.add_edge_list(edges)
  87. def finish(self):
  88. _log.info('setting code attributes')
  89. code_a = self.graph.new_vp('int64_t')
  90. code_a.a[:] = np.concatenate(self.codes)
  91. self.graph.vp['code'] = code_a
  92. _log.info('setting source attributes')
  93. source_a = self.graph.new_vp('string')
  94. for v, s in zip(self.graph.vertices(), np.concatenate(self.sources)):
  95. source_a[v] = src_label_rev[s]
  96. self.graph.vp['source'] = source_a
  97. _log.info('setting source attributes')
  98. label_a = self.graph.new_vp('string')
  99. for v, l in zip(self.graph.vertices(), self.labels):
  100. label_a[v] = l
  101. self.graph.vp['label'] = label_a
  102. return self.graph
  103. class GraphLoader:
  104. cluster = None
  105. isbn_table = 'isbn_id'
  106. def set_cluster(self, cluster, cur):
  107. _log.info('restricting graph load to cluster %s', cluster)
  108. self.cluster = cluster
  109. self.isbn_table = 'gc_isbns'
  110. cur.execute('''
  111. CREATE TEMPORARY TABLE gc_isbns
  112. AS SELECT isbn_id, isbn
  113. FROM isbn_cluster JOIN isbn_id USING (isbn_id)
  114. WHERE cluster = %s
  115. ''', [self.cluster])
  116. def q_isbns(self, full=True):
  117. if full:
  118. return f'SELECT isbn_id AS id, isbn FROM {self.isbn_table}'
  119. else:
  120. return f'SELECT isbn_id AS id FROM {self.isbn_table}'
  121. @property
  122. def limit(self):
  123. if self.isbn_table == 'isbn_id':
  124. return ''
  125. else:
  126. return f'JOIN {self.isbn_table} USING (isbn_id)'
  127. def q_loc_nodes(self, full=False):
  128. if full:
  129. return f'''
  130. SELECT DISTINCT rec_id AS id, title
  131. FROM locmds.book_rec_isbn {self.limit}
  132. LEFT JOIN locmds.book_title USING (rec_id)
  133. '''
  134. else:
  135. return f'''
  136. SELECT DISTINCT rec_id AS id
  137. FROM locmds.book_rec_isbn {self.limit}
  138. '''
  139. def q_loc_edges(self):
  140. return f'''
  141. SELECT isbn_id, rec_id
  142. FROM locmds.book_rec_isbn {self.limit}
  143. '''
  144. def q_ol_edition_nodes(self, full=False):
  145. if full:
  146. return f'''
  147. SELECT DISTINCT
  148. edition_id AS id, edition_key AS label,
  149. NULLIF(edition_data->>'title', '') AS title
  150. FROM ol.isbn_link {self.limit}
  151. JOIN ol.edition USING (edition_id)
  152. '''
  153. else:
  154. return f'''
  155. SELECT DISTINCT edition_id AS id
  156. FROM ol.isbn_link {self.limit}
  157. '''
  158. def q_ol_work_nodes(self, full=False):
  159. if full:
  160. return f'''
  161. SELECT DISTINCT
  162. work_id AS id, work_key AS label,
  163. NULLIF(work_data->>'title', '') AS title
  164. FROM ol.isbn_link {self.limit}
  165. JOIN ol.work USING (work_id)
  166. '''
  167. else:
  168. return f'''
  169. SELECT DISTINCT work_id AS id
  170. FROM ol.isbn_link {self.limit}
  171. WHERE work_id IS NOT NULL
  172. '''
  173. def q_ol_edition_edges(self):
  174. return f'''
  175. SELECT DISTINCT isbn_id, edition_id
  176. FROM ol.isbn_link {self.limit}
  177. '''
  178. def q_ol_work_edges(self):
  179. return f'''
  180. SELECT DISTINCT edition_id, work_id
  181. FROM ol.isbn_link {self.limit}
  182. WHERE work_id IS NOT NULL
  183. '''
  184. def q_gr_book_nodes(self, full=False):
  185. return f'''
  186. SELECT DISTINCT gr_book_id AS id
  187. FROM gr.book_isbn {self.limit}
  188. '''
  189. def q_gr_work_nodes(self, full=False):
  190. if full:
  191. return f'''
  192. SELECT DISTINCT gr_work_id AS id, work_title AS title
  193. FROM gr.book_isbn {self.limit}
  194. JOIN gr.book_ids ids USING (gr_book_id)
  195. LEFT JOIN gr.work_title USING (gr_work_id)
  196. WHERE ids.gr_work_id IS NOT NULL
  197. '''
  198. else:
  199. return f'''
  200. SELECT DISTINCT gr_work_id AS id
  201. FROM gr.book_isbn {self.limit}
  202. JOIN gr.book_ids ids USING (gr_book_id)
  203. WHERE ids.gr_work_id IS NOT NULL
  204. '''
  205. def q_gr_book_edges(self, full=False):
  206. return f'''
  207. SELECT DISTINCT isbn_id, gr_book_id
  208. FROM gr.book_isbn {self.limit}
  209. '''
  210. def q_gr_work_edges(self):
  211. return f'''
  212. SELECT DISTINCT gr_book_id, gr_work_id
  213. FROM gr.book_isbn {self.limit}
  214. JOIN gr.book_ids ids USING (gr_book_id)
  215. WHERE ids.gr_work_id IS NOT NULL
  216. '''
  217. def load_graph(self, cxn, full=False):
  218. if full:
  219. gb = FullGraphBuilder()
  220. else:
  221. gb = MinGraphBuilder()
  222. _log.info('fetching ISBNs')
  223. isbns = pd.read_sql_query(self.q_isbns(full), cxn)
  224. isbn_nodes = gb.add_nodes(isbns.rename({'isbn': 'label'}), ns_isbn)
  225. _log.info('fetching LOC records')
  226. loc_recs = pd.read_sql_query(self.q_loc_nodes(full), cxn)
  227. loc_nodes = gb.add_nodes(loc_recs, ns_loc_rec)
  228. _log.info('fetching LOC ISBN links')
  229. loc_edges = pd.read_sql_query(self.q_loc_edges(), cxn)
  230. gb.add_edges(loc_edges, isbn_nodes, loc_nodes)
  231. _log.info('fetching OL editions')
  232. ol_eds = pd.read_sql_query(self.q_ol_edition_nodes(full), cxn)
  233. ol_e_nodes = gb.add_nodes(ol_eds, ns_edition)
  234. _log.info('fetching OL works')
  235. ol_wks = pd.read_sql_query(self.q_ol_work_nodes(full), cxn)
  236. ol_w_nodes = gb.add_nodes(ol_wks, ns_work)
  237. _log.info('fetching OL ISBN edges')
  238. ol_ie_edges = pd.read_sql_query(self.q_ol_edition_edges(), cxn)
  239. gb.add_edges(ol_ie_edges, isbn_nodes, ol_e_nodes)
  240. _log.info('fetching OL edition/work edges')
  241. ol_ew_edges = pd.read_sql_query(self.q_ol_work_edges(), cxn)
  242. gb.add_edges(ol_ew_edges, ol_e_nodes, ol_w_nodes)
  243. _log.info('fetching GR books')
  244. gr_books = pd.read_sql_query(self.q_gr_book_nodes(full), cxn)
  245. gr_b_nodes = gb.add_nodes(gr_books, ns_gr_book)
  246. _log.info('fetching GR ISBN edges')
  247. gr_ib_edges = pd.read_sql_query(self.q_gr_book_edges(), cxn)
  248. gb.add_edges(gr_ib_edges, isbn_nodes, gr_b_nodes)
  249. _log.info('fetching GR works')
  250. gr_works = pd.read_sql_query(self.q_gr_work_nodes(full), cxn)
  251. gr_w_nodes = gb.add_nodes(gr_works, ns_gr_work)
  252. _log.info('fetching GR work/edition edges')
  253. gr_bw_edges = pd.read_sql_query(self.q_gr_work_edges(), cxn)
  254. gb.add_edges(gr_bw_edges, gr_b_nodes, gr_w_nodes)
  255. g = gb.finish()
  256. _log.info('imported %s', g)
  257. return g
Tip!

Press p or to see the previous file or, n or to see the next file