Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

model_dag.py 4.1 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
  1. from airflow import DAG
  2. from airflow.providers.standard.operators.python import PythonOperator
  3. from datetime import datetime
  4. import sys
  5. import os
  6. from pathlib import Path
  7. # Add project root to Python path
  8. # Handle both local execution and Airflow execution
  9. if 'airflow' in __file__:
  10. # Running from Airflow dags directory - adjust for WSL/Ubuntu
  11. project_root = Path.home() / "MachineLearning Pipeline"
  12. else:
  13. # Running from project directory
  14. project_root = Path(__file__).parent
  15. # Add both project root and src directory to Python path
  16. sys.path.insert(0, str(project_root))
  17. sys.path.insert(0, str(project_root / "src"))
  18. # Change working directory to project root for relative paths to work
  19. os.chdir(str(project_root))
  20. # Import pipeline modules
  21. try:
  22. from src.mlpipeline.pipeline.data_ingestion_pipeline import DataIngestionTrainingPipeline
  23. from src.mlpipeline.pipeline.data_validation_pipeline import DataValidationTrainingPipeline
  24. from src.mlpipeline.pipeline.data_transformation_pipeline import DataTransformationTrainingPipeline
  25. from src.mlpipeline.pipeline.model_trainer_pipeline import ModelTrainerTrainingPipeline
  26. from src.mlpipeline.pipeline.model_evaluation_pipeline import ModelEvaluationTrainingPipeline
  27. from src.mlpipeline.logging import logger
  28. except ImportError as e:
  29. print(f"Import error: {e}")
  30. print(f"Current working directory: {os.getcwd()}")
  31. print(f"Python path: {sys.path}")
  32. raise
  33. # Task 1: Data Ingestion
  34. def data_ingestion():
  35. logger.info(">>>>>> Data Ingestion Stage started <<<<<<")
  36. data_ingestion_pipeline = DataIngestionTrainingPipeline()
  37. data_ingestion_pipeline.initiate_data_ingestion()
  38. logger.info(">>>>>> Data Ingestion Stage completed <<<<<<")
  39. return "Data ingestion completed successfully"
  40. # Task 2: Data Validation
  41. def data_validation():
  42. logger.info(">>>>>> Data Validation Stage started <<<<<<")
  43. data_validation_pipeline = DataValidationTrainingPipeline()
  44. data_validation_pipeline.initiate_data_validation()
  45. logger.info(">>>>>> Data Validation Stage completed <<<<<<")
  46. return "Data validation completed successfully"
  47. # Task 3: Data Transformation
  48. def data_transformation():
  49. logger.info(">>>>>> Data Transformation Stage started <<<<<<")
  50. data_transformation_pipeline = DataTransformationTrainingPipeline()
  51. data_transformation_pipeline.initiate_data_transformation()
  52. logger.info(">>>>>> Data Transformation Stage completed <<<<<<")
  53. return "Data transformation completed successfully"
  54. # Task 4: Model Training
  55. def model_training():
  56. logger.info(">>>>>> Model Training Stage started <<<<<<")
  57. model_trainer_pipeline = ModelTrainerTrainingPipeline()
  58. model_trainer_pipeline.initiate_model_trainer()
  59. logger.info(">>>>>> Model Training Stage completed <<<<<<")
  60. return "Model training completed successfully"
  61. # Task 5: Model Evaluation
  62. def model_evaluation():
  63. logger.info(">>>>>> Model Evaluation Stage started <<<<<<")
  64. model_evaluation_pipeline = ModelEvaluationTrainingPipeline()
  65. model_evaluation_pipeline.initiate_model_evaluation()
  66. logger.info(">>>>>> Model Evaluation Stage completed <<<<<<")
  67. return "Model evaluation completed successfully"
  68. # Define the DAG
  69. with DAG(
  70. dag_id='ml_pipeline_dag',
  71. start_date=datetime(2025, 7, 12),
  72. schedule=None,
  73. catchup=False
  74. ) as dag:
  75. data_ingestion_task = PythonOperator(
  76. task_id='data_ingestion',
  77. python_callable=data_ingestion
  78. )
  79. data_validation_task = PythonOperator(
  80. task_id='data_validation',
  81. python_callable=data_validation
  82. )
  83. data_transformation_task = PythonOperator(
  84. task_id='data_transformation',
  85. python_callable=data_transformation
  86. )
  87. model_training_task = PythonOperator(
  88. task_id='model_training',
  89. python_callable=model_training
  90. )
  91. model_evaluation_task = PythonOperator(
  92. task_id='model_evaluation',
  93. python_callable=model_evaluation
  94. )
  95. # Set task dependencies
  96. data_ingestion_task >> data_validation_task >> data_transformation_task >> model_training_task >> model_evaluation_task
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...