Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

sparkjob.py 1.7 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.types import StructType, StructField
  3. from pyspark.sql.functions import pandas_udf, concat, col, PandasUDFType
  4. from pyspark.sql.types import IntegerType, StringType, DoubleType
  5. import logging
  6. from coef_matrix import classify
  7. spark = SparkSession.builder.master("local[*]").appName('demo').getOrCreate()
  8. df = spark.read.csv('/user/cloudera-dev/hbgfalgodata/2018112*/*/part-*')
  9. rm_left_paren = lambda x: x[1:] if x.startswith('[') else x
  10. format_time = pandas_udf(lambda x: x.map(rm_left_paren), StringType())
  11. rm_right_paren = lambda x: x[:-1] if x.endswith(']') else x
  12. format_val = pandas_udf(lambda x: x.map(rm_right_paren), StringType())
  13. get_day = lambda x: x.split(' ')[0]
  14. get_day_udf = pandas_udf(lambda x: x.map(get_day), StringType())
  15. df1 = df.withColumn('time', format_time(df._c0))
  16. df2 = df1.withColumn('grpid', concat(col('_c1'), col('_c2'), col('_c3')))
  17. df3 = df2.withColumn('current', format_val(df1._c6))
  18. df4 = df3.withColumn('day', get_day_udf(df1.time))
  19. inp = df4.select('grpid', 'day', 'time', col('_c4').alias('strno'), 'current')
  20. # 若与单进程版一致,需要将 'grpid' 和 'day' 两列合并为 'cid' 列
  21. schema = StructType([StructField("cid", StringType(), True),
  22. StructField("strno", StringType(), True),
  23. StructField("coef", DoubleType(), True)])
  24. @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
  25. def cluster_coef(pdf):
  26. pdf1 = pdf.set_index(['grpid', 'day'])
  27. grps = pdf1.groupby(pdf1.index)
  28. pdf2 = grps.apply(classify)
  29. pdf3 = pdf2.reset_index()
  30. pdf3['level_0'] = pdf3['level_0'].astype(str)
  31. return pdf3
  32. coefs = inp.groupBy(['grpid', 'day']).apply(cluster_coef)
  33. res = coefs.toPandas()
  34. res.to_csv('res_pyspark.csv', index=False)
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...