- 积分
- 9532
- 贡献
-
- 精华
- 在线时间
- 小时
- 注册时间
- 2017-6-25
- 最后登录
- 1970-1-1
|
登录后查看更多精彩内容~
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
本帖最后由 墨家大宝 于 2020-8-29 18:54 编辑
师姐想下载亿些CMIP6数据,全球日数据太大了硬盘不够,而且只想要中国范围的月平均数据。她推给我一篇介绍在shell脚本中用wget+cdo实现类似需求的方式,而我并不懂shell和cdo,大概看了一下后觉得用Python应该可以做得更好一些。
在我理解中,shell脚本中用wget+cdo的实现方法为:循环使用wget下载原始数据到本地硬盘上,每下载完一条后通过cdo命令将数据处理成所需要的另存到硬盘,再将原始数据删除。这样做思路清晰,但有两个地方我认为可以改进:
- 多线程下载
- 原始数据直接在内存中就处理,省去硬盘写入、读取、删除的开销,硬盘上只保存最终所需要的数据
思路为: urllib 多线程下载,下载数据用 io.BytesIO 保存在内存, xarray 读取下载数据并处理保存到硬盘。- from urllib.request import Request, urlopen
- from io import BytesIO
- from multiprocessing.dummy import Pool
- from os.path import exists
- import json
- from datetime import datetime
- import xarray
- from get_url import nc_urls
- threads = 10
- lon_w, lon_e = 70, 140
- lat_s, lat_n = 15, 55
- save_to = r'./cmip6_china_monthmean'
- base_headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
- 'AppleWebKit/537.36 (KHTML, like Gecko) '
- 'Chrome/74.0.3729.131 Safari/537.36',
- 'Connection': 'Keep-Alive'
- }
- pool = Pool(threads)
- lon_slice = slice(lon_w, lon_e)
- lat_slice = slice(lat_s, lat_n)
- def worker(info):
- nc_url, pos_start, pos_end, n, cache = info
- headers = {**base_headers, 'range': f'bytes={pos_start}-{pos_end}'}
- while True:
- try:
- cache[n] = urlopen(Request(nc_url, headers=headers), timeout=5).read()
- return
- except Exception as e:
- continue
- done_json = f'{save_to}/done.json'
- if exists(done_json):
- with open(done_json) as f:
- done = json.load(f)
- else:
- done = {}
- for nc_name, nc_url in nc_urls.items():
- if nc_name in done:
- print(f'{nc_name}\n已处理过,跳过')
- continue
- cache = [None] * threads
- print(f'{nc_name}\n{datetime.now()}开始下载')
- while True:
- try:
- nc_bytes = int(urlopen(Request(nc_url, headers=base_headers), timeout=5).headers['content-length'])
- break
- except Exception as e:
- continue
- subsize = nc_bytes // threads
- worker_info = [[nc_url, x*subsize, (x+1)*subsize-1, x, cache] for x in range(threads)]
- worker_info[-1][2] = nc_bytes - 1
- pool.map(worker, worker_info)
- print(f'{datetime.now()}下载完成')
- f = BytesIO(b''.join(cache))
- del cache
- f.seek(0)
- ds = xarray.open_dataset(f).sel(lon=lon_slice, lat=lat_slice)
- ds = ds.groupby(ds.time.astype('datetime64[M]')).mean()
- ds.to_netcdf(f'{save_to}/{nc_name[:-3]}_china_monthmean.nc')
- print(f'{datetime.now()}处理完成')
- del ds, f
- done[nc_name] = nc_url
- with open(done_json, 'w') as f:
- json.dump(done, f, indent=4)
- pool.close()
- pool.join()
复制代码
其中 get_url.py 是用来从网站提供的sh脚本中提取下载链接的,内容如下:
- from os.path import basename, exists
- from glob import glob
- import re
- import json
- wget_sh_glob = './pr_ssp245/wget*.sh'
- nc_urls_json = './pr_ssp245/nc_urls.json'
- if exists(nc_urls_json):
- with open(nc_urls_json) as f:
- nc_urls = json.load(f)
- else:
- nc_urls = {}
- for sh_path in glob(wget_sh_glob):
- with open(sh_path) as f:
- txt = f.read()
- for nc_url in re.findall(r'(http://\S+?.nc)\'', txt):
- nc_name = basename(nc_url)
- if nc_name in nc_urls:
- print(f'{nc_name}')
- else:
- nc_urls[nc_name] = nc_url
- with open(nc_urls_json, 'w') as f:
- json.dump(nc_urls, f, indent=4)
复制代码
效果如图:
收获如下:
- 虽然不太优雅,但第一次成功尝试多线程,以后有的是进步空间
- 第一次尝试直接读取内存中的二进制nc文件(需要安装 h5netcdf 包)
存在的问题:
- 有些数据从如1850年开始计时,所以需要安装`cftime`包
- 下载过程中没有失败跳过判断(需求多的话我再加上吧)
- 第一次用 io.BytesIO ,尝试过多线程直接按照各自位置向 io.BytesIO 中存,但失败了,所以妥协了先用字节串列表存数据,再拼接到 io.BytesIO 中,希望擅长多线程或者相关“内存临时文件”编程的前辈指教一下
|
|