请选择 进入手机版 | 继续访问电脑版
爱气象,爱气象家园! 

气象家园

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博登陆

只需一步, 快速开始

搜索
查看: 1190|回复: 7

[其他] 使用python并行下载ERA5数据的方法

[复制链接]

新浪微博达人勋

发表于 2020-2-14 15:57:56 | 显示全部楼层 |阅读模式

登录后查看更多精彩内容~

您需要 登录 才可以下载或查看,没有帐号?立即注册 新浪微博登陆

x
本帖最后由 astiny 于 2020-2-26 14:03 编辑

ERA5数据是我们常用的再分析资料之一。它的时空分辨率都比较高。


一般我们下载ERA5数据,可以直接在官网上,手动选择时间、变量等信息,然后点击链接下载;也可以使用ECMWF的Python cds API下载。(如http://bbs.06climate.com/forum.php?mod=viewthread&tid=91210


multiprocessing包是Python中的多进程管理包。使用multiprocessing包进行并行下载,可以有效提高下载的速度。比如Gavin老师写的脚本(见http://climate2weather.cc/2019/05/10/era5-download/


但该脚本也有一些小问题:cds的API仅支持最多12个任务(2020.2.17修改,已咨询ec并确认)同时提交。假如你同时提交多个下载进程,且其中后提交的进程下载速度比先提交的进程快,那么该进程结束,新的进程提交上去,会顶掉最早提交的那个进程,造成该进程无法顺利下载完毕。相信用过Gavin老师那个脚本的同学们,有些已经遇到这个问题了。那么,我们应该如何解决呢?


这个问题主要还是ERA5关于下载的介绍不完整所造成的。在官网的下载教程和实例代码中,仅介绍了retrieve这一种下载方法。这可能对大家造成了一种误解,以为只能使用这一种下载方法。其实不然。
熟悉python面向对象编程的同学们应该可以马上反应过来了:对于一个稍有编程经验的程序员来说,他开发时候,肯定是会把所有的方法(以ERA5的这个api为例,包括retrieve,download,delete等等)都开发在一个类里,不可能只提供一个下载的函数。EC的coding能力我还是比较信任的,他们应该干不出这种事情。


因此,我找到了EC在github上的项目https://github.com/ecmwf/cdsapi,然后在该repository中搜索"delete"。
果然,在https://github.com/ecmwf/cdsapi/blob/a656204578341cfc994a4772496b5aca1d823d49/cdsapi/api.py中,详细介绍了retrieve,download,delete等方法。在提交session然后download后,其实可以通过delete来删除你已经下载完的进程,这样就可以避免新的进程顶掉最早提交的进程了。

好了,说了那么多,一大半对于大多数同学都是没用的。还是代码见吧(基本上是从Gavin老师脚本上改的。运行到现在没出现过任何问题)。这里默认大家都懂怎么用python非并行下载了。不太懂的可以参考上面几个链接。

#############start################

  1. from queue import Queue
  2. from threading import Thread
  3. import cdsapi
  4. import time
  5. from datetime import datetime, timedelta
  6. import os, glob
  7. from multiprocessing import Pool

  8. download_dir = os.getcwd()


  9. def download_ear5(allvar):
  10.     print(allvar)
  11.     variable = allvar[0]
  12.     pressure_level = allvar[1]
  13.     year = allvar[2]
  14.     month = allvar[3]
  15.     day = allvar[4]
  16.     hour = allvar[5]
  17.     file_path = os.path.join(
  18.         download_dir,
  19.         variable,
  20.         pressure_level,
  21.         year,
  22.         month,
  23.         day,
  24.         variable
  25.         + "_"
  26.         + pressure_level
  27.         + "hpa"
  28.         + "_"
  29.         + year
  30.         + month
  31.         + day
  32.         + hour
  33.         + ".nc",
  34.     )
  35.     if not os.path.exists(
  36.         file_path
  37.     ):  #  or (os.path.getsize(file_path) < 1024*1024*1.8) :
  38.         c = cdsapi.Client()
  39.         r = c.retrieve(
  40.             "reanalysis-era5-pressure-levels",
  41.             {
  42.                 "product_type": "reanalysis",
  43.                 "format": "netcdf",
  44.                 "variable": [variable],
  45.                 "pressure_level": [pressure_level],
  46.                 "year": year,
  47.                 "month": [month],
  48.                 "day": [day],
  49.                 "time": [hour + ":00"],
  50.                 "area": [90, 0, 0, 180],
  51.             },
  52.         )
  53.         r.download(
  54.             os.path.join(
  55.                 download_dir,
  56.                 variable,
  57.                 pressure_level,
  58.                 year,
  59.                 month,
  60.                 day,
  61.                 variable
  62.                 + "_"
  63.                 + pressure_level
  64.                 + "hpa"
  65.                 + "_"
  66.                 + year
  67.                 + month
  68.                 + day
  69.                 + hour
  70.                 + ".nc",
  71.             )
  72.         )
  73.         r.delete()
  74.         time.sleep(3)
  75.     return


  76. # 下载脚本
  77. class DownloadWorker(Thread):
  78.     def __init__(self, queue):
  79.         Thread.__init__(self)
  80.         self.queue = queue

  81.     def run(self):
  82.         while True:
  83.             # 从队列中获取任务并扩展tuple
  84.             allvar = self.queue.get()
  85.             download_ear5(allvar)
  86.             self.queue.task_done()


  87. # 主程序
  88. def main():
  89.     start_time = datetime(2017, 1, 1, 0)  # 开始时间
  90.     end_time = datetime(2020, 1, 1, 0)  # 结束时间
  91.     # 创建列表,存储起止时间之间的所有时次
  92.     datetime_list = []
  93.     while start_time <= end_time:
  94.         datetime_list.append(start_time)
  95.         start_time += timedelta(hours=1)
  96.     # 变量列表
  97.     variable_list = [
  98.         "geopotential",
  99.         "specific_humidity",
  100.         "temperature",
  101.         "u_component_of_wind",
  102.         "v_component_of_wind",
  103.     ]
  104.     # 气压层列表
  105.     pressure_level_list = ["100", "200", "500", "700", "850", "925", "1000"]
  106.     # 创建allvar列表,内含variable,pressure_level,year,month,day,hour
  107.     allvar = []
  108.     for idatetime in datetime_list:  # 非并行创建文件夹,同事填充allvar列表
  109.         year = idatetime.strftime("%Y")
  110.         month = idatetime.strftime("%m")
  111.         day = idatetime.strftime("%d")
  112.         hour = idatetime.strftime("%H")
  113.         for pressure_level in pressure_level_list:
  114.             for variable in variable_list:
  115.                 if not os.path.exists(
  116.                     os.path.join(
  117.                         download_dir, variable, pressure_level, year, month, day
  118.                     )
  119.                 ):
  120.                     os.makedirs(
  121.                         os.path.join(
  122.                             download_dir, variable, pressure_level, year, month, day
  123.                         )
  124.                     )
  125.                 allvar.append([variable, pressure_level, year, month, day, hour])

  126.     # 创建一个主进程与工作进程通信
  127.     queue = Queue()

  128.     # 注意,每个用户同时最多接受4个request https://cds.climate.copernicus.eu/vision
  129.     # 创建12个工作线程
  130.     for x in range(12):
  131.         worker = DownloadWorker(queue)
  132.         # 将daemon设置为True将会使主线程退出,即使所有worker都阻塞了
  133.         worker.daemon = False
  134.         worker.start()

  135.     # 将任务以tuple的形式放入队列中
  136.     for link in allvar:
  137.         queue.put((link))

  138.     # 让主线程等待队列完成所有的任务
  139.     queue.join()


  140. if __name__ == "__main__":
  141.     main()
复制代码

#############end###################

接下来是感想和杂谈时间。
1.不能只看教程。还是得养成多看源代码的习惯。不然,只能永远做一个调包侠,coding的技能没法成长。

2.我这里写的脚本,是将每一层、每个变量,单独存成一个文件。这样有一个好处:当你需要用部分数据的时候,不需要读取全部,节省了很多时间。有点和mongodb的感觉一样(笑)。但是,据我在许多气象相关微信群里的经验,有很多同学似乎不太适应用循环的方法读取多个文件。这也许是收了ncl的影响,因为ncl中的addfiles函数,提供了读取整个文件列表中类似格式的nc文件的方法。这固然提供了很多方便,但也许也使得大家下意识地回避了循环这个问题。实际上,我们不仅不应该逃避循环结构,而应该进一步地拥抱循环语句。我相信,对于熟悉数据结构和循环的同学来说,循环应该是一个十分得力的工具。如果不熟悉的话,希望大家也都熟悉起来,因为掌握循环和数据结构(不论对哪种语言来说),都是十分有必要的。


down_era5_surface.py

3.55 KB, 下载次数: 13, 下载积分: 金钱 -5

down_era5.py

3.57 KB, 下载次数: 21, 下载积分: 金钱 -5

密码修改失败请联系微信:mofangbao

新浪微博达人勋

发表于 2020-2-16 17:44:10 | 显示全部楼层
厉害,如何提高coding水平?
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

 楼主| 发表于 2020-2-17 15:10:39 | 显示全部楼层
梁的丰 发表于 2020-2-16 17:44
厉害,如何提高coding水平?

个人经验是,多写多看,面向工程。没太多捷径。
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

发表于 2020-2-19 13:50:54 | 显示全部楼层
一直想研究一下并行下载!非常感谢楼主的分享,下了不少功夫!学习一下!
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

发表于 2020-2-19 22:25:54 | 显示全部楼层
强的一笔,学习学习{:eb302:}{:eb302:}{:eb302:}{:eb302:}{:eb302:}{:eb302:}
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

发表于 2020-2-19 22:33:13 | 显示全部楼层
感谢分享!老师,请教个问题,ERA5数据也包含预报数据下载吧?如何下载呢,我想下载有效波高这类海洋数据!
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

发表于 2020-2-20 10:00:16 | 显示全部楼层
感谢LZ分享,学习到了
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

 楼主| 发表于 2020-2-20 13:34:18 | 显示全部楼层
cxbhero 发表于 2020-2-19 22:33
感谢分享!老师,请教个问题,ERA5数据也包含预报数据下载吧?如何下载呢,我想下载有效波高这类海洋数据!

这是再分析呀。。。预报数据只有气象局有。要钱的。
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册 新浪微博登陆

本版积分规则

Copyright ©2011-2014 bbs.06climate.com All Rights Reserved.  Powered by Discuz! (京ICP-10201084)

本站信息均由会员发表,不代表气象家园立场,禁止在本站发表与国家法律相抵触言论

快速回复 返回顶部 返回列表