请选择 进入手机版 | 继续访问电脑版
爱气象,爱气象家园! 

气象家园

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博登陆

只需一步, 快速开始

搜索
查看: 274|回复: 7

[其他] 使用python并行下载ERA5数据的方法

[复制链接]

新浪微博达人勋

发表于 2020-2-14 15:57:56 | 显示全部楼层 |阅读模式

登录后查看更多精彩内容~

您需要 登录 才可以下载或查看,没有帐号?立即注册 新浪微博登陆

x
本帖最后由 astiny 于 2020-2-17 18:54 编辑

ERA5数据是我们常用的再分析资料之一。它的时空分辨率都比较高。


一般我们下载ERA5数据,可以直接在官网上,手动选择时间、变量等信息,然后点击链接下载;也可以使用ECMWF的Python cds API下载。(如http://bbs.06climate.com/forum.php?mod=viewthread&tid=91210


multiprocessing包是Python中的多进程管理包。使用multiprocessing包进行并行下载,可以有效提高下载的速度。比如Gavin老师写的脚本(见http://climate2weather.cc/2019/05/10/era5-download/


但该脚本也有一些小问题:cds的API仅支持最多12个任务(2020.2.17修改,已咨询ec并确认)同时提交。假如你同时提交多个下载进程,且其中后提交的进程下载速度比先提交的进程快,那么该进程结束,新的进程提交上去,会顶掉最早提交的那个进程,造成该进程无法顺利下载完毕。相信用过Gavin老师那个脚本的同学们,有些已经遇到这个问题了。那么,我们应该如何解决呢?


这个问题主要还是ERA5关于下载的介绍不完整所造成的。在官网的下载教程和实例代码中,仅介绍了retrieve这一种下载方法。这可能对大家造成了一种误解,以为只能使用这一种下载方法。其实不然。
熟悉python面向对象编程的同学们应该可以马上反应过来了:对于一个稍有编程经验的程序员来说,他开发时候,肯定是会把所有的方法(以ERA5的这个api为例,包括retrieve,download,delete等等)都开发在一个类里,不可能只提供一个下载的函数。EC的coding能力我还是比较信任的,他们应该干不出这种事情。


因此,我找到了EC在github上的项目https://github.com/ecmwf/cdsapi,然后在该repository中搜索"delete"。
果然,在https://github.com/ecmwf/cdsapi/blob/a656204578341cfc994a4772496b5aca1d823d49/cdsapi/api.py中,详细介绍了retrieve,download,delete等方法。在提交session然后download后,其实可以通过delete来删除你已经下载完的进程,这样就可以避免新的进程顶掉最早提交的进程了。

好了,说了那么多,一大半对于大多数同学都是没用的。还是代码见吧(基本上是从Gavin老师脚本上改的。运行到现在没出现过任何问题)。这里默认大家都懂怎么用python非并行下载了。不太懂的可以参考上面几个链接。

#############start###################
from queue import Queue
from threading import Thread
import cdsapi
import time
from datetime import datetime,timedelta
import os,glob
from multiprocessing import Pool
download_dir = os.getcwd()
def download_ear5(allvar):
    print(allvar)
    variable = allvar[0]
    pressure_level= allvar[1]
    year= allvar[2]
    month= allvar[3]
    day= allvar[4]
    hour= allvar[5]
    file_path = os.path.join(download_dir,variable,pressure_level,year,month,day,variable+"_"+pressure_level+'hpa'+"_"+year+month+day+hour+".nc")
    if (not os.path.exists(file_path))  or (os.path.getsize(file_path) < 1024*1024*1.8) :
        c = cdsapi.Client()
        r = c.retrieve(
            'reanalysis-era5-pressure-levels',
            {
                'product_type': 'reanalysis',
                'format': 'netcdf',
                'variable': [variable],
                'pressure_level': [pressure_level],
                'year': year,
                'month': [month],
                'day': [day],
                'time': [hour+':00'],
                #'area' : [32, 118, 27, 124],
            },
            )
        r.download(os.path.join(download_dir,variable,pressure_level,year,month,day,variable+"_"+pressure_level+'hpa'+"_"+year+month+day+hour+".nc"))
        r.delete()
    return

#下载脚本
class DownloadWorker(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # 从队列中获取任务并扩展tuple
            allvar = self.queue.get()
            download_ear5(allvar)
            self.queue.task_done()
#主程序
def main():
    start_time  =  datetime(2017,1,1,0)  # 开始时间
    end_time = datetime(2020,1,1,0)      # 结束时间
    #创建列表,存储起止时间之间的所有时次
    datetime_list = []   
    while start_time <= end_time:
        datetime_list.append(start_time)
        start_time +=  timedelta(hours=1)
    #变量列表
    variable_list = ['geopotential', 'specific_humidity', 'temperature','u_component_of_wind', 'v_component_of_wind']
    #气压层列表
    pressure_level_list = ['100','200', '500','700', '850', '925', '1000']
    #创建allvar列表,内含variable,pressure_level,year,month,day,hour
    allvar = []
    for idatetime in datetime_list:  #非并行创建文件夹,同时填充allvar列表
        year = idatetime.strftime('%Y')
        month = idatetime.strftime('%m')
        day = idatetime.strftime('%d')
        hour = idatetime.strftime('%H')
        for pressure_level in pressure_level_list:
            for variable in variable_list:
                if not os.path.exists(os.path.join(download_dir,variable,pressure_level,year,month,day)):
                    os.makedirs(os.path.join(download_dir,variable,pressure_level,year,month,day))
                allvar.append([variable,pressure_level,year,month,day,hour])
    job_number= 12
    if len(allvar)%job_number==0:
        sub_mission_len = int(len(allvar)/job_number)
    else:
        sub_mission_len = int(len(allvar)/job_number)+1
    for i in range(sub_mission_len):
        if i*4+4  > len(allvar):
            allvar_part = allvar[i*job_number:]
        else:
            allvar_part = allvar[i*job_number:i*job_number+job_number]
        p = Pool(len(allvar_part))
        for i in range(len(allvar_part)):
            p.apply_async(download_ear5, args=(allvar_part,))
        #print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        #print('All subprocesses done.')
if __name__ == '__main__':
    main()

#############end###################

接下来是感想和杂谈时间。
1.不能只看教程。还是得养成多看源代码的习惯。不然,只能永远做一个调包侠,coding的技能没法成长。

2.我这里写的脚本,是将每一层、每个变量,单独存成一个文件。这样有一个好处:当你需要用部分数据的时候,不需要读取全部,节省了很多时间。有点和mongodb的感觉一样(笑)。但是,据我在许多气象相关微信群里的经验,有很多同学似乎不太适应用循环的方法读取多个文件。这也许是收了ncl的影响,因为ncl中的addfiles函数,提供了读取整个文件列表中类似格式的nc文件的方法。这固然提供了很多方便,但也许也使得大家下意识地回避了循环这个问题。实际上,我们不仅不应该逃避循环结构,而应该进一步地拥抱循环语句。我相信,对于熟悉数据结构和循环的同学来说,循环应该是一个十分得力的工具。如果不熟悉的话,希望大家也都熟悉起来,因为掌握循环和数据结构(不论对哪种语言来说),都是十分有必要的。


down_era5.py

3.6 KB, 下载次数: 8, 下载积分: 金钱 -5

密码修改失败请联系qq:937062711

新浪微博达人勋

发表于 2020-2-16 17:44:10 | 显示全部楼层
厉害,如何提高coding水平?
密码修改失败请联系qq:937062711
回复 支持 反对

使用道具 举报

新浪微博达人勋

 楼主| 发表于 7 天前 | 显示全部楼层
梁的丰 发表于 2020-2-16 17:44
厉害,如何提高coding水平?

个人经验是,多写多看,面向工程。没太多捷径。
密码修改失败请联系qq:937062711
回复 支持 反对

使用道具 举报

新浪微博达人勋

发表于 5 天前 | 显示全部楼层
一直想研究一下并行下载!非常感谢楼主的分享,下了不少功夫!学习一下!
密码修改失败请联系qq:937062711
回复 支持 反对

使用道具 举报

新浪微博达人勋

发表于 5 天前 | 显示全部楼层
强的一笔,学习学习{:eb302:}{:eb302:}{:eb302:}{:eb302:}{:eb302:}{:eb302:}
密码修改失败请联系qq:937062711
回复 支持 反对

使用道具 举报

新浪微博达人勋

发表于 5 天前 | 显示全部楼层
感谢分享!老师,请教个问题,ERA5数据也包含预报数据下载吧?如何下载呢,我想下载有效波高这类海洋数据!
密码修改失败请联系qq:937062711
回复 支持 反对

使用道具 举报

新浪微博达人勋

发表于 4 天前 | 显示全部楼层
感谢LZ分享,学习到了
密码修改失败请联系qq:937062711
回复 支持 反对

使用道具 举报

新浪微博达人勋

 楼主| 发表于 4 天前 | 显示全部楼层
cxbhero 发表于 2020-2-19 22:33
感谢分享!老师,请教个问题,ERA5数据也包含预报数据下载吧?如何下载呢,我想下载有效波高这类海洋数据!

这是再分析呀。。。预报数据只有气象局有。要钱的。
密码修改失败请联系qq:937062711
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册 新浪微博登陆

本版积分规则

Copyright ©2011-2014 bbs.06climate.com All Rights Reserved.  Powered by Discuz! (京ICP-10201084)

本站信息均由会员发表,不代表气象家园立场,禁止在本站发表与国家法律相抵触言论

快速回复 返回顶部 返回列表