Check the alive UDP socket
[stakingWatchdog.git] / src / main.rs
1 /*
2 * API Reference: https://ethereum.github.io/beacon-APIs/
3 */
4
5 use std::{
6 net::UdpSocket,
7 sync::{Arc, Mutex},
8 thread,
9 time::{self, Duration},
10 };
11
12 use anyhow::Result;
13 use lettre::{
14 message::header::ContentType, transport::smtp::authentication::Credentials, Message,
15 SmtpTransport, Transport,
16 };
17 use serde::Deserialize;
18
19 use crate::config::Config;
20
21 mod config;
22
23 const FILE_CONF: &str = "config.ron";
24 const CHECK_PERIOD: Duration = Duration::from_secs(10); // 10 s.
25 const PING_TIMEOUT: Duration = Duration::from_secs(10); // 10 s.
26 const EMAIL_RESEND_PERIOD: Duration = Duration::from_secs(2 * 60 * 60); // 2 h.
27 const STATE_PRINT_PERIOD: Duration = Duration::from_secs(15 * 60); // 15 min.
28 const BASE_URI: &str = "http://localhost:5052/eth/v1/";
29
30 fn main() -> Result<()> {
31 println!("Staking Watchdog");
32
33 let config = Config::read(FILE_CONF)?;
34
35 println!(
36 "Configuration: {:?}",
37 Config {
38 smtp_password: "*****".to_string(),
39 ..config.clone()
40 }
41 );
42
43 let check_alive_error_mutex = start_check_alive_thread();
44
45 let mut time_last_email_send = time::Instant::now() - EMAIL_RESEND_PERIOD;
46 let mut time_last_state_printed = time::Instant::now() - STATE_PRINT_PERIOD;
47 let mut error_state = false;
48
49 loop {
50 let time_beginning_loop = time::Instant::now();
51
52 if let Err(error) = check_validators(&config.pub_keys)
53 .as_ref()
54 .and(check_alive_error_mutex.lock().unwrap().as_ref())
55 {
56 error_state = true;
57 println!("Error: {:?}", error);
58 if time::Instant::now() - time_last_email_send >= EMAIL_RESEND_PERIOD {
59 // Send e-mail.
60 println!("Sending email...");
61 match send_email(
62 "Staking ERROR",
63 &format!("Error: {:?}", error),
64 &config.smtp_login,
65 &config.smtp_password,
66 ) {
67 Err(email_error) => println!("Error sending email: {:?}", email_error),
68 _ => {
69 println!("Email successfully sent");
70 time_last_email_send = time::Instant::now();
71 }
72 }
73 }
74 } else {
75 if error_state {
76 error_state = false;
77 println!("End of erroneous state");
78 }
79
80 if time::Instant::now() - time_last_state_printed >= STATE_PRINT_PERIOD {
81 println!("No error detected");
82 time_last_state_printed = time::Instant::now();
83 }
84 }
85
86 let elapsed = time::Instant::now() - time_beginning_loop;
87
88 if elapsed < CHECK_PERIOD {
89 let to_wait = CHECK_PERIOD - elapsed;
90 thread::sleep(to_wait);
91 }
92 }
93 }
94
95 #[derive(Debug)]
96 enum CheckError {
97 HttpError(String),
98 NotSync,
99 InvalidSyncStatus,
100 NodeHavingIssues,
101 UnknownCodeFromHealthCheck(u16),
102 ReqwestError(reqwest::Error),
103 ValidatorError { pub_key: String, message: String },
104 ValidatorStatusError { pub_key: String, message: String },
105 CheckAlive,
106 }
107
108 impl From<reqwest::Error> for CheckError {
109 fn from(value: reqwest::Error) -> Self {
110 CheckError::ReqwestError(value)
111 }
112 }
113
114 #[derive(Deserialize, Debug)]
115 struct JsonValidatorState {
116 data: JsonValidatorStateData,
117 }
118
119 #[derive(Deserialize, Debug)]
120 struct JsonValidatorStateData {
121 status: String,
122 }
123
124 #[derive(Deserialize, Debug)]
125 struct JsonError {
126 code: u16,
127 message: String,
128 }
129
130 fn check_validators(pub_keys: &[String]) -> std::result::Result<(), CheckError> {
131 let url = BASE_URI;
132 let client = reqwest::blocking::Client::new();
133
134 let request_health = client
135 .get(format!("{url}node/health"))
136 .header("accept", "application/json");
137 match request_health.send() {
138 Ok(resp) => {
139 // println!("{resp:?}"); // For debug.
140 match resp.status().as_u16() {
141 200 => (),
142 206 => return Err(CheckError::NotSync),
143 400 => return Err(CheckError::InvalidSyncStatus),
144 503 => return Err(CheckError::NodeHavingIssues),
145 code => return Err(CheckError::UnknownCodeFromHealthCheck(code)),
146 }
147 }
148 Err(error) => {
149 println!("{error:?}");
150 return Err(CheckError::HttpError(error.to_string()));
151 }
152 }
153
154 for pub_key in pub_keys {
155 let request = client
156 .get(format!("{url}beacon/states/head/validators/0x{pub_key}"))
157 .header("accept", "application/json");
158 match request.send() {
159 Ok(resp) => {
160 // println!("{resp:?}"); // For debug.
161 match resp.status().as_u16() {
162 200 => {
163 let json: JsonValidatorState = resp.json()?;
164 if json.data.status != "active_ongoing" {
165 return Err(CheckError::ValidatorStatusError {
166 pub_key: pub_key.clone(),
167 message: format!("Status: {}", json.data.status),
168 });
169 }
170 }
171 code => {
172 let json: JsonError = resp.json()?;
173 return Err(CheckError::ValidatorError {
174 pub_key: pub_key.clone(),
175 message: format!(
176 "Http error code: {}, message: {}",
177 code, json.message
178 ),
179 });
180 }
181 }
182 }
183 Err(error) => {
184 return Err(CheckError::ValidatorError {
185 pub_key: pub_key.clone(),
186 message: error.to_string(),
187 });
188 }
189 }
190 }
191
192 Ok(())
193 }
194
195 fn send_email(title: &str, body: &str, login: &str, pass: &str) -> Result<()> {
196 let email = Message::builder()
197 .message_id(None)
198 .from("Staking Watchdog <redmine@d-lan.net>".parse()?)
199 .to("Greg Burri <greg.burri@gmail.com>".parse()?)
200 .subject(title)
201 .header(ContentType::TEXT_PLAIN)
202 .body(body.to_string())?;
203
204 let creds = Credentials::new(login.to_string(), pass.to_string());
205
206 // Open a remote connection to gmail
207 let mailer = SmtpTransport::relay("mail.gandi.net")?
208 .credentials(creds)
209 .build();
210
211 // Send the email
212 let response = mailer.send(&email)?;
213
214 println!("{:?}", response);
215
216 Ok(())
217 }
218
219 fn start_check_alive_thread() -> Arc<Mutex<std::result::Result<(), CheckError>>> {
220 let check_alive_error_mutex: Arc<Mutex<std::result::Result<(), CheckError>>> =
221 Arc::new(Mutex::new(Ok(())));
222 let check_alive_error_mutex_copy = check_alive_error_mutex.clone();
223
224 let _thread_check_alive_handle = thread::spawn(move || {
225 let socket = UdpSocket::bind("0.0.0.0:8739").unwrap();
226 socket.set_read_timeout(Some(PING_TIMEOUT)).unwrap();
227
228 let mut buffer = [0u8; 8];
229
230 loop {
231 match socket.recv_from(&mut buffer) {
232 Ok((size, src)) => {
233 let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
234 if size == 8 {
235 *check_alive_error = Ok(());
236 socket.send_to(&buffer, &src).unwrap();
237 } else {
238 *check_alive_error = Err(CheckError::CheckAlive);
239 }
240 }
241 Err(_error) => {
242 let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
243 *check_alive_error = Err(CheckError::CheckAlive);
244 }
245 }
246 }
247 });
248
249 check_alive_error_mutex_copy
250 }