Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

MPSPSCMRRingData.py 5.7 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
  1. import multiprocessing
  2. from operator import mul
  3. import uuid
  4. from typing import Union
  5. from ..io import FormattedMemoryViewIO
  6. from .MPSharedMemory import MPSharedMemory
  7. class MPSPSCMRRingData:
  8. """
  9. Multiprocess lockless Single Producer, Single Consumer, Multi Reader Ring Data.
  10. Producer knows how many data is read by Consumer (by accessing read_id)
  11. Side readers can read last data without locks.
  12. The data returned is either valid or None.
  13. """
  14. def __init__(self, table_size, heap_size_mb, multi_producer : bool = False):
  15. self._table_size = table_size
  16. self._heap_size = heap_size = heap_size_mb*1024*1024
  17. self._write_lock = multiprocessing.Lock() if multi_producer else None
  18. self._event = multiprocessing.Event()
  19. self._sizeof_uuid = 16
  20. table_item_size = self._table_item_size = 8+8+self._sizeof_uuid
  21. self._table_offset = 8+8
  22. self._heap_offset = self._table_offset + table_size*table_item_size
  23. self._shared_mem = MPSharedMemory(8+8+table_size*table_item_size + heap_size)
  24. self._initialize_mvs()
  25. self._mv_ids[0] = 0 # write_id
  26. self._mv_ids[1] = 0 # read_id
  27. # Initialize first block at 0 index
  28. wid = 0
  29. wid_uuid = uuid.uuid4().bytes
  30. wid_heap_offset = 0
  31. wid_data_size = 0
  32. fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
  33. fmv.seek(self._table_offset + (wid % self._table_size)*self._table_item_size)
  34. fmv.write_fmt('QQ', wid_heap_offset, wid_data_size), fmv.write(wid_uuid)
  35. def _initialize_mvs(self):
  36. mv = self._shared_mem.get_mv()
  37. self._mv_ids = mv.cast('Q')[0:2]
  38. def __getstate__(self):
  39. d = self.__dict__.copy()
  40. d.pop('_mv_ids')
  41. return d
  42. def __setstate__(self, d):
  43. self.__dict__.update(d)
  44. self._initialize_mvs()
  45. def get_write_id(self) -> int: return self._mv_ids[0]
  46. def get_read_id(self) -> int: return self._mv_ids[1]
  47. def write(self, data : Union[bytes, bytearray]):
  48. """
  49. write data incrementing write_id
  50. """
  51. heap_size = self._heap_size
  52. if not isinstance(data, (bytes, bytearray)):
  53. raise ValueError('data must be an instance of bytes or bytearray')
  54. data_size = len(data)
  55. if data_size == 0:
  56. raise ValueError('data_size must be > 0')
  57. data_size_in_heap = data_size+self._sizeof_uuid
  58. data_size_in_heap = ( data_size_in_heap + (-data_size_in_heap & 7) )
  59. if data_size_in_heap > heap_size:
  60. raise Exception('data_size more than heap_size')
  61. fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
  62. wid_uuid = uuid.uuid4().bytes
  63. if self._write_lock is not None:
  64. self._write_lock.acquire()
  65. wid = self._mv_ids[0]
  66. # Read table record of wid
  67. fmv.seek(self._table_offset + (wid % self._table_size)*self._table_item_size)
  68. (wid_heap_offset, wid_data_size), _ = fmv.read_fmt('QQ'), fmv.read(self._sizeof_uuid)
  69. # Calc aligned next offset
  70. wid_heap_offset += self._sizeof_uuid + wid_data_size
  71. wid_heap_offset = ( wid_heap_offset + (-wid_heap_offset & 7) )
  72. # Check if next offset with data size fit remain heap space
  73. if wid_heap_offset+data_size_in_heap >= heap_size:
  74. wid_heap_offset = 0
  75. # Write the data into heap
  76. fmv.seek(self._heap_offset + wid_heap_offset)
  77. fmv.write(wid_uuid)
  78. fmv.write(data)
  79. # Write new table record
  80. wid += 1
  81. fmv.seek(self._table_offset + (wid % self._table_size)*self._table_item_size)
  82. fmv.write_fmt('QQ', wid_heap_offset, data_size), fmv.write(wid_uuid)
  83. # Set new write_id
  84. self._mv_ids[0] = wid
  85. if self._write_lock is not None:
  86. self._write_lock.release()
  87. self._event.set()
  88. def get_by_id(self, id) -> Union[bytearray, None]:
  89. """
  90. get data by id
  91. """
  92. fmv = FormattedMemoryViewIO(self._shared_mem.get_mv())
  93. sizeof_uuid = self._sizeof_uuid
  94. # Read table record
  95. fmv.seek(self._table_offset + (id % self._table_size)*self._table_item_size)
  96. (rid_heap_offset, rid_data_size), rid_uuid = fmv.read_fmt('QQ'), fmv.read(sizeof_uuid)
  97. if rid_data_size == 0:
  98. return None
  99. # Seek to the data in the heap
  100. fmv.seek(self._heap_offset + rid_heap_offset)
  101. # Check data validness
  102. if fmv.read(sizeof_uuid) != rid_uuid:
  103. return None
  104. # read the data
  105. result = fmv.read(rid_data_size)
  106. # Check data validness again
  107. fmv.seek(self._heap_offset + rid_heap_offset)
  108. if fmv.read(sizeof_uuid) != rid_uuid:
  109. return None
  110. return result
  111. def read(self, timeout=0, update_rid=True) -> Union[bytearray, None]:
  112. """
  113. read data incrementing read_id
  114. """
  115. if self._mv_ids[0] == self._mv_ids[1]:
  116. if timeout == 0:
  117. return None
  118. if not self._event.wait(timeout):
  119. return None
  120. self._event.clear()
  121. wid, rid = self._mv_ids[0], self._mv_ids[1]
  122. result = None
  123. while rid < wid:
  124. rid = rid+1
  125. result = self.get_by_id(rid)
  126. if result is not None:
  127. break
  128. if update_rid:
  129. self._mv_ids[1] = rid
  130. return result
  131. # def wait_for_read(self, timeout : float) -> bool:
  132. # """
  133. # returns True if ready to .read()
  134. # """
  135. # result = self._event.wait(timeout)
  136. # if result:
  137. # self._event.clear()
  138. # return result
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...