Skip to content

Commit

Permalink
WIP cluster controller api
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed May 10, 2024
1 parent a718fcd commit ce0677d
Show file tree
Hide file tree
Showing 15 changed files with 621 additions and 135 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/cluster-controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ options_schema = ["dep:schemars"]
[dependencies]
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-network = { workspace = true }
restate-node-protocol = { workspace = true }
restate-types = { workspace = true }

anyhow = { workspace = true }
Expand Down
111 changes: 106 additions & 5 deletions crates/cluster-controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,21 @@
// by the Apache License, Version 2.0.

use codederror::CodedError;
use restate_core::cancellation_watcher;
use futures::stream::BoxStream;
use futures::StreamExt;

use restate_network::Networking;
use restate_node_protocol::common::RequestId;
use restate_node_protocol::worker::{
Action, AttachmentResponse, KeyRange, RunMode, RunPartition, WorkerMessage,
};
use restate_types::partition_table::FixedPartitionTable;

use restate_core::network::NetworkSender;
use restate_core::{cancellation_watcher, task_center, Metadata, ShutdownError, TaskCenter};
use restate_node_protocol::cluster_controller::ClusterControllerMessage;
use restate_node_protocol::MessageEnvelope;
use restate_types::{GenerationalNodeId, Version};

#[derive(Debug, thiserror::Error, CodedError)]
pub enum Error {
Expand All @@ -18,8 +32,25 @@ pub enum Error {
Error,
}

#[derive(Debug, Default)]
pub struct Service {}
pub struct Service {
metadata: Metadata,
networking: Networking,
incoming_messages: BoxStream<'static, MessageEnvelope<ClusterControllerMessage>>,
}

impl Service {
pub fn new(
metadata: Metadata,
networking: Networking,
incoming_messages: BoxStream<'static, MessageEnvelope<ClusterControllerMessage>>,
) -> Self {
Service {
metadata,
networking,
incoming_messages,
}
}
}

// todo: Replace with proper handle
pub struct ClusterControllerHandle;
Expand All @@ -29,8 +60,78 @@ impl Service {
ClusterControllerHandle
}

pub async fn run(self) -> anyhow::Result<()> {
let _ = cancellation_watcher().await;
pub async fn run(mut self) -> anyhow::Result<()> {
// Make sure we have partition table before starting
let _ = self.metadata.wait_for_partition_table(Version::MIN).await?;

let mut shutdown = std::pin::pin!(cancellation_watcher());
let tc = task_center();
loop {
tokio::select! {
Some(message) = self.incoming_messages.next() => {
let (from, message) = message.split();
self.handle_network_message(&tc, from, message).await?;
}
_ = &mut shutdown => {
return Ok(());
}
}
}
}

async fn handle_network_message(
&mut self,
tc: &TaskCenter,
from: GenerationalNodeId,
msg: ClusterControllerMessage,
) -> Result<(), ShutdownError> {
match msg {
ClusterControllerMessage::Attach(details) => {
let partition_table = self
.metadata
.partition_table()
.expect("partition table is loaded before run");
let networking = self.networking.clone();
let response =
self.create_attachment_response(&partition_table, from, details.request_id);
tc.spawn(
restate_core::TaskKind::Disposable,
"attachment-response",
None,
async move {
Ok(networking
.send(from.into(), &WorkerMessage::AttachmentResponse(response))
.await?)
},
)?;
}
}
Ok(())
}

fn create_attachment_response(
&self,
partition_table: &FixedPartitionTable,
_node: GenerationalNodeId,
request_id: RequestId,
) -> AttachmentResponse {
// simulating a plan after initial attachement
let actions = partition_table
.partitioner()
.map(|(partition_id, key_range)| {
Action::RunPartition(RunPartition {
partition_id,
key_range_inclusive: KeyRange {
from: *key_range.start(),
to: *key_range.end(),
},
mode: RunMode::Leader,
})
})
.collect();
AttachmentResponse {
request_id,
actions,
}
}
}
42 changes: 41 additions & 1 deletion crates/core/src/worker_api/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_types::identifiers::PartitionId;
use restate_node_protocol::worker::RunMode;
use restate_types::identifiers::{LeaderEpoch, PartitionId};
use restate_types::logs::Lsn;
use restate_types::time::MillisSinceEpoch;
use restate_types::GenerationalNodeId;
use tokio::sync::{mpsc, oneshot};

use crate::ShutdownError;
Expand All @@ -18,6 +22,42 @@ pub enum ProcessorsManagerCommand {
GetLivePartitions(oneshot::Sender<Vec<PartitionId>>),
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ReplayStatus {
Starting,
Active,
CatchingUp { target_tail_lsn: Lsn },
}

#[derive(Debug, Clone)]
pub struct PartitionProcessorStatus {
pub updated_at: MillisSinceEpoch,
pub planned_mode: RunMode,
pub effective_mode: Option<RunMode>,
pub last_observed_leader_epoch: Option<LeaderEpoch>,
pub last_observed_leader_node: Option<GenerationalNodeId>,
pub last_applied_log_lsn: Option<Lsn>,
pub last_record_applied_at: Option<MillisSinceEpoch>,
pub skipped_records: u64,
pub replay_status: ReplayStatus,
}

impl PartitionProcessorStatus {
pub fn new(planned_mode: RunMode) -> Self {
Self {
updated_at: MillisSinceEpoch::now(),
planned_mode,
effective_mode: None,
last_observed_leader_epoch: None,
last_observed_leader_node: None,
last_applied_log_lsn: None,
last_record_applied_at: None,
skipped_records: 0,
replay_status: ReplayStatus::Starting,
}
}
}

#[derive(Debug, Clone)]
pub struct ProcessorsManagerHandle(mpsc::Sender<ProcessorsManagerCommand>);

Expand Down
2 changes: 2 additions & 0 deletions crates/node-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ flexbuffers = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
serde = { workspace = true}
serde_with = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
ulid = { workspace = true, features = ["serde"] }

[build-dependencies]
prost-build = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/node-protocol/proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ enum TargetName {
INGRESS = 2;
LOCAL_METADATA_STORE = 3;
LOCAL_METADATA_STORE_CLIENT = 4;
CLUSTER_CONTROLLER = 5;
WORKER = 6;
}

62 changes: 62 additions & 0 deletions crates/node-protocol/src/cluster_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use bytes::{Buf, BufMut};
use serde::{Deserialize, Serialize};

use crate::codec::{decode_default, encode_default, Targeted, WireDecode, WireEncode};
use crate::common::{ProtocolVersion, RequestId, TargetName};
use crate::CodecError;

#[derive(
Debug,
Clone,
Serialize,
Deserialize,
derive_more::From,
strum_macros::EnumIs,
strum_macros::IntoStaticStr,
)]
pub enum ClusterControllerMessage {
Attach(AttachementDetails),
}

impl Targeted for ClusterControllerMessage {
const TARGET: TargetName = TargetName::ClusterController;

fn kind(&self) -> &'static str {
self.into()
}
}

impl WireEncode for ClusterControllerMessage {
fn encode<B: BufMut>(
&self,
buf: &mut B,
protocol_version: ProtocolVersion,
) -> Result<(), CodecError> {
// serialize message into buf
encode_default(self, buf, protocol_version)
}
}

impl WireDecode for ClusterControllerMessage {
fn decode<B: Buf>(buf: &mut B, protocol_version: ProtocolVersion) -> Result<Self, CodecError>
where
Self: Sized,
{
decode_default(buf, protocol_version)
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AttachementDetails {
pub request_id: RequestId,
}
26 changes: 26 additions & 0 deletions crates/node-protocol/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,32 @@ include!(concat!(env!("OUT_DIR"), "/dev.restate.common.rs"));
pub static MIN_SUPPORTED_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::Flexbuffers;
pub static CURRENT_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::Flexbuffers;

#[derive(
Debug,
derive_more::Display,
PartialEq,
Eq,
Clone,
Copy,
Hash,
PartialOrd,
Ord,
serde::Serialize,
serde::Deserialize,
)]
pub struct RequestId(ulid::Ulid);
impl RequestId {
pub fn new() -> Self {
Default::default()
}
}

impl Default for RequestId {
fn default() -> Self {
RequestId(ulid::Ulid::new())
}
}

pub const FILE_DESCRIPTOR_SET: &[u8] =
include_bytes!(concat!(env!("OUT_DIR"), "/common_descriptor.bin"));

Expand Down
2 changes: 2 additions & 0 deletions crates/node-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod cluster_controller;
pub mod codec;
pub mod common;
mod error;
pub mod ingress;
pub mod metadata;
pub mod node;
pub mod worker;

// re-exports for convenience
pub use common::CURRENT_PROTOCOL_VERSION;
Expand Down
Loading

0 comments on commit ce0677d

Please sign in to comment.