1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
- import datetime
- import json
- from collections import OrderedDict
- from typing import Dict
- import numpy as np
- import ray
- import ray.train.torch # NOQA: F401 (imported but unused)
- import typer
- from ray.data import Dataset
- from ray.train.torch.torch_predictor import TorchPredictor
- from sklearn.metrics import precision_recall_fscore_support
- from snorkel.slicing import PandasSFApplier, slicing_function
- from typing_extensions import Annotated
- from madewithml import predict, utils
- from madewithml.config import logger
- # Initialize Typer CLI app
- app = typer.Typer()
- def get_overall_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> Dict: # pragma: no cover, eval workload
- """Get overall performance metrics.
- Args:
- y_true (np.ndarray): ground truth labels.
- y_pred (np.ndarray): predicted labels.
- Returns:
- Dict: overall metrics.
- """
- metrics = precision_recall_fscore_support(y_true, y_pred, average="weighted")
- overall_metrics = {
- "precision": metrics[0],
- "recall": metrics[1],
- "f1": metrics[2],
- "num_samples": np.float64(len(y_true)),
- }
- return overall_metrics
- def get_per_class_metrics(y_true: np.ndarray, y_pred: np.ndarray, class_to_index: Dict) -> Dict: # pragma: no cover, eval workload
- """Get per class performance metrics.
- Args:
- y_true (np.ndarray): ground truth labels.
- y_pred (np.ndarray): predicted labels.
- class_to_index (Dict): dictionary mapping class to index.
- Returns:
- Dict: per class metrics.
- """
- per_class_metrics = {}
- metrics = precision_recall_fscore_support(y_true, y_pred, average=None)
- for i, _class in enumerate(class_to_index):
- per_class_metrics[_class] = {
- "precision": metrics[0][i],
- "recall": metrics[1][i],
- "f1": metrics[2][i],
- "num_samples": np.float64(metrics[3][i]),
- }
- sorted_per_class_metrics = OrderedDict(sorted(per_class_metrics.items(), key=lambda tag: tag[1]["f1"], reverse=True))
- return sorted_per_class_metrics
- @slicing_function()
- def nlp_llm(x): # pragma: no cover, eval workload
- """NLP projects that use LLMs."""
- nlp_project = "natural-language-processing" in x.tag
- llm_terms = ["transformer", "llm", "bert"]
- llm_project = any(s.lower() in x.text.lower() for s in llm_terms)
- return nlp_project and llm_project
- @slicing_function()
- def short_text(x): # pragma: no cover, eval workload
- """Projects with short titles and descriptions."""
- return len(x.text.split()) < 8 # less than 8 words
- def get_slice_metrics(y_true: np.ndarray, y_pred: np.ndarray, ds: Dataset) -> Dict: # pragma: no cover, eval workload
- """Get performance metrics for slices.
- Args:
- y_true (np.ndarray): ground truth labels.
- y_pred (np.ndarray): predicted labels.
- ds (Dataset): Ray dataset with labels.
- Returns:
- Dict: performance metrics for slices.
- """
- slice_metrics = {}
- df = ds.to_pandas()
- df["text"] = df["title"] + " " + df["description"]
- slices = PandasSFApplier([nlp_llm, short_text]).apply(df)
- for slice_name in slices.dtype.names:
- mask = slices[slice_name].astype(bool)
- if sum(mask):
- metrics = precision_recall_fscore_support(y_true[mask], y_pred[mask], average="micro")
- slice_metrics[slice_name] = {}
- slice_metrics[slice_name]["precision"] = metrics[0]
- slice_metrics[slice_name]["recall"] = metrics[1]
- slice_metrics[slice_name]["f1"] = metrics[2]
- slice_metrics[slice_name]["num_samples"] = len(y_true[mask])
- return slice_metrics
- @app.command()
- def evaluate(
- run_id: Annotated[str, typer.Option(help="id of the specific run to load from")] = None,
- dataset_loc: Annotated[str, typer.Option(help="dataset (with labels) to evaluate on")] = None,
- results_fp: Annotated[str, typer.Option(help="location to save evaluation results to")] = None,
- ) -> Dict: # pragma: no cover, eval workload
- """Evaluate on the holdout dataset.
- Args:
- run_id (str): id of the specific run to load from. Defaults to None.
- dataset_loc (str): dataset (with labels) to evaluate on.
- results_fp (str, optional): location to save evaluation results to. Defaults to None.
- Returns:
- Dict: model's performance metrics on the dataset.
- """
- # Load
- ds = ray.data.read_csv(dataset_loc)
- best_checkpoint = predict.get_best_checkpoint(run_id=run_id)
- predictor = TorchPredictor.from_checkpoint(best_checkpoint)
- # y_true
- preprocessor = predictor.get_preprocessor()
- preprocessed_ds = preprocessor.transform(ds)
- values = preprocessed_ds.select_columns(cols=["targets"]).take_all()
- y_true = np.stack([item["targets"] for item in values])
- # y_pred
- z = predictor.predict(data=ds.to_pandas())["predictions"]
- y_pred = np.stack(z).argmax(1)
- # Metrics
- metrics = {
- "timestamp": datetime.datetime.now().strftime("%B %d, %Y %I:%M:%S %p"),
- "run_id": run_id,
- "overall": get_overall_metrics(y_true=y_true, y_pred=y_pred),
- "per_class": get_per_class_metrics(y_true=y_true, y_pred=y_pred, class_to_index=preprocessor.class_to_index),
- "slices": get_slice_metrics(y_true=y_true, y_pred=y_pred, ds=ds),
- }
- logger.info(json.dumps(metrics, indent=2))
- if results_fp: # pragma: no cover, saving results
- utils.save_dict(d=metrics, path=results_fp)
- return metrics
- if __name__ == "__main__": # pragma: no cover, checked during evaluation workload
- app()
|