From a85d121427f1e42e51148c7414274c32528a57b0 Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Fri, 17 Jan 2025 13:37:52 +0100 Subject: [PATCH 1/3] Fix downloading files when data from multiple datta streams should be combined --- src/ecmwf_models/era5/download.py | 18 ++++++++--- src/ecmwf_models/era5/era5_lut.csv | 6 ++-- src/ecmwf_models/extract.py | 50 ++++++++++++++++++++++++++++-- 3 files changed, 65 insertions(+), 9 deletions(-) diff --git a/src/ecmwf_models/era5/download.py b/src/ecmwf_models/era5/download.py index ac2e0e8..4a73ba9 100644 --- a/src/ecmwf_models/era5/download.py +++ b/src/ecmwf_models/era5/download.py @@ -24,7 +24,11 @@ check_api_ready, get_first_last_image_date ) -from ecmwf_models.extract import save_ncs_from_nc, save_gribs_from_grib +from ecmwf_models.extract import ( + save_ncs_from_nc, + save_gribs_from_grib, + unzip_nc, +) def split_chunk(timestamps, @@ -148,7 +152,7 @@ def download_era5( request = { "data_format": "grib" if grb else "netcdf", - "download_format": "unarchived", + "download_format": "zip", "variable": variables, "year": [str(y) for y in years], "month": [str(m).zfill(2) for m in months], @@ -364,7 +368,7 @@ def _download(curr_start, curr_end): fname = "{start}_{end}.{ext}".format( start=curr_start.strftime("%Y%m%d"), end=curr_end.strftime("%Y%m%d"), - ext="grb" if grb else "nc") + ext="zip") dl_file = os.path.join(downloaded_data_path, fname) @@ -412,6 +416,11 @@ def _download(curr_start, curr_end): keep_original=keep_original, keep_prelim=keep_prelim) else: + # Extract and merge nc files from zip + dl_file_new = dl_file.replace('.zip', '.nc') + unzip_nc(dl_file, dl_file_new) + dl_file = dl_file_new + save_ncs_from_nc( dl_file, target_path, @@ -525,4 +534,5 @@ def download_record_extension(path, dry_run=False, cds_token=None): enddate=enddate, cds_token=cds_token, dry_run=dry_run, - **props['download_settings']) + **props['download_settings'] + ) diff --git a/src/ecmwf_models/era5/era5_lut.csv b/src/ecmwf_models/era5/era5_lut.csv index 5038aec..2c5ab43 100644 --- a/src/ecmwf_models/era5/era5_lut.csv +++ b/src/ecmwf_models/era5/era5_lut.csv @@ -23,7 +23,7 @@ convective_inhibition,Convective inhibition,cin,0 convective_precipitation,Convective precipitation,cp,0 convective_rain_rate,Convective rain rate,crr,0 convective_snowfall,Convective snowfall,csf,0 -convective_snowfall_rate_water_equivalent,Convective snowfall rate water equivalent,csfr,1 +convective_snowfall_rate_water_equivalent,Convective snowfall rate water equivalent,csfr,0 downward_uv_radiation_at_the_surface,Downward UV radiation at the surface,uvb,0 duct_base_height,Duct base height,dctb,0 eastward_gravity_wave_surface_stress,Eastward gravity wave surface stress,lgws,0 @@ -63,7 +63,7 @@ large_scale_precipitation,Large-scale precipitation,lsp,0 large_scale_precipitation_fraction,Large-scale precipitation fraction,lspf,0 large_scale_rain_rate,Large scale rain rate,lsrr,0 large_scale_snowfall,Large-scale snowfall,lsf,0 -large_scale_snowfall_rate_water_equivalent,Large scale snowfall rate water equivalent,lssfr,1 +large_scale_snowfall_rate_water_equivalent,Large scale snowfall rate water equivalent,lssfr,0 leaf_area_index_high_vegetation,"Leaf area index, high vegetation",lai_hv,0 leaf_area_index_low_vegetation,"Leaf area index, low vegetation",lai_lv,0 low_cloud_cover,Low cloud cover,lcc,0 @@ -258,4 +258,4 @@ wave_spectral_directional_width_for_wind_waves,Wave spectral directional width f wave_spectral_kurtosis,Wave spectral kurtosis,wsk,0 wave_spectral_peakedness,Wave spectral peakedness,wsp,0 wave_spectral_skewness,Wave Spectral Skewness,wss,0 -zero_degree_level,Zero degree level,deg0l,0 \ No newline at end of file +zero_degree_level,Zero degree level,deg0l,0 diff --git a/src/ecmwf_models/extract.py b/src/ecmwf_models/extract.py index e5648f6..8a54c18 100644 --- a/src/ecmwf_models/extract.py +++ b/src/ecmwf_models/extract.py @@ -1,9 +1,13 @@ +import tempfile from datetime import datetime import logging import os import pandas as pd import xarray as xr from datedown.fname_creator import create_dt_fpath +import zipfile +import shutil +import numpy as np from ecmwf_models.globals import (IMG_FNAME_TEMPLATE, IMG_FNAME_DATETIME_FORMAT, EXPVER, SUBDIRS) @@ -17,6 +21,48 @@ ) +def unzip_nc( + input_zip, + output_nc, +): + """ + Unzip and merge all netcdf files downloaded from CDS. If the zip file + contains only 1 netcdf file, it only be extracted. + + Parameters + ---------- + input_zip: str + Path to the downloaded zip file containing one or more (datastream) + netcdf files. + output_nc: str + Path to the netcdf file to write + """ + with tempfile.TemporaryDirectory() as tmpdir: + with zipfile.ZipFile(input_zip, "r") as zip_ref: + zip_ref.extractall(tmpdir) + ncfiles = [f for f in os.listdir(tmpdir) if f.endswith(".nc")] + if len(ncfiles) == 1: + shutil.move(ncfiles[0], output_nc) + else: + # Sometimes CDS returns multiple netcdf files, merge them + ds = [xr.open_dataset(os.path.join(tmpdir, f)) for f in ncfiles] + expvers = [] + for d in ds: + if 'expver' in d.coords: + expvers.append(d.coords['expver'].values.astype(int)) + if len(expvers) > 0: + expvers = np.array(expvers).max(axis=0) + for d in ds: + d.coords['expver'] = np.array([f"{e:04}" for e in expvers]) + + ds = xr.combine_by_coords(ds, combine_attrs="override", + compat='override') + ds.to_netcdf(output_nc, encoding={ + v: {'zlib': True, 'complevel': 6} for v in ds.data_vars}) + + os.remove(input_zip) + + def save_ncs_from_nc( input_nc, output_path, @@ -67,12 +113,12 @@ def save_ncs_from_nc( for k, v in grid.items(): f.write(f"{k} = {v}\n") - for time in nc_in["time"].values: + for i, time in enumerate(nc_in["time"].values): subset = nc_in.sel({"time": time}) # Expver identifies preliminary data if 'expver' in subset: - expver = str(subset['expver'].values) + expver = str(subset['expver'].values[i]) subset = subset.drop_vars('expver') try: ext = EXPVER[expver] From 7b7a184bf57a838bced0e10f7e4a80ce879ee7d4 Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Fri, 17 Jan 2025 14:18:15 +0100 Subject: [PATCH 2/3] Fix tests --- src/ecmwf_models/era5/download.py | 2 +- src/ecmwf_models/extract.py | 7 +++++-- tests/tests_era5/test_era5_download.py | 28 ++++++++++++++++++++------ 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/src/ecmwf_models/era5/download.py b/src/ecmwf_models/era5/download.py index 4a73ba9..f6d56eb 100644 --- a/src/ecmwf_models/era5/download.py +++ b/src/ecmwf_models/era5/download.py @@ -368,7 +368,7 @@ def _download(curr_start, curr_end): fname = "{start}_{end}.{ext}".format( start=curr_start.strftime("%Y%m%d"), end=curr_end.strftime("%Y%m%d"), - ext="zip") + ext="zip" if grb is False else "grb") dl_file = os.path.join(downloaded_data_path, fname) diff --git a/src/ecmwf_models/extract.py b/src/ecmwf_models/extract.py index 8a54c18..ae9fadb 100644 --- a/src/ecmwf_models/extract.py +++ b/src/ecmwf_models/extract.py @@ -40,7 +40,8 @@ def unzip_nc( with tempfile.TemporaryDirectory() as tmpdir: with zipfile.ZipFile(input_zip, "r") as zip_ref: zip_ref.extractall(tmpdir) - ncfiles = [f for f in os.listdir(tmpdir) if f.endswith(".nc")] + ncfiles = [os.path.join(tmpdir, f) for f in os.listdir(tmpdir) + if f.endswith(".nc")] if len(ncfiles) == 1: shutil.move(ncfiles[0], output_nc) else: @@ -98,6 +99,8 @@ def save_ncs_from_nc( ext='nc') nc_in = xr.open_dataset(input_nc, mask_and_scale=True) + if 'valid_time' in nc_in.dims: + nc_in = nc_in.rename_dims({"valid_time": 'time'}) if 'valid_time' in nc_in.variables: nc_in = nc_in.rename_vars({"valid_time": 'time'}) @@ -118,7 +121,7 @@ def save_ncs_from_nc( # Expver identifies preliminary data if 'expver' in subset: - expver = str(subset['expver'].values[i]) + expver = str(np.atleast_1d(subset['expver'].values)[i]) subset = subset.drop_vars('expver') try: ext = EXPVER[expver] diff --git a/tests/tests_era5/test_era5_download.py b/tests/tests_era5/test_era5_download.py index 9f1d309..28ffb38 100644 --- a/tests/tests_era5/test_era5_download.py +++ b/tests/tests_era5/test_era5_download.py @@ -30,6 +30,7 @@ import xarray as xr import pytest import tempfile +import zipfile from c3s_sm.misc import read_summary_yml @@ -60,7 +61,9 @@ def test_download_with_cdo_not_installed(): "ecmwf_models-test-data", "download", "era5_example_downloaded_raw.nc") save_ncs_from_nc( - infile, out_path, 'ERA5', grid=grid, keep_original=True) + infile, out_path, 'ERA5', grid=grid, + keep_original=True) + def test_dry_download_nc_era5(): with tempfile.TemporaryDirectory() as dl_path: @@ -72,8 +75,14 @@ def test_dry_download_nc_era5(): os.path.dirname(os.path.abspath(__file__)), '..', "ecmwf_models-test-data", "download", "era5_example_downloaded_raw.nc") - trgt = os.path.join(dl_path, 'temp_downloaded', '20100101_20100101.nc') - shutil.copyfile(thefile, trgt) + + assert os.path.exists(thefile) + + trgt = os.path.join(dl_path, "temp_downloaded", + "20100101_20100101.zip") + with zipfile.ZipFile(trgt, 'w') as zip: + # Add the file to the ZIP archive + zip.write(thefile, arcname="20100101_20100101.nc") assert os.path.isfile(trgt) @@ -176,9 +185,16 @@ def test_download_nc_era5_regridding(): os.path.dirname(os.path.abspath(__file__)), '..', "ecmwf_models-test-data", "download", "era5_example_downloaded_raw.nc") - shutil.copyfile( - thefile, - os.path.join(dl_path, 'temp_downloaded', '20100101_20100101.nc')) + + assert os.path.exists(thefile) + + trgt = os.path.join(dl_path, "temp_downloaded", + "20100101_20100101.zip") + with zipfile.ZipFile(trgt, 'w') as zip: + # Add the file to the ZIP archive + zip.write(thefile, arcname="20100101_20100101.nc") + + assert os.path.isfile(trgt) startdate = enddate = datetime(2010, 1, 1) From ebda210960ccdbe8767dff32bcfef60130a29c7f Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Fri, 17 Jan 2025 14:19:39 +0100 Subject: [PATCH 3/3] Fix tests --- CHANGELOG.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index ba3c98c..238bc32 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,7 +4,8 @@ Changelog Unreleased changes in master branch =================================== -- +- Fixed a bug where CDS returned a zip file when downloading data from multiple + streams while the package expected a netcdf file. Version 0.10.2 ==============