Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix downloading data from multiple data streams (zip format) #42

Merged
merged 4 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ Changelog

Unreleased changes in master branch
===================================
-
- Fixed a bug where CDS returned a zip file when downloading data from multiple
streams while the package expected a netcdf file.

Version 0.10.2
==============
Expand Down
18 changes: 14 additions & 4 deletions src/ecmwf_models/era5/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
check_api_ready,
get_first_last_image_date
)
from ecmwf_models.extract import save_ncs_from_nc, save_gribs_from_grib
from ecmwf_models.extract import (
save_ncs_from_nc,
save_gribs_from_grib,
unzip_nc,
)


def split_chunk(timestamps,
Expand Down Expand Up @@ -148,7 +152,7 @@ def download_era5(

request = {
"data_format": "grib" if grb else "netcdf",
"download_format": "unarchived",
"download_format": "zip",
"variable": variables,
"year": [str(y) for y in years],
"month": [str(m).zfill(2) for m in months],
Expand Down Expand Up @@ -364,7 +368,7 @@ def _download(curr_start, curr_end):
fname = "{start}_{end}.{ext}".format(
start=curr_start.strftime("%Y%m%d"),
end=curr_end.strftime("%Y%m%d"),
ext="grb" if grb else "nc")
ext="zip" if grb is False else "grb")

dl_file = os.path.join(downloaded_data_path, fname)

Expand Down Expand Up @@ -412,6 +416,11 @@ def _download(curr_start, curr_end):
keep_original=keep_original,
keep_prelim=keep_prelim)
else:
# Extract and merge nc files from zip
dl_file_new = dl_file.replace('.zip', '.nc')
unzip_nc(dl_file, dl_file_new)
dl_file = dl_file_new

save_ncs_from_nc(
dl_file,
target_path,
Expand Down Expand Up @@ -525,4 +534,5 @@ def download_record_extension(path, dry_run=False, cds_token=None):
enddate=enddate,
cds_token=cds_token,
dry_run=dry_run,
**props['download_settings'])
**props['download_settings']
)
6 changes: 3 additions & 3 deletions src/ecmwf_models/era5/era5_lut.csv
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ convective_inhibition,Convective inhibition,cin,0
convective_precipitation,Convective precipitation,cp,0
convective_rain_rate,Convective rain rate,crr,0
convective_snowfall,Convective snowfall,csf,0
convective_snowfall_rate_water_equivalent,Convective snowfall rate water equivalent,csfr,1
convective_snowfall_rate_water_equivalent,Convective snowfall rate water equivalent,csfr,0
downward_uv_radiation_at_the_surface,Downward UV radiation at the surface,uvb,0
duct_base_height,Duct base height,dctb,0
eastward_gravity_wave_surface_stress,Eastward gravity wave surface stress,lgws,0
Expand Down Expand Up @@ -63,7 +63,7 @@ large_scale_precipitation,Large-scale precipitation,lsp,0
large_scale_precipitation_fraction,Large-scale precipitation fraction,lspf,0
large_scale_rain_rate,Large scale rain rate,lsrr,0
large_scale_snowfall,Large-scale snowfall,lsf,0
large_scale_snowfall_rate_water_equivalent,Large scale snowfall rate water equivalent,lssfr,1
large_scale_snowfall_rate_water_equivalent,Large scale snowfall rate water equivalent,lssfr,0
leaf_area_index_high_vegetation,"Leaf area index, high vegetation",lai_hv,0
leaf_area_index_low_vegetation,"Leaf area index, low vegetation",lai_lv,0
low_cloud_cover,Low cloud cover,lcc,0
Expand Down Expand Up @@ -258,4 +258,4 @@ wave_spectral_directional_width_for_wind_waves,Wave spectral directional width f
wave_spectral_kurtosis,Wave spectral kurtosis,wsk,0
wave_spectral_peakedness,Wave spectral peakedness,wsp,0
wave_spectral_skewness,Wave Spectral Skewness,wss,0
zero_degree_level,Zero degree level,deg0l,0
zero_degree_level,Zero degree level,deg0l,0
53 changes: 51 additions & 2 deletions src/ecmwf_models/extract.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import tempfile
from datetime import datetime
import logging
import os
import pandas as pd
import xarray as xr
from datedown.fname_creator import create_dt_fpath
import zipfile
import shutil
import numpy as np

from ecmwf_models.globals import (IMG_FNAME_TEMPLATE,
IMG_FNAME_DATETIME_FORMAT, EXPVER, SUBDIRS)
Expand All @@ -17,6 +21,49 @@
)


def unzip_nc(
input_zip,
output_nc,
):
"""
Unzip and merge all netcdf files downloaded from CDS. If the zip file
contains only 1 netcdf file, it only be extracted.

Parameters
----------
input_zip: str
Path to the downloaded zip file containing one or more (datastream)
netcdf files.
output_nc: str
Path to the netcdf file to write
"""
with tempfile.TemporaryDirectory() as tmpdir:
with zipfile.ZipFile(input_zip, "r") as zip_ref:
zip_ref.extractall(tmpdir)
ncfiles = [os.path.join(tmpdir, f) for f in os.listdir(tmpdir)
if f.endswith(".nc")]
if len(ncfiles) == 1:
shutil.move(ncfiles[0], output_nc)
else:
# Sometimes CDS returns multiple netcdf files, merge them
ds = [xr.open_dataset(os.path.join(tmpdir, f)) for f in ncfiles]
expvers = []
for d in ds:
if 'expver' in d.coords:
expvers.append(d.coords['expver'].values.astype(int))
if len(expvers) > 0:
expvers = np.array(expvers).max(axis=0)
for d in ds:
d.coords['expver'] = np.array([f"{e:04}" for e in expvers])

ds = xr.combine_by_coords(ds, combine_attrs="override",
compat='override')
ds.to_netcdf(output_nc, encoding={
v: {'zlib': True, 'complevel': 6} for v in ds.data_vars})

os.remove(input_zip)


def save_ncs_from_nc(
input_nc,
output_path,
Expand Down Expand Up @@ -52,6 +99,8 @@ def save_ncs_from_nc(
ext='nc')

nc_in = xr.open_dataset(input_nc, mask_and_scale=True)
if 'valid_time' in nc_in.dims:
nc_in = nc_in.rename_dims({"valid_time": 'time'})
if 'valid_time' in nc_in.variables:
nc_in = nc_in.rename_vars({"valid_time": 'time'})

Expand All @@ -67,12 +116,12 @@ def save_ncs_from_nc(
for k, v in grid.items():
f.write(f"{k} = {v}\n")

for time in nc_in["time"].values:
for i, time in enumerate(nc_in["time"].values):
subset = nc_in.sel({"time": time})

# Expver identifies preliminary data
if 'expver' in subset:
expver = str(subset['expver'].values)
expver = str(np.atleast_1d(subset['expver'].values)[i])
subset = subset.drop_vars('expver')
try:
ext = EXPVER[expver]
Expand Down
28 changes: 22 additions & 6 deletions tests/tests_era5/test_era5_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import xarray as xr
import pytest
import tempfile
import zipfile

from c3s_sm.misc import read_summary_yml

Expand Down Expand Up @@ -60,7 +61,9 @@ def test_download_with_cdo_not_installed():
"ecmwf_models-test-data", "download",
"era5_example_downloaded_raw.nc")
save_ncs_from_nc(
infile, out_path, 'ERA5', grid=grid, keep_original=True)
infile, out_path, 'ERA5', grid=grid,
keep_original=True)


def test_dry_download_nc_era5():
with tempfile.TemporaryDirectory() as dl_path:
Expand All @@ -72,8 +75,14 @@ def test_dry_download_nc_era5():
os.path.dirname(os.path.abspath(__file__)), '..',
"ecmwf_models-test-data", "download",
"era5_example_downloaded_raw.nc")
trgt = os.path.join(dl_path, 'temp_downloaded', '20100101_20100101.nc')
shutil.copyfile(thefile, trgt)

assert os.path.exists(thefile)

trgt = os.path.join(dl_path, "temp_downloaded",
"20100101_20100101.zip")
with zipfile.ZipFile(trgt, 'w') as zip:
# Add the file to the ZIP archive
zip.write(thefile, arcname="20100101_20100101.nc")

assert os.path.isfile(trgt)

Expand Down Expand Up @@ -176,9 +185,16 @@ def test_download_nc_era5_regridding():
os.path.dirname(os.path.abspath(__file__)), '..',
"ecmwf_models-test-data", "download",
"era5_example_downloaded_raw.nc")
shutil.copyfile(
thefile,
os.path.join(dl_path, 'temp_downloaded', '20100101_20100101.nc'))

assert os.path.exists(thefile)

trgt = os.path.join(dl_path, "temp_downloaded",
"20100101_20100101.zip")
with zipfile.ZipFile(trgt, 'w') as zip:
# Add the file to the ZIP archive
zip.write(thefile, arcname="20100101_20100101.nc")

assert os.path.isfile(trgt)

startdate = enddate = datetime(2010, 1, 1)

Expand Down
Loading