Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

cluster.py 4.4 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
  1. """
  2. Usage:
  3. cluster.py [-T FILE]
  4. Options:
  5. -T FILE
  6. Write transcript to FILE.
  7. """
  8. import os
  9. import sys
  10. import gzip
  11. import threading
  12. from textwrap import dedent
  13. from functools import reduce
  14. from natural.number import number
  15. from psycopg2 import sql
  16. import hashlib
  17. from docopt import docopt
  18. import pandas as pd
  19. import numpy as np
  20. from graph_tool.all import label_components
  21. from bookdata import db, tracking, script_log
  22. from bookdata.graph import GraphLoader
  23. from bookdata.schema import *
  24. _log = script_log(__name__)
  25. def cluster_isbns(isbn_recs, edges):
  26. """
  27. Compute ISBN clusters.
  28. """
  29. _log.info('initializing isbn vector')
  30. isbns = isbn_recs.groupby('isbn_id').record.min()
  31. index = isbns.index
  32. clusters = isbns.values
  33. _log.info('mapping edge IDs')
  34. edges = edges.assign(left_ino=index.get_indexer(edges.left_isbn).astype('i4'))
  35. assert np.all(edges.left_ino >= 0)
  36. edges = edges.assign(right_ino=index.get_indexer(edges.right_isbn).astype('i4'))
  37. assert np.all(edges.right_ino >= 0)
  38. _log.info('clustering')
  39. iters = _make_clusters(clusters, edges.left_ino.values, edges.right_ino.values)
  40. isbns = isbns.reset_index(name='cluster')
  41. _log.info('produced %s clusters in %d iterations',
  42. number(isbns.cluster.nunique()), iters)
  43. return isbns.loc[:, ['isbn_id', 'cluster']]
  44. def _make_clusters(clusters, ls, rs):
  45. """
  46. Compute book clusters. The input is initial cluster assignments and the left and right
  47. indexes for co-occuring ISBN edges; these are ISBNs that have connections to the same
  48. record in the bipartite ISBN-record graph.
  49. Args:
  50. clusters(ndarray): the initial cluster assignments
  51. ls(ndarray): the indexes of the left hand side of edges
  52. rs(ndarray): the indexes of the right hand side of edges
  53. """
  54. iters = 0
  55. nchanged = len(ls)
  56. while nchanged > 0:
  57. iters = iters + 1
  58. cdf = pd.DataFrame({
  59. 'idx': rs,
  60. 'cluster': np.minimum(clusters[ls], clusters[rs])
  61. })
  62. c = cdf.groupby('idx')['cluster'].min()
  63. nchanged = np.sum(c.values != clusters[c.index])
  64. _log.info('iteration %d changed %d clusters', iters, nchanged)
  65. clusters[c.index] = c.values
  66. return iters
  67. def _import_clusters(dbc, frame):
  68. with dbc.cursor() as cur:
  69. _log.info('creating cluster table')
  70. cur.execute(sql.SQL('DROP TABLE IF EXISTS isbn_cluster CASCADE'))
  71. cur.execute(sql.SQL('''
  72. CREATE TABLE isbn_cluster (
  73. isbn_id INTEGER NOT NULL,
  74. cluster INTEGER NOT NULL
  75. )
  76. '''))
  77. _log.info('loading %d clusters into isbn_cluster', len(frame))
  78. db.save_table(dbc, sql.SQL('isbn_cluster'), frame)
  79. with dbc.cursor() as cur:
  80. cur.execute(sql.SQL('ALTER TABLE isbn_cluster ADD PRIMARY KEY (isbn_id)'))
  81. cur.execute(sql.SQL('CREATE INDEX isbn_cluster_idx ON isbn_cluster (cluster)'))
  82. cur.execute(sql.SQL('ANALYZE isbn_cluster'))
  83. def _hash_frame(df):
  84. hash = hashlib.md5()
  85. for c in df.columns:
  86. hash.update(df[c].values.data)
  87. return hash.hexdigest()
  88. def cluster(txout):
  89. "Cluster ISBNs"
  90. with db.connect() as dbc, dbc:
  91. tracking.begin_stage(dbc, 'cluster')
  92. with db.engine().connect() as cxn:
  93. _log.info('loading graph')
  94. gl = GraphLoader()
  95. g = gl.load_graph(cxn, False)
  96. print('NODES', g.num_vertices(), file=txout)
  97. print('EDGES', g.num_edges(), file=txout)
  98. _log.info('finding connected components')
  99. comps, hist = label_components(g)
  100. _log.info('found %d components, largest has %s items', len(hist), np.max(hist))
  101. print('COMPONENTS', len(hist), file=txout)
  102. _log.info('saving cluster records to database')
  103. is_isbn = g.vp.source.a == ns_isbn.code
  104. clusters = pd.DataFrame({
  105. 'isbn_id': g.vp.label.a[is_isbn],
  106. 'cluster': comps.a[is_isbn]
  107. })
  108. _import_clusters(dbc, clusters)
  109. _log.info('saving ID graph')
  110. g.vp['cluster'] = comps
  111. g.save('data/id-graph.gt')
  112. c_hash = _hash_frame(clusters)
  113. print('WRITE CLUSTERS', c_hash, file=txout)
  114. tracking.end_stage(dbc, 'cluster', c_hash)
  115. opts = docopt(__doc__)
  116. tx_fn = opts.get('-T', None)
  117. if tx_fn == '-' or not tx_fn:
  118. tx_out = sys.stdout
  119. else:
  120. _log.info('writing transcript to %s', tx_fn)
  121. tx_out = open(tx_fn, 'w')
  122. cluster(tx_out)
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...