Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

MPWeakHeap.py 6.9 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
  1. import multiprocessing
  2. import uuid
  3. from typing import Union
  4. from ..io import FormattedMemoryViewIO
  5. from .MPSharedMemory import MPSharedMemory
  6. class MPWeakHeap:
  7. """
  8. Multiprocess weak heap.
  9. heap structure
  10. |ring_head_block_offset block block block ...|
  11. block structure:
  12. ---
  13. (8) block_size
  14. (8) data_size
  15. (16) UUID sig
  16. ...data...
  17. """
  18. class DataRef:
  19. def __init__(self, block_offset, uuid : bytes):
  20. self._block_offset = block_offset
  21. self._uuid = uuid
  22. def __init__(self, size_mb : int):
  23. self._heap_size = size_mb * 1024 * 1024 # should be 16 byte aligned
  24. self._shared_mem = MPSharedMemory(self._heap_size)
  25. self._lock = multiprocessing.Lock()
  26. # Initialize heap structure
  27. self._ring_head_block_offset = 0
  28. self._first_block_offset = 8
  29. self._block_header_size = 8+8+16
  30. self._block_data_start_offset = 8+8+16
  31. fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
  32. fmv.seek(self._ring_head_block_offset), fmv.write_fmt('q', self._first_block_offset)
  33. # Entire block
  34. fmv.seek(self._first_block_offset)
  35. fmv.write_fmt('qq', self._heap_size-self._first_block_offset, 0), fmv.write(uuid.uuid4().bytes)
  36. def add_data(self, data : Union[bytes, bytearray, memoryview] ) -> 'MPWeakHeap.DataRef':
  37. """
  38. add the data to the head of ring
  39. data
  40. """
  41. heap_size = self._heap_size
  42. block_header_size = self._block_header_size
  43. if isinstance(data, memoryview):
  44. data = data.cast('B')
  45. if not data.contiguous:
  46. raise ValueError('data as memoryview should be contiguous')
  47. data_size = data.nbytes
  48. else:
  49. data_size = len(data)
  50. lock = self._lock
  51. fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
  52. lock.acquire()
  53. # start from ring_head_block_offset
  54. fmv.seek(self._ring_head_block_offset)
  55. cur_block_offset, = fmv.read_fmt('q')
  56. while True:
  57. fmv.seek(cur_block_offset)
  58. block_size, = fmv.get_fmt('q')
  59. block_free_size = block_size - block_header_size
  60. if data_size <= block_free_size:
  61. # the space of the block is enough for the data
  62. block_new_size = block_header_size + ( data_size + (-data_size & 7) )
  63. block_remain_size = block_size-block_new_size
  64. if block_remain_size >= block_header_size:
  65. # the remain space of the block is enough for next block, split the block
  66. next_block_offset = cur_block_offset + block_new_size
  67. fmv.seek(next_block_offset), fmv.write_fmt('qq', block_remain_size, 0), fmv.write(uuid.uuid4().bytes)
  68. else:
  69. # otherwise do not split
  70. next_block_offset = cur_block_offset + block_size
  71. if next_block_offset >= heap_size:
  72. next_block_offset = self._first_block_offset
  73. block_new_size = block_size
  74. # update current block structure
  75. uid = uuid.uuid4().bytes
  76. fmv.seek(cur_block_offset), fmv.write_fmt('qq', block_new_size, data_size ), fmv.write(uid)
  77. # update ring_head_block_offset
  78. fmv.seek(self._ring_head_block_offset), fmv.write_fmt('q', next_block_offset)
  79. lock.release()
  80. # write the data into the block
  81. fmv.seek(cur_block_offset+self._block_data_start_offset)
  82. fmv.write(data)
  83. return MPWeakHeap.DataRef(cur_block_offset, uid)
  84. else:
  85. # the space of the block is not enough for the daata
  86. is_first_block = cur_block_offset == self._first_block_offset
  87. is_last_block = (cur_block_offset+block_size) >= heap_size
  88. if is_last_block:
  89. if is_first_block:
  90. lock.release()
  91. raise Exception(f'Not enough space in MPWeakHeap to allocate {data_size}')
  92. # if it is last block, leave it unchanged, and continue with first block
  93. cur_block_offset = self._first_block_offset
  94. continue
  95. else:
  96. # not last block, merge with next block
  97. # get next block size
  98. fmv.seek(cur_block_offset+block_size)
  99. next_block_size, = fmv.get_fmt('q')
  100. # erase data of next block
  101. fmv.write_fmt('qq', 0, 0), fmv.write(uuid.uuid4().bytes)
  102. # overwrite current block size with expanded block size
  103. fmv.seek(cur_block_offset)
  104. fmv.write_fmt('q', block_size+next_block_size)
  105. # continue with the same expanded block
  106. continue
  107. def get_data(self, data_ref : 'MPWeakHeap.DataRef') -> Union[bytearray, None]:
  108. """
  109. Get data
  110. if data is overwritten already, None will be returned
  111. """
  112. lock = self._lock
  113. fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
  114. # short lock to get data info
  115. fmv.seek(data_ref._block_offset)
  116. lock.acquire()
  117. (_, data_size), uuid = fmv.read_fmt('qq'), fmv.read(16)
  118. lock.release()
  119. # Check valid UUID
  120. if data_ref._uuid != uuid:
  121. return None
  122. # read the data
  123. result = fmv.read(data_size)
  124. # short lock again to validate that the reference is still valid,
  125. # thus we read valid data
  126. fmv.seek(data_ref._block_offset)
  127. lock.acquire()
  128. (_, data_size), uuid = fmv.read_fmt('qq'), fmv.read(16)
  129. lock.release()
  130. # Check valid UUID
  131. if data_ref._uuid != uuid:
  132. return None
  133. return result
  134. def summary(self) -> str:
  135. """
  136. returns a string with summary of heap
  137. """
  138. result = []
  139. heap_size = self._heap_size
  140. fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
  141. lock = self._lock
  142. lock.acquire()
  143. head_block_offset, = fmv.read_fmt('q')
  144. cur_block_offset = self._first_block_offset
  145. block_id = 0
  146. while cur_block_offset != self._heap_size:
  147. fmv.seek(cur_block_offset)
  148. (block_size, data_size), sig = fmv.read_fmt('qq'), fmv.read(16)
  149. s = ''
  150. if cur_block_offset == head_block_offset:
  151. s += f'[{block_id} HEAD]:'
  152. else:
  153. s += f'[{block_id}]:'
  154. if data_size != 0:
  155. s += f'block_size: {block_size} data_size:{data_size}'
  156. else:
  157. s += f'block_size: {block_size} empty'
  158. result.append(s)
  159. block_id += 1
  160. cur_block_offset += block_size
  161. lock.release()
  162. return '\n'.join(result)
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...