Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

app.scala 1.5 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
  1. import org.apache.spark.sql.types.{StructType, StructField
  2. ,StringType, TimestampType
  3. ,DoubleType}
  4. import org.apache.spark.sql.functions.{concat, substring}
  5. val START_TIME = 7
  6. val END_TIME = 17
  7. val TF = 30
  8. val SAMPLE_INT = 5 // 采用频率,目前规定为5min
  9. val I0 = 1
  10. val IM = 12
  11. val STATION_ID = "STATION_ID"
  12. val DEVICE_ID = "DEVICE_ID"
  13. val TIME_HEADER = "MONITOR_TIME"
  14. val REPORT_TIME = "ALERT_TIME"
  15. val V_HEADER = "VOLTAGE_VALUE"
  16. val CURRENT_HEADER = "CURRENT_VALUE"
  17. val GRP_ID = "grp_id"
  18. val STR_HEADER = "str_no"
  19. val DEVIATION = "DEVIATION" // 输出结果中 离散率 列名
  20. val STR_STATUS = "STR_STATUS" // 输出结果中 组串状态 列名
  21. val COM_FAIL = -1
  22. val UNIT_INVALID = -2
  23. val STR_INVALID = -3
  24. val dfSchema = StructType(Array(
  25. StructField(STATION_ID, StringType, true)
  26. ,StructField(DEVICE_ID, StringType, true)
  27. ,StructField(TIME_HEADER, TimestampType, true)
  28. ,StructField(V_HEADER, DoubleType, true)
  29. ,StructField(CURRENT_HEADER, DoubleType, true)
  30. ))
  31. val df = spark.read.format("csv").option("header", "true").schema(dfSchema).load("comb1122.csv")
  32. val df2 = df.filter(hour($"MONITOR_TIME") >= START_TIME && hour($"MONITOR_TIME") <= END_TIME)
  33. val df3 = df2.withColumn(GRP_ID, substring(col(DEVICE_ID), 0, 6)).withColumn(
  34. GRP_ID, concat(col(STATION_ID), col(GRP_ID)))
  35. val df4 = df3.withColumn(STR_HEADER, substring(col(DEVICE_ID), 7, 2))
  36. val unit_grps = df4.groupBy(GRP_ID)
  37. val res = unit_grps.apply(unit_deviation)
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...