Register
Login
Resources
Docs Blog Datasets Glossary Case Studies Tutorials & Webinars
Product
Data Engine LLMs Platform Enterprise
Pricing Explore
Connect to our Discord channel

extraload.py 7.0 KB

You have to be logged in to leave a comment. Sign In
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
  1. """
  2. Extract, Tranform and Load (ETL) functions for handling ICESat-2 point clouds.
  3. Copies data seamlessly between different array structures and file formats!
  4. """
  5. import functools
  6. import dask
  7. import numpy as np
  8. import pandas as pd
  9. import tqdm
  10. import zarr
  11. def array_to_dataframe(
  12. array: dask.array.core.Array, colname: str = None, startcol: int = 0
  13. ):
  14. """
  15. Converts a 1D or 2D data array into a tidy dataframe structure.
  16. An array of shape (m, n) will turn into a table with m rows and n columns.
  17. These are the possible conversions:
  18. - numpy array -> pandas DataFrame
  19. - dask Array -> dask DataFrame
  20. Pass in a colname to set the column name. By default, it will automatically
  21. use the array.name attribute in dask Arrays, but this can be overriden.
  22. For 2D arrays, columns will be formatted as 'col_0', 'col_1', 'col_2' and
  23. so on. The startcol argument allows adjustment of the starting column
  24. number, helpful if you prefer starting from 1, e.g. 'col_1', 'col_2', etc.
  25. See also https://github.com/dask/dask/issues/5021
  26. """
  27. if not colname:
  28. colname = array.name if hasattr(array, "name") else ""
  29. if array.ndim == 1: # 1-dimensional arrays
  30. columns = colname
  31. elif array.ndim == 2: # 2-dimensional arrays
  32. colname += "_" if colname != "" else "" # add underscore to name
  33. columns = [f"{colname}{i+startcol}" for i in range(array.shape[1])]
  34. try:
  35. # Attempt dask Array to dask DataFrame conversion
  36. dataframe: dask.dataframe.core.DataFrame = array.to_dask_dataframe(
  37. columns=columns
  38. )
  39. except AttributeError:
  40. # Fallback to converting to pandas.DataFrame
  41. dataframe: pd.DataFrame = pd.DataFrame.from_records(data=array, columns=columns)
  42. return dataframe
  43. def ndarray_to_parquet(
  44. ndarray,
  45. parquetpath: str,
  46. variables: list = None,
  47. dropnacols: list = None,
  48. startcol: int = 1,
  49. engine: str = "pyarrow",
  50. **kwargs,
  51. ) -> pd.DataFrame:
  52. """
  53. Converts an n-dimensional xarray Dataset or Zarr Array into an Apache
  54. Parquet columnar file via an intermediate Dask/Pandas DataFrame format.
  55. This is a convenience function that wraps around array_to_dataframe,
  56. intended to make converting n number of arrays easier.
  57. Parameters
  58. ----------
  59. ndarray : xarray.Dataset or zarr.hierarchy.Group
  60. An n-dimensional array in xarray containing several coordinate and data
  61. variables, or a Zarr array containing several variables.
  62. parquetpath : str
  63. Filepath to where the resulting parquet file will be stored.
  64. variables : list
  65. Name(s) of the variables/columns that will be stored to the parquet
  66. file. If not provided, all the variables in the zarr group will be
  67. stored.
  68. dropnacols : list
  69. Drop rows containing NaN values in these fields before saving to the
  70. Parquet file.
  71. startcol : int
  72. Adjust the starting column number, helpful if you prefer starting from
  73. another number like 3, e.g. 'col_3', 'col_4', etc. Default is 1.
  74. engine : str
  75. Parquet library to use. Choose from 'auto', 'fastparquet', 'pyarrow'.
  76. Default is "pyarrow".
  77. **kwargs : dict
  78. Extra options to be passed on to pandas.DataFrame.to_parquet.
  79. Returns
  80. -------
  81. point_labels : cudf.Series
  82. A column of labels that indicates which polygon the points fall into.
  83. """
  84. if variables is None:
  85. try:
  86. variables = [varname for varname, _ in ndarray.arrays()]
  87. except AttributeError:
  88. variables = [c for c in ndarray.coords] + [d for d in ndarray.data_vars]
  89. if isinstance(ndarray, zarr.hierarchy.Group):
  90. array_func = lambda varname: dask.array.from_zarr(ndarray[varname])
  91. else:
  92. array_func = lambda varname: ndarray[varname].data
  93. dataframes: list = [
  94. array_to_dataframe(
  95. array=array_func(varname), colname=varname, startcol=startcol
  96. )
  97. for varname in variables
  98. ]
  99. dataframe: dask.dataframe.core.DataFrame = dask.dataframe.concat(
  100. dfs=dataframes, axis="columns"
  101. )
  102. if dropnacols:
  103. dataframe = dataframe.dropna(subset=dropnacols)
  104. # Convert to pandas DataFrame first before saving to a single binary
  105. # parquet file, rather than going directly from a Dask DataFrame to a
  106. # series of parquet files in a parquet folder. This ensures that cudf can
  107. # read it later, see https://github.com/rapidsai/cudf/issues/1688
  108. df: pd.DataFrame = dataframe.compute()
  109. df.to_parquet(path=parquetpath, engine=engine, **kwargs)
  110. return df
  111. def split_tracks(df: pd.DataFrame, by: str = "referencegroundtrack") -> dict:
  112. """
  113. Splits a point cloud of ICESat-2 laser tracks into separate lasers.
  114. Specifically, this function takes a big pandas.DataFrame and separates it
  115. into a many smaller ones based on a group 'by' method. Note that this is a
  116. hardcoded convenience function that probably has little use elsewhere!
  117. Parameters
  118. ----------
  119. df : cudf.DataFrame or pandas.DataFrame
  120. A table of X, Y, Z points to be split into separate tracks.
  121. by : str
  122. The name of the column to use
  123. The maximum distance between 2 points such they reside in the same
  124. neighborhood. Default is 3000 (metres).
  125. Returns
  126. -------
  127. track_dict : dict
  128. A Python dictionary with key: rgtpairname (e.g. "1234_pt2"), and
  129. value: rgtpairdataframe (a pandas.DataFrame just for that rgtpairname)
  130. """
  131. track_dict: dict = {} # key=rgtpairname, value=rgtpairdataframe
  132. rgt_groups = df.groupby(by=by)
  133. for rgt, df_rgt_wide in tqdm.tqdm(rgt_groups, total=len(rgt_groups.groups.keys())):
  134. df_rgt: pd.DataFrame = wide_to_long(
  135. df=df_rgt_wide, stubnames=["h_corr", "utc_time"], j="cycle_number"
  136. )
  137. # Split one referencegroundtrack into 3 laser pair tracks pt1, pt2, pt3
  138. df_rgt["pairtrack"]: pd.Series = pd.cut(
  139. x=df_rgt.y_atc,
  140. bins=[-np.inf, -100, 100, np.inf],
  141. labels=("pt1", "pt2", "pt3"),
  142. )
  143. pt_groups = df_rgt.groupby(by="pairtrack")
  144. for pairtrack, df_ in pt_groups:
  145. if len(df_) > 0:
  146. rgtpair = f"{rgt:04d}_{pairtrack}"
  147. track_dict[rgtpair] = df_
  148. return track_dict
  149. @functools.wraps(wrapped=pd.wide_to_long)
  150. def wide_to_long(
  151. df: pd.DataFrame,
  152. stubnames: list,
  153. i: str = "id",
  154. j: str = None,
  155. sep: str = "_",
  156. suffix: str = "\\d+",
  157. ) -> pd.DataFrame:
  158. """
  159. A wrapper around pandas.wide_to_long that wraps around pandas.melt!
  160. Handles setting an index (Default to "id") and resetting the second level
  161. index (the 'j' variable), while dropping NaN values too!
  162. Documentation for input arguments are the same as pd.wide_to_long. This
  163. convenience function just uses different default arguments for 'i' and
  164. 'sep'.
  165. """
  166. df[i] = df.index
  167. df_long = pd.wide_to_long(
  168. df=df, stubnames=stubnames, i=i, j=j, sep=sep, suffix=suffix
  169. )
  170. df_long = df_long.reset_index(level=j)
  171. return df_long.dropna()
Tip!

Press p or to see the previous file or, n or to see the next file

Comments

Loading...