Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

iter_utils.py 1.9 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
  1. import threading
  2. import queue as Queue
  3. import multiprocessing
  4. import time
  5. class ThisThreadGenerator(object):
  6. def __init__(self, generator_func, user_param=None):
  7. super().__init__()
  8. self.generator_func = generator_func
  9. self.user_param = user_param
  10. self.initialized = False
  11. def __iter__(self):
  12. return self
  13. def __next__(self):
  14. if not self.initialized:
  15. self.initialized = True
  16. self.generator_func = self.generator_func(self.user_param)
  17. return next(self.generator_func)
  18. class SubprocessGenerator(object):
  19. def __init__(self, generator_func, user_param=None, prefetch=2):
  20. super().__init__()
  21. self.prefetch = prefetch
  22. self.generator_func = generator_func
  23. self.user_param = user_param
  24. self.sc_queue = multiprocessing.Queue()
  25. self.cs_queue = multiprocessing.Queue()
  26. self.p = None
  27. def process_func(self):
  28. self.generator_func = self.generator_func(self.user_param)
  29. while True:
  30. while self.prefetch > -1:
  31. try:
  32. gen_data = next (self.generator_func)
  33. except StopIteration:
  34. self.cs_queue.put (None)
  35. return
  36. self.cs_queue.put (gen_data)
  37. self.prefetch -= 1
  38. self.sc_queue.get()
  39. self.prefetch += 1
  40. def __iter__(self):
  41. return self
  42. def __next__(self):
  43. if self.p == None:
  44. self.p = multiprocessing.Process(target=self.process_func, args=())
  45. self.p.daemon = True
  46. self.p.start()
  47. gen_data = self.cs_queue.get()
  48. if gen_data is None:
  49. self.p.terminate()
  50. self.p.join()
  51. raise StopIteration()
  52. self.sc_queue.put (1)
  53. return gen_data
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...