From 6a130458c55de738ecced00989c74cc7125e17fe Mon Sep 17 00:00:00 2001 From: jan Date: Sat, 1 Oct 2016 19:38:00 +0200 Subject: jetzt warten wir auch diff --git a/src/main.rs b/src/main.rs index cb8b02d..8a3787b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,9 @@ use std::fs::File; use std::env; use std::path::Path; use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; +use std::time; mod pre_process; mod section; @@ -31,44 +33,59 @@ fn main() { let raw_files = env::var("RAW_FILES").unwrap_or("characters".into()); let out_files = env::var("OUT").unwrap_or("json".into()); let base_path = Path::new(&raw_files); - let out_path = Path::new(&out_files); + let active_threads = Arc::new(AtomicUsize::new(0)); let files: Arc>> = Arc::new(Mutex::new(WalkDir::new(base_path).min_depth(1).into_iter().filter_map(|e| e.ok()).collect())); - for 1..MAX_THREADS { - - std::thread(move || { - let entry: Option = None; - { - entry = files.lock().unwrap().pop(); - - if entry.is_none() { - println!("thread finished"); - return; + for i in 1..MAX_THREADS { + let files = files.clone(); + let out_files = out_files.clone(); + active_threads.fetch_add(1, Ordering::SeqCst); + let active_threads = active_threads.clone(); + thread::spawn(move || { + let out_path = Path::new(&out_files); + loop { + let entry: Option; + { + entry = files.lock().unwrap().pop(); + + if entry.is_none() { + break; + } + } + let entry = entry.unwrap(); + + let mut f = File::open(entry.path()).expect("could not open file"); + let mut buf = String::new(); + if let Err(_) = f.read_to_string(&mut buf) { + println!("invalid file: {}", entry.path().to_str().unwrap()); + continue; } - } - let entry = entry.unwrap(); - - let mut f = File::open(entry.path()).expect("could not open file"); - let mut buf = String::new(); - if let Err(_) = f.read_to_string(&mut buf) { - println!("invalid file: {}", entry.path().to_str().unwrap()); - continue; - } - let buf = pre_process::strip_irrelevant_content(&buf); + let buf = pre_process::strip_irrelevant_content(&buf); - let mut char = Character::new(); - char.parse(&buf); + let mut char = Character::new(); + char.parse(&buf); - let json = serde_json::to_string(&char).unwrap(); + let json = serde_json::to_string(&char).unwrap(); - let out_file = out_path.join(entry.file_name().to_str().unwrap().replace("html", "json")); - let mut o = File::create(&out_file).unwrap(); - o.write_all(json.as_bytes()).unwrap(); - println!("{:?}", out_file); + let out_file = out_path.join(entry.file_name().to_str().unwrap().replace("html", "json")); + let mut o = File::create(&out_file).unwrap(); + o.write_all(json.as_bytes()).unwrap(); + println!("{:?}", out_file); + } + println!("thread {} finished", i); + active_threads.fetch_sub(1, Ordering::SeqCst); }); } + + loop { + if active_threads.load(Ordering::SeqCst) == 0 { + println!("all threads finished."); + break; + } + thread::sleep(time::Duration::from_millis(100)); + } } -- cgit v0.10.1