* API Reference: https://ethereum.github.io/beacon-APIs/
*/
-#![cfg_attr(debug_assertions, allow(unused_variables, unused_imports, dead_code))]
-
use std::{
- fs,
- net::{IpAddr, Ipv4Addr},
+ fmt,
+ net::UdpSocket,
+ sync::{Arc, Mutex},
thread,
time::{self, Duration},
};
-use anyhow::{Context, Result};
-use reqwest::StatusCode;
+use anyhow::Result;
+use lettre::{
+ message::header::ContentType, transport::smtp::authentication::Credentials, Message,
+ SmtpTransport, Transport,
+};
use serde::Deserialize;
-use serde_json::{json, Value};
use crate::config::Config;
mod config;
-// mod error;
const FILE_CONF: &str = "config.ron";
-const CHECK_PERIOD: Duration = Duration::from_secs(5); // 5s.
-const EMAIL_RESEND_PERIOD: Duration = Duration::from_secs(12 * 60 * 60); // 12h.
+const CHECK_PERIOD: Duration = Duration::from_secs(10); // 10 s.
+const PING_TIMEOUT: Duration = Duration::from_secs(10); // 10 s.
+const EMAIL_RESEND_PERIOD: Duration = Duration::from_secs(2 * 60 * 60); // 2 h.
+const STATE_PRINT_PERIOD: Duration = Duration::from_secs(15 * 60); // 15 min.
const BASE_URI: &str = "http://localhost:5052/eth/v1/";
fn main() -> Result<()> {
let config = Config::read(FILE_CONF)?;
- println!("Configuration: {:?}", config);
+ println!(
+ "Configuration: {:?}",
+ Config {
+ smtp_password: "*****".to_string(),
+ ..config.clone()
+ }
+ );
+
+ let check_alive_error_mutex = start_check_alive_thread();
let mut time_last_email_send = time::Instant::now() - EMAIL_RESEND_PERIOD;
+ let mut time_last_state_printed = time::Instant::now() - STATE_PRINT_PERIOD;
+ let mut error_state = false;
loop {
let time_beginning_loop = time::Instant::now();
- if let Err(error) = check_validators(&config.pub_keys) {
+ if let Err(error) = check_validators(&config.pub_keys)
+ .as_ref()
+ .and(check_alive_error_mutex.lock().unwrap().as_ref())
+ {
+ error_state = true;
println!("Error: {:?}", error);
if time::Instant::now() - time_last_email_send >= EMAIL_RESEND_PERIOD {
// Send e-mail.
println!("Sending email...");
+ match send_email(
+ "Watchdog: Staking error",
+ &format!("Error: {}", error),
+ &config.smtp_relay_address,
+ &config.smtp_login,
+ &config.smtp_password,
+ ) {
+ Err(email_error) => println!("Error sending email: {:?}", email_error),
+ _ => {
+ println!("Email successfully sent");
+ time_last_email_send = time::Instant::now();
+ }
+ }
+ }
+ } else {
+ if error_state {
+ error_state = false;
+ println!("End of erroneous state");
+ }
- time_last_email_send = time::Instant::now();
+ if time::Instant::now() - time_last_state_printed >= STATE_PRINT_PERIOD {
+ println!("No error detected");
+ time_last_state_printed = time::Instant::now();
}
}
ReqwestError(reqwest::Error),
ValidatorError { pub_key: String, message: String },
ValidatorStatusError { pub_key: String, message: String },
+ CheckAlive,
+}
+
+impl fmt::Display for CheckError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ CheckError::HttpError(text) => {
+ write!(
+ f,
+ "Beacon node health check can't be reached (HTTP error): {}",
+ text
+ )
+ }
+ CheckError::NotSync => {
+ write!(
+ f,
+ "Beacon node health check is syncing (currently not sync)"
+ )
+ }
+ CheckError::InvalidSyncStatus => {
+ write!(f, "Beacon node health check returns an invalid status code")
+ }
+ CheckError::NodeHavingIssues => {
+ write!(
+ f,
+ "Beacon node health check is not initilized or having issue"
+ )
+ }
+ CheckError::UnknownCodeFromHealthCheck(code) => {
+ write!(
+ f,
+ "Beacon node health check returns an unknown code: {}",
+ code
+ )
+ }
+ CheckError::ReqwestError(error) => {
+ write!(f, "Error from reqwest: {}", error)
+ }
+ CheckError::ValidatorError { pub_key, message } => {
+ write!(f, "Validator '{}' returns an error: {}", pub_key, message)
+ }
+ CheckError::ValidatorStatusError { pub_key, message } => {
+ write!(
+ f,
+ "Validator '{}' returns a status error: {}",
+ pub_key, message
+ )
+ }
+ CheckError::CheckAlive => {
+ write!(f, "Check alive ping hasn't been received")
+ }
+ }
+ }
}
impl From<reqwest::Error> for CheckError {
.header("accept", "application/json");
match request_health.send() {
Ok(resp) => {
- println!("{resp:?}");
+ // println!("{resp:?}"); // For debug.
match resp.status().as_u16() {
200 => (),
206 => return Err(CheckError::NotSync),
}
}
- return Err(CheckError::NotSync);
-
for pub_key in pub_keys {
let request = client
.get(format!("{url}beacon/states/head/validators/0x{pub_key}"))
.header("accept", "application/json");
match request.send() {
Ok(resp) => {
- println!("{resp:?}");
+ // println!("{resp:?}"); // For debug.
match resp.status().as_u16() {
200 => {
let json: JsonValidatorState = resp.json()?;
- // println!("JSON:\n{:?}", json); // For Debug.
if json.data.status != "active_ongoing" {
return Err(CheckError::ValidatorStatusError {
pub_key: pub_key.clone(),
}
code => {
let json: JsonError = resp.json()?;
- // println!("JSON:\n{:?}", json); // For Debug.
return Err(CheckError::ValidatorError {
pub_key: pub_key.clone(),
message: format!(
}
}
Err(error) => {
- println!("{error:?}");
return Err(CheckError::ValidatorError {
pub_key: pub_key.clone(),
message: error.to_string(),
}
}
- // match request_builder
- // .header("Authorization", format!("Apikey {}", api_key))
- // .send()
- // {
- // Ok(resp) => {
- // if resp.status().is_success() {
- // let content = resp.text().unwrap();
- // Ok(serde_json::from_str(&content).unwrap())
- // } else {
- // Err(Box::new(Error {
- // message: format!("Request unsuccessful to {}: {:#?}", &url, resp),
- // }))
- // }
- // }
- // Err(error) => Err(Box::new(Error {
- // message: format!("Error during request to {}: {:?}", &url, error),
- // })),
- // }
-
- // 1) Check health.
-
- // 2) Check each validators.
+ Ok(())
+}
+
+fn send_email(
+ title: &str,
+ body: &str,
+ smtp_relay_address: &str,
+ login: &str,
+ pass: &str,
+) -> Result<()> {
+ let email = Message::builder()
+ .message_id(None)
+ .from("Staking Watchdog <redmine@d-lan.net>".parse()?)
+ .to("Greg Burri <greg.burri@gmail.com>".parse()?)
+ .subject(title)
+ .header(ContentType::TEXT_PLAIN)
+ .body(body.to_string())?;
+
+ let creds = Credentials::new(login.to_string(), pass.to_string());
+
+ let mailer = SmtpTransport::relay(smtp_relay_address)?
+ .credentials(creds)
+ .build();
+
+ let response = mailer.send(&email)?;
+
+ println!("{:?}", response);
Ok(())
}
+
+fn start_check_alive_thread() -> Arc<Mutex<std::result::Result<(), CheckError>>> {
+ let check_alive_error_mutex: Arc<Mutex<std::result::Result<(), CheckError>>> =
+ Arc::new(Mutex::new(Ok(())));
+ let check_alive_error_mutex_copy = check_alive_error_mutex.clone();
+
+ let _thread_check_alive_handle = thread::spawn(move || {
+ let socket = UdpSocket::bind("0.0.0.0:8739").unwrap();
+ socket.set_read_timeout(Some(PING_TIMEOUT)).unwrap();
+
+ let mut buffer = [0u8; 8];
+
+ loop {
+ match socket.recv_from(&mut buffer) {
+ Ok((size, src)) => {
+ let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
+ if size == 8 {
+ *check_alive_error = Ok(());
+ socket.send_to(&buffer, &src).unwrap();
+ } else {
+ *check_alive_error = Err(CheckError::CheckAlive);
+ }
+ }
+ Err(_error) => {
+ let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
+ *check_alive_error = Err(CheckError::CheckAlive);
+ }
+ }
+ }
+ });
+
+ check_alive_error_mutex_copy
+}