Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

devi_spark.py 6.4 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.types import StructType, StructField
  3. from pyspark.sql.functions import (pandas_udf, concat,substring,
  4. PandasUDFType, col, hour)
  5. from pyspark.sql.types import IntegerType, StringType, DoubleType, TimestampType
  6. import logging
  7. import sys
  8. import pandas as pd
  9. import numpy as np
  10. import subprocess
  11. import csv
  12. import json
  13. from typing import Tuple
  14. """
  15. * @author Leo
  16. * 2019.2.21
  17. *
  18. * @Description: 光伏组串离散率算子的 Spark 实现
  19. *
  20. * 实现标准输入格式到算子输入的转换
  21. * 保证不论输入的是一个计算单元,还是多个计算单元,不影响算子实现
  22. 常量定义:
  23. 大写字段名用于输入、输出,小写字段名用于计算过程中的存储字段
  24. """
  25. START_TIME = 7
  26. END_TIME = 17
  27. TF = 30
  28. SAMPLE_INT = 5 # 采用频率,目前规定为5min
  29. I0 = 1
  30. IM = 12
  31. STATION_ID = 'STATION_ID'
  32. DEVICE_ID = 'DEVICE_ID'
  33. TIME_HEADER = 'MONITOR_TIME'
  34. REPORT_TIME = 'ALERT_TIME'
  35. V_HEADER = 'VOLTAGE_VALUE'
  36. CURRENT_HEADER = 'CURRENT_VALUE'
  37. GRP_ID = 'grp_id'
  38. STR_HEADER = 'str_no'
  39. DEVIATION = 'DEVIATION' # 输出结果中 离散率 列名
  40. STR_STATUS = 'STR_STATUS' # 输出结果中 组串状态 列名
  41. COM_FAIL = -1
  42. UNIT_INVALID = -2
  43. STR_INVALID = -3
  44. COM_FAIL_DF = pd.DataFrame({DEVIATION: [COM_FAIL], STR_STATUS: ["-"]}, index=pd.Index([END_TIME]))
  45. UNIT_INVALID_DF = pd.DataFrame({DEVIATION: [UNIT_INVALID], STR_STATUS: ["-"]}, index=pd.Index([END_TIME]))
  46. # inputFilePath = sys.argv[1]
  47. # outputFilePath = sys.argv[2]
  48. # isTrain = sys.argv[3]
  49. inputFilePath = "/home/leo//docs/znbt/photovoltaics/HuBeiShaYang/fullTable/comb1122.csv"
  50. # inputFilePath = "/user/hdfs/pvtest/deviation1122.csv"
  51. outputFilePath = "res.csv"
  52. isTrain = "false"
  53. logging.basicConfig(level=logging.INFO)
  54. logging.debug(f"input file: {inputFilePath}, output file: {outputFilePath}")
  55. logging.debug('-------- Head of input CSV file: ---------')
  56. head_lines = subprocess.run(['head', inputFilePath], check=True, stdout=subprocess.PIPE)
  57. logging.debug(head_lines)
  58. def get_valid_zone(indf: pd.DataFrame, thr: float):
  59. """获取有效计算区间 [t1, t2]
  60. """
  61. above_i0_flags = indf[CURRENT_HEADER] >= thr
  62. above_i0_vals = indf.loc[above_i0_flags.any(axis=1)]
  63. assert not above_i0_vals.empty # 通过 step 2 的验证,本步中至少应该有一个大于 I0 的电流值
  64. return above_i0_vals.index[0]
  65. def filter_window(indf: pd.DataFrame, start_index) -> pd.DataFrame:
  66. """按照文档第3步要求对过滤和校正组串电流值
  67. """
  68. df2 = indf[start_index:]
  69. df3 = df2[(df2[CURRENT_HEADER] >= -0.5) & (df2[CURRENT_HEADER] <= 12)]
  70. # 如果 df3 长度为 0,直接返回空 DataFrame,在后续计算中不产生效果,
  71. # 所以不需要这里专门标记为 组串无有效值
  72. pd.options.mode.chained_assignment = None
  73. df3[CURRENT_HEADER] = df3[CURRENT_HEADER].apply(lambda x: x if (x > 0.1) else 0)
  74. return df3[[CURRENT_HEADER, V_HEADER]] # 去掉重复的 组串编号 列
  75. def mark_unit(powers: pd.Series, str_info: dict, d_1st: float=None) -> Tuple[float, dict]:
  76. """根据一个计算单元中各个组串的平均功率计算 d 值和组串异常状态
  77. """
  78. for str_no in str_info:
  79. powers[str_no] = np.nan
  80. if powers.dropna().empty:
  81. return (d_1st, str_info)
  82. p_u = np.mean(powers)
  83. d = np.sqrt(sum((powers - p_u).pow(2).dropna())) / p_u
  84. d_1st = d if d_1st is None else d_1st
  85. if d <= 0.05:
  86. return (d_1st, str_info)
  87. elif (d > 0.05) and (d <= 0.1):
  88. str_info[powers.idxmin()] = 'E3'
  89. elif (d > 0.1) and (d <= 0.2):
  90. str_info[powers.idxmin()] = 'E2'
  91. elif d > 0.2:
  92. str_info[powers.idxmin()] = 'E1'
  93. return mark_unit(powers, str_info, d_1st)
  94. unitSchema = StructType([
  95. StructField(DEVIATION, DoubleType(), True)
  96. ,StructField(STR_STATUS, StringType(), True)
  97. ])
  98. @pandas_udf(unitSchema, PandasUDFType.GROUPED_MAP)
  99. def unit_deviation(unit: pd.DataFrame) -> pd.DataFrame:
  100. """计算计算单元的离散度和组串状态
  101. """
  102. logging.debug(f'Process unit:\n{unit.head(1)}')
  103. logging.debug(f'Process unit:\n{unit.info()}')
  104. unit2 = unit[[TIME_HEADER, CURRENT_HEADER, V_HEADER, STR_HEADER]]
  105. # step 1
  106. pivt = pd.pivot_table(unit2, values=[CURRENT_HEADER,V_HEADER], index=[TIME_HEADER], columns=[STR_HEADER])
  107. if pivt.rolling(int(TF / SAMPLE_INT)).std().min().sum() == 0:
  108. return COM_FAIL_DF
  109. # step 2
  110. str_grps = unit2.groupby(STR_HEADER)
  111. if str_grps.apply(lambda x: max(x[CURRENT_HEADER]) < I0).all():
  112. return UNIT_INVALID_DF
  113. # step 3
  114. start = get_valid_zone(pivt, I0)
  115. logging.debug(f'Unit start time: {start}')
  116. filtered_data = str_grps.apply(filter_window, (start)) # apply 的函数参数必须是 tuple
  117. # step 4
  118. str_avg = filtered_data.groupby(STR_HEADER).mean()
  119. if str_avg[CURRENT_HEADER].mean() < I0:
  120. return UNIT_INVALID_DF
  121. # step 5
  122. p_s = str_avg.apply(np.prod, axis=1)
  123. # step 6 ~ 9
  124. d, str_info = mark_unit(p_s, {})
  125. # step 10
  126. return pd.DataFrame({DEVIATION: [d], STR_STATUS: [json.dumps(str_info, separators=('|', ':'))]},
  127. index=pd.Index([END_TIME]))
  128. def main(inf, outf, train):
  129. spark = SparkSession.builder.master("local[*]").appName('pv-deviation').getOrCreate()
  130. dfSchema = StructType([
  131. StructField(STATION_ID, StringType(), True)
  132. ,StructField(DEVICE_ID, StringType(), True)
  133. ,StructField(TIME_HEADER, TimestampType(), True)
  134. ,StructField(V_HEADER, DoubleType(), True)
  135. ,StructField(CURRENT_HEADER, DoubleType(), True)
  136. ])
  137. df = spark.read.format("csv").option("header", "true").schema(dfSchema).load(inputFilePath)
  138. df2 = df.filter((hour(col(TIME_HEADER)) >= START_TIME) & (hour(col(TIME_HEADER)) <= END_TIME))
  139. df3 = df2.withColumn(GRP_ID, substring(col(DEVICE_ID), 0, 6)).withColumn(
  140. GRP_ID, concat(col(STATION_ID), col(GRP_ID)))
  141. df4 = df3.withColumn(STR_HEADER, substring(col(DEVICE_ID), 7, 2))
  142. unit_grps = df4.groupBy(GRP_ID)
  143. res = unit_grps.apply(unit_deviation)
  144. if isTrain == 'true':
  145. fh = open(outf, 'w', encoding='utf-8')
  146. fh.write("this is train's result")
  147. fh.close()
  148. else:
  149. # r2 = res.toPandas().reset_index(level=GRP_ID)
  150. r2 = res.toPandas().reset_index()
  151. r2.to_csv(outf, encoding='utf-8', index=True, quoting=csv.QUOTE_NONE)
  152. if __name__ == '__main__':
  153. main(inputFilePath, outputFilePath, isTrain)
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...