1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
- #!/usr/bin/env python
- import os
- import json
- import click
- import numpy as np
- import xarray as xr
- from glob import glob
- from datetime import datetime
- from collections import defaultdict
- from dask.distributed import Client
- def get_date(path, file_exprs):
- base = os.path.basename(path)
- dt = None
- for ex in file_exprs:
- try:
- dt = datetime.strptime(base, ex)
- except ValueError:
- continue
- if dt is None:
- raise ValueError(f'Could not parse datetime from {base}')
- return dt
- def filter_files(filelist, file_exprs, tstart, tend):
- tstart = datetime.strptime(tstart, '%Y-%m-%d')
- tend = datetime.strptime(tend, '%Y-%m-%d')
- return [
- file for file in filelist
- if tstart <= get_date(file, file_exprs) <= tend
- ]
- def load_variables(basedir, suffix, bounds, variables, file_exprs, use_collections):
- if len(variables) == 0:
- return []
- if use_collections:
- if not all('collection' in v for v in variables):
- raise ValueError('All variables must specify collection')
- subdirs = set([
- os.path.join(basedir, v['collection'])
- for v in variables
- ])
- else:
- subdirs = set([basedir])
- datasets = []
- for subdir in subdirs:
- filelist = glob(os.path.join(subdir, f'*.{suffix}'))
- filelist = filter_files(filelist, file_exprs, *bounds['time'])
- print(f'Loading {len(filelist)} from {subdir}...')
- ds = xr.open_mfdataset(filelist, join='override', parallel=True)
- print('...done')
- ds = ds.sel(**{
- k: slice(*v) for k, v in bounds.items()
- }).transpose('time', 'lat', 'lon')
- datasets.append(ds)
- ds = xr.merge(datasets, join='inner')
- data = [
- sum(
- vi['weight'] * ds[vi['name']]
- for vi in v['variables']
- ).rename(v['name']).assign_attrs(**v['attributes'])
- for v in variables
- ]
- return data
- @click.command()
- @click.argument('configfile')
- @click.argument('geosfpdir')
- @click.argument('epadir')
- @click.argument('outputfile')
- def main(configfile, geosfpdir, epadir, outputfile):
- client = Client()
- with open(configfile, 'r') as f:
- config = json.load(f)
- bounds = config['bounds']
- epavars = config['epa_variables']
- geosvars = config['geos_fp_variables']
- file_exprs = config['file_exprs']
- all_variables = sum([
- load_variables(geosfpdir, 'nc4', bounds, geosvars, file_exprs, True),
- load_variables(epadir, 'nc', bounds, epavars, file_exprs, False)
- ], [])
- merged = xr.merge(all_variables, combine_attrs='drop_conflicts')
- merged = merged.chunk(
- {
- 'time': 8,
- 'lat': len(merged.lat),
- 'lon': len(merged.lon),
- }
- )
- print(merged)
- write_job = merged.to_zarr(
- outputfile, mode='w', compute=False, consolidated=True
- )
- print(f'Writing data, view progress: {client.dashboard_link}')
- write_job.compute()
- print('Done.')
- if __name__ == '__main__':
- main()
|