Skip to content

Commit

Permalink
Attach to cluster controller through Networking
Browse files Browse the repository at this point in the history
This PR includes:
- Attachment of PPM to cluster controller now use Networking
- PPM observes partition processor status through a buffered watch mechanism
- PPM can now send control messages (unused at this PR) to processor for future use.
- PPM collects state information from running processors. This will be used in a follow PR to response to controller requests about partitions.


Cluster controller grpc service is kept for external tooling integration (CLI, etc.)
  • Loading branch information
AhmedSoliman committed May 21, 2024
1 parent 85f4109 commit 03418f4
Show file tree
Hide file tree
Showing 16 changed files with 463 additions and 160 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/cluster-controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ options_schema = ["dep:schemars"]
[dependencies]
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-network = { workspace = true }
restate-node-protocol = { workspace = true }
restate-types = { workspace = true }

anyhow = { workspace = true }
Expand Down
103 changes: 97 additions & 6 deletions crates/cluster-controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,20 @@
// by the Apache License, Version 2.0.

use codederror::CodedError;
use restate_core::cancellation_watcher;
use futures::stream::BoxStream;
use futures::StreamExt;

use restate_network::Networking;
use restate_node_protocol::cluster_controller::{
Action, AttachRequest, AttachResponse, RunMode, RunPartition,
};
use restate_node_protocol::common::{KeyRange, RequestId};
use restate_types::partition_table::FixedPartitionTable;

use restate_core::network::{MessageRouterBuilder, NetworkSender};
use restate_core::{cancellation_watcher, task_center, Metadata, ShutdownError, TaskCenter};
use restate_node_protocol::MessageEnvelope;
use restate_types::{GenerationalNodeId, Version};

#[derive(Debug, thiserror::Error, CodedError)]
pub enum Error {
Expand All @@ -18,19 +31,97 @@ pub enum Error {
Error,
}

#[derive(Debug, Default)]
pub struct Service {}
pub struct Service {
metadata: Metadata,
networking: Networking,
incoming_messages: BoxStream<'static, MessageEnvelope<AttachRequest>>,
}

impl Service {
pub fn new(
metadata: Metadata,
networking: Networking,
router_builder: &mut MessageRouterBuilder,
) -> Self {
let incoming_messages = router_builder.subscribe_to_stream(10);
Service {
metadata,
networking,
incoming_messages,
}
}
}

// todo: Replace with proper handle
pub struct ClusterControllerHandle;

impl Service {
pub fn handle(&self) -> ClusterControllerHandle {
ClusterControllerHandle
}

pub async fn run(self) -> anyhow::Result<()> {
let _ = cancellation_watcher().await;
pub async fn run(mut self) -> anyhow::Result<()> {
// Make sure we have partition table before starting
let _ = self.metadata.wait_for_partition_table(Version::MIN).await?;

let mut shutdown = std::pin::pin!(cancellation_watcher());
let tc = task_center();
loop {
tokio::select! {
Some(message) = self.incoming_messages.next() => {
let (from, message) = message.split();
self.handle_attach_request(&tc, from, message)?;
}
_ = &mut shutdown => {
return Ok(());
}
}
}
}

fn handle_attach_request(
&mut self,
tc: &TaskCenter,
from: GenerationalNodeId,
request: AttachRequest,
) -> Result<(), ShutdownError> {
let partition_table = self
.metadata
.partition_table()
.expect("partition table is loaded before run");
let networking = self.networking.clone();
let response = self.create_attachment_response(&partition_table, from, request.request_id);
tc.spawn(
restate_core::TaskKind::Disposable,
"attachment-response",
None,
async move { Ok(networking.send(from.into(), &response).await?) },
)?;
Ok(())
}

fn create_attachment_response(
&self,
partition_table: &FixedPartitionTable,
_node: GenerationalNodeId,
request_id: RequestId,
) -> AttachResponse {
// simulating a plan after initial attachement
let actions = partition_table
.partitioner()
.map(|(partition_id, key_range)| {
Action::RunPartition(RunPartition {
partition_id,
key_range_inclusive: KeyRange {
from: *key_range.start(),
to: *key_range.end(),
},
mode: RunMode::Leader,
})
})
.collect();
AttachResponse {
request_id,
actions,
}
}
}
43 changes: 42 additions & 1 deletion crates/core/src/worker_api/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,57 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_types::identifiers::PartitionId;
use tokio::sync::{mpsc, oneshot};

use restate_node_protocol::cluster_controller::RunMode;
use restate_types::identifiers::{LeaderEpoch, PartitionId};
use restate_types::logs::Lsn;
use restate_types::time::MillisSinceEpoch;
use restate_types::GenerationalNodeId;

use crate::ShutdownError;

#[derive(Debug)]
pub enum ProcessorsManagerCommand {
GetLivePartitions(oneshot::Sender<Vec<PartitionId>>),
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ReplayStatus {
Starting,
Active,
CatchingUp { target_tail_lsn: Lsn },
}

#[derive(Debug, Clone)]
pub struct PartitionProcessorStatus {
pub updated_at: MillisSinceEpoch,
pub planned_mode: RunMode,
pub effective_mode: Option<RunMode>,
pub last_observed_leader_epoch: Option<LeaderEpoch>,
pub last_observed_leader_node: Option<GenerationalNodeId>,
pub last_applied_log_lsn: Option<Lsn>,
pub last_record_applied_at: Option<MillisSinceEpoch>,
pub skipped_records: u64,
pub replay_status: ReplayStatus,
}

impl PartitionProcessorStatus {
pub fn new(planned_mode: RunMode) -> Self {
Self {
updated_at: MillisSinceEpoch::now(),
planned_mode,
effective_mode: None,
last_observed_leader_epoch: None,
last_observed_leader_node: None,
last_applied_log_lsn: None,
last_record_applied_at: None,
skipped_records: 0,
replay_status: ReplayStatus::Starting,
}
}
}

#[derive(Debug, Clone)]
pub struct ProcessorsManagerHandle(mpsc::Sender<ProcessorsManagerCommand>);

Expand Down
1 change: 1 addition & 0 deletions crates/node-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ flexbuffers = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
serde = { workspace = true}
serde_with = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
Expand Down
11 changes: 6 additions & 5 deletions crates/node-protocol/proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ message NodeId {
}

// A generic type for versioned metadata
message Version {
uint32 value = 1;
}
message Version { uint32 value = 1; }

// The target service for a message
// The handle name or type tag of the message. For every target there must be
// exactly one message handler implementation.
enum TargetName {
TargetName_UNKNOWN = 0;
METADATA_MANAGER = 1;
INGRESS = 2;
LOCAL_METADATA_STORE = 3;
LOCAL_METADATA_STORE_CLIENT = 4;
ATTACH_REQUEST = 5;
ATTACH_RESPONSE = 6;
PARTITION_PROCESSOR_MANAGER_REQUESTS = 7;
}

51 changes: 51 additions & 0 deletions crates/node-protocol/src/cluster_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_types::identifiers::PartitionId;
use serde::{Deserialize, Serialize};

use crate::common::{KeyRange, RequestId, TargetName};
use crate::define_rpc;

define_rpc! {
@request = AttachRequest,
@response = AttachResponse,
@request_target = TargetName::AttachRequest,
@response_target = TargetName::AttachResponse,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct AttachRequest {
pub request_id: RequestId,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AttachResponse {
pub request_id: RequestId,
pub actions: Vec<Action>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)]
pub enum RunMode {
Leader,
Follower,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Action {
RunPartition(RunPartition),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunPartition {
pub partition_id: PartitionId,
pub key_range_inclusive: KeyRange,
pub mode: RunMode,
}
15 changes: 15 additions & 0 deletions crates/node-protocol/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::ops::RangeInclusive;
use std::sync::atomic::AtomicUsize;

use restate_types::identifiers::PartitionKey;

include!(concat!(env!("OUT_DIR"), "/dev.restate.common.rs"));

pub static MIN_SUPPORTED_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::Flexbuffers;
Expand Down Expand Up @@ -105,6 +108,18 @@ impl From<restate_types::GenerationalNodeId> for NodeId {
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KeyRange {
pub from: PartitionKey,
pub to: PartitionKey,
}

impl From<KeyRange> for RangeInclusive<PartitionKey> {
fn from(val: KeyRange) -> Self {
RangeInclusive::new(val.from, val.to)
}
}

// write tests for RequestId
#[cfg(test)]
mod tests {
Expand Down
5 changes: 5 additions & 0 deletions crates/node-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod cluster_controller;
pub mod codec;
pub mod common;
mod error;
Expand Down Expand Up @@ -50,6 +51,10 @@ impl<M> MessageEnvelope<M> {
pub fn split(self) -> (GenerationalNodeId, M) {
(self.peer, self.body)
}

pub fn body(&self) -> &M {
&self.body
}
}

impl<M: RpcMessage> MessageEnvelope<M> {
Expand Down
10 changes: 3 additions & 7 deletions crates/node-services/proto/cluster_ctrl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@ import "common.proto";
package dev.restate.cluster_ctrl;

service ClusterCtrlSvc {
// Attach worker at cluster controller
rpc AttachNode(AttachmentRequest) returns (AttachmentResponse);

rpc GetClusterState(ClusterStateRequest) returns (ClusterStateResponse);
}

message AttachmentRequest {
optional dev.restate.common.NodeId node_id = 1;
}
message ClusterStateRequest {}

message AttachmentResponse {}
message ClusterStateResponse {}
3 changes: 3 additions & 0 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ impl Node {
let admin_role = if config.has_role(Role::Admin) {
Some(AdminRole::new(
updateable_config.clone(),
metadata.clone(),
networking.clone(),
metadata_manager.writer(),
&mut router_builder,
metadata_store_client.clone(),
)?)
} else {
Expand Down
Loading

0 comments on commit 03418f4

Please sign in to comment.