1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
- # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- # SPDX-License-Identifier: MIT-0
- """A CLI to create or update and run pipelines."""
- from __future__ import absolute_import
- import argparse
- import json
- import sys
- from pipelines._utils import get_pipeline_driver, convert_struct
- def main(): # pragma: no cover
- """The main harness that creates or updates and runs the pipeline.
- Creates or updates the pipeline and runs it.
- """
- parser = argparse.ArgumentParser(
- "Creates or updates and runs the pipeline for the pipeline script."
- )
- parser.add_argument(
- "-n",
- "--module-name",
- dest="module_name",
- type=str,
- help="The module name of the pipeline to import.",
- )
- parser.add_argument(
- "-kwargs",
- "--kwargs",
- dest="kwargs",
- default=None,
- help="Dict string of keyword arguments for the pipeline generation (if supported)",
- )
- parser.add_argument(
- "-role-arn",
- "--role-arn",
- dest="role_arn",
- type=str,
- help="The role arn for the pipeline service execution role.",
- )
- parser.add_argument(
- "-description",
- "--description",
- dest="description",
- type=str,
- default=None,
- help="The description of the pipeline.",
- )
- parser.add_argument(
- "-tags",
- "--tags",
- dest="tags",
- default=None,
- help="""List of dict strings of '[{"Key": "string", "Value": "string"}, ..]'""",
- )
- args = parser.parse_args()
- if args.module_name is None or args.role_arn is None:
- parser.print_help()
- sys.exit(2)
- tags = convert_struct(args.tags)
- try:
- pipeline = get_pipeline_driver(args.module_name, args.kwargs)
- print("###### Creating/updating a SageMaker Pipeline with the following definition:")
- parsed = json.loads(pipeline.definition())
- print(json.dumps(parsed, indent=2, sort_keys=True))
- upsert_response = pipeline.upsert(
- role_arn=args.role_arn, description=args.description, tags=tags
- )
- print("\n###### Created/Updated SageMaker Pipeline: Response received:")
- print(upsert_response)
- execution = pipeline.start()
- print(f"\n###### Execution started with PipelineExecutionArn: {execution.arn}")
- print("Waiting for the execution to finish...")
- description = execution.describe()
- print(f"\n###### Execution started with description: {description}")
- execution.wait()
- print("\n#####Execution completed. Execution step details:")
- print(execution.list_steps())
- # Todo print the status?
- except Exception as e: # pylint: disable=W0703
- print(f"Exception: {e}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|