1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
- from pyspark.sql import SparkSession
- from pyspark.sql.types import StructType, StructField
- from pyspark.sql.functions import pandas_udf, concat, col, PandasUDFType
- from pyspark.sql.types import IntegerType, StringType, DoubleType
- import logging
- from coef_matrix import classify
- spark = SparkSession.builder.master("local[*]").appName('demo').getOrCreate()
- df = spark.read.csv('/user/cloudera-dev/hbgfalgodata/2018112*/*/part-*')
- rm_left_paren = lambda x: x[1:] if x.startswith('[') else x
- format_time = pandas_udf(lambda x: x.map(rm_left_paren), StringType())
- rm_right_paren = lambda x: x[:-1] if x.endswith(']') else x
- format_val = pandas_udf(lambda x: x.map(rm_right_paren), StringType())
- get_day = lambda x: x.split(' ')[0]
- get_day_udf = pandas_udf(lambda x: x.map(get_day), StringType())
- df1 = df.withColumn('time', format_time(df._c0))
- df2 = df1.withColumn('grpid', concat(col('_c1'), col('_c2'), col('_c3')))
- df3 = df2.withColumn('current', format_val(df1._c6))
- df4 = df3.withColumn('day', get_day_udf(df1.time))
- inp = df4.select('grpid', 'day', 'time', col('_c4').alias('strno'), 'current')
- # 若与单进程版一致,需要将 'grpid' 和 'day' 两列合并为 'cid' 列
- schema = StructType([StructField("cid", StringType(), True),
- StructField("strno", StringType(), True),
- StructField("coef", DoubleType(), True)])
- @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
- def cluster_coef(pdf):
- pdf1 = pdf.set_index(['grpid', 'day'])
- grps = pdf1.groupby(pdf1.index)
- pdf2 = grps.apply(classify)
- pdf3 = pdf2.reset_index()
- pdf3['level_0'] = pdf3['level_0'].astype(str)
- return pdf3
- coefs = inp.groupBy(['grpid', 'day']).apply(cluster_coef)
- res = coefs.toPandas()
- res.to_csv('res_pyspark.csv', index=False)
|