43ecb4affcc874719394d0602d89d976375cface
[stakingWatchdog.git] / src / main.rs
1 /*
2 * API Reference: https://ethereum.github.io/beacon-APIs/
3 */
4
5 use std::{
6 fmt,
7 net::UdpSocket,
8 sync::{Arc, Mutex},
9 thread,
10 time::{self, Duration},
11 };
12
13 use anyhow::Result;
14 use lettre::{
15 message::header::ContentType, transport::smtp::authentication::Credentials, Message,
16 SmtpTransport, Transport,
17 };
18 use serde::Deserialize;
19
20 use crate::config::Config;
21
22 mod config;
23
24 const FILE_CONF: &str = "config.ron";
25 const CHECK_PERIOD: Duration = Duration::from_secs(10); // 10 s.
26 const PING_TIMEOUT: Duration = Duration::from_secs(10); // 10 s.
27 const EMAIL_RESEND_PERIOD: Duration = Duration::from_secs(2 * 60 * 60); // 2 h.
28 const STATE_PRINT_PERIOD: Duration = Duration::from_secs(15 * 60); // 15 min.
29 const BASE_URI: &str = "http://localhost:5052/eth/v1/";
30
31 fn main() -> Result<()> {
32 println!("Staking Watchdog");
33
34 let config = Config::read(FILE_CONF)?;
35
36 println!(
37 "Configuration: {:?}",
38 Config {
39 smtp_password: "*****".to_string(),
40 ..config.clone()
41 }
42 );
43
44 let check_alive_error_mutex = start_check_alive_thread();
45
46 let mut time_last_email_send = time::Instant::now() - EMAIL_RESEND_PERIOD;
47 let mut time_last_state_printed = time::Instant::now() - STATE_PRINT_PERIOD;
48 let mut error_state = false;
49
50 loop {
51 let time_beginning_loop = time::Instant::now();
52
53 if let Err(error) = check_validators(&config.pub_keys)
54 .as_ref()
55 .and(check_alive_error_mutex.lock().unwrap().as_ref())
56 {
57 error_state = true;
58 println!("Error: {:?}", error);
59 if time::Instant::now() - time_last_email_send >= EMAIL_RESEND_PERIOD {
60 // Send e-mail.
61 println!("Sending email...");
62 match send_email(
63 "Watchdog: Staking error",
64 &format!("Error: {}", error),
65 &config.smtp_login,
66 &config.smtp_password,
67 ) {
68 Err(email_error) => println!("Error sending email: {:?}", email_error),
69 _ => {
70 println!("Email successfully sent");
71 time_last_email_send = time::Instant::now();
72 }
73 }
74 }
75 } else {
76 if error_state {
77 error_state = false;
78 println!("End of erroneous state");
79 }
80
81 if time::Instant::now() - time_last_state_printed >= STATE_PRINT_PERIOD {
82 println!("No error detected");
83 time_last_state_printed = time::Instant::now();
84 }
85 }
86
87 let elapsed = time::Instant::now() - time_beginning_loop;
88
89 if elapsed < CHECK_PERIOD {
90 let to_wait = CHECK_PERIOD - elapsed;
91 thread::sleep(to_wait);
92 }
93 }
94 }
95
96 #[derive(Debug)]
97 enum CheckError {
98 HttpError(String),
99 NotSync,
100 InvalidSyncStatus,
101 NodeHavingIssues,
102 UnknownCodeFromHealthCheck(u16),
103 ReqwestError(reqwest::Error),
104 ValidatorError { pub_key: String, message: String },
105 ValidatorStatusError { pub_key: String, message: String },
106 CheckAlive,
107 }
108
109 impl fmt::Display for CheckError {
110 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
111 match self {
112 CheckError::HttpError(text) => {
113 write!(
114 f,
115 "Beacon node health check can't be reached (HTTP error): {}",
116 text
117 )
118 }
119 CheckError::NotSync => {
120 write!(
121 f,
122 "Beacon node health check is syncing (currently not sync)"
123 )
124 }
125 CheckError::InvalidSyncStatus => {
126 write!(f, "Beacon node health check returns an invalid status code")
127 }
128 CheckError::NodeHavingIssues => {
129 write!(
130 f,
131 "Beacon node health check is not initilized or having issue"
132 )
133 }
134 CheckError::UnknownCodeFromHealthCheck(code) => {
135 write!(
136 f,
137 "Beacon node health check returns an unknown code: {}",
138 code
139 )
140 }
141 CheckError::ReqwestError(error) => {
142 write!(f, "Error from reqwest: {}", error)
143 }
144 CheckError::ValidatorError { pub_key, message } => {
145 write!(f, "Validator '{}' returns an error: {}", pub_key, message)
146 }
147 CheckError::ValidatorStatusError { pub_key, message } => {
148 write!(
149 f,
150 "Validator '{}' returns a status error: {}",
151 pub_key, message
152 )
153 }
154 CheckError::CheckAlive => {
155 write!(f, "Check alive ping hasn't been received")
156 }
157 }
158 }
159 }
160
161 impl From<reqwest::Error> for CheckError {
162 fn from(value: reqwest::Error) -> Self {
163 CheckError::ReqwestError(value)
164 }
165 }
166
167 #[derive(Deserialize, Debug)]
168 struct JsonValidatorState {
169 data: JsonValidatorStateData,
170 }
171
172 #[derive(Deserialize, Debug)]
173 struct JsonValidatorStateData {
174 status: String,
175 }
176
177 #[derive(Deserialize, Debug)]
178 struct JsonError {
179 code: u16,
180 message: String,
181 }
182
183 fn check_validators(pub_keys: &[String]) -> std::result::Result<(), CheckError> {
184 let url = BASE_URI;
185 let client = reqwest::blocking::Client::new();
186
187 let request_health = client
188 .get(format!("{url}node/health"))
189 .header("accept", "application/json");
190 match request_health.send() {
191 Ok(resp) => {
192 // println!("{resp:?}"); // For debug.
193 match resp.status().as_u16() {
194 200 => (),
195 206 => return Err(CheckError::NotSync),
196 400 => return Err(CheckError::InvalidSyncStatus),
197 503 => return Err(CheckError::NodeHavingIssues),
198 code => return Err(CheckError::UnknownCodeFromHealthCheck(code)),
199 }
200 }
201 Err(error) => {
202 println!("{error:?}");
203 return Err(CheckError::HttpError(error.to_string()));
204 }
205 }
206
207 for pub_key in pub_keys {
208 let request = client
209 .get(format!("{url}beacon/states/head/validators/0x{pub_key}"))
210 .header("accept", "application/json");
211 match request.send() {
212 Ok(resp) => {
213 // println!("{resp:?}"); // For debug.
214 match resp.status().as_u16() {
215 200 => {
216 let json: JsonValidatorState = resp.json()?;
217 if json.data.status != "active_ongoing" {
218 return Err(CheckError::ValidatorStatusError {
219 pub_key: pub_key.clone(),
220 message: format!("Status: {}", json.data.status),
221 });
222 }
223 }
224 code => {
225 let json: JsonError = resp.json()?;
226 return Err(CheckError::ValidatorError {
227 pub_key: pub_key.clone(),
228 message: format!(
229 "Http error code: {}, message: {}",
230 code, json.message
231 ),
232 });
233 }
234 }
235 }
236 Err(error) => {
237 return Err(CheckError::ValidatorError {
238 pub_key: pub_key.clone(),
239 message: error.to_string(),
240 });
241 }
242 }
243 }
244
245 Ok(())
246 }
247
248 fn send_email(title: &str, body: &str, login: &str, pass: &str) -> Result<()> {
249 let email = Message::builder()
250 .message_id(None)
251 .from("Staking Watchdog <redmine@d-lan.net>".parse()?)
252 .to("Greg Burri <greg.burri@gmail.com>".parse()?)
253 .subject(title)
254 .header(ContentType::TEXT_PLAIN)
255 .body(body.to_string())?;
256
257 let creds = Credentials::new(login.to_string(), pass.to_string());
258
259 // Open a remote connection to gmail
260 let mailer = SmtpTransport::relay("mail.infomaniak.com")?
261 .credentials(creds)
262 .build();
263
264 // Send the email
265 let response = mailer.send(&email)?;
266
267 println!("{:?}", response);
268
269 Ok(())
270 }
271
272 fn start_check_alive_thread() -> Arc<Mutex<std::result::Result<(), CheckError>>> {
273 let check_alive_error_mutex: Arc<Mutex<std::result::Result<(), CheckError>>> =
274 Arc::new(Mutex::new(Ok(())));
275 let check_alive_error_mutex_copy = check_alive_error_mutex.clone();
276
277 let _thread_check_alive_handle = thread::spawn(move || {
278 let socket = UdpSocket::bind("0.0.0.0:8739").unwrap();
279 socket.set_read_timeout(Some(PING_TIMEOUT)).unwrap();
280
281 let mut buffer = [0u8; 8];
282
283 loop {
284 match socket.recv_from(&mut buffer) {
285 Ok((size, src)) => {
286 let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
287 if size == 8 {
288 *check_alive_error = Ok(());
289 socket.send_to(&buffer, &src).unwrap();
290 } else {
291 *check_alive_error = Err(CheckError::CheckAlive);
292 }
293 }
294 Err(_error) => {
295 let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
296 *check_alive_error = Err(CheckError::CheckAlive);
297 }
298 }
299 }
300 });
301
302 check_alive_error_mutex_copy
303 }