Skip to content

Commit

Permalink
closes #131
Browse files Browse the repository at this point in the history
Signed-off-by: Valerian Saliou <[email protected]>
  • Loading branch information
valeriansaliou committed Apr 19, 2019
1 parent d96546b commit db71064
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 24 deletions.
6 changes: 5 additions & 1 deletion src/channel/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub enum ChannelCommandError {
NotFound,
QueryError,
InternalError,
ShuttingDown,
PolicyReject(&'static str),
InvalidFormat(&'static str),
InvalidMetaKey((String, String)),
Expand All @@ -49,6 +50,8 @@ pub struct ChannelCommandSearch;
pub struct ChannelCommandIngest;
pub struct ChannelCommandControl;

pub type ChannelCommandResponseArgs = (&'static str, Option<Vec<String>>);

type ChannelResult = Result<Vec<ChannelCommandResponse>, ChannelCommandError>;
type MetaPartsResult<'a> = Result<(&'a str, &'a str), (&'a str, &'a str)>;

Expand Down Expand Up @@ -95,6 +98,7 @@ impl ChannelCommandError {
ChannelCommandError::NotFound => String::from("not_found"),
ChannelCommandError::QueryError => String::from("query_error"),
ChannelCommandError::InternalError => String::from("internal_error"),
ChannelCommandError::ShuttingDown => String::from("shutting_down"),
ChannelCommandError::PolicyReject(reason) => format!("policy_reject({})", reason),
ChannelCommandError::InvalidFormat(format) => format!("invalid_format({})", format),
ChannelCommandError::InvalidMetaKey(ref data) => {
Expand All @@ -108,7 +112,7 @@ impl ChannelCommandError {
}

impl ChannelCommandResponse {
pub fn to_args(&self) -> (&'static str, Option<Vec<String>>) {
pub fn to_args(&self) -> ChannelCommandResponseArgs {
// Convert internal response to channel response arguments; this either gives 'RESPONSE' \
// or 'RESPONSE <value:1> <value:2> <..>' whether there are values or not.
match *self {
Expand Down
10 changes: 10 additions & 0 deletions src/channel/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// License: Mozilla Public License v2.0 (MPL v2.0)

use std::net::TcpListener;
use std::sync::RwLock;
use std::process;
use std::thread;

Expand All @@ -14,6 +15,10 @@ use crate::{APP_CONF, THREAD_NAME_CHANNEL_CLIENT};
pub struct ChannelListenBuilder;
pub struct ChannelListen;

lazy_static! {
pub static ref CHANNEL_AVAILABLE: RwLock<bool> = RwLock::new(true);
}

impl ChannelListenBuilder {
pub fn new() -> ChannelListen {
ChannelListen {}
Expand Down Expand Up @@ -55,4 +60,9 @@ impl ChannelListen {
}
}
}

pub fn teardown() {
// Channel cannot be used anymore
*CHANNEL_AVAILABLE.write().unwrap() = false;
}
}
54 changes: 32 additions & 22 deletions src/channel/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use std::time::Instant;

use super::command::{
ChannelCommandBase, ChannelCommandControl, ChannelCommandError, ChannelCommandIngest,
ChannelCommandResponse, ChannelCommandSearch, COMMANDS_MODE_CONTROL, COMMANDS_MODE_INGEST,
COMMANDS_MODE_SEARCH,
ChannelCommandResponse, ChannelCommandResponseArgs, ChannelCommandSearch,
COMMANDS_MODE_CONTROL, COMMANDS_MODE_INGEST, COMMANDS_MODE_SEARCH,
};
use super::listen::CHANNEL_AVAILABLE;
use super::statistics::{COMMANDS_TOTAL, COMMAND_LATENCY_BEST, COMMAND_LATENCY_WORST};
use crate::LINE_FEED;

Expand Down Expand Up @@ -47,26 +48,35 @@ impl ChannelMessage {

let mut result = ChannelMessageResult::Continue;

// Handle response arguments to issued command
let response_args_groups = match M::handle(&message) {
Ok(resp_groups) => resp_groups
.iter()
.map(|resp| match resp {
ChannelCommandResponse::Ok
| ChannelCommandResponse::Pong
| ChannelCommandResponse::Pending(_)
| ChannelCommandResponse::Result(_)
| ChannelCommandResponse::Event(_, _, _)
| ChannelCommandResponse::Void
| ChannelCommandResponse::Err(_) => resp.to_args(),
ChannelCommandResponse::Ended(_) => {
result = ChannelMessageResult::Close;
resp.to_args()
}
})
.collect(),
Err(reason) => vec![ChannelCommandResponse::Err(reason).to_args()],
};
// Process response for issued command
let response_args_groups: Vec<ChannelCommandResponseArgs>;

if *CHANNEL_AVAILABLE.read().unwrap() != true {
// Server going down, reject command
response_args_groups =
vec![ChannelCommandResponse::Err(ChannelCommandError::ShuttingDown).to_args()];
} else {
// Handle response arguments to issued command
response_args_groups = match M::handle(&message) {
Ok(resp_groups) => resp_groups
.iter()
.map(|resp| match resp {
ChannelCommandResponse::Ok
| ChannelCommandResponse::Pong
| ChannelCommandResponse::Pending(_)
| ChannelCommandResponse::Result(_)
| ChannelCommandResponse::Event(_, _, _)
| ChannelCommandResponse::Void
| ChannelCommandResponse::Err(_) => resp.to_args(),
ChannelCommandResponse::Ended(_) => {
result = ChannelMessageResult::Close;
resp.to_args()
}
})
.collect(),
Err(reason) => vec![ChannelCommandResponse::Err(reason).to_args()],
};
}

// Serve response messages on socket
for response_args in response_args_groups {
Expand Down
5 changes: 4 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use clap::{App, Arg};
use graceful::SignalGuard;
use log::LevelFilter;

use channel::listen::ChannelListenBuilder;
use channel::listen::{ChannelListenBuilder, ChannelListen};
use channel::statistics::ensure_states as ensure_states_channel_statistics;
use config::config::Config;
use config::logger::ConfigLogger;
Expand Down Expand Up @@ -167,6 +167,9 @@ fn main() {
signal_guard.at_exit(move |signal| {
info!("stopping gracefully (got signal: {})", signal);

// Teardown Sonic Channel
ChannelListen::teardown();

// Perform a FST consolidation (ensures all in-memory items are synced on-disk before \
// shutdown; otherwise we would lose all non-consolidated FST changes)
StoreFSTPool::consolidate(true);
Expand Down

0 comments on commit db71064

Please sign in to comment.