60d072403a7b10f82d62987df69ab8b3052e1948
[stakingWatchdog.git] / src / main.rs
1 /*
2 * API Reference: https://ethereum.github.io/beacon-APIs/
3 */
4
5 use std::{
6 fmt,
7 net::UdpSocket,
8 sync::{Arc, Mutex},
9 thread,
10 time::{self, Duration},
11 };
12
13 use anyhow::Result;
14 use lettre::{
15 message::header::ContentType, transport::smtp::authentication::Credentials, Message,
16 SmtpTransport, Transport,
17 };
18 use serde::Deserialize;
19
20 use crate::config::Config;
21
22 mod config;
23
24 const FILE_CONF: &str = "config.ron";
25 const CHECK_PERIOD: Duration = Duration::from_secs(10); // 10 s.
26 const PING_TIMEOUT: Duration = Duration::from_secs(10); // 10 s.
27 const NUMBER_SUCCESSIVE_ERRORS_SEND_EMAIL: i32 = 2; // Send an email after 2 successive errors.
28 const EMAIL_RESEND_PERIOD: Duration = Duration::from_secs(2 * 60 * 60); // 2 h.
29 const STATE_PRINT_PERIOD: Duration = Duration::from_secs(15 * 60); // 15 min.
30 const BASE_URI: &str = "http://localhost:5052/eth/v1/";
31
32 fn main() -> Result<()> {
33 println!("Staking Watchdog");
34
35 let config = Config::read(FILE_CONF)?;
36
37 println!(
38 "Configuration: {:?}",
39 Config {
40 smtp_password: "*****".to_string(),
41 ..config.clone()
42 }
43 );
44
45 let check_alive_error_mutex = start_check_alive_thread();
46
47 let mut time_last_email_send = time::Instant::now() - EMAIL_RESEND_PERIOD;
48 let mut time_last_state_printed = time::Instant::now() - STATE_PRINT_PERIOD;
49 let mut error_count = 0; // Number of successive errors.
50
51 loop {
52 let time_beginning_loop = time::Instant::now();
53
54 if let Err(error) = check_validators(&config.pub_keys)
55 .as_ref()
56 .and(check_alive_error_mutex.lock().unwrap().as_ref())
57 {
58 error_count += 1;
59 println!("Error (error_count={}): {:?}", error_count, error);
60 if error_count >= NUMBER_SUCCESSIVE_ERRORS_SEND_EMAIL
61 && time::Instant::now() - time_last_email_send >= EMAIL_RESEND_PERIOD
62 {
63 // Send e-mail.
64 println!("Sending email...");
65 match send_email(
66 "Watchdog: Staking error",
67 &format!("Error: {}", error),
68 &config.smtp_relay_address,
69 &config.smtp_login,
70 &config.smtp_password,
71 ) {
72 Err(email_error) => println!("Error sending email: {:?}", email_error),
73 _ => {
74 println!("Email successfully sent");
75 time_last_email_send = time::Instant::now();
76 }
77 }
78 }
79 } else {
80 if error_count > 0 {
81 error_count = 0;
82 println!("End of erroneous state");
83 }
84
85 if time::Instant::now() - time_last_state_printed >= STATE_PRINT_PERIOD {
86 println!("No error detected");
87 time_last_state_printed = time::Instant::now();
88 }
89 }
90
91 let elapsed = time::Instant::now() - time_beginning_loop;
92
93 if elapsed < CHECK_PERIOD {
94 let to_wait = CHECK_PERIOD - elapsed;
95 thread::sleep(to_wait);
96 }
97 }
98 }
99
100 #[derive(Debug)]
101 enum CheckError {
102 HttpError(String),
103 NotSync,
104 InvalidSyncStatus,
105 NodeHavingIssues,
106 UnknownCodeFromHealthCheck(u16),
107 ReqwestError(reqwest::Error),
108 ValidatorError { pub_key: String, message: String },
109 ValidatorStatusError { pub_key: String, message: String },
110 CheckAlive,
111 }
112
113 impl fmt::Display for CheckError {
114 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
115 match self {
116 CheckError::HttpError(text) => {
117 write!(
118 f,
119 "Beacon node health check can't be reached (HTTP error): {}",
120 text
121 )
122 }
123 CheckError::NotSync => {
124 write!(
125 f,
126 "Beacon node health check is syncing (currently not sync)"
127 )
128 }
129 CheckError::InvalidSyncStatus => {
130 write!(f, "Beacon node health check returns an invalid status code")
131 }
132 CheckError::NodeHavingIssues => {
133 write!(
134 f,
135 "Beacon node health check is not initilized or having issue"
136 )
137 }
138 CheckError::UnknownCodeFromHealthCheck(code) => {
139 write!(
140 f,
141 "Beacon node health check returns an unknown code: {}",
142 code
143 )
144 }
145 CheckError::ReqwestError(error) => {
146 write!(f, "Error from reqwest: {}", error)
147 }
148 CheckError::ValidatorError { pub_key, message } => {
149 write!(f, "Validator '{}' returns an error: {}", pub_key, message)
150 }
151 CheckError::ValidatorStatusError { pub_key, message } => {
152 write!(
153 f,
154 "Validator '{}' returns a status error: {}",
155 pub_key, message
156 )
157 }
158 CheckError::CheckAlive => {
159 write!(f, "Check alive ping hasn't been received")
160 }
161 }
162 }
163 }
164
165 impl From<reqwest::Error> for CheckError {
166 fn from(value: reqwest::Error) -> Self {
167 CheckError::ReqwestError(value)
168 }
169 }
170
171 #[derive(Deserialize, Debug)]
172 struct JsonValidatorState {
173 data: JsonValidatorStateData,
174 }
175
176 #[derive(Deserialize, Debug)]
177 struct JsonValidatorStateData {
178 status: String,
179 }
180
181 #[derive(Deserialize, Debug)]
182 struct JsonError {
183 code: u16,
184 message: String,
185 }
186
187 fn check_validators(pub_keys: &[String]) -> std::result::Result<(), CheckError> {
188 let url = BASE_URI;
189 let client = reqwest::blocking::Client::new();
190
191 let request_health = client
192 .get(format!("{url}node/health"))
193 .header("accept", "application/json");
194 match request_health.send() {
195 Ok(resp) => {
196 // println!("{resp:?}"); // For debug.
197 match resp.status().as_u16() {
198 200 => (),
199 206 => return Err(CheckError::NotSync),
200 400 => return Err(CheckError::InvalidSyncStatus),
201 503 => return Err(CheckError::NodeHavingIssues),
202 code => return Err(CheckError::UnknownCodeFromHealthCheck(code)),
203 }
204 }
205 Err(error) => {
206 println!("{error:?}");
207 return Err(CheckError::HttpError(error.to_string()));
208 }
209 }
210
211 for pub_key in pub_keys {
212 let request = client
213 .get(format!("{url}beacon/states/head/validators/0x{pub_key}"))
214 .header("accept", "application/json");
215 match request.send() {
216 Ok(resp) => {
217 // println!("{resp:?}"); // For debug.
218 match resp.status().as_u16() {
219 200 => {
220 let json: JsonValidatorState = resp.json()?;
221 if json.data.status != "active_ongoing" {
222 return Err(CheckError::ValidatorStatusError {
223 pub_key: pub_key.clone(),
224 message: format!("Status: {}", json.data.status),
225 });
226 }
227 }
228 code => {
229 let json: JsonError = resp.json()?;
230 return Err(CheckError::ValidatorError {
231 pub_key: pub_key.clone(),
232 message: format!(
233 "Http error code: {}, message: {}",
234 code, json.message
235 ),
236 });
237 }
238 }
239 }
240 Err(error) => {
241 return Err(CheckError::ValidatorError {
242 pub_key: pub_key.clone(),
243 message: error.to_string(),
244 });
245 }
246 }
247 }
248
249 Ok(())
250 }
251
252 fn send_email(
253 title: &str,
254 body: &str,
255 smtp_relay_address: &str,
256 login: &str,
257 pass: &str,
258 ) -> Result<()> {
259 let email = Message::builder()
260 .message_id(None)
261 .from("Staking Watchdog <redmine@d-lan.net>".parse()?)
262 .to("Greg Burri <greg.burri@gmail.com>".parse()?)
263 .subject(title)
264 .header(ContentType::TEXT_PLAIN)
265 .body(body.to_string())?;
266
267 let creds = Credentials::new(login.to_string(), pass.to_string());
268
269 let mailer = SmtpTransport::relay(smtp_relay_address)?
270 .credentials(creds)
271 .build();
272
273 let response = mailer.send(&email)?;
274
275 println!("{:?}", response);
276
277 Ok(())
278 }
279
280 fn start_check_alive_thread() -> Arc<Mutex<std::result::Result<(), CheckError>>> {
281 let check_alive_error_mutex: Arc<Mutex<std::result::Result<(), CheckError>>> =
282 Arc::new(Mutex::new(Ok(())));
283 let check_alive_error_mutex_copy = check_alive_error_mutex.clone();
284
285 let _thread_check_alive_handle = thread::spawn(move || {
286 let socket = UdpSocket::bind("0.0.0.0:8739").unwrap();
287 socket.set_read_timeout(Some(PING_TIMEOUT)).unwrap();
288
289 let mut buffer = [0u8; 8];
290
291 loop {
292 match socket.recv_from(&mut buffer) {
293 Ok((size, src)) => {
294 let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
295 if size == 8 {
296 *check_alive_error = Ok(());
297 socket.send_to(&buffer, &src).unwrap();
298 } else {
299 *check_alive_error = Err(CheckError::CheckAlive);
300 }
301 }
302 Err(_error) => {
303 let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
304 *check_alive_error = Err(CheckError::CheckAlive);
305 }
306 }
307 }
308 });
309
310 check_alive_error_mutex_copy
311 }