Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

sync.py 2.6 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
  1. import psutil, os, stat, sys, time
  2. from tempfile import NamedTemporaryFile
  3. from watchdog.observers import Observer
  4. from watchdog.events import PatternMatchingEventHandler
  5. class Sync(object):
  6. """Watches for files to change and automatically pushes them
  7. """
  8. def __init__(self, api, project, bucket="default"):
  9. self._proc = psutil.Process(os.getpid())
  10. self._api = api
  11. self._project = project
  12. self._bucket = bucket
  13. self._handler = PatternMatchingEventHandler()
  14. self._handler.on_created = self.add
  15. self._handler.on_modified = self.push
  16. self._observer = Observer()
  17. self._observer.schedule(self._handler, os.path.abspath("."), recursive=False)
  18. def watch(self, files=[]):
  19. if len(files) > 0:
  20. self._handler._patterns = [os.path.abspath(file) for file in files]
  21. #TODO: upsert command line
  22. self._observer.start()
  23. slug = "{project}/{bucket}".format(
  24. project=self._project,
  25. bucket=self._bucket
  26. )
  27. print("Watching changes for %s" % slug)
  28. output = NamedTemporaryFile("w")
  29. try:
  30. if self.source_proc:
  31. output.write(" ".join(self.source_proc.cmdline())+"\n\n")
  32. line = sys.stdin.readline()
  33. while line:
  34. sys.stdout.write(line)
  35. output.write(line)
  36. #TODO: push log every few minutes...
  37. line = sys.stdin.readline()
  38. #Wait for changes
  39. time.sleep(0.1)
  40. output.flush()
  41. print("Pushing log")
  42. self._api.push(slug, {"training.log": open(output.name, "rb")})
  43. else:
  44. time.sleep(1.0)
  45. output.close()
  46. self._observer.stop()
  47. except KeyboardInterrupt:
  48. self._observer.stop()
  49. self._observer.join()
  50. #TODO: limit / throttle the number of adds / pushes
  51. def add(self, event):
  52. self.push(event)
  53. def push(self, event):
  54. if os.stat(event.src_path).st_size == 0 or os.path.isdir(event.src_path):
  55. return None
  56. fileName = event.src_path.split("/")[-1]
  57. print("Pushing {file}".format(file=fileName))
  58. self._api.push(self._project, [fileName], bucket=self._bucket)
  59. @property
  60. def source_proc(self):
  61. mode = os.fstat(0).st_mode
  62. if not stat.S_ISFIFO(mode):
  63. # stdin is not a pipe
  64. return None
  65. else:
  66. source = self._proc.parent().children()[0]
  67. return None if source == self._proc else source
  68. def echo(self):
  69. print(sys.stdin.read())
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...