From 7c88c967062ffe8671d5bf62ba8c2213885f9b75 Mon Sep 17 00:00:00 2001
From: Ummon <greg.burri@gmail.com>
Date: Sat, 14 Dec 2019 13:01:31 +0100
Subject: [PATCH] Optimize day 07 part 2 with a thread pool

---
 Cargo.toml   |  3 ++-
 src/day07.rs | 57 +++++++++++++++++++++++++++++-----------------------
 2 files changed, 34 insertions(+), 26 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index c7438f7..c456a17 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -7,4 +7,5 @@ edition = "2018"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-itertools = "0.8"
\ No newline at end of file
+itertools = "0.8"
+threadpool = "1.7"
\ No newline at end of file
diff --git a/src/day07.rs b/src/day07.rs
index 0d00d1c..61a370a 100644
--- a/src/day07.rs
+++ b/src/day07.rs
@@ -1,7 +1,7 @@
 use super::intcode;
 use itertools::Itertools;
-use std::sync::mpsc::{ self, Sender, Receiver };
-use std::thread::{ self, JoinHandle };
+use std::sync::{ Arc, Barrier, mpsc::{ self, Sender, Receiver }, atomic::{ AtomicI64, Ordering } };
+use threadpool::ThreadPool;
 
 fn last_thruster_signal(code: &[i64], phase_setting: &[i64]) -> i64 {
     phase_setting.iter().fold(0, |last_output, input| intcode::execute_op_code(&code, &[*input, last_output])[0])
@@ -29,11 +29,11 @@ impl intcode::IO for Stage {
     // Send to the output channel.
     fn write(&mut self, value: i64) {
         self.last_produced_value = value;
-        self.output_channel.send(value);
+        self.output_channel.send(value).unwrap_or_default();
     }
 }
 
-fn last_thruster_signal_with_feedback_loop(code: &[i64], phase_setting: &[i64]) -> i64 {
+fn last_thruster_signal_with_feedback_loop(code: &[i64], phase_setting: &[i64], pool: &ThreadPool) -> i64 {
     let n = phase_setting.len();
 
     let mut senders = Vec::<Sender<i64>>::new();
@@ -41,8 +41,8 @@ fn last_thruster_signal_with_feedback_loop(code: &[i64], phase_setting: &[i64])
 
     for (i, (s, r)) in (0 .. n).map(|i| (i, mpsc::channel::<i64>())) {
         // Initial values.
-        s.send(phase_setting[i]);
-        if i == 0 { s.send(0); }
+        s.send(phase_setting[i]).unwrap_or_default();
+        if i == 0 { s.send(0).unwrap_or_default(); }
 
         senders.insert(if i == 0 { 0 } else { i - 1 }, s);
         receivers.push(r);
@@ -51,28 +51,33 @@ fn last_thruster_signal_with_feedback_loop(code: &[i64], phase_setting: &[i64])
     // Prepare each pair of received and sender for the each stages.
     let mut channels: Vec<(Receiver<i64>, Sender<i64>)> = receivers.drain(..).zip(senders.drain(..)).collect();
 
-    let mut join_handles: Vec<JoinHandle<i64>> =
-        channels
-            .drain(..)
-            .map(
-                |(receiver, sender)| {
-                    let code_copy = Vec::<i64>::from(code);
-                    thread::spawn(
-                        move || {
-                            let mut stage = Stage { input_channel: receiver, output_channel: sender, last_produced_value: 0 };
-                            intcode::execute_op_code_with_custom_io(&code_copy, &mut stage);
-                            stage.last_produced_value
-                        }
-                    )
+    let result = Arc::new(AtomicI64::new(0));
+    let barrier = Arc::new(Barrier::new(2));
+
+    for (i, (receiver, sender)) in channels.drain(..).enumerate() {
+        let code_copy = Vec::<i64>::from(code);
+        let barrier = barrier.clone();
+        let result = result.clone();
+
+        pool.execute(
+            move || {
+                let mut stage = Stage { input_channel: receiver, output_channel: sender, last_produced_value: 0 };
+                intcode::execute_op_code_with_custom_io(&code_copy, &mut stage);
+                if i == 4 {
+                    result.store(stage.last_produced_value, Ordering::Relaxed);
+                    barrier.wait();
                 }
-            )
-            .collect();
+            }
+        )
+    }
 
-    join_handles.pop().unwrap().join().unwrap()
+    barrier.wait();
+    result.load(Ordering::Relaxed)
 }
 
 pub fn find_largest_last_thruster_signal_with_feedback_loop(code: &[i64]) -> i64 {
-    (5i64 ..= 9i64).permutations(5).map(|phase_setting| last_thruster_signal_with_feedback_loop(&code, &phase_setting)).max().unwrap()
+    let pool = ThreadPool::new(5);
+    (5i64 ..= 9i64).permutations(5).map(|phase_setting| last_thruster_signal_with_feedback_loop(&code, &phase_setting, &pool)).max().unwrap()
 }
 
 #[cfg(test)]
@@ -104,13 +109,15 @@ mod tests {
     fn part2_sample_1() {
         let code = vec![3,26,1001,26,-4,26,3,27,1002,27,2,27,1,27,26,27,4,27,1001,28,-1,28,1005,28,6,99,0,0,5];
         let phase_setting = [9,8,7,6,5];
-        assert_eq!(last_thruster_signal_with_feedback_loop(&code, &phase_setting), 139_629_729);
+        let pool = ThreadPool::new(5);
+        assert_eq!(last_thruster_signal_with_feedback_loop(&code, &phase_setting, &pool), 139_629_729);
     }
 
     #[test]
     fn part2_sample_2() {
         let code = vec![3,52,1001,52,-5,52,3,53,1,52,56,54,1007,54,5,55,1005,55,26,1001,54,-5,54,1105,1,12,1,53,54,53,1008,54,0,55,1001,55,1,55,2,53,55,53,4,53,1001,56,-1,56,1005,56,6,99,0,0,0,0,10];
         let phase_setting = [9,7,8,5,6];
-        assert_eq!(last_thruster_signal_with_feedback_loop(&code, &phase_setting), 18_216);
+        let pool = ThreadPool::new(5);
+        assert_eq!(last_thruster_signal_with_feedback_loop(&code, &phase_setting, &pool), 18_216);
     }
 }
\ No newline at end of file
-- 
2.49.0