Skip to content

Commit

Permalink
Merge pull request #42 from TUW-GEO/develop
Browse files Browse the repository at this point in the history
Fix downloading data from multiple data streams (zip format)
  • Loading branch information
wpreimes authored Jan 17, 2025
2 parents 79d07f3 + ebda210 commit cfdb73c
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 16 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ Changelog

Unreleased changes in master branch
===================================
-
- Fixed a bug where CDS returned a zip file when downloading data from multiple
streams while the package expected a netcdf file.

Version 0.10.2
==============
Expand Down
18 changes: 14 additions & 4 deletions src/ecmwf_models/era5/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
check_api_ready,
get_first_last_image_date
)
from ecmwf_models.extract import save_ncs_from_nc, save_gribs_from_grib
from ecmwf_models.extract import (
save_ncs_from_nc,
save_gribs_from_grib,
unzip_nc,
)


def split_chunk(timestamps,
Expand Down Expand Up @@ -148,7 +152,7 @@ def download_era5(

request = {
"data_format": "grib" if grb else "netcdf",
"download_format": "unarchived",
"download_format": "zip",
"variable": variables,
"year": [str(y) for y in years],
"month": [str(m).zfill(2) for m in months],
Expand Down Expand Up @@ -364,7 +368,7 @@ def _download(curr_start, curr_end):
fname = "{start}_{end}.{ext}".format(
start=curr_start.strftime("%Y%m%d"),
end=curr_end.strftime("%Y%m%d"),
ext="grb" if grb else "nc")
ext="zip" if grb is False else "grb")

dl_file = os.path.join(downloaded_data_path, fname)

Expand Down Expand Up @@ -412,6 +416,11 @@ def _download(curr_start, curr_end):
keep_original=keep_original,
keep_prelim=keep_prelim)
else:
# Extract and merge nc files from zip
dl_file_new = dl_file.replace('.zip', '.nc')
unzip_nc(dl_file, dl_file_new)
dl_file = dl_file_new

save_ncs_from_nc(
dl_file,
target_path,
Expand Down Expand Up @@ -525,4 +534,5 @@ def download_record_extension(path, dry_run=False, cds_token=None):
enddate=enddate,
cds_token=cds_token,
dry_run=dry_run,
**props['download_settings'])
**props['download_settings']
)
6 changes: 3 additions & 3 deletions src/ecmwf_models/era5/era5_lut.csv
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ convective_inhibition,Convective inhibition,cin,0
convective_precipitation,Convective precipitation,cp,0
convective_rain_rate,Convective rain rate,crr,0
convective_snowfall,Convective snowfall,csf,0
convective_snowfall_rate_water_equivalent,Convective snowfall rate water equivalent,csfr,1
convective_snowfall_rate_water_equivalent,Convective snowfall rate water equivalent,csfr,0
downward_uv_radiation_at_the_surface,Downward UV radiation at the surface,uvb,0
duct_base_height,Duct base height,dctb,0
eastward_gravity_wave_surface_stress,Eastward gravity wave surface stress,lgws,0
Expand Down Expand Up @@ -63,7 +63,7 @@ large_scale_precipitation,Large-scale precipitation,lsp,0
large_scale_precipitation_fraction,Large-scale precipitation fraction,lspf,0
large_scale_rain_rate,Large scale rain rate,lsrr,0
large_scale_snowfall,Large-scale snowfall,lsf,0
large_scale_snowfall_rate_water_equivalent,Large scale snowfall rate water equivalent,lssfr,1
large_scale_snowfall_rate_water_equivalent,Large scale snowfall rate water equivalent,lssfr,0
leaf_area_index_high_vegetation,"Leaf area index, high vegetation",lai_hv,0
leaf_area_index_low_vegetation,"Leaf area index, low vegetation",lai_lv,0
low_cloud_cover,Low cloud cover,lcc,0
Expand Down Expand Up @@ -258,4 +258,4 @@ wave_spectral_directional_width_for_wind_waves,Wave spectral directional width f
wave_spectral_kurtosis,Wave spectral kurtosis,wsk,0
wave_spectral_peakedness,Wave spectral peakedness,wsp,0
wave_spectral_skewness,Wave Spectral Skewness,wss,0
zero_degree_level,Zero degree level,deg0l,0
zero_degree_level,Zero degree level,deg0l,0
53 changes: 51 additions & 2 deletions src/ecmwf_models/extract.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import tempfile
from datetime import datetime
import logging
import os
import pandas as pd
import xarray as xr
from datedown.fname_creator import create_dt_fpath
import zipfile
import shutil
import numpy as np

from ecmwf_models.globals import (IMG_FNAME_TEMPLATE,
IMG_FNAME_DATETIME_FORMAT, EXPVER, SUBDIRS)
Expand All @@ -17,6 +21,49 @@
)


def unzip_nc(
input_zip,
output_nc,
):
"""
Unzip and merge all netcdf files downloaded from CDS. If the zip file
contains only 1 netcdf file, it only be extracted.
Parameters
----------
input_zip: str
Path to the downloaded zip file containing one or more (datastream)
netcdf files.
output_nc: str
Path to the netcdf file to write
"""
with tempfile.TemporaryDirectory() as tmpdir:
with zipfile.ZipFile(input_zip, "r") as zip_ref:
zip_ref.extractall(tmpdir)
ncfiles = [os.path.join(tmpdir, f) for f in os.listdir(tmpdir)
if f.endswith(".nc")]
if len(ncfiles) == 1:
shutil.move(ncfiles[0], output_nc)
else:
# Sometimes CDS returns multiple netcdf files, merge them
ds = [xr.open_dataset(os.path.join(tmpdir, f)) for f in ncfiles]
expvers = []
for d in ds:
if 'expver' in d.coords:
expvers.append(d.coords['expver'].values.astype(int))
if len(expvers) > 0:
expvers = np.array(expvers).max(axis=0)
for d in ds:
d.coords['expver'] = np.array([f"{e:04}" for e in expvers])

ds = xr.combine_by_coords(ds, combine_attrs="override",
compat='override')
ds.to_netcdf(output_nc, encoding={
v: {'zlib': True, 'complevel': 6} for v in ds.data_vars})

os.remove(input_zip)


def save_ncs_from_nc(
input_nc,
output_path,
Expand Down Expand Up @@ -52,6 +99,8 @@ def save_ncs_from_nc(
ext='nc')

nc_in = xr.open_dataset(input_nc, mask_and_scale=True)
if 'valid_time' in nc_in.dims:
nc_in = nc_in.rename_dims({"valid_time": 'time'})
if 'valid_time' in nc_in.variables:
nc_in = nc_in.rename_vars({"valid_time": 'time'})

Expand All @@ -67,12 +116,12 @@ def save_ncs_from_nc(
for k, v in grid.items():
f.write(f"{k} = {v}\n")

for time in nc_in["time"].values:
for i, time in enumerate(nc_in["time"].values):
subset = nc_in.sel({"time": time})

# Expver identifies preliminary data
if 'expver' in subset:
expver = str(subset['expver'].values)
expver = str(np.atleast_1d(subset['expver'].values)[i])
subset = subset.drop_vars('expver')
try:
ext = EXPVER[expver]
Expand Down
28 changes: 22 additions & 6 deletions tests/tests_era5/test_era5_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import xarray as xr
import pytest
import tempfile
import zipfile

from c3s_sm.misc import read_summary_yml

Expand Down Expand Up @@ -60,7 +61,9 @@ def test_download_with_cdo_not_installed():
"ecmwf_models-test-data", "download",
"era5_example_downloaded_raw.nc")
save_ncs_from_nc(
infile, out_path, 'ERA5', grid=grid, keep_original=True)
infile, out_path, 'ERA5', grid=grid,
keep_original=True)


def test_dry_download_nc_era5():
with tempfile.TemporaryDirectory() as dl_path:
Expand All @@ -72,8 +75,14 @@ def test_dry_download_nc_era5():
os.path.dirname(os.path.abspath(__file__)), '..',
"ecmwf_models-test-data", "download",
"era5_example_downloaded_raw.nc")
trgt = os.path.join(dl_path, 'temp_downloaded', '20100101_20100101.nc')
shutil.copyfile(thefile, trgt)

assert os.path.exists(thefile)

trgt = os.path.join(dl_path, "temp_downloaded",
"20100101_20100101.zip")
with zipfile.ZipFile(trgt, 'w') as zip:
# Add the file to the ZIP archive
zip.write(thefile, arcname="20100101_20100101.nc")

assert os.path.isfile(trgt)

Expand Down Expand Up @@ -176,9 +185,16 @@ def test_download_nc_era5_regridding():
os.path.dirname(os.path.abspath(__file__)), '..',
"ecmwf_models-test-data", "download",
"era5_example_downloaded_raw.nc")
shutil.copyfile(
thefile,
os.path.join(dl_path, 'temp_downloaded', '20100101_20100101.nc'))

assert os.path.exists(thefile)

trgt = os.path.join(dl_path, "temp_downloaded",
"20100101_20100101.zip")
with zipfile.ZipFile(trgt, 'w') as zip:
# Add the file to the ZIP archive
zip.write(thefile, arcname="20100101_20100101.nc")

assert os.path.isfile(trgt)

startdate = enddate = datetime(2010, 1, 1)

Expand Down

0 comments on commit cfdb73c

Please sign in to comment.