爱气象,爱气象家园! 

气象家园

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博登陆

只需一步, 快速开始

搜索
查看: 254|回复: 7

在NCEI逐日数据处理当中使用的Rust代码及其数据

[复制链接]

新浪微博达人勋

发表于 2025-2-7 13:55:47 | 显示全部楼层 |阅读模式

登录后查看更多精彩内容~

您需要 登录 才可以下载或查看,没有帐号?立即注册 新浪微博登陆

x
本帖最后由 Hellohistory 于 2025-2-7 14:12 编辑

第一次在家园发帖,还是很开心的,我的工作和气象没有任何关系,但是对于怀揣着这份热爱希望能交到更多的好朋友
这次处理的是NCEI的1929年至2024年的逐日数据,数据量很大,我写了一些代码顺利将数据写入到mysql当中,希望各位朋友能批判我工作当中不到位的地方,共同进步!!!
  1. use csv::{ReaderBuilder, StringRecord};
  2. use indicatif::{ProgressBar, ProgressStyle};
  3. use mysql::prelude::*;
  4. use mysql::*;
  5. use std::error::Error;
  6. use std::path::{Path, PathBuf};
  7. use walkdir::WalkDir;
  8. use chrono::NaiveDate;

  9. // 定义一个别名
  10. type StdResult<T> = std::result::Result<T, Box<dyn Error>>;

  11. /// ---------------------------
  12. /// 数据库连接信息
  13. /// ---------------------------
  14. const MYSQL_USER: &str = "";
  15. const MYSQL_PASSWORD: &str = "";
  16. const MYSQL_HOST: &str = "localhost";
  17. const MYSQL_PORT: u16 = 3306;
  18. const MYSQL_DATABASE: &str = "ncei_day_weather_data";

  19. /// ---------------------------
  20. /// CSV 文件中需要的每日气象列 (除去 STATION, LAT, LON, ELEVATION, NAME)
  21. /// 下面将它们存储到更合适的数据库类型 (DATE, DOUBLE, INT, 等)
  22. ///
  23. /// 这里列的顺序要和代码里解析逻辑对应
  24. /// ---------------------------
  25. static DAILY_FIELD_NAMES: &[&str] = &[
  26.     "date",
  27.     "temp",
  28.     "temp_attributes",
  29.     "dewp",
  30.     "dewp_attributes",
  31.     "slp",
  32.     "slp_attributes",
  33.     "stp",
  34.     "stp_attributes",
  35.     "visib",
  36.     "visib_attributes",
  37.     "wdsp",
  38.     "wdsp_attributes",
  39.     "mxspd",
  40.     "gust",
  41.     "max",
  42.     "max_attributes",
  43.     "min",
  44.     "min_attributes",
  45.     "prcp",
  46.     "prcp_attributes",
  47.     "sndp",
  48.     "frshtt",
  49. ];

  50. ///
  51. /// main 函数
  52. ///
  53. fn main() -> StdResult<()> {
  54.     // ---------------------------
  55.     // 要处理的 CSV 文件所在目录
  56.     // ---------------------------
  57.     let input_dir = Path::new(r"G:\气象数据\美国国家环境信息中心(NCEI)\逐日气象数据");
  58.    
  59.     let base_url = format!("mysql://{}:{}@{}:{}", MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, MYSQL_PORT);
  60.     let base_opts = Opts::from_url(&base_url)?;
  61.     let mut base_conn = Conn::new(base_opts)?;
  62.     create_database_if_not_exists(&mut base_conn, MYSQL_DATABASE)?;

  63.     // 连接到具体数据库
  64.     let url_with_db = format!(
  65.         "mysql://{}:{}@{}:{}/{}",
  66.         MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, MYSQL_PORT, MYSQL_DATABASE
  67.     );
  68.     let opts = Opts::from_url(&url_with_db)?;
  69.     let pool = Pool::new(opts)?;
  70.     let mut conn = pool.get_conn()?;

  71.     // 确保站点信息表存在
  72.     create_station_info_table_if_not_exists(&mut conn)?;

  73.     // 收集所有 .csv 文件路径
  74.     let files = collect_csv_files(input_dir);
  75.     let total_files = files.len();
  76.     println!("共找到 {} 个 CSV 文件,开始处理...", total_files);

  77.     // 进度条
  78.     let pb = ProgressBar::new(total_files as u64);
  79.     pb.set_style(
  80.         ProgressStyle::default_bar()
  81.             .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} 文件 ({eta})")
  82.             .unwrap()
  83.             .progress_chars("#>-"),
  84.     );

  85.     let mut success_count = 0_usize;
  86.     let mut error_count = 0_usize;

  87.     for path in files {
  88.         let station_code = match path.file_stem() {
  89.             Some(os_str) => os_str.to_string_lossy().to_string(),
  90.             None => {
  91.                 eprintln!("无法获取文件名: {}", path.display());
  92.                 error_count += 1;
  93.                 pb.inc(1);
  94.                 continue;
  95.             }
  96.         };

  97.         match process_single_csv(&mut conn, &path, &station_code) {
  98.             Ok(_) => success_count += 1,
  99.             Err(e) => {
  100.                 eprintln!("\n&#10060; 处理失败: {},错误: {}", path.display(), e);
  101.                 error_count += 1;
  102.             }
  103.         }

  104.         pb.inc(1);
  105.     }

  106.     pb.finish_with_message("所有文件处理结束");

  107.     println!("\n处理结果:");
  108.     println!("&#9989; 成功: {} 个文件", success_count);
  109.     println!("&#10060; 失败: {} 个文件", error_count);

  110.     Ok(())
  111. }

  112. /// ---------------------------
  113. /// 创建数据库(如果不存在)
  114. /// ---------------------------
  115. fn create_database_if_not_exists(conn: &mut Conn, db_name: &str) -> StdResult<()> {
  116.     let create_db_sql = format!("CREATE DATABASE IF NOT EXISTS `{}`", db_name);
  117.     conn.query_drop(create_db_sql)?;
  118.     Ok(())
  119. }

  120. /// ---------------------------
  121. /// station_info 表
  122. /// ---------------------------
  123. fn create_station_info_table_if_not_exists(conn: &mut PooledConn) -> StdResult<()> {
  124.     let create_sql = r#"
  125.         CREATE TABLE IF NOT EXISTS `station_info` (
  126.             `station_code` VARCHAR(20) NOT NULL,
  127.             `latitude` DOUBLE DEFAULT NULL,
  128.             `longitude` DOUBLE DEFAULT NULL,
  129.             `elevation` DOUBLE DEFAULT NULL,
  130.             `station_name` VARCHAR(255) DEFAULT NULL,
  131.             PRIMARY KEY (`station_code`)
  132.         ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
  133.     "#;
  134.     conn.exec_drop(create_sql, ())?;
  135.     Ok(())
  136. }

  137. /// ---------------------------
  138. /// 收集目录下所有 .csv 文件
  139. /// ---------------------------
  140. fn collect_csv_files(input_dir: &Path) -> Vec<PathBuf> {
  141.     WalkDir::new(input_dir)
  142.         .into_iter()
  143.         .filter_map(|e| e.ok()) // 过滤异常
  144.         .filter(|e| e.file_type().is_file()) // 只要文件
  145.         .filter(|e| {
  146.             e.path()
  147.                 .extension()
  148.                 .and_then(|ext| ext.to_str())
  149.                 .map(|s| s.eq_ignore_ascii_case("csv"))
  150.                 .unwrap_or(false)
  151.         })
  152.         .map(|e| e.path().to_path_buf())
  153.         .collect()
  154. }

  155. /// ---------------------------
  156. /// 处理单个 CSV
  157. /// 1) 抽取/更新站点信息到 station_info
  158. /// 2) 创建 (或升级) weather_data_{station} 表
  159. /// 3) 批量插入每日数据,使用更细的列类型
  160. /// ---------------------------
  161. fn process_single_csv(conn: &mut PooledConn, csv_path: &Path, station_code: &str) -> StdResult<()> {
  162.     let mut reader = ReaderBuilder::new().flexible(true).from_path(csv_path)?;

  163.     // 读取表头
  164.     let headers = reader.headers()?.clone();
  165.     if headers.len() < 2 {
  166.         return Err(format!("表头列数太少: {}", headers.len()).into());
  167.     }

  168.     // 全部读入内存(示例做法)
  169.     let mut all_records = vec![];
  170.     for rec in reader.records() {
  171.         let record = rec?;
  172.         all_records.push(record);
  173.     }

  174.     // 先更新 station_info (这里假设第一行足够代表该站点信息)
  175.     if !all_records.is_empty() {
  176.         let first = &all_records[0];
  177.         upsert_station_info(conn, station_code, first)?;
  178.     }

  179.     // 为该站点创建 / 升级数据表
  180.     let table_name = format!("weather_data_{}", station_code);
  181.     create_daily_table_if_not_exists(conn, &table_name)?;

  182.     // 把每日数据解析为 Rust 结构,然后批量插入
  183.     let mut parsed_rows = Vec::new();
  184.     for record in &all_records {
  185.         if let Some(row) = parse_csv_record_to_daily_row(record) {
  186.             parsed_rows.push(row);
  187.         } else {
  188.             // 说明此行校验/解析失败或属于异常范围,直接跳过
  189.             continue;
  190.         }
  191.     }

  192.     batch_insert_daily_rows(conn, &table_name, &parsed_rows)?;

  193.     Ok(())
  194. }

  195. /// ---------------------------
  196. /// station_info 插入或更新
  197. /// (ON DUPLICATE KEY UPDATE)
  198. /// ---------------------------
  199. fn upsert_station_info(conn: &mut PooledConn, station_code: &str, rec: &StringRecord) -> StdResult<()> {
  200.     // 原先的 CSV 顺序: [0:STATION, 1:DATE, 2:LAT, 3:LON, 4:ELEV, 5:NAME, 6:TEMP...]
  201.     let lat = parse_f64_or_null(rec.get(2));
  202.     let lon = parse_f64_or_null(rec.get(3));
  203.     let elev = parse_f64_or_null(rec.get(4));
  204.     let name = rec.get(5).unwrap_or("").trim();

  205.     let sql = r#"
  206.         INSERT INTO station_info (station_code, latitude, longitude, elevation, station_name)
  207.         VALUES (:code, :lat, :lon, :elev, :sname)
  208.         ON DUPLICATE KEY UPDATE
  209.             latitude = VALUES(latitude),
  210.             longitude = VALUES(longitude),
  211.             elevation = VALUES(elevation),
  212.             station_name = VALUES(station_name)
  213.     "#;

  214.     conn.exec_drop(
  215.         sql,
  216.         params! {
  217.             "code" => station_code,
  218.             "lat" => lat,
  219.             "lon" => lon,
  220.             "elev" => elev,
  221.             "sname" => name,
  222.         },
  223.     )?;

  224.     Ok(())
  225. }

  226. /// ---------------------------
  227. /// 创建每日数据表 (更细分的类型)
  228. /// ---------------------------
  229. fn create_daily_table_if_not_exists(conn: &mut PooledConn, table_name: &str) -> StdResult<()> {
  230.     // 下面每个字段都给了更具体的类型:
  231.     //  date -> DATE
  232.     //  temp, dewp, slp, stp, visib, wdsp, mxspd, gust, max, min, prcp, sndp -> DOUBLE
  233.     //  *_attributes -> VARCHAR(50)
  234.     //  frshtt -> INT
  235.     // 可根据实际数据分布再做细调
  236.     let create_sql = format!(
  237.         r#"
  238.         CREATE TABLE IF NOT EXISTS `{table_name}` (
  239.             `date` DATE NOT NULL,
  240.             `temp` DOUBLE DEFAULT NULL,
  241.             `temp_attributes` VARCHAR(50) DEFAULT NULL,
  242.             `dewp` DOUBLE DEFAULT NULL,
  243.             `dewp_attributes` VARCHAR(50) DEFAULT NULL,
  244.             `slp` DOUBLE DEFAULT NULL,
  245.             `slp_attributes` VARCHAR(50) DEFAULT NULL,
  246.             `stp` DOUBLE DEFAULT NULL,
  247.             `stp_attributes` VARCHAR(50) DEFAULT NULL,
  248.             `visib` DOUBLE DEFAULT NULL,
  249.             `visib_attributes` VARCHAR(50) DEFAULT NULL,
  250.             `wdsp` DOUBLE DEFAULT NULL,
  251.             `wdsp_attributes` VARCHAR(50) DEFAULT NULL,
  252.             `mxspd` DOUBLE DEFAULT NULL,
  253.             `gust` DOUBLE DEFAULT NULL,
  254.             `max` DOUBLE DEFAULT NULL,
  255.             `max_attributes` VARCHAR(50) DEFAULT NULL,
  256.             `min` DOUBLE DEFAULT NULL,
  257.             `min_attributes` VARCHAR(50) DEFAULT NULL,
  258.             `prcp` DOUBLE DEFAULT NULL,
  259.             `prcp_attributes` VARCHAR(50) DEFAULT NULL,
  260.             `sndp` DOUBLE DEFAULT NULL,
  261.             `frshtt` INT DEFAULT NULL,
  262.             UNIQUE KEY idx_unique_date(`date`)
  263.         ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
  264.         "#,
  265.     );
  266.     conn.exec_drop(create_sql, ())?;
  267.     Ok(())
  268. }

  269. /// ---------------------------
  270. /// 代表每日气象的解析后结构
  271. /// 注意:对应 DAILY_FIELD_NAMES 的顺序
  272. /// ---------------------------
  273. #[derive(Debug)]
  274. struct DailyRow {
  275.     date: NaiveDate,          // DATE
  276.     temp: Option<f64>,        // DOUBLE
  277.     temp_attr: Option<String>,
  278.     dewp: Option<f64>,
  279.     dewp_attr: Option<String>,
  280.     slp: Option<f64>,
  281.     slp_attr: Option<String>,
  282.     stp: Option<f64>,
  283.     stp_attr: Option<String>,
  284.     visib: Option<f64>,
  285.     visib_attr: Option<String>,
  286.     wdsp: Option<f64>,
  287.     wdsp_attr: Option<String>,
  288.     mxspd: Option<f64>,
  289.     gust: Option<f64>,
  290.     max: Option<f64>,
  291.     max_attr: Option<String>,
  292.     min: Option<f64>,
  293.     min_attr: Option<String>,
  294.     prcp: Option<f64>,
  295.     prcp_attr: Option<String>,
  296.     sndp: Option<f64>,
  297.     frshtt: Option<i32>,
  298. }

  299. /// ---------------------------
  300. /// 从 CSV 的一行 StringRecord 中解析出一个 DailyRow
  301. /// 如果解析/校验失败,返回 None
  302. /// ---------------------------
  303. fn parse_csv_record_to_daily_row(rec: &StringRecord) -> Option<DailyRow> {
  304.     // 这里注意对应 CSV 的字段顺序:
  305.     //   0 STATION
  306.     //   1 DATE
  307.     //   2 LAT
  308.     //   3 LON
  309.     //   4 ELEV
  310.     //   5 NAME
  311.     //   6 TEMP
  312.     //   7 TEMP_ATTRIBUTES
  313.     //   8 DEWP
  314.     //   9 DEWP_ATTRIBUTES
  315.     //   10 SLP
  316.     //   11 SLP_ATTRIBUTES
  317.     //   12 STP
  318.     //   13 STP_ATTRIBUTES
  319.     //   14 VISIB
  320.     //   15 VISIB_ATTRIBUTES
  321.     //   16 WDSP
  322.     //   17 WDSP_ATTRIBUTES
  323.     //   18 MXSPD
  324.     //   19 GUST
  325.     //   20 MAX
  326.     //   21 MAX_ATTRIBUTES
  327.     //   22 MIN
  328.     //   23 MIN_ATTRIBUTES
  329.     //   24 PRCP
  330.     //   25 PRCP_ATTRIBUTES
  331.     //   26 SNDP
  332.     //   27 FRSHTT
  333.     //
  334.     // 这里要小心索引越界,如果 CSV 某些列缺失,会导致 rec.get(...) = None。
  335.     // 为简化,先检查长度,如果不够就返回 None。
  336.     if rec.len() < 28 {
  337.         eprintln!("行列数不足,无法解析: {:?}", rec);
  338.         return None;
  339.     }

  340.     // 1) 解析 date (第 1 列)
  341.     let date_str = rec.get(1).unwrap().trim();
  342.     let date_parsed = match parse_date_ymd(date_str) {
  343.         Some(d) => d,
  344.         None => {
  345.             eprintln!("日期格式不符或无法解析: {}", date_str);
  346.             return None;
  347.         }
  348.     };

  349.     // 2) 依次解析其他字段
  350.     //    对数值进行 parse_f64_or_null,并做简单的范围过滤(可选)
  351.     //    也可以把过于离谱的数据直接当 None 或者整行舍弃。
  352.     let temp = parse_f64_or_null(rec.get(6));
  353.     // 例如,若温度超出 -200 ~ 190 范围,我们视为离谱 -> 跳过整行
  354.     if let Some(t) = temp {
  355.         if t < -200.0 || t > 190.0 {
  356.             eprintln!("温度超出合理范围 ({:.1}), 跳过行: {:?}", t, rec);
  357.             return None;
  358.         }
  359.     }
  360.     let temp_attr = parse_str_or_null(rec.get(7));
  361.     let dewp = parse_f64_or_null(rec.get(8));
  362.     let dewp_attr = parse_str_or_null(rec.get(9));
  363.     let slp = parse_f64_or_null(rec.get(10));
  364.     let slp_attr = parse_str_or_null(rec.get(11));
  365.     let stp = parse_f64_or_null(rec.get(12));
  366.     let stp_attr = parse_str_or_null(rec.get(13));
  367.     let visib = parse_f64_or_null(rec.get(14));
  368.     let visib_attr = parse_str_or_null(rec.get(15));
  369.     let wdsp = parse_f64_or_null(rec.get(16));
  370.     let wdsp_attr = parse_str_or_null(rec.get(17));
  371.     let mxspd = parse_f64_or_null(rec.get(18));
  372.     let gust = parse_f64_or_null(rec.get(19));
  373.     let max = parse_f64_or_null(rec.get(20));
  374.     let max_attr = parse_str_or_null(rec.get(21));
  375.     let min = parse_f64_or_null(rec.get(22));
  376.     let min_attr = parse_str_or_null(rec.get(23));
  377.     let prcp = parse_f64_or_null(rec.get(24));
  378.     let prcp_attr = parse_str_or_null(rec.get(25));
  379.     let sndp = parse_f64_or_null(rec.get(26));
  380.     // 解析 frshtt 为 int
  381.     let frshtt = parse_i32_or_null(rec.get(27));

  382.     Some(DailyRow {
  383.         date: date_parsed,
  384.         temp,
  385.         temp_attr,
  386.         dewp,
  387.         dewp_attr,
  388.         slp,
  389.         slp_attr,
  390.         stp,
  391.         stp_attr,
  392.         visib,
  393.         visib_attr,
  394.         wdsp,
  395.         wdsp_attr,
  396.         mxspd,
  397.         gust,
  398.         max,
  399.         max_attr,
  400.         min,
  401.         min_attr,
  402.         prcp,
  403.         prcp_attr,
  404.         sndp,
  405.         frshtt,
  406.     })
  407. }

  408. /// ---------------------------
  409. /// 辅助函数:解析日期字符串 (yyyy/MM/dd) -> NaiveDate
  410. /// 也可以加更多格式兼容
  411. /// ---------------------------
  412. fn parse_date_ymd(s: &str) -> Option<NaiveDate> {
  413.     let s = s.trim();
  414.     if s.is_empty() {
  415.         return None;
  416.     }

  417.     // 尝试用 yyyy-MM-dd
  418.     if let Ok(d) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
  419.         return Some(d);
  420.     }

  421.     // 尝试用 yyyy/MM/dd
  422.     if let Ok(d) = NaiveDate::parse_from_str(s, "%Y/%m/%d") {
  423.         return Some(d);
  424.     }

  425.     None
  426. }

  427. /// ---------------------------
  428. /// 辅助函数:解析浮点数,失败或空时返回 None
  429. /// ---------------------------
  430. /// 将字符串解析为 f64;若为空字符串、"9999.9" 或解析失败则返回 None
  431. fn parse_f64_or_null(s: Option<&str>) -> Option<f64> {
  432.     match s {
  433.         Some(txt) => {
  434.             let trimmed = txt.trim();
  435.             // 如果本身就为空字符串
  436.             if trimmed.is_empty() {
  437.                 return None;
  438.             }
  439.             // 如果等于 "9999.9",也视为无效 => 返回 None
  440.             if trimmed == "9999.9" {
  441.                 return None;
  442.             }
  443.             // 正常解析
  444.             trimmed.parse::<f64>().ok()
  445.         }
  446.         None => None,
  447.     }
  448. }


  449. /// ---------------------------
  450. /// 辅助函数:解析整型,失败或空时返回 None
  451. /// ---------------------------
  452. fn parse_i32_or_null(s: Option<&str>) -> Option<i32> {
  453.     match s {
  454.         Some(txt) => {
  455.             let trimmed = txt.trim();
  456.             if trimmed.is_empty() {
  457.                 None
  458.             } else {
  459.                 trimmed.parse::<i32>().ok()
  460.             }
  461.         }
  462.         None => None,
  463.     }
  464. }

  465. /// ---------------------------
  466. /// 辅助函数:若字符串非空则返回Some,否则None
  467. /// ---------------------------
  468. fn parse_str_or_null(s: Option<&str>) -> Option<String> {
  469.     s.map(|v| {
  470.         let t = v.trim();
  471.         if t.is_empty() {
  472.             None
  473.         } else {
  474.             Some(t.to_string())
  475.         }
  476.     }).flatten()
  477. }

  478. /// ---------------------------
  479. /// 批量插入解析后的 DailyRow
  480. /// ---------------------------
  481. fn batch_insert_daily_rows(
  482.     conn: &mut PooledConn,
  483.     table_name: &str,
  484.     rows: &[DailyRow],
  485. ) -> StdResult<()> {
  486.     if rows.is_empty() {
  487.         return Ok(());
  488.     }

  489.     // 构造 INSERT IGNORE 语句
  490.     let columns_str = DAILY_FIELD_NAMES.join(", ");

  491.     // 生成单行占位符,如 "(?, ?, ?, ...)"
  492.     let placeholders_single_row = {
  493.         let mut ph = String::new();
  494.         for (i, _) in DAILY_FIELD_NAMES.iter().enumerate() {
  495.             if i == 0 {
  496.                 ph.push('?');
  497.             } else {
  498.                 ph.push_str(", ?");
  499.             }
  500.         }
  501.         format!("({})", ph)
  502.     };

  503.     // 一次插入多少行
  504.     let chunk_size = 500;
  505.     for chunk in rows.chunks(chunk_size) {
  506.         let mut sql = format!("INSERT IGNORE INTO `{}` ({}) VALUES ", table_name, columns_str);
  507.         let placeholders_joined = vec![placeholders_single_row.clone(); chunk.len()].join(", ");
  508.         sql.push_str(&placeholders_joined);

  509.         // 收集所有参数
  510.         let mut params: Vec<Value> = Vec::new();

  511.         for row in chunk {
  512.             // 将 struct 字段一个个转成 Value
  513.             // 注意顺序要与 DAILY_FIELD_NAMES 对应
  514.             // 1) date (DATE)
  515.             params.push(Value::from(row.date.format("%Y-%m-%d").to_string()));
  516.             // 2) temp (DOUBLE)
  517.             params.push(opt_f64_to_value(row.temp));
  518.             // 3) temp_attributes (VARCHAR)
  519.             params.push(opt_str_to_value(&row.temp_attr));
  520.             // 4) dewp
  521.             params.push(opt_f64_to_value(row.dewp));
  522.             // 5) dewp_attr
  523.             params.push(opt_str_to_value(&row.dewp_attr));
  524.             // 6) slp
  525.             params.push(opt_f64_to_value(row.slp));
  526.             // 7) slp_attr
  527.             params.push(opt_str_to_value(&row.slp_attr));
  528.             // 8) stp
  529.             params.push(opt_f64_to_value(row.stp));
  530.             // 9) stp_attr
  531.             params.push(opt_str_to_value(&row.stp_attr));
  532.             // 10) visib
  533.             params.push(opt_f64_to_value(row.visib));
  534.             // 11) visib_attr
  535.             params.push(opt_str_to_value(&row.visib_attr));
  536.             // 12) wdsp
  537.             params.push(opt_f64_to_value(row.wdsp));
  538.             // 13) wdsp_attr
  539.             params.push(opt_str_to_value(&row.wdsp_attr));
  540.             // 14) mxspd
  541.             params.push(opt_f64_to_value(row.mxspd));
  542.             // 15) gust
  543.             params.push(opt_f64_to_value(row.gust));
  544.             // 16) max
  545.             params.push(opt_f64_to_value(row.max));
  546.             // 17) max_attr
  547.             params.push(opt_str_to_value(&row.max_attr));
  548.             // 18) min
  549.             params.push(opt_f64_to_value(row.min));
  550.             // 19) min_attr
  551.             params.push(opt_str_to_value(&row.min_attr));
  552.             // 20) prcp
  553.             params.push(opt_f64_to_value(row.prcp));
  554.             // 21) prcp_attr
  555.             params.push(opt_str_to_value(&row.prcp_attr));
  556.             // 22) sndp
  557.             params.push(opt_f64_to_value(row.sndp));
  558.             // 23) frshtt
  559.             params.push(opt_i32_to_value(row.frshtt));
  560.         }

  561.         conn.exec_drop(sql, params)?;
  562.     }

  563.     Ok(())
  564. }

  565. /// ---------------------------
  566. /// 将 Option<f64> 转为 mysql::Value
  567. /// ---------------------------
  568. fn opt_f64_to_value(opt: Option<f64>) -> Value {
  569.     match opt {
  570.         Some(v) => Value::from(v),
  571.         None => Value::NULL,
  572.     }
  573. }

  574. /// ---------------------------
  575. /// 将 Option<i32> 转为 mysql::Value
  576. /// ---------------------------
  577. fn opt_i32_to_value(opt: Option<i32>) -> Value {
  578.     match opt {
  579.         Some(v) => Value::from(v),
  580.         None => Value::NULL,
  581.     }
  582. }

  583. /// ---------------------------
  584. /// 将 Option<String> 转为 mysql::Value
  585. /// ---------------------------
  586. fn opt_str_to_value(opt: &Option<String>) -> Value {
  587.     match opt {
  588.         Some(s) => Value::from(s.as_str()),
  589.         None => Value::NULL,
  590.     }
  591. }
复制代码
这个代码的作用是将逐日数据首先进行简单的判断,数据是否合理,太过于离谱的数据会被筛选掉,另外这个温度应该是华氏度,为了便于理解,我也把代码写上了详细的注释,我把数据库导出为sql,需要的朋友可以自行探索一下
Cargo.toml
  1. [package]  
  2. name = "NCEI_day"  
  3. version = "0.1.0"  
  4. edition = "2021"  
  5.   
  6. [dependencies]  
  7. csv = "1.3.1"  
  8. walkdir = "2.5.0"  
  9. indicatif = "0.17.11"  
  10. mysql = "23.0.1"  
  11. chrono = "0.4.39"
复制代码


百度网盘:17NobkMbMoKPp2pv0jdUdeA?pwd=ca7c
第一次发帖,出现问题还望多多海涵!!!


密码修改失败请联系微信:mofangbao

新浪微博达人勋

发表于 2025-2-7 18:46:15 | 显示全部楼层
本帖最后由 Lancelot 于 2025-2-7 18:49 编辑

想法是好的
但最大的问题是这里没人懂Rust,甚至极少有人会用数据库+SQL
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

 楼主| 发表于 2025-2-7 23:45:06 | 显示全部楼层
Lancelot 发表于 2025-2-7 18:46
想法是好的
但最大的问题是这里没人懂Rust,甚至极少有人会用数据库+SQL

最近研究Rust的数据处理性能,刚好可以用在气象数据处理上,可能还是我理解的太浅显了
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

发表于 2025-2-8 08:46:33 | 显示全部楼层
解析csv、构造SQL语句等等各方面几乎都比python麻烦,有条件的话试试sqlalchemy配合pandas,看看效率上和rust相比如何
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

发表于 2025-2-8 13:12:16 | 显示全部楼层
Hellohistory 发表于 2025-2-7 23:45
最近研究Rust的数据处理性能,刚好可以用在气象数据处理上,可能还是我理解的太浅显了

虽然都说Rust性能高,但对于非业务人员来说更看重灵活性
当然我没研究过Rust性能到底在哪里好(
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

 楼主| 发表于 2025-2-8 19:09:46 来自手机 | 显示全部楼层
墨家大宝 发表于 2025-2-8 08:46
解析csv、构造SQL语句等等各方面几乎都比python麻烦,有条件的话试试sqlalchemy配合pandas,看看效率上和ru ...

这个的性能瓶颈主要是io和移除异常值,Python消耗的内存比编译型语言多多了,io差不多的
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

 楼主| 发表于 2025-2-8 19:12:02 来自手机 | 显示全部楼层
Lancelot 发表于 2025-2-8 13:12
虽然都说Rust性能高,但对于非业务人员来说更看重灵活性
当然我没研究过Rust性能到底在哪里好(

rust主要是媲美c++的性能和媲美Python的库调用,我觉得主要是语法上的复杂性和所有权的难度
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

新浪微博达人勋

发表于 2025-2-9 10:29:19 | 显示全部楼层
Hellohistory 发表于 2025-2-8 19:09
这个的性能瓶颈主要是io和移除异常值,Python消耗的内存比编译型语言多多了,io差不多的

且不说python调用的包底层可能就是C、Fortran、rust实现的,单看python程序运行时多消耗的内存是否值得自己从顶层用rust写一遍来改善,换句话说你的精力和内存哪个更有价值。

当然这是建立在目标为完成工作的前提上,如果目标是发展自己的编程兴趣和能力,重新造轮子还是非常有意思的。
密码修改失败请联系微信:mofangbao
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册 新浪微博登陆

本版积分规则

Copyright ©2011-2014 bbs.06climate.com All Rights Reserved.  Powered by Discuz! (京ICP-10201084)

本站信息均由会员发表,不代表气象家园立场,禁止在本站发表与国家法律相抵触言论

快速回复 返回顶部 返回列表