- 积分
- 19
- 贡献
-
- 精华
- 在线时间
- 小时
- 注册时间
- 2024-8-19
- 最后登录
- 1970-1-1

|
登录后查看更多精彩内容~
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
本帖最后由 Hellohistory 于 2025-2-7 14:12 编辑
第一次在家园发帖,还是很开心的,我的工作和气象没有任何关系,但是对于怀揣着这份热爱希望能交到更多的好朋友
这次处理的是NCEI的1929年至2024年的逐日数据,数据量很大,我写了一些代码顺利将数据写入到mysql当中,希望各位朋友能批判我工作当中不到位的地方,共同进步!!!
- use csv::{ReaderBuilder, StringRecord};
- use indicatif::{ProgressBar, ProgressStyle};
- use mysql::prelude::*;
- use mysql::*;
- use std::error::Error;
- use std::path::{Path, PathBuf};
- use walkdir::WalkDir;
- use chrono::NaiveDate;
- // 定义一个别名
- type StdResult<T> = std::result::Result<T, Box<dyn Error>>;
- /// ---------------------------
- /// 数据库连接信息
- /// ---------------------------
- const MYSQL_USER: &str = "";
- const MYSQL_PASSWORD: &str = "";
- const MYSQL_HOST: &str = "localhost";
- const MYSQL_PORT: u16 = 3306;
- const MYSQL_DATABASE: &str = "ncei_day_weather_data";
- /// ---------------------------
- /// CSV 文件中需要的每日气象列 (除去 STATION, LAT, LON, ELEVATION, NAME)
- /// 下面将它们存储到更合适的数据库类型 (DATE, DOUBLE, INT, 等)
- ///
- /// 这里列的顺序要和代码里解析逻辑对应
- /// ---------------------------
- static DAILY_FIELD_NAMES: &[&str] = &[
- "date",
- "temp",
- "temp_attributes",
- "dewp",
- "dewp_attributes",
- "slp",
- "slp_attributes",
- "stp",
- "stp_attributes",
- "visib",
- "visib_attributes",
- "wdsp",
- "wdsp_attributes",
- "mxspd",
- "gust",
- "max",
- "max_attributes",
- "min",
- "min_attributes",
- "prcp",
- "prcp_attributes",
- "sndp",
- "frshtt",
- ];
- ///
- /// main 函数
- ///
- fn main() -> StdResult<()> {
- // ---------------------------
- // 要处理的 CSV 文件所在目录
- // ---------------------------
- let input_dir = Path::new(r"G:\气象数据\美国国家环境信息中心(NCEI)\逐日气象数据");
-
- let base_url = format!("mysql://{}:{}@{}:{}", MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, MYSQL_PORT);
- let base_opts = Opts::from_url(&base_url)?;
- let mut base_conn = Conn::new(base_opts)?;
- create_database_if_not_exists(&mut base_conn, MYSQL_DATABASE)?;
- // 连接到具体数据库
- let url_with_db = format!(
- "mysql://{}:{}@{}:{}/{}",
- MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, MYSQL_PORT, MYSQL_DATABASE
- );
- let opts = Opts::from_url(&url_with_db)?;
- let pool = Pool::new(opts)?;
- let mut conn = pool.get_conn()?;
- // 确保站点信息表存在
- create_station_info_table_if_not_exists(&mut conn)?;
- // 收集所有 .csv 文件路径
- let files = collect_csv_files(input_dir);
- let total_files = files.len();
- println!("共找到 {} 个 CSV 文件,开始处理...", total_files);
- // 进度条
- let pb = ProgressBar::new(total_files as u64);
- pb.set_style(
- ProgressStyle::default_bar()
- .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} 文件 ({eta})")
- .unwrap()
- .progress_chars("#>-"),
- );
- let mut success_count = 0_usize;
- let mut error_count = 0_usize;
- for path in files {
- let station_code = match path.file_stem() {
- Some(os_str) => os_str.to_string_lossy().to_string(),
- None => {
- eprintln!("无法获取文件名: {}", path.display());
- error_count += 1;
- pb.inc(1);
- continue;
- }
- };
- match process_single_csv(&mut conn, &path, &station_code) {
- Ok(_) => success_count += 1,
- Err(e) => {
- eprintln!("\n❌ 处理失败: {},错误: {}", path.display(), e);
- error_count += 1;
- }
- }
- pb.inc(1);
- }
- pb.finish_with_message("所有文件处理结束");
- println!("\n处理结果:");
- println!("✅ 成功: {} 个文件", success_count);
- println!("❌ 失败: {} 个文件", error_count);
- Ok(())
- }
- /// ---------------------------
- /// 创建数据库(如果不存在)
- /// ---------------------------
- fn create_database_if_not_exists(conn: &mut Conn, db_name: &str) -> StdResult<()> {
- let create_db_sql = format!("CREATE DATABASE IF NOT EXISTS `{}`", db_name);
- conn.query_drop(create_db_sql)?;
- Ok(())
- }
- /// ---------------------------
- /// station_info 表
- /// ---------------------------
- fn create_station_info_table_if_not_exists(conn: &mut PooledConn) -> StdResult<()> {
- let create_sql = r#"
- CREATE TABLE IF NOT EXISTS `station_info` (
- `station_code` VARCHAR(20) NOT NULL,
- `latitude` DOUBLE DEFAULT NULL,
- `longitude` DOUBLE DEFAULT NULL,
- `elevation` DOUBLE DEFAULT NULL,
- `station_name` VARCHAR(255) DEFAULT NULL,
- PRIMARY KEY (`station_code`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
- "#;
- conn.exec_drop(create_sql, ())?;
- Ok(())
- }
- /// ---------------------------
- /// 收集目录下所有 .csv 文件
- /// ---------------------------
- fn collect_csv_files(input_dir: &Path) -> Vec<PathBuf> {
- WalkDir::new(input_dir)
- .into_iter()
- .filter_map(|e| e.ok()) // 过滤异常
- .filter(|e| e.file_type().is_file()) // 只要文件
- .filter(|e| {
- e.path()
- .extension()
- .and_then(|ext| ext.to_str())
- .map(|s| s.eq_ignore_ascii_case("csv"))
- .unwrap_or(false)
- })
- .map(|e| e.path().to_path_buf())
- .collect()
- }
- /// ---------------------------
- /// 处理单个 CSV
- /// 1) 抽取/更新站点信息到 station_info
- /// 2) 创建 (或升级) weather_data_{station} 表
- /// 3) 批量插入每日数据,使用更细的列类型
- /// ---------------------------
- fn process_single_csv(conn: &mut PooledConn, csv_path: &Path, station_code: &str) -> StdResult<()> {
- let mut reader = ReaderBuilder::new().flexible(true).from_path(csv_path)?;
- // 读取表头
- let headers = reader.headers()?.clone();
- if headers.len() < 2 {
- return Err(format!("表头列数太少: {}", headers.len()).into());
- }
- // 全部读入内存(示例做法)
- let mut all_records = vec![];
- for rec in reader.records() {
- let record = rec?;
- all_records.push(record);
- }
- // 先更新 station_info (这里假设第一行足够代表该站点信息)
- if !all_records.is_empty() {
- let first = &all_records[0];
- upsert_station_info(conn, station_code, first)?;
- }
- // 为该站点创建 / 升级数据表
- let table_name = format!("weather_data_{}", station_code);
- create_daily_table_if_not_exists(conn, &table_name)?;
- // 把每日数据解析为 Rust 结构,然后批量插入
- let mut parsed_rows = Vec::new();
- for record in &all_records {
- if let Some(row) = parse_csv_record_to_daily_row(record) {
- parsed_rows.push(row);
- } else {
- // 说明此行校验/解析失败或属于异常范围,直接跳过
- continue;
- }
- }
- batch_insert_daily_rows(conn, &table_name, &parsed_rows)?;
- Ok(())
- }
- /// ---------------------------
- /// station_info 插入或更新
- /// (ON DUPLICATE KEY UPDATE)
- /// ---------------------------
- fn upsert_station_info(conn: &mut PooledConn, station_code: &str, rec: &StringRecord) -> StdResult<()> {
- // 原先的 CSV 顺序: [0:STATION, 1:DATE, 2:LAT, 3:LON, 4:ELEV, 5:NAME, 6:TEMP...]
- let lat = parse_f64_or_null(rec.get(2));
- let lon = parse_f64_or_null(rec.get(3));
- let elev = parse_f64_or_null(rec.get(4));
- let name = rec.get(5).unwrap_or("").trim();
- let sql = r#"
- INSERT INTO station_info (station_code, latitude, longitude, elevation, station_name)
- VALUES (:code, :lat, :lon, :elev, :sname)
- ON DUPLICATE KEY UPDATE
- latitude = VALUES(latitude),
- longitude = VALUES(longitude),
- elevation = VALUES(elevation),
- station_name = VALUES(station_name)
- "#;
- conn.exec_drop(
- sql,
- params! {
- "code" => station_code,
- "lat" => lat,
- "lon" => lon,
- "elev" => elev,
- "sname" => name,
- },
- )?;
- Ok(())
- }
- /// ---------------------------
- /// 创建每日数据表 (更细分的类型)
- /// ---------------------------
- fn create_daily_table_if_not_exists(conn: &mut PooledConn, table_name: &str) -> StdResult<()> {
- // 下面每个字段都给了更具体的类型:
- // date -> DATE
- // temp, dewp, slp, stp, visib, wdsp, mxspd, gust, max, min, prcp, sndp -> DOUBLE
- // *_attributes -> VARCHAR(50)
- // frshtt -> INT
- // 可根据实际数据分布再做细调
- let create_sql = format!(
- r#"
- CREATE TABLE IF NOT EXISTS `{table_name}` (
- `date` DATE NOT NULL,
- `temp` DOUBLE DEFAULT NULL,
- `temp_attributes` VARCHAR(50) DEFAULT NULL,
- `dewp` DOUBLE DEFAULT NULL,
- `dewp_attributes` VARCHAR(50) DEFAULT NULL,
- `slp` DOUBLE DEFAULT NULL,
- `slp_attributes` VARCHAR(50) DEFAULT NULL,
- `stp` DOUBLE DEFAULT NULL,
- `stp_attributes` VARCHAR(50) DEFAULT NULL,
- `visib` DOUBLE DEFAULT NULL,
- `visib_attributes` VARCHAR(50) DEFAULT NULL,
- `wdsp` DOUBLE DEFAULT NULL,
- `wdsp_attributes` VARCHAR(50) DEFAULT NULL,
- `mxspd` DOUBLE DEFAULT NULL,
- `gust` DOUBLE DEFAULT NULL,
- `max` DOUBLE DEFAULT NULL,
- `max_attributes` VARCHAR(50) DEFAULT NULL,
- `min` DOUBLE DEFAULT NULL,
- `min_attributes` VARCHAR(50) DEFAULT NULL,
- `prcp` DOUBLE DEFAULT NULL,
- `prcp_attributes` VARCHAR(50) DEFAULT NULL,
- `sndp` DOUBLE DEFAULT NULL,
- `frshtt` INT DEFAULT NULL,
- UNIQUE KEY idx_unique_date(`date`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
- "#,
- );
- conn.exec_drop(create_sql, ())?;
- Ok(())
- }
- /// ---------------------------
- /// 代表每日气象的解析后结构
- /// 注意:对应 DAILY_FIELD_NAMES 的顺序
- /// ---------------------------
- #[derive(Debug)]
- struct DailyRow {
- date: NaiveDate, // DATE
- temp: Option<f64>, // DOUBLE
- temp_attr: Option<String>,
- dewp: Option<f64>,
- dewp_attr: Option<String>,
- slp: Option<f64>,
- slp_attr: Option<String>,
- stp: Option<f64>,
- stp_attr: Option<String>,
- visib: Option<f64>,
- visib_attr: Option<String>,
- wdsp: Option<f64>,
- wdsp_attr: Option<String>,
- mxspd: Option<f64>,
- gust: Option<f64>,
- max: Option<f64>,
- max_attr: Option<String>,
- min: Option<f64>,
- min_attr: Option<String>,
- prcp: Option<f64>,
- prcp_attr: Option<String>,
- sndp: Option<f64>,
- frshtt: Option<i32>,
- }
- /// ---------------------------
- /// 从 CSV 的一行 StringRecord 中解析出一个 DailyRow
- /// 如果解析/校验失败,返回 None
- /// ---------------------------
- fn parse_csv_record_to_daily_row(rec: &StringRecord) -> Option<DailyRow> {
- // 这里注意对应 CSV 的字段顺序:
- // 0 STATION
- // 1 DATE
- // 2 LAT
- // 3 LON
- // 4 ELEV
- // 5 NAME
- // 6 TEMP
- // 7 TEMP_ATTRIBUTES
- // 8 DEWP
- // 9 DEWP_ATTRIBUTES
- // 10 SLP
- // 11 SLP_ATTRIBUTES
- // 12 STP
- // 13 STP_ATTRIBUTES
- // 14 VISIB
- // 15 VISIB_ATTRIBUTES
- // 16 WDSP
- // 17 WDSP_ATTRIBUTES
- // 18 MXSPD
- // 19 GUST
- // 20 MAX
- // 21 MAX_ATTRIBUTES
- // 22 MIN
- // 23 MIN_ATTRIBUTES
- // 24 PRCP
- // 25 PRCP_ATTRIBUTES
- // 26 SNDP
- // 27 FRSHTT
- //
- // 这里要小心索引越界,如果 CSV 某些列缺失,会导致 rec.get(...) = None。
- // 为简化,先检查长度,如果不够就返回 None。
- if rec.len() < 28 {
- eprintln!("行列数不足,无法解析: {:?}", rec);
- return None;
- }
- // 1) 解析 date (第 1 列)
- let date_str = rec.get(1).unwrap().trim();
- let date_parsed = match parse_date_ymd(date_str) {
- Some(d) => d,
- None => {
- eprintln!("日期格式不符或无法解析: {}", date_str);
- return None;
- }
- };
- // 2) 依次解析其他字段
- // 对数值进行 parse_f64_or_null,并做简单的范围过滤(可选)
- // 也可以把过于离谱的数据直接当 None 或者整行舍弃。
- let temp = parse_f64_or_null(rec.get(6));
- // 例如,若温度超出 -200 ~ 190 范围,我们视为离谱 -> 跳过整行
- if let Some(t) = temp {
- if t < -200.0 || t > 190.0 {
- eprintln!("温度超出合理范围 ({:.1}), 跳过行: {:?}", t, rec);
- return None;
- }
- }
- let temp_attr = parse_str_or_null(rec.get(7));
- let dewp = parse_f64_or_null(rec.get(8));
- let dewp_attr = parse_str_or_null(rec.get(9));
- let slp = parse_f64_or_null(rec.get(10));
- let slp_attr = parse_str_or_null(rec.get(11));
- let stp = parse_f64_or_null(rec.get(12));
- let stp_attr = parse_str_or_null(rec.get(13));
- let visib = parse_f64_or_null(rec.get(14));
- let visib_attr = parse_str_or_null(rec.get(15));
- let wdsp = parse_f64_or_null(rec.get(16));
- let wdsp_attr = parse_str_or_null(rec.get(17));
- let mxspd = parse_f64_or_null(rec.get(18));
- let gust = parse_f64_or_null(rec.get(19));
- let max = parse_f64_or_null(rec.get(20));
- let max_attr = parse_str_or_null(rec.get(21));
- let min = parse_f64_or_null(rec.get(22));
- let min_attr = parse_str_or_null(rec.get(23));
- let prcp = parse_f64_or_null(rec.get(24));
- let prcp_attr = parse_str_or_null(rec.get(25));
- let sndp = parse_f64_or_null(rec.get(26));
- // 解析 frshtt 为 int
- let frshtt = parse_i32_or_null(rec.get(27));
- Some(DailyRow {
- date: date_parsed,
- temp,
- temp_attr,
- dewp,
- dewp_attr,
- slp,
- slp_attr,
- stp,
- stp_attr,
- visib,
- visib_attr,
- wdsp,
- wdsp_attr,
- mxspd,
- gust,
- max,
- max_attr,
- min,
- min_attr,
- prcp,
- prcp_attr,
- sndp,
- frshtt,
- })
- }
- /// ---------------------------
- /// 辅助函数:解析日期字符串 (yyyy/MM/dd) -> NaiveDate
- /// 也可以加更多格式兼容
- /// ---------------------------
- fn parse_date_ymd(s: &str) -> Option<NaiveDate> {
- let s = s.trim();
- if s.is_empty() {
- return None;
- }
- // 尝试用 yyyy-MM-dd
- if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
- return Some(d);
- }
- // 尝试用 yyyy/MM/dd
- if let Ok(d) = NaiveDate::parse_from_str(s, "%Y/%m/%d") {
- return Some(d);
- }
- None
- }
- /// ---------------------------
- /// 辅助函数:解析浮点数,失败或空时返回 None
- /// ---------------------------
- /// 将字符串解析为 f64;若为空字符串、"9999.9" 或解析失败则返回 None
- fn parse_f64_or_null(s: Option<&str>) -> Option<f64> {
- match s {
- Some(txt) => {
- let trimmed = txt.trim();
- // 如果本身就为空字符串
- if trimmed.is_empty() {
- return None;
- }
- // 如果等于 "9999.9",也视为无效 => 返回 None
- if trimmed == "9999.9" {
- return None;
- }
- // 正常解析
- trimmed.parse::<f64>().ok()
- }
- None => None,
- }
- }
- /// ---------------------------
- /// 辅助函数:解析整型,失败或空时返回 None
- /// ---------------------------
- fn parse_i32_or_null(s: Option<&str>) -> Option<i32> {
- match s {
- Some(txt) => {
- let trimmed = txt.trim();
- if trimmed.is_empty() {
- None
- } else {
- trimmed.parse::<i32>().ok()
- }
- }
- None => None,
- }
- }
- /// ---------------------------
- /// 辅助函数:若字符串非空则返回Some,否则None
- /// ---------------------------
- fn parse_str_or_null(s: Option<&str>) -> Option<String> {
- s.map(|v| {
- let t = v.trim();
- if t.is_empty() {
- None
- } else {
- Some(t.to_string())
- }
- }).flatten()
- }
- /// ---------------------------
- /// 批量插入解析后的 DailyRow
- /// ---------------------------
- fn batch_insert_daily_rows(
- conn: &mut PooledConn,
- table_name: &str,
- rows: &[DailyRow],
- ) -> StdResult<()> {
- if rows.is_empty() {
- return Ok(());
- }
- // 构造 INSERT IGNORE 语句
- let columns_str = DAILY_FIELD_NAMES.join(", ");
- // 生成单行占位符,如 "(?, ?, ?, ...)"
- let placeholders_single_row = {
- let mut ph = String::new();
- for (i, _) in DAILY_FIELD_NAMES.iter().enumerate() {
- if i == 0 {
- ph.push('?');
- } else {
- ph.push_str(", ?");
- }
- }
- format!("({})", ph)
- };
- // 一次插入多少行
- let chunk_size = 500;
- for chunk in rows.chunks(chunk_size) {
- let mut sql = format!("INSERT IGNORE INTO `{}` ({}) VALUES ", table_name, columns_str);
- let placeholders_joined = vec![placeholders_single_row.clone(); chunk.len()].join(", ");
- sql.push_str(&placeholders_joined);
- // 收集所有参数
- let mut params: Vec<Value> = Vec::new();
- for row in chunk {
- // 将 struct 字段一个个转成 Value
- // 注意顺序要与 DAILY_FIELD_NAMES 对应
- // 1) date (DATE)
- params.push(Value::from(row.date.format("%Y-%m-%d").to_string()));
- // 2) temp (DOUBLE)
- params.push(opt_f64_to_value(row.temp));
- // 3) temp_attributes (VARCHAR)
- params.push(opt_str_to_value(&row.temp_attr));
- // 4) dewp
- params.push(opt_f64_to_value(row.dewp));
- // 5) dewp_attr
- params.push(opt_str_to_value(&row.dewp_attr));
- // 6) slp
- params.push(opt_f64_to_value(row.slp));
- // 7) slp_attr
- params.push(opt_str_to_value(&row.slp_attr));
- // 8) stp
- params.push(opt_f64_to_value(row.stp));
- // 9) stp_attr
- params.push(opt_str_to_value(&row.stp_attr));
- // 10) visib
- params.push(opt_f64_to_value(row.visib));
- // 11) visib_attr
- params.push(opt_str_to_value(&row.visib_attr));
- // 12) wdsp
- params.push(opt_f64_to_value(row.wdsp));
- // 13) wdsp_attr
- params.push(opt_str_to_value(&row.wdsp_attr));
- // 14) mxspd
- params.push(opt_f64_to_value(row.mxspd));
- // 15) gust
- params.push(opt_f64_to_value(row.gust));
- // 16) max
- params.push(opt_f64_to_value(row.max));
- // 17) max_attr
- params.push(opt_str_to_value(&row.max_attr));
- // 18) min
- params.push(opt_f64_to_value(row.min));
- // 19) min_attr
- params.push(opt_str_to_value(&row.min_attr));
- // 20) prcp
- params.push(opt_f64_to_value(row.prcp));
- // 21) prcp_attr
- params.push(opt_str_to_value(&row.prcp_attr));
- // 22) sndp
- params.push(opt_f64_to_value(row.sndp));
- // 23) frshtt
- params.push(opt_i32_to_value(row.frshtt));
- }
- conn.exec_drop(sql, params)?;
- }
- Ok(())
- }
- /// ---------------------------
- /// 将 Option<f64> 转为 mysql::Value
- /// ---------------------------
- fn opt_f64_to_value(opt: Option<f64>) -> Value {
- match opt {
- Some(v) => Value::from(v),
- None => Value::NULL,
- }
- }
- /// ---------------------------
- /// 将 Option<i32> 转为 mysql::Value
- /// ---------------------------
- fn opt_i32_to_value(opt: Option<i32>) -> Value {
- match opt {
- Some(v) => Value::from(v),
- None => Value::NULL,
- }
- }
- /// ---------------------------
- /// 将 Option<String> 转为 mysql::Value
- /// ---------------------------
- fn opt_str_to_value(opt: &Option<String>) -> Value {
- match opt {
- Some(s) => Value::from(s.as_str()),
- None => Value::NULL,
- }
- }
复制代码 这个代码的作用是将逐日数据首先进行简单的判断,数据是否合理,太过于离谱的数据会被筛选掉,另外这个温度应该是华氏度,为了便于理解,我也把代码写上了详细的注释,我把数据库导出为sql,需要的朋友可以自行探索一下
Cargo.toml
- [package]
- name = "NCEI_day"
- version = "0.1.0"
- edition = "2021"
-
- [dependencies]
- csv = "1.3.1"
- walkdir = "2.5.0"
- indicatif = "0.17.11"
- mysql = "23.0.1"
- chrono = "0.4.39"
复制代码
百度网盘:17NobkMbMoKPp2pv0jdUdeA?pwd=ca7c
第一次发帖,出现问题还望多多海涵!!!
|
|