Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

coef_matrix.py 4.2 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
  1. # Created by: leo
  2. # Created on: 2018.11.14
  3. import pandas as pd
  4. import numpy as np
  5. import logging
  6. from max_current import (CURRENT_HEADER, TIME_HEADER, CLUSTER_HEADER, OUTLIER,
  7. CONST_ZERO, CONST_NONZERO, get_thr_from_coefs)
  8. def coef_median(inp: pd.DataFrame, min_sample_no: int = 2) -> pd.Series:
  9. """
  10. 基于相关系数矩阵列中值法计算指定数据集所在的时间窗口内所有组串的相关系数向量
  11. :param inp: 包含时间和电流数值的 Dataframe,需包含3列:时间戳, 组串ID, 电流值
  12. :param min_sample_no: 最小有效观测数,数据清洗后如果小于此值则返回异常值
  13. :returns: 包含每个组串与最大电流所在组串比较的相关系数向量,长度为 inp 内包含所有组串数量
  14. """
  15. if len(inp) == 0:
  16. return pd.Series([OUTLIER])
  17. # 若输入为纵表,转换为宽表(组串ID作列名)
  18. if CLUSTER_HEADER in inp.columns:
  19. raw = pd.pivot_table(inp, values=CURRENT_HEADER, index=[TIME_HEADER], columns=[CLUSTER_HEADER])
  20. data = raw.dropna()
  21. data.index = pd.to_datetime(data.index)
  22. else:
  23. data = inp
  24. # 校验有效数据长度,若低于最小阈值则返回异常指标序列
  25. if len(data) < min_sample_no:
  26. return pd.Series([OUTLIER])
  27. coef_matrix = data.corr()
  28. coef_med = coef_matrix.median()
  29. if coef_med.dropna().empty:
  30. return pd.Series([OUTLIER])
  31. # 保证电流高的组串相关系数高
  32. coefs = coef_med if data[coef_med.idxmax()].mean() >= data[coef_med.idxmin()].mean() else 1 - coef_med
  33. # mark strings with constant current
  34. consts = coefs[coefs.isna()]
  35. for idx in consts.index:
  36. if sum(abs(data[idx])) < 1e-7:
  37. coefs[idx] = CONST_ZERO
  38. else:
  39. coefs[idx] = CONST_NONZERO
  40. return coefs
  41. def string_coef_matrix(inp: pd.DataFrame, str_id: int) -> float:
  42. """
  43. 基于相关系数矩阵方法计算指定数据集所在的窗口时间内指定组串的电流相关性系数
  44. :param inp: 包含时间和电流数值的 Dataframe,需包含3列:时间戳, 组串ID, 电流值
  45. :param str_id: 指定组串的ID
  46. :return: 指定组串与最大电流所在组串比较的相关系数
  47. """
  48. coefs = coef_median(inp)
  49. if len(coefs) == 1:
  50. return coefs[0]
  51. if str_id in coefs.index:
  52. return coefs[str_id]
  53. else:
  54. return OUTLIER
  55. def calc_matrix_coefs(inp: pd.DataFrame) -> pd.DataFrame:
  56. """
  57. 基于相关系数矩阵方法计算所有组串在输入 DataFrame 上的所有相关系数,作为后续聚类的输入
  58. :param inp: 包含多个月份一个汇流箱内所有组串电流值的数据集,需包含3列:时间戳, 组串ID, 电流值
  59. :return: DataFrame, index 为时间,列为输入 DataFrame 中包含的所有组串ID
  60. """
  61. raw = pd.pivot_table(inp, values=CURRENT_HEADER, index=[TIME_HEADER], columns=[CLUSTER_HEADER])
  62. data = raw.dropna()
  63. data.index = pd.to_datetime(data.index)
  64. daily = data.groupby(pd.Grouper(freq='D'))
  65. coef_tbl = {str_id: daily.apply(string_coef_matrix, str_id) for str_id in data.columns}
  66. return pd.DataFrame(coef_tbl)
  67. def train_median_thresholds(inp: pd.DataFrame) -> np.ndarray:
  68. """
  69. 光伏故障分类算子训练函数,计算分隔阈值
  70. :param inp: 包含多个月份一个汇流箱内所有组串电流值的数据集,需包含3列:时间戳, 组串ID, 电流值
  71. :param thr: 月度最大电流的百分比,低于此阈值的数据被分入低电流组
  72. :return: 二元组,分别为正常、关注、告警组的分隔阈值
  73. """
  74. coefs = calc_matrix_coefs(inp)
  75. return get_thr_from_coefs(coefs)
  76. def classify(inp: pd.DataFrame) -> pd.DataFrame:
  77. """
  78. 根据输入参数对输出数据做状态分类
  79. :param inp: 输入电流数据,需包含3列:时间戳, 组串ID, 电流值
  80. :return: 输入数据集中每个组串的电流中值,为了配合PySpark 的 PandasUDFType.GROUPED_MAP,
  81. 类型改为 DataFrame
  82. """
  83. # 需要将电流值由字符串类型转为实数类型
  84. fix_type = inp.astype({'current': 'float'})
  85. coefs = coef_median(fix_type)
  86. return coefs.to_frame('coef')
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...