X-Git-Url: http://git.euphorik.ch/?a=blobdiff_plain;f=src%2Fmain.rs;h=08039fb26eb442810a7783296bae277554d5cf70;hb=8ff916896124ea900ea2c6b66c87e6cc72ffe1eb;hp=fadc30d8f4f432f39e3c9aed6ffadb5a1e00e58d;hpb=bd25470e4d4c104ab661fe6212f56179d9a32b32;p=stakingWatchdog.git diff --git a/src/main.rs b/src/main.rs index fadc30d..08039fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,28 +2,30 @@ * API Reference: https://ethereum.github.io/beacon-APIs/ */ -#![cfg_attr(debug_assertions, allow(unused_variables, unused_imports, dead_code))] - use std::{ - fs, - net::{IpAddr, Ipv4Addr}, + fmt, + net::UdpSocket, + sync::{Arc, Mutex}, thread, time::{self, Duration}, }; -use anyhow::{Context, Result}; -use reqwest::StatusCode; +use anyhow::Result; +use lettre::{ + message::header::ContentType, transport::smtp::authentication::Credentials, Message, + SmtpTransport, Transport, +}; use serde::Deserialize; -use serde_json::{json, Value}; use crate::config::Config; mod config; -// mod error; const FILE_CONF: &str = "config.ron"; -const CHECK_PERIOD: Duration = Duration::from_secs(5); // 5s. -const EMAIL_RESEND_PERIOD: Duration = Duration::from_secs(12 * 60 * 60); // 12h. +const CHECK_PERIOD: Duration = Duration::from_secs(10); // 10 s. +const PING_TIMEOUT: Duration = Duration::from_secs(10); // 10 s. +const EMAIL_RESEND_PERIOD: Duration = Duration::from_secs(2 * 60 * 60); // 2 h. +const STATE_PRINT_PERIOD: Duration = Duration::from_secs(15 * 60); // 15 min. const BASE_URI: &str = "http://localhost:5052/eth/v1/"; fn main() -> Result<()> { @@ -31,20 +33,55 @@ fn main() -> Result<()> { let config = Config::read(FILE_CONF)?; - println!("Configuration: {:?}", config); + println!( + "Configuration: {:?}", + Config { + smtp_password: "*****".to_string(), + ..config.clone() + } + ); + + let check_alive_error_mutex = start_check_alive_thread(); let mut time_last_email_send = time::Instant::now() - EMAIL_RESEND_PERIOD; + let mut time_last_state_printed = time::Instant::now() - STATE_PRINT_PERIOD; + let mut error_state = false; loop { let time_beginning_loop = time::Instant::now(); - if let Err(error) = check_validators(&config.pub_keys) { + if let Err(error) = check_validators(&config.pub_keys) + .as_ref() + .and(check_alive_error_mutex.lock().unwrap().as_ref()) + { + error_state = true; println!("Error: {:?}", error); if time::Instant::now() - time_last_email_send >= EMAIL_RESEND_PERIOD { // Send e-mail. println!("Sending email..."); + match send_email( + "Watchdog: Staking error", + &format!("Error: {}", error), + &config.smtp_relay_address, + &config.smtp_login, + &config.smtp_password, + ) { + Err(email_error) => println!("Error sending email: {:?}", email_error), + _ => { + println!("Email successfully sent"); + time_last_email_send = time::Instant::now(); + } + } + } + } else { + if error_state { + error_state = false; + println!("End of erroneous state"); + } - time_last_email_send = time::Instant::now(); + if time::Instant::now() - time_last_state_printed >= STATE_PRINT_PERIOD { + println!("No error detected"); + time_last_state_printed = time::Instant::now(); } } @@ -67,6 +104,59 @@ enum CheckError { ReqwestError(reqwest::Error), ValidatorError { pub_key: String, message: String }, ValidatorStatusError { pub_key: String, message: String }, + CheckAlive, +} + +impl fmt::Display for CheckError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + CheckError::HttpError(text) => { + write!( + f, + "Beacon node health check can't be reached (HTTP error): {}", + text + ) + } + CheckError::NotSync => { + write!( + f, + "Beacon node health check is syncing (currently not sync)" + ) + } + CheckError::InvalidSyncStatus => { + write!(f, "Beacon node health check returns an invalid status code") + } + CheckError::NodeHavingIssues => { + write!( + f, + "Beacon node health check is not initilized or having issue" + ) + } + CheckError::UnknownCodeFromHealthCheck(code) => { + write!( + f, + "Beacon node health check returns an unknown code: {}", + code + ) + } + CheckError::ReqwestError(error) => { + write!(f, "Error from reqwest: {}", error) + } + CheckError::ValidatorError { pub_key, message } => { + write!(f, "Validator '{}' returns an error: {}", pub_key, message) + } + CheckError::ValidatorStatusError { pub_key, message } => { + write!( + f, + "Validator '{}' returns a status error: {}", + pub_key, message + ) + } + CheckError::CheckAlive => { + write!(f, "Check alive ping hasn't been received") + } + } + } } impl From for CheckError { @@ -100,7 +190,7 @@ fn check_validators(pub_keys: &[String]) -> std::result::Result<(), CheckError> .header("accept", "application/json"); match request_health.send() { Ok(resp) => { - println!("{resp:?}"); + // println!("{resp:?}"); // For debug. match resp.status().as_u16() { 200 => (), 206 => return Err(CheckError::NotSync), @@ -115,19 +205,16 @@ fn check_validators(pub_keys: &[String]) -> std::result::Result<(), CheckError> } } - return Err(CheckError::NotSync); - for pub_key in pub_keys { let request = client .get(format!("{url}beacon/states/head/validators/0x{pub_key}")) .header("accept", "application/json"); match request.send() { Ok(resp) => { - println!("{resp:?}"); + // println!("{resp:?}"); // For debug. match resp.status().as_u16() { 200 => { let json: JsonValidatorState = resp.json()?; - // println!("JSON:\n{:?}", json); // For Debug. if json.data.status != "active_ongoing" { return Err(CheckError::ValidatorStatusError { pub_key: pub_key.clone(), @@ -137,7 +224,6 @@ fn check_validators(pub_keys: &[String]) -> std::result::Result<(), CheckError> } code => { let json: JsonError = resp.json()?; - // println!("JSON:\n{:?}", json); // For Debug. return Err(CheckError::ValidatorError { pub_key: pub_key.clone(), message: format!( @@ -149,7 +235,6 @@ fn check_validators(pub_keys: &[String]) -> std::result::Result<(), CheckError> } } Err(error) => { - println!("{error:?}"); return Err(CheckError::ValidatorError { pub_key: pub_key.clone(), message: error.to_string(), @@ -158,28 +243,66 @@ fn check_validators(pub_keys: &[String]) -> std::result::Result<(), CheckError> } } - // match request_builder - // .header("Authorization", format!("Apikey {}", api_key)) - // .send() - // { - // Ok(resp) => { - // if resp.status().is_success() { - // let content = resp.text().unwrap(); - // Ok(serde_json::from_str(&content).unwrap()) - // } else { - // Err(Box::new(Error { - // message: format!("Request unsuccessful to {}: {:#?}", &url, resp), - // })) - // } - // } - // Err(error) => Err(Box::new(Error { - // message: format!("Error during request to {}: {:?}", &url, error), - // })), - // } - - // 1) Check health. - - // 2) Check each validators. + Ok(()) +} + +fn send_email( + title: &str, + body: &str, + smtp_relay_address: &str, + login: &str, + pass: &str, +) -> Result<()> { + let email = Message::builder() + .message_id(None) + .from("Staking Watchdog ".parse()?) + .to("Greg Burri ".parse()?) + .subject(title) + .header(ContentType::TEXT_PLAIN) + .body(body.to_string())?; + + let creds = Credentials::new(login.to_string(), pass.to_string()); + + let mailer = SmtpTransport::relay(smtp_relay_address)? + .credentials(creds) + .build(); + + let response = mailer.send(&email)?; + + println!("{:?}", response); Ok(()) } + +fn start_check_alive_thread() -> Arc>> { + let check_alive_error_mutex: Arc>> = + Arc::new(Mutex::new(Ok(()))); + let check_alive_error_mutex_copy = check_alive_error_mutex.clone(); + + let _thread_check_alive_handle = thread::spawn(move || { + let socket = UdpSocket::bind("0.0.0.0:8739").unwrap(); + socket.set_read_timeout(Some(PING_TIMEOUT)).unwrap(); + + let mut buffer = [0u8; 8]; + + loop { + match socket.recv_from(&mut buffer) { + Ok((size, src)) => { + let mut check_alive_error = check_alive_error_mutex.lock().unwrap(); + if size == 8 { + *check_alive_error = Ok(()); + socket.send_to(&buffer, &src).unwrap(); + } else { + *check_alive_error = Err(CheckError::CheckAlive); + } + } + Err(_error) => { + let mut check_alive_error = check_alive_error_mutex.lock().unwrap(); + *check_alive_error = Err(CheckError::CheckAlive); + } + } + } + }); + + check_alive_error_mutex_copy +}