Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

IOThreadLinesReader.py 1.3 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
  1. import threading
  2. import time
  3. from io import IOBase
  4. from typing import List
  5. from collections import deque
  6. class IOThreadLinesReader:
  7. """
  8. continuously reads lines from IO in background thread.
  9. """
  10. def __init__(self, io : IOBase, max_lines=None):
  11. self._io = io
  12. self._lock = threading.Lock()
  13. self._lines = deque(maxlen=max_lines)
  14. threading.Thread(target=self._proc, daemon=True).start()
  15. def _proc(self):
  16. io = self._io
  17. lock = self._lock
  18. lines = self._lines
  19. while not io.closed and io.readable():
  20. line = io.readline()
  21. lock.acquire()
  22. lines.append(line.decode('utf-8').rstrip())
  23. lock.release()
  24. if len(line) == 0:
  25. break
  26. time.sleep(0.01)
  27. def get_lines(self, wait_new=True, till_eof=False) -> List[str]:
  28. """
  29. """
  30. lock = self._lock
  31. lines = self._lines
  32. result = []
  33. while True:
  34. if len(lines) != 0:
  35. lock.acquire()
  36. result += lines
  37. lines.clear()
  38. lock.release()
  39. if till_eof and len(result[-1]) != 0:
  40. continue
  41. return result
  42. if not till_eof and not wait_new:
  43. return None
  44. time.sleep(0.001)
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...