- 积分
- 6169
- 贡献
-
- 精华
- 在线时间
- 小时
- 注册时间
- 2013-9-1
- 最后登录
- 1970-1-1
|
登录后查看更多精彩内容~
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
本帖最后由 astiny 于 2020-2-26 14:03 编辑
ERA5数据是我们常用的再分析资料之一。它的时空分辨率都比较高。
一般我们下载ERA5数据,可以直接在官网上,手动选择时间、变量等信息,然后点击链接下载;也可以使用ECMWF的Python cds API下载。(如http://bbs.06climate.com/forum.php?mod=viewthread&tid=91210)
multiprocessing包是Python中的多进程管理包。使用multiprocessing包进行并行下载,可以有效提高下载的速度。比如Gavin老师写的脚本(见http://climate2weather.cc/2019/05/10/era5-download/)
但该脚本也有一些小问题:cds的API仅支持最多12个任务(2020.2.17修改,已咨询ec并确认)同时提交。假如你同时提交多个下载进程,且其中后提交的进程下载速度比先提交的进程快,那么该进程结束,新的进程提交上去,会顶掉最早提交的那个进程,造成该进程无法顺利下载完毕。相信用过Gavin老师那个脚本的同学们,有些已经遇到这个问题了。那么,我们应该如何解决呢?
这个问题主要还是ERA5关于下载的介绍不完整所造成的。在官网的下载教程和实例代码中,仅介绍了retrieve这一种下载方法。这可能对大家造成了一种误解,以为只能使用这一种下载方法。其实不然。
熟悉python面向对象编程的同学们应该可以马上反应过来了:对于一个稍有编程经验的程序员来说,他开发时候,肯定是会把所有的方法(以ERA5的这个api为例,包括retrieve,download,delete等等)都开发在一个类里,不可能只提供一个下载的函数。EC的coding能力我还是比较信任的,他们应该干不出这种事情。
因此,我找到了EC在github上的项目https://github.com/ecmwf/cdsapi,然后在该repository中搜索"delete"。
果然,在https://github.com/ecmwf/cdsapi/blob/a656204578341cfc994a4772496b5aca1d823d49/cdsapi/api.py中,详细介绍了retrieve,download,delete等方法。在提交session然后download后,其实可以通过delete来删除你已经下载完的进程,这样就可以避免新的进程顶掉最早提交的进程了。
好了,说了那么多,一大半对于大多数同学都是没用的。还是代码见吧(基本上是从Gavin老师脚本上改的。运行到现在没出现过任何问题)。这里默认大家都懂怎么用python非并行下载了。不太懂的可以参考上面几个链接。
#############start################
- from queue import Queue
- from threading import Thread
- import cdsapi
- import time
- from datetime import datetime, timedelta
- import os, glob
- from multiprocessing import Pool
- download_dir = os.getcwd()
- def download_ear5(allvar):
- print(allvar)
- variable = allvar[0]
- pressure_level = allvar[1]
- year = allvar[2]
- month = allvar[3]
- day = allvar[4]
- hour = allvar[5]
- file_path = os.path.join(
- download_dir,
- variable,
- pressure_level,
- year,
- month,
- day,
- variable
- + "_"
- + pressure_level
- + "hpa"
- + "_"
- + year
- + month
- + day
- + hour
- + ".nc",
- )
- if not os.path.exists(
- file_path
- ): # or (os.path.getsize(file_path) < 1024*1024*1.8) :
- c = cdsapi.Client()
- r = c.retrieve(
- "reanalysis-era5-pressure-levels",
- {
- "product_type": "reanalysis",
- "format": "netcdf",
- "variable": [variable],
- "pressure_level": [pressure_level],
- "year": year,
- "month": [month],
- "day": [day],
- "time": [hour + ":00"],
- "area": [90, 0, 0, 180],
- },
- )
- r.download(
- os.path.join(
- download_dir,
- variable,
- pressure_level,
- year,
- month,
- day,
- variable
- + "_"
- + pressure_level
- + "hpa"
- + "_"
- + year
- + month
- + day
- + hour
- + ".nc",
- )
- )
- r.delete()
- time.sleep(3)
- return
- # 下载脚本
- class DownloadWorker(Thread):
- def __init__(self, queue):
- Thread.__init__(self)
- self.queue = queue
- def run(self):
- while True:
- # 从队列中获取任务并扩展tuple
- allvar = self.queue.get()
- download_ear5(allvar)
- self.queue.task_done()
- # 主程序
- def main():
- start_time = datetime(2017, 1, 1, 0) # 开始时间
- end_time = datetime(2020, 1, 1, 0) # 结束时间
- # 创建列表,存储起止时间之间的所有时次
- datetime_list = []
- while start_time <= end_time:
- datetime_list.append(start_time)
- start_time += timedelta(hours=1)
- # 变量列表
- variable_list = [
- "geopotential",
- "specific_humidity",
- "temperature",
- "u_component_of_wind",
- "v_component_of_wind",
- ]
- # 气压层列表
- pressure_level_list = ["100", "200", "500", "700", "850", "925", "1000"]
- # 创建allvar列表,内含variable,pressure_level,year,month,day,hour
- allvar = []
- for idatetime in datetime_list: # 非并行创建文件夹,同事填充allvar列表
- year = idatetime.strftime("%Y")
- month = idatetime.strftime("%m")
- day = idatetime.strftime("%d")
- hour = idatetime.strftime("%H")
- for pressure_level in pressure_level_list:
- for variable in variable_list:
- if not os.path.exists(
- os.path.join(
- download_dir, variable, pressure_level, year, month, day
- )
- ):
- os.makedirs(
- os.path.join(
- download_dir, variable, pressure_level, year, month, day
- )
- )
- allvar.append([variable, pressure_level, year, month, day, hour])
- # 创建一个主进程与工作进程通信
- queue = Queue()
- # 注意,每个用户同时最多接受4个request https://cds.climate.copernicus.eu/vision
- # 创建12个工作线程
- for x in range(12):
- worker = DownloadWorker(queue)
- # 将daemon设置为True将会使主线程退出,即使所有worker都阻塞了
- worker.daemon = False
- worker.start()
- # 将任务以tuple的形式放入队列中
- for link in allvar:
- queue.put((link))
- # 让主线程等待队列完成所有的任务
- queue.join()
- if __name__ == "__main__":
- main()
复制代码
#############end###################
接下来是感想和杂谈时间。
1.不能只看教程。还是得养成多看源代码的习惯。不然,只能永远做一个调包侠,coding的技能没法成长。
2.我这里写的脚本,是将每一层、每个变量,单独存成一个文件。这样有一个好处:当你需要用部分数据的时候,不需要读取全部,节省了很多时间。有点和mongodb的感觉一样(笑)。但是,据我在许多气象相关微信群里的经验,有很多同学似乎不太适应用循环的方法读取多个文件。这也许是收了ncl的影响,因为ncl中的addfiles函数,提供了读取整个文件列表中类似格式的nc文件的方法。这固然提供了很多方便,但也许也使得大家下意识地回避了循环这个问题。实际上,我们不仅不应该逃避循环结构,而应该进一步地拥抱循环语句。我相信,对于熟悉数据结构和循环的同学来说,循环应该是一个十分得力的工具。如果不熟悉的话,希望大家也都熟悉起来,因为掌握循环和数据结构(不论对哪种语言来说),都是十分有必要的。
|
|