Move smtp relay address to the configuration
[stakingWatchdog.git] / src / main.rs
1 /*
2 * API Reference: https://ethereum.github.io/beacon-APIs/
3 */
4
5 use std::{
6 fmt,
7 net::UdpSocket,
8 sync::{Arc, Mutex},
9 thread,
10 time::{self, Duration},
11 };
12
13 use anyhow::Result;
14 use lettre::{
15 message::header::ContentType, transport::smtp::authentication::Credentials, Message,
16 SmtpTransport, Transport,
17 };
18 use serde::Deserialize;
19
20 use crate::config::Config;
21
22 mod config;
23
24 const FILE_CONF: &str = "config.ron";
25 const CHECK_PERIOD: Duration = Duration::from_secs(10); // 10 s.
26 const PING_TIMEOUT: Duration = Duration::from_secs(10); // 10 s.
27 const EMAIL_RESEND_PERIOD: Duration = Duration::from_secs(2 * 60 * 60); // 2 h.
28 const STATE_PRINT_PERIOD: Duration = Duration::from_secs(15 * 60); // 15 min.
29 const BASE_URI: &str = "http://localhost:5052/eth/v1/";
30
31 fn main() -> Result<()> {
32 println!("Staking Watchdog");
33
34 let config = Config::read(FILE_CONF)?;
35
36 println!(
37 "Configuration: {:?}",
38 Config {
39 smtp_password: "*****".to_string(),
40 ..config.clone()
41 }
42 );
43
44 let check_alive_error_mutex = start_check_alive_thread();
45
46 let mut time_last_email_send = time::Instant::now() - EMAIL_RESEND_PERIOD;
47 let mut time_last_state_printed = time::Instant::now() - STATE_PRINT_PERIOD;
48 let mut error_state = false;
49
50 loop {
51 let time_beginning_loop = time::Instant::now();
52
53 if let Err(error) = check_validators(&config.pub_keys)
54 .as_ref()
55 .and(check_alive_error_mutex.lock().unwrap().as_ref())
56 {
57 error_state = true;
58 println!("Error: {:?}", error);
59 if time::Instant::now() - time_last_email_send >= EMAIL_RESEND_PERIOD {
60 // Send e-mail.
61 println!("Sending email...");
62 match send_email(
63 "Watchdog: Staking error",
64 &format!("Error: {}", error),
65 &config.smtp_relay_address,
66 &config.smtp_login,
67 &config.smtp_password,
68 ) {
69 Err(email_error) => println!("Error sending email: {:?}", email_error),
70 _ => {
71 println!("Email successfully sent");
72 time_last_email_send = time::Instant::now();
73 }
74 }
75 }
76 } else {
77 if error_state {
78 error_state = false;
79 println!("End of erroneous state");
80 }
81
82 if time::Instant::now() - time_last_state_printed >= STATE_PRINT_PERIOD {
83 println!("No error detected");
84 time_last_state_printed = time::Instant::now();
85 }
86 }
87
88 let elapsed = time::Instant::now() - time_beginning_loop;
89
90 if elapsed < CHECK_PERIOD {
91 let to_wait = CHECK_PERIOD - elapsed;
92 thread::sleep(to_wait);
93 }
94 }
95 }
96
97 #[derive(Debug)]
98 enum CheckError {
99 HttpError(String),
100 NotSync,
101 InvalidSyncStatus,
102 NodeHavingIssues,
103 UnknownCodeFromHealthCheck(u16),
104 ReqwestError(reqwest::Error),
105 ValidatorError { pub_key: String, message: String },
106 ValidatorStatusError { pub_key: String, message: String },
107 CheckAlive,
108 }
109
110 impl fmt::Display for CheckError {
111 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
112 match self {
113 CheckError::HttpError(text) => {
114 write!(
115 f,
116 "Beacon node health check can't be reached (HTTP error): {}",
117 text
118 )
119 }
120 CheckError::NotSync => {
121 write!(
122 f,
123 "Beacon node health check is syncing (currently not sync)"
124 )
125 }
126 CheckError::InvalidSyncStatus => {
127 write!(f, "Beacon node health check returns an invalid status code")
128 }
129 CheckError::NodeHavingIssues => {
130 write!(
131 f,
132 "Beacon node health check is not initilized or having issue"
133 )
134 }
135 CheckError::UnknownCodeFromHealthCheck(code) => {
136 write!(
137 f,
138 "Beacon node health check returns an unknown code: {}",
139 code
140 )
141 }
142 CheckError::ReqwestError(error) => {
143 write!(f, "Error from reqwest: {}", error)
144 }
145 CheckError::ValidatorError { pub_key, message } => {
146 write!(f, "Validator '{}' returns an error: {}", pub_key, message)
147 }
148 CheckError::ValidatorStatusError { pub_key, message } => {
149 write!(
150 f,
151 "Validator '{}' returns a status error: {}",
152 pub_key, message
153 )
154 }
155 CheckError::CheckAlive => {
156 write!(f, "Check alive ping hasn't been received")
157 }
158 }
159 }
160 }
161
162 impl From<reqwest::Error> for CheckError {
163 fn from(value: reqwest::Error) -> Self {
164 CheckError::ReqwestError(value)
165 }
166 }
167
168 #[derive(Deserialize, Debug)]
169 struct JsonValidatorState {
170 data: JsonValidatorStateData,
171 }
172
173 #[derive(Deserialize, Debug)]
174 struct JsonValidatorStateData {
175 status: String,
176 }
177
178 #[derive(Deserialize, Debug)]
179 struct JsonError {
180 code: u16,
181 message: String,
182 }
183
184 fn check_validators(pub_keys: &[String]) -> std::result::Result<(), CheckError> {
185 let url = BASE_URI;
186 let client = reqwest::blocking::Client::new();
187
188 let request_health = client
189 .get(format!("{url}node/health"))
190 .header("accept", "application/json");
191 match request_health.send() {
192 Ok(resp) => {
193 // println!("{resp:?}"); // For debug.
194 match resp.status().as_u16() {
195 200 => (),
196 206 => return Err(CheckError::NotSync),
197 400 => return Err(CheckError::InvalidSyncStatus),
198 503 => return Err(CheckError::NodeHavingIssues),
199 code => return Err(CheckError::UnknownCodeFromHealthCheck(code)),
200 }
201 }
202 Err(error) => {
203 println!("{error:?}");
204 return Err(CheckError::HttpError(error.to_string()));
205 }
206 }
207
208 for pub_key in pub_keys {
209 let request = client
210 .get(format!("{url}beacon/states/head/validators/0x{pub_key}"))
211 .header("accept", "application/json");
212 match request.send() {
213 Ok(resp) => {
214 // println!("{resp:?}"); // For debug.
215 match resp.status().as_u16() {
216 200 => {
217 let json: JsonValidatorState = resp.json()?;
218 if json.data.status != "active_ongoing" {
219 return Err(CheckError::ValidatorStatusError {
220 pub_key: pub_key.clone(),
221 message: format!("Status: {}", json.data.status),
222 });
223 }
224 }
225 code => {
226 let json: JsonError = resp.json()?;
227 return Err(CheckError::ValidatorError {
228 pub_key: pub_key.clone(),
229 message: format!(
230 "Http error code: {}, message: {}",
231 code, json.message
232 ),
233 });
234 }
235 }
236 }
237 Err(error) => {
238 return Err(CheckError::ValidatorError {
239 pub_key: pub_key.clone(),
240 message: error.to_string(),
241 });
242 }
243 }
244 }
245
246 Ok(())
247 }
248
249 fn send_email(
250 title: &str,
251 body: &str,
252 smtp_relay_address: &str,
253 login: &str,
254 pass: &str,
255 ) -> Result<()> {
256 let email = Message::builder()
257 .message_id(None)
258 .from("Staking Watchdog <redmine@d-lan.net>".parse()?)
259 .to("Greg Burri <greg.burri@gmail.com>".parse()?)
260 .subject(title)
261 .header(ContentType::TEXT_PLAIN)
262 .body(body.to_string())?;
263
264 let creds = Credentials::new(login.to_string(), pass.to_string());
265
266 // Open a remote connection to gmail
267 let mailer = SmtpTransport::relay(smtp_relay_address)?
268 .credentials(creds)
269 .build();
270
271 // Send the email
272 let response = mailer.send(&email)?;
273
274 println!("{:?}", response);
275
276 Ok(())
277 }
278
279 fn start_check_alive_thread() -> Arc<Mutex<std::result::Result<(), CheckError>>> {
280 let check_alive_error_mutex: Arc<Mutex<std::result::Result<(), CheckError>>> =
281 Arc::new(Mutex::new(Ok(())));
282 let check_alive_error_mutex_copy = check_alive_error_mutex.clone();
283
284 let _thread_check_alive_handle = thread::spawn(move || {
285 let socket = UdpSocket::bind("0.0.0.0:8739").unwrap();
286 socket.set_read_timeout(Some(PING_TIMEOUT)).unwrap();
287
288 let mut buffer = [0u8; 8];
289
290 loop {
291 match socket.recv_from(&mut buffer) {
292 Ok((size, src)) => {
293 let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
294 if size == 8 {
295 *check_alive_error = Ok(());
296 socket.send_to(&buffer, &src).unwrap();
297 } else {
298 *check_alive_error = Err(CheckError::CheckAlive);
299 }
300 }
301 Err(_error) => {
302 let mut check_alive_error = check_alive_error_mutex.lock().unwrap();
303 *check_alive_error = Err(CheckError::CheckAlive);
304 }
305 }
306 }
307 });
308
309 check_alive_error_mutex_copy
310 }