Send an email after a certain amount of error
[stakingWatchdog.git] / src / main.rs
1 /*
2 * API Reference: https://ethereum.github.io/beacon-APIs/
3 *
4 * TODO:
5 * - Check that some processes (nethermind, lightouse) do not exceed a percentage
6 * of total memory. For example 40 %.
7 */
8
9 use std::{
10 fmt,
11 net::UdpSocket,
12 sync::{Arc, Mutex},
13 thread,
14 time::{self, Duration},
15 };
16
17 use anyhow::Result;
18 use lettre::{
19 message::header::ContentType, transport::smtp::authentication::Credentials, Message,
20 SmtpTransport, Transport,
21 };
22 use serde::Deserialize;
23
24 use crate::config::Config;
25
26 mod config;
27
28 const FILE_CONF: &str = "config.ron";
29 const CHECK_PERIOD: Duration = Duration::from_secs(10); // 10 s.
30 const PING_TIMEOUT: Duration = Duration::from_secs(10); // 10 s.
31
32 const NUMBER_SUCCESSIVE_ERRORS_SEND_EMAIL: i32 = 2; // Send an email after 2 successive errors.
33
34 // Send an emergency email after 10 successive errors.
35 const NUMBER_SUCCESSIVE_ERRORS_SEND_EMEGENCY_EMAIL: i32 = 10;
36
37 const EMAIL_RESEND_PERIOD: Duration = Duration::from_secs(2 * 60 * 60); // 2 h.
38 const STATE_PRINT_PERIOD: Duration = Duration::from_secs(15 * 60); // 15 min.
39 const BASE_URI: &str = "http://localhost:5052/eth/v1/";
40
41 fn main() -> Result<()> {
42 println!("Staking Watchdog");
43
44 let config = Config::read(FILE_CONF)?;
45
46 println!(
47 "Configuration: {:?}",
48 Config {
49 smtp_password: "*****".to_string(),
50 ..config.clone()
51 }
52 );
53
54 let check_alive_error_mutex = start_check_alive_thread();
55
56 let mut time_last_email_send = time::Instant::now() - EMAIL_RESEND_PERIOD;
57 let mut time_last_state_printed = time::Instant::now() - STATE_PRINT_PERIOD;
58 let mut error_count = 0; // Number of successive errors.
59
60 loop {
61 let time_beginning_loop = time::Instant::now();
62
63 if let Err(error) = check_validators(&config.pub_keys)
64 .as_ref()
65 .and(check_alive_error_mutex.lock().unwrap().as_ref())
66 {
67 error_count += 1;
68 println!("Error {:?} (error_count={}): {}", error, error_count, error);
69 if error_count >= NUMBER_SUCCESSIVE_ERRORS_SEND_EMAIL
70 && time::Instant::now() - time_last_email_send >= EMAIL_RESEND_PERIOD
71 {
72 // Send e-mail.
73 println!("Sending email...");
74 match send_email(
75 "Watchdog: Staking error",
76 &format!("Error: {}", error),
77 &config.smtp_relay_address,
78 &config.smtp_login,
79 &config.smtp_password,
80 ) {
81 Err(email_error) => println!("Error sending email: {:?}", email_error),
82 _ => {
83 println!("Email successfully sent");
84 time_last_email_send = time::Instant::now();
85 }
86 }
87 } else if error_count == NUMBER_SUCCESSIVE_ERRORS_SEND_EMEGENCY_EMAIL {
88 // Send e-mail.
89 println!("Sending EMERGENCY email...");
90 match send_email(
91 "[EMERGENCY] Watchdog: Staking error",
92 &format!(
93 "[EMERGENCY] Error: {}\n\nNumber of error: {}",
94 error, error_count
95 ),
96 &config.smtp_relay_address,
97 &config.smtp_login,
98 &config.smtp_password,
99 ) {
100 Err(email_error) => println!("Error sending email: {:?}", email_error),
101 _ => {
102 println!("Email successfully sent");
103 time_last_email_send = time::Instant::now();
104 }
105 }
106 }
107 } else {
108 if error_count > 0 {
109 error_count = 0;
110 println!("End of erroneous state");
111 }
112
113 if time::Instant::now() - time_last_state_printed >= STATE_PRINT_PERIOD {
114 println!("No error detected");
115 time_last_state_printed = time::Instant::now();
116 }
117 }
118
119 let elapsed = time::Instant::now() - time_beginning_loop;
120
121 if elapsed < CHECK_PERIOD {
122 let to_wait = CHECK_PERIOD - elapsed;
123 thread::sleep(to_wait);
124 }
125 }
126 }
127
128 #[derive(Debug)]
129 enum CheckError {
130 HttpError(String),
131 NotSync,
132 InvalidSyncStatus,
133 NodeHavingIssues,
134 UnknownCodeFromHealthCheck(u16),
135 ReqwestError(reqwest::Error),
136 ValidatorError { pub_key: String, message: String },
137 ValidatorStatusError { pub_key: String, message: String },
138 CheckAlive,
139 }
140
141 impl fmt::Display for CheckError {
142 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
143 match self {
144 CheckError::HttpError(text) => {
145 write!(
146 f,
147 "Beacon node health check can't be reached (HTTP error): {}",
148 text
149 )
150 }
151 CheckError::NotSync => {
152 write!(
153 f,
154 "Beacon node health check is syncing (currently not sync)"
155 )
156 }
157 CheckError::InvalidSyncStatus => {
158 write!(f, "Beacon node health check returns an invalid status code")
159 }
160 CheckError::NodeHavingIssues => {
161 write!(
162 f,
163 "Beacon node health check is not initilized or having issue"
164 )
165 }
166 CheckError::UnknownCodeFromHealthCheck(code) => {
167 write!(
168 f,
169 "Beacon node health check returns an unknown code: {}",
170 code
171 )
172 }
173 CheckError::ReqwestError(error) => {
174 write!(f, "Error from reqwest: {}", error)
175 }
176 CheckError::ValidatorError { pub_key, message } => {
177 write!(f, "Validator '{}' returns an error: {}", pub_key, message)
178 }
179 CheckError::ValidatorStatusError { pub_key, message } => {
180 write!(
181 f,
182 "Validator '{}' returns a status error: {}",
183 pub_key, message
184 )
185 }
186 CheckError::CheckAlive => {
187 write!(f, "Check alive ping hasn't been received")
188 }
189 }
190 }
191 }
192
193 impl From<reqwest::Error> for CheckError {
194 fn from(value: reqwest::Error) -> Self {
195 CheckError::ReqwestError(value)
196 }
197 }
198
199 #[derive(Deserialize, Debug)]
200 struct JsonValidatorState {
201 data: JsonValidatorStateData,
202 }
203
204 #[derive(Deserialize, Debug)]
205 struct JsonValidatorStateData {
206 status: String,
207 }
208
209 #[derive(Deserialize, Debug)]
210 struct JsonError {
211 code: u16,
212 message: String,
213 }
214
215 fn check_validators(pub_keys: &[String]) -> std::result::Result<(), CheckError> {
216 let url = BASE_URI;
217 let client = reqwest::blocking::Client::new();
218
219 let request_health = client
220 .get(format!("{url}node/health"))
221 .header("accept", "application/json");
222 match request_health.send() {
223 Ok(resp) => {
224 // println!("{resp:?}"); // For debug.
225 match resp.status().as_u16() {
226 200 => (),
227 206 => return Err(CheckError::NotSync),
228 400 => return Err(CheckError::InvalidSyncStatus),
229 503 => return Err(CheckError::NodeHavingIssues),
230 code => return Err(CheckError::UnknownCodeFromHealthCheck(code)),
231 }
232 }
233 Err(error) => {
234 println!("{error:?}");
235 return Err(CheckError::HttpError(error.to_string()));
236 }
237 }
238
239 for pub_key in pub_keys {
240 let request = client
241 .get(format!("{url}beacon/states/head/validators/0x{pub_key}"))
242 .header("accept", "application/json");
243 match request.send() {
244 Ok(resp) => {
245 // println!("{resp:?}"); // For debug.
246 match resp.status().as_u16() {
247 200 => {
248 let json: JsonValidatorState = resp.json()?;
249 if json.data.status != "active_ongoing" {
250 return Err(CheckError::ValidatorStatusError {
251 pub_key: pub_key.clone(),
252 message: format!("Status: {}", json.data.status),
253 });
254 }
255 }
256 code => {
257 let json: JsonError = resp.json()?;
258 return Err(CheckError::ValidatorError {
259 pub_key: pub_key.clone(),
260 message: format!(
261 "Http error code: {}, message: {}",
262 code, json.message
263 ),
264 });
265 }
266 }
267 }
268 Err(error) => {
269 return Err(CheckError::ValidatorError {
270 pub_key: pub_key.clone(),
271 message: error.to_string(),
272 });
273 }
274 }
275 }
276
277 Ok(())
278 }
279
280 fn send_email(
281 title: &str,
282 body: &str,
283 smtp_relay_address: &str,
284 login: &str,
285 pass: &str,
286 ) -> Result<()> {
287 let email = Message::builder()
288 .message_id(None)
289 .from("Staking Watchdog <redmine@d-lan.net>".parse()?)
290 .to("Greg Burri <greg.burri@gmail.com>".parse()?)
291 .subject(title)
292 .header(ContentType::TEXT_PLAIN)
293 .body(body.to_string())?;
294
295 let creds = Credentials::new(login.to_string(), pass.to_string());
296
297 let mailer = SmtpTransport::relay(smtp_relay_address)?
298 .credentials(creds)
299 .build();
300
301 let response = mailer.send(&email)?;
302
303 println!("{:?}", response);
304
305 Ok(())
306 }
307
308 fn start_check_alive_thread() -> Arc<Mutex<std::result::Result<(), CheckError>>> {
309 let check_alive_error_mutex: Arc<Mutex<std::result::Result<(), CheckError>>> =
310 Arc::new(Mutex::new(Ok(())));
311 let check_alive_error_mutex_copy = check_alive_error_mutex.clone();
312
313 let _thread_check_alive_handle = thread::spawn(move || {
314 let socket = UdpSocket::bind("0.0.0.0:8739").unwrap();
315 socket.set_read_timeout(Some(PING_TIMEOUT)).unwrap();
316
317 let mut buffer = [0u8; 8];
318
319 loop {
320 match socket.recv_from(&mut buffer) {
321 Ok((size, src)) => {
322 let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
323 if size == 8 {
324 *check_alive_error = Ok(());
325 socket.send_to(&buffer, &src).unwrap();
326 } else {
327 *check_alive_error = Err(CheckError::CheckAlive);
328 }
329 }
330 Err(_error) => {
331 let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
332 *check_alive_error = Err(CheckError::CheckAlive);
333 }
334 }
335 }
336 });
337
338 check_alive_error_mutex_copy
339 }