Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

app.py 5.8 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
  1. import sys
  2. import pandas as pd
  3. import subprocess
  4. import logging
  5. import csv
  6. import json
  7. import numpy as np
  8. from typing import Tuple
  9. """
  10. * @author Leo
  11. * 2019.1.25
  12. *
  13. * @Description: 光伏组串离散率算子入口函数
  14. *
  15. * 实现标准输入格式到算子输入的转换
  16. * 保证不论输入的是一个计算单元,还是多个计算单元,不影响算子实现
  17. 常量定义:
  18. 大写字段名用于输入、输出,小写字段名用于计算过程中的存储字段
  19. """
  20. START_TIME = '07:00'
  21. END_TIME = '17:00'
  22. TF = 30
  23. SAMPLE_INT = 5 # 采用频率,目前规定为5min
  24. I0 = 1
  25. IM = 12
  26. STATION_ID = 'STATION_ID'
  27. DEVICE_ID = 'DEVICE_ID'
  28. TIME_HEADER = 'MONITOR_TIME'
  29. REPORT_TIME = 'ALERT_TIME'
  30. V_HEADER = 'VOLTAGE_VALUE'
  31. CURRENT_HEADER = 'CURRENT_VALUE'
  32. GRP_ID = 'grp_id'
  33. STR_HEADER = 'str_no'
  34. DEVIATION = 'DEVIATION' # 输出结果中 离散率 列名
  35. STR_STATUS = 'STR_STATUS' # 输出结果中 组串状态 列名
  36. COM_FAIL = -1
  37. UNIT_INVALID = -2
  38. STR_INVALID = -3
  39. COM_FAIL_DF = pd.DataFrame({DEVIATION: [COM_FAIL], STR_STATUS: ["-"]}, index=pd.Index([END_TIME]))
  40. UNIT_INVALID_DF = pd.DataFrame({DEVIATION: [UNIT_INVALID], STR_STATUS: ["-"]}, index=pd.Index([END_TIME]))
  41. inputFilePath = sys.argv[1]
  42. outputFilePath = sys.argv[2]
  43. isTrain = sys.argv[3]
  44. # inputFilePath = "/home/leo//docs/znbt/photovoltaics/HuBeiShaYang/fullTable/comb1122.csv"
  45. # outputFilePath = "res.csv"
  46. # isTrain = "false"
  47. logging.basicConfig(level=logging.INFO)
  48. logging.debug(f"input file: {inputFilePath}, output file: {outputFilePath}")
  49. logging.debug('-------- Head of input CSV file: ---------')
  50. head_lines = subprocess.run(['head', inputFilePath], check=True, stdout=subprocess.PIPE)
  51. logging.debug(head_lines)
  52. def get_valid_zone(indf: pd.DataFrame, thr: float):
  53. """获取有效计算区间 [t1, t2]
  54. """
  55. above_i0_flags = indf[CURRENT_HEADER] >= thr
  56. above_i0_vals = indf.loc[above_i0_flags.any(axis=1)]
  57. assert not above_i0_vals.empty # 通过 step 2 的验证,本步中至少应该有一个大于 I0 的电流值
  58. return above_i0_vals.index[0]
  59. def filter_window(indf: pd.DataFrame, start_index) -> pd.DataFrame:
  60. """按照文档第3步要求对过滤和校正组串电流值
  61. """
  62. df2 = indf[start_index:]
  63. df3 = df2[(df2[CURRENT_HEADER] >= -0.5) & (df2[CURRENT_HEADER] <= 12)]
  64. # 如果 df3 长度为 0,直接返回空 DataFrame,在后续计算中不产生效果,
  65. # 所以不需要这里专门标记为 组串无有效值
  66. pd.options.mode.chained_assignment = None
  67. df3[CURRENT_HEADER] = df3[CURRENT_HEADER].apply(lambda x: x if (x > 0.1) else 0)
  68. return df3[[CURRENT_HEADER, V_HEADER]] # 去掉重复的 组串编号 列
  69. def mark_unit(powers: pd.Series, str_info: dict, d_1st: float=None) -> Tuple[float, dict]:
  70. """根据一个计算单元中各个组串的平均功率计算 d 值和组串异常状态
  71. """
  72. for str_no in str_info:
  73. powers[str_no] = np.nan
  74. if powers.dropna().empty:
  75. return (d_1st, str_info)
  76. p_u = np.mean(powers)
  77. d = np.sqrt(sum((powers - p_u).pow(2).dropna())) / p_u
  78. d_1st = d if d_1st is None else d_1st
  79. if d <= 0.05:
  80. return (d_1st, str_info)
  81. elif (d > 0.05) and (d <= 0.1):
  82. str_info[powers.idxmin()] = 'E3'
  83. elif (d > 0.1) and (d <= 0.2):
  84. str_info[powers.idxmin()] = 'E2'
  85. elif d > 0.2:
  86. str_info[powers.idxmin()] = 'E1'
  87. return mark_unit(powers, str_info, d_1st)
  88. def unit_deviation(unit: pd.DataFrame) -> pd.DataFrame:
  89. """计算计算单元的离散度和组串状态
  90. """
  91. logging.debug(f'Process unit:\n{unit.head(1)}')
  92. unit2 = unit[[CURRENT_HEADER, V_HEADER, STR_HEADER]]
  93. # step 1
  94. pivt = pd.pivot_table(unit2, values=[CURRENT_HEADER,V_HEADER], index=[TIME_HEADER], columns=[STR_HEADER])
  95. if pivt.rolling(int(TF / SAMPLE_INT)).std().min().sum() == 0:
  96. return COM_FAIL_DF
  97. # step 2
  98. str_grps = unit2.groupby(STR_HEADER)
  99. if str_grps.apply(lambda x: max(x[CURRENT_HEADER]) < I0).all():
  100. return UNIT_INVALID_DF
  101. # step 3
  102. start = get_valid_zone(pivt, I0)
  103. logging.debug(f'Unit start time: {start}')
  104. filtered_data = str_grps.apply(filter_window, (start)) # apply 的函数参数必须是 tuple
  105. # step 4
  106. str_avg = filtered_data.groupby(STR_HEADER).mean()
  107. if str_avg[CURRENT_HEADER].mean() < I0:
  108. return UNIT_INVALID_DF
  109. # step 5
  110. p_s = str_avg.apply(np.prod, axis=1)
  111. # step 6 ~ 9
  112. d, str_info = mark_unit(p_s, {})
  113. # step 10
  114. return pd.DataFrame({DEVIATION: [d], STR_STATUS: [json.dumps(str_info, separators=('|', ':'))]},
  115. index=pd.Index([END_TIME]))
  116. def main(inf, outf, train):
  117. inp = pd.read_csv(inputFilePath,
  118. usecols=[0, 1, 2, 3, 4],
  119. dtype={STATION_ID: str, DEVICE_ID: str, CURRENT_HEADER: float, V_HEADER: float},
  120. index_col=2,
  121. parse_dates = [TIME_HEADER]).dropna()
  122. inp = inp.between_time(START_TIME, END_TIME)
  123. inp[GRP_ID] = inp[STATION_ID] + inp[DEVICE_ID].apply(lambda x: x[:6]) # 设备标识是一个8位数字组成的字符串,最后两位是组串编号
  124. inp[STR_HEADER] = inp[DEVICE_ID].apply(lambda x: x[6:])
  125. unit_grps = inp.groupby(GRP_ID)
  126. res = unit_grps.apply(unit_deviation)
  127. if isTrain == 'true':
  128. fh = open(outf, 'w', encoding='utf-8')
  129. fh.write("this is train's result")
  130. fh.close()
  131. else:
  132. r2 = res.reset_index(level=GRP_ID)
  133. res.to_csv(outf, encoding='utf-8', index=True, quoting=csv.QUOTE_NONE)
  134. # res.to_csv(outf, encoding='utf-8', index=False,
  135. # columns=[STATION_ID, DEVICE_ID, REPORT_TIME, 'ALERT_TYPE', 'ALERT_LEVEL', JSON_MAT,
  136. # 'IS_PROCESSED', COEF_COL], quoting=csv.QUOTE_NONE, escapechar='|')
  137. if __name__ == '__main__':
  138. main(inputFilePath, outputFilePath, isTrain)
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...