2 * API Reference: https://ethereum.github.io/beacon-APIs/
10 time
::{self, Duration
},
15 message
::header
::ContentType
, transport
::smtp
::authentication
::Credentials
, Message
,
16 SmtpTransport
, Transport
,
18 use serde
::Deserialize
;
20 use crate::config
::Config
;
24 const FILE_CONF
: &str = "config.ron";
25 const CHECK_PERIOD
: Duration
= Duration
::from_secs(10); // 10 s.
26 const PING_TIMEOUT
: Duration
= Duration
::from_secs(10); // 10 s.
27 const NUMBER_SUCCESSIVE_ERRORS_SEND_EMAIL
: i32 = 2; // Send an email after 2 successive errors.
28 const EMAIL_RESEND_PERIOD
: Duration
= Duration
::from_secs(2 * 60 * 60); // 2 h.
29 const STATE_PRINT_PERIOD
: Duration
= Duration
::from_secs(15 * 60); // 15 min.
30 const BASE_URI
: &str = "http://localhost:5052/eth/v1/";
32 fn main() -> Result
<()> {
33 println!("Staking Watchdog");
35 let config
= Config
::read(FILE_CONF
)?
;
38 "Configuration: {:?}",
40 smtp_password
: "*****".to_string(),
45 let check_alive_error_mutex
= start_check_alive_thread();
47 let mut time_last_email_send
= time
::Instant
::now() - EMAIL_RESEND_PERIOD
;
48 let mut time_last_state_printed
= time
::Instant
::now() - STATE_PRINT_PERIOD
;
49 let mut error_count
= 0; // Number of successive errors.
52 let time_beginning_loop
= time
::Instant
::now();
54 if let Err(error
) = check_validators(&config
.pub_keys
)
56 .and(check_alive_error_mutex
.lock().unwrap().as_ref())
59 println!("Error (error_count={}): {:?}", error_count
, error
);
60 if error_count
>= NUMBER_SUCCESSIVE_ERRORS_SEND_EMAIL
61 && time
::Instant
::now() - time_last_email_send
>= EMAIL_RESEND_PERIOD
64 println!("Sending email...");
66 "Watchdog: Staking error",
67 &format!("Error: {}", error
),
68 &config
.smtp_relay_address
,
70 &config
.smtp_password
,
72 Err(email_error
) => println!("Error sending email: {:?}", email_error
),
74 println!("Email successfully sent");
75 time_last_email_send
= time
::Instant
::now();
82 println!("End of erroneous state");
85 if time
::Instant
::now() - time_last_state_printed
>= STATE_PRINT_PERIOD
{
86 println!("No error detected");
87 time_last_state_printed
= time
::Instant
::now();
91 let elapsed
= time
::Instant
::now() - time_beginning_loop
;
93 if elapsed
< CHECK_PERIOD
{
94 let to_wait
= CHECK_PERIOD
- elapsed
;
95 thread
::sleep(to_wait
);
106 UnknownCodeFromHealthCheck(u16),
107 ReqwestError(reqwest
::Error
),
108 ValidatorError
{ pub_key
: String
, message
: String
},
109 ValidatorStatusError
{ pub_key
: String
, message
: String
},
113 impl fmt
::Display
for CheckError
{
114 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
116 CheckError
::HttpError(text
) => {
119 "Beacon node health check can't be reached (HTTP error): {}",
123 CheckError
::NotSync
=> {
126 "Beacon node health check is syncing (currently not sync)"
129 CheckError
::InvalidSyncStatus
=> {
130 write!(f
, "Beacon node health check returns an invalid status code")
132 CheckError
::NodeHavingIssues
=> {
135 "Beacon node health check is not initilized or having issue"
138 CheckError
::UnknownCodeFromHealthCheck(code
) => {
141 "Beacon node health check returns an unknown code: {}",
145 CheckError
::ReqwestError(error
) => {
146 write!(f
, "Error from reqwest: {}", error
)
148 CheckError
::ValidatorError
{ pub_key
, message
} => {
149 write!(f
, "Validator '{}' returns an error: {}", pub_key
, message
)
151 CheckError
::ValidatorStatusError
{ pub_key
, message
} => {
154 "Validator '{}' returns a status error: {}",
158 CheckError
::CheckAlive
=> {
159 write!(f
, "Check alive ping hasn't been received")
165 impl From
<reqwest
::Error
> for CheckError
{
166 fn from(value
: reqwest
::Error
) -> Self {
167 CheckError
::ReqwestError(value
)
171 #[derive(Deserialize, Debug)]
172 struct JsonValidatorState
{
173 data
: JsonValidatorStateData
,
176 #[derive(Deserialize, Debug)]
177 struct JsonValidatorStateData
{
181 #[derive(Deserialize, Debug)]
187 fn check_validators(pub_keys
: &[String
]) -> std
::result
::Result
<(), CheckError
> {
189 let client
= reqwest
::blocking
::Client
::new();
191 let request_health
= client
192 .get(format!("{url}node/health"))
193 .header("accept", "application/json");
194 match request_health
.send() {
196 // println!("{resp:?}"); // For debug.
197 match resp
.status().as_u16() {
199 206 => return Err(CheckError
::NotSync
),
200 400 => return Err(CheckError
::InvalidSyncStatus
),
201 503 => return Err(CheckError
::NodeHavingIssues
),
202 code
=> return Err(CheckError
::UnknownCodeFromHealthCheck(code
)),
206 println!("{error:?}");
207 return Err(CheckError
::HttpError(error
.to_string()));
211 for pub_key
in pub_keys
{
213 .get(format!("{url}beacon/states/head/validators/0x{pub_key}"))
214 .header("accept", "application/json");
215 match request
.send() {
217 // println!("{resp:?}"); // For debug.
218 match resp
.status().as_u16() {
220 let json
: JsonValidatorState
= resp
.json()?
;
221 if json
.data
.status
!= "active_ongoing" {
222 return Err(CheckError
::ValidatorStatusError
{
223 pub_key
: pub_key
.clone(),
224 message
: format!("Status: {}", json
.data
.status
),
229 let json
: JsonError
= resp
.json()?
;
230 return Err(CheckError
::ValidatorError
{
231 pub_key
: pub_key
.clone(),
233 "Http error code: {}, message: {}",
241 return Err(CheckError
::ValidatorError
{
242 pub_key
: pub_key
.clone(),
243 message
: error
.to_string(),
255 smtp_relay_address
: &str,
259 let email
= Message
::builder()
261 .from("Staking Watchdog <redmine@d-lan.net>".parse()?
)
262 .to("Greg Burri <greg.burri@gmail.com>".parse()?
)
264 .header(ContentType
::TEXT_PLAIN
)
265 .body(body
.to_string())?
;
267 let creds
= Credentials
::new(login
.to_string(), pass
.to_string());
269 let mailer
= SmtpTransport
::relay(smtp_relay_address
)?
273 let response
= mailer
.send(&email
)?
;
275 println!("{:?}", response
);
280 fn start_check_alive_thread() -> Arc
<Mutex
<std
::result
::Result
<(), CheckError
>>> {
281 let check_alive_error_mutex
: Arc
<Mutex
<std
::result
::Result
<(), CheckError
>>> =
282 Arc
::new(Mutex
::new(Ok(())));
283 let check_alive_error_mutex_copy
= check_alive_error_mutex
.clone();
285 let _thread_check_alive_handle
= thread
::spawn(move || {
286 let socket
= UdpSocket
::bind("0.0.0.0:8739").unwrap();
287 socket
.set_read_timeout(Some(PING_TIMEOUT
)).unwrap();
289 let mut buffer
= [0u8; 8];
292 match socket
.recv_from(&mut buffer
) {
294 let mut check_alive_error
= check_alive_error_mutex
.lock().unwrap();
296 *check_alive_error
= Ok(());
297 socket
.send_to(&buffer
, &src
).unwrap();
299 *check_alive_error
= Err(CheckError
::CheckAlive
);
303 let mut check_alive_error
= check_alive_error_mutex
.lock().unwrap();
304 *check_alive_error
= Err(CheckError
::CheckAlive
);
310 check_alive_error_mutex_copy