Skip to content

Commit

Permalink
初步完成配置中心、服务中心内容与命名空间的联动:
Browse files Browse the repository at this point in the history
1. 当通过api设置一个新配置,其配置命名空间不存在时;会自动创建一个命名值,以方便控制台管理;
2. 当通过api注册一个服务实例,其服务命名空间不存在时;会自动创建一个命名值,以方便控制台管理;
#126
  • Loading branch information
heqingpan committed Oct 11, 2024
1 parent f5a787a commit a38ab5b
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 13 deletions.
37 changes: 35 additions & 2 deletions src/config/config_index.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::common::string_utils::StringUtils;
use crate::config::core::ConfigKey;
use crate::namespace::model::{NamespaceActorReq, WeakNamespaceFromType, WeakNamespaceParam};
use crate::namespace::NamespaceActor;
use actix::Addr;
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};

use crate::common::string_utils::StringUtils;
use crate::config::core::ConfigKey;
const SYSCONFIG_NAMESPACE: &str = "__INNER_SYSTEM__";

#[derive(Debug, Clone, Default)]
pub struct ConfigQueryParam {
Expand Down Expand Up @@ -118,6 +122,7 @@ impl ConfigIndex {
pub struct TenantIndex {
pub tenant_group: BTreeMap<Arc<String>, ConfigIndex>,
pub size: usize,
pub(crate) namespace_actor: Option<Addr<NamespaceActor>>,
}

impl TenantIndex {
Expand Down Expand Up @@ -151,6 +156,13 @@ impl TenantIndex {
self.size += 1;
result = true;
}
self.notify_namespace_change(
WeakNamespaceParam {
namespace_id: tenant.clone(),
from_type: WeakNamespaceFromType::Config,
},
false,
);
self.tenant_group.insert(tenant, config_index);
}
result
Expand All @@ -170,12 +182,33 @@ impl TenantIndex {
result = true;
}
if group_size == 0 {
self.notify_namespace_change(
WeakNamespaceParam {
namespace_id: tenant.clone(),
from_type: WeakNamespaceFromType::Config,
},
true,
);
self.tenant_group.remove(tenant);
}
}
result
}

fn notify_namespace_change(&self, param: WeakNamespaceParam, is_remove: bool) {
if SYSCONFIG_NAMESPACE == param.namespace_id.as_str() {
//历史系统命名空间跳过
return;
}
if let Some(act) = &self.namespace_actor {
if is_remove {
act.do_send(NamespaceActorReq::RemoveWeak(param));
} else {
act.do_send(NamespaceActorReq::SetWeak(param));
}
}
}

pub fn query_config_page(&self, param: &ConfigQueryParam) -> (usize, Vec<ConfigKey>) {
let mut rlist = vec![];
let mut size = 0;
Expand Down
5 changes: 5 additions & 0 deletions src/config/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::config::model::{
ConfigRaftCmd, ConfigRaftResult, ConfigValueDO, HistoryItem, SetConfigParam,
};
use crate::config::utils::param_utils;
use crate::namespace::NamespaceActor;
use crate::now_millis_i64;
use crate::raft::filestore::model::SnapshotRecordDto;
use crate::raft::filestore::raftsnapshot::{SnapshotWriterActor, SnapshotWriterRequest};
Expand Down Expand Up @@ -397,6 +398,7 @@ pub struct ConfigActor {
pub(crate) subscriber: Subscriber,
pub(crate) tenant_index: TenantIndex,
raft: Option<Weak<NacosRaft>>,
namespace_actor: Option<Addr<NamespaceActor>>,
sequence: SimpleSequence,
}

Expand All @@ -411,6 +413,8 @@ impl Inject for ConfigActor {
) {
let raft: Option<Arc<NacosRaft>> = factory_data.get_bean();
self.raft = raft.map(|e| Arc::downgrade(&e));
self.namespace_actor = factory_data.get_actor();
self.tenant_index.namespace_actor = self.namespace_actor.clone();
if let Some(conn_manage) = factory_data.get_actor() {
self.subscriber.set_conn_manage(conn_manage);
}
Expand All @@ -432,6 +436,7 @@ impl ConfigActor {
listener: ConfigListener::new(),
tenant_index: TenantIndex::new(),
raft: None,
namespace_actor: None,
sequence: SimpleSequence::new(0, 100),
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
init_env();
let rust_log = std::env::var("RUST_LOG").unwrap_or("info".to_owned());
println!("version:{}, RUST_LOG:{}", APP_VERSION, &rust_log);
unsafe {
std::env::set_var("RUST_LOG", &rust_log);
}
std::env::set_var("RUST_LOG", &rust_log);
let sys_config = Arc::new(AppSysConfig::init_from_env());
println!("data dir:{}", sys_config.local_db_dir);
let timezone_fmt = Arc::new(TimeZoneFormatEnv::new(
Expand Down
68 changes: 64 additions & 4 deletions src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use crate::common::string_utils::StringUtils;
use crate::config::core::ConfigActor;
use crate::console::NamespaceUtilsOld;
use crate::namespace::model::{
Namespace, NamespaceDO, NamespaceParam, NamespaceQueryReq, NamespaceQueryResult,
NamespaceRaftReq, NamespaceRaftResult,
Namespace, NamespaceActorReq, NamespaceActorResult, NamespaceDO, NamespaceParam,
NamespaceQueryReq, NamespaceQueryResult, NamespaceRaftReq, NamespaceRaftResult,
WeakNamespaceFromType, FROM_SYSTEM_VALUE, FROM_USER_VALUE,
};
use crate::naming::core::NamingActor;
use crate::raft::filestore::model::SnapshotRecordDto;
Expand Down Expand Up @@ -90,7 +91,7 @@ impl NamespaceActor {
NamespaceParam {
namespace_id: EMPTY_ARC_STRING.clone(),
namespace_name: Some(DEFAULT_NAMESPACE.to_owned()),
r#type: Some("0".to_owned()),
r#type: Some(FROM_SYSTEM_VALUE.to_owned()),
},
false,
false,
Expand Down Expand Up @@ -131,13 +132,56 @@ impl NamespaceActor {
Namespace {
namespace_id: param.namespace_id,
namespace_name: param.namespace_name.unwrap_or_default(),
r#type: param.r#type.unwrap_or("2".to_owned()),
r#type: param.r#type.unwrap_or(FROM_USER_VALUE.to_owned()),
}
};
self.data
.insert(value.namespace_id.clone(), Arc::new(value));
}

fn set_weak_namespace(&mut self, namespace_id: Arc<String>, from_type: WeakNamespaceFromType) {
if let Some(v) = self.data.get(&namespace_id) {
let new_type = from_type.get_merge_value(&v.r#type);
if new_type == &v.r#type {
//类型不变直接跨过
return;
}
let mut new_value = v.as_ref().to_owned();
new_value.r#type = new_type.to_owned();
self.data.insert(namespace_id, Arc::new(new_value));
} else {
self.id_order_list.push(namespace_id.clone());
let value = Namespace {
namespace_id: namespace_id.clone(),
namespace_name: namespace_id.as_str().to_owned(),
r#type: from_type.get_type_value().to_owned(),
};
self.data.insert(namespace_id.clone(), Arc::new(value));
}
}

fn remove_weak_namespace(
&mut self,
namespace_id: Arc<String>,
from_type: WeakNamespaceFromType,
) {
if let Some(v) = self.data.get(&namespace_id) {
if let Some(new_type) = from_type.get_split_value(&v.r#type) {
//更新类型
if new_type == &v.r#type {
//类型不变直接跨过
return;
}
let mut new_value = v.as_ref().to_owned();
new_value.r#type = new_type.to_owned();
self.data.insert(namespace_id, Arc::new(new_value));
} else {
//删除
self.remove_id(&namespace_id);
}
}
}

fn remove_id(&mut self, id: &Arc<String>) {
for (i, item) in self.id_order_list.iter().enumerate() {
if id == item {
Expand Down Expand Up @@ -351,6 +395,22 @@ impl Handler<NamespaceRaftReq> for NamespaceActor {
}
}

impl Handler<NamespaceActorReq> for NamespaceActor {
type Result = anyhow::Result<NamespaceActorResult>;
fn handle(&mut self, msg: NamespaceActorReq, _ctx: &mut Self::Context) -> Self::Result {
match msg {
NamespaceActorReq::SetWeak(param) => {
self.set_weak_namespace(param.namespace_id, param.from_type);
Ok(NamespaceActorResult::None)
}
NamespaceActorReq::RemoveWeak(param) => {
self.remove_weak_namespace(param.namespace_id, param.from_type);
Ok(NamespaceActorResult::None)
}
}
}
}

impl Handler<NamespaceQueryReq> for NamespaceActor {
type Result = anyhow::Result<NamespaceQueryResult>;
fn handle(&mut self, msg: NamespaceQueryReq, _ctx: &mut Self::Context) -> Self::Result {
Expand Down
74 changes: 74 additions & 0 deletions src/namespace/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,68 @@ impl From<Namespace> for NamespaceDO {
}
}

pub(crate) const FROM_SYSTEM_VALUE: &'static str = "0";
pub(crate) const FROM_USER_VALUE: &'static str = "2";
pub(crate) const FROM_CONFIG_VALUE: &'static str = "3";
pub(crate) const FROM_NAMING_VALUE: &'static str = "4";
pub(crate) const FROM_CONFIG_OR_NAMING_VALUE: &'static str = "5";

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WeakNamespaceFromType {
Config,
Naming,
}

impl WeakNamespaceFromType {
pub fn get_type_value(&self) -> &str {
match self {
WeakNamespaceFromType::Config => FROM_CONFIG_VALUE,
WeakNamespaceFromType::Naming => FROM_NAMING_VALUE,
}
}

/// 获取合并弱引用类型后的值
pub fn get_merge_value<'a>(&self, v: &'a str) -> &'a str {
if v == FROM_SYSTEM_VALUE || v == FROM_USER_VALUE || self.get_type_value() == v {
v
} else {
FROM_CONFIG_OR_NAMING_VALUE
}
}

/// 获取分离弱引用类型后的值
pub fn get_split_value<'a>(&self, v: &'a str) -> Option<&'a str> {
if v == FROM_CONFIG_OR_NAMING_VALUE {
//合并值需要拆开
match self {
WeakNamespaceFromType::Config => Some(FROM_NAMING_VALUE),
WeakNamespaceFromType::Naming => Some(FROM_CONFIG_VALUE),
}
} else if v != self.get_type_value() {
//非合并值,且与当前类型值不相等,直接返回原值
Some(v)
} else {
//否则移除自身,返回空
None
}
}

/// 判断指定值是否已包含当前类型或已持久化,以确认是否要更新类型值;
pub fn contained_by_value(&self, v: &str) -> bool {
if v == FROM_SYSTEM_VALUE || v == FROM_USER_VALUE || v == FROM_CONFIG_OR_NAMING_VALUE {
true
} else {
self.get_type_value() == v
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WeakNamespaceParam {
pub namespace_id: Arc<String>,
pub from_type: WeakNamespaceFromType,
}

///
/// raft持久化后,向NamespaceActor发起的变更请求
///
Expand All @@ -82,6 +144,18 @@ pub enum NamespaceRaftResult {
None,
}

#[derive(Message, Clone, Debug)]
#[rtype(result = "anyhow::Result<NamespaceActorResult>")]
pub enum NamespaceActorReq {
SetWeak(WeakNamespaceParam),
RemoveWeak(WeakNamespaceParam),
}

#[derive(Clone, Debug)]
pub enum NamespaceActorResult {
None,
}

#[derive(Message, Clone, Debug, Serialize, Deserialize)]
#[rtype(result = "anyhow::Result<NamespaceQueryResult>")]
pub enum NamespaceQueryReq {
Expand Down
5 changes: 5 additions & 0 deletions src/naming/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use std::time::Duration;
use crate::common::constant::EMPTY_ARC_STRING;
use crate::metrics::metrics_key::MetricsKey;
use crate::metrics::model::{MetricsItem, MetricsQuery, MetricsRecord};
use crate::namespace::NamespaceActor;
use actix::prelude::*;

//#[derive(Default)]
Expand All @@ -75,6 +76,7 @@ pub struct NamingActor {
pub(crate) client_instance_set: HashMap<Arc<String>, HashSet<InstanceKey>>,
cluster_node_manage: Option<Addr<InnerNodeManage>>,
cluster_delay_notify: Option<Addr<ClusterInstanceDelayNotifyActor>>,
namespace_actor: Option<Addr<NamespaceActor>>,
current_range: Option<ProcessRange>,
//dal_addr: Addr<ServiceDalActor>,
}
Expand Down Expand Up @@ -104,6 +106,8 @@ impl Inject for NamingActor {
}
self.cluster_node_manage = factory_data.get_actor();
self.cluster_delay_notify = factory_data.get_actor();
self.namespace_actor = factory_data.get_actor();
self.namespace_index.namespace_actor = self.namespace_actor.clone();
log::info!("NamingActor inject complete");
}
}
Expand Down Expand Up @@ -132,6 +136,7 @@ impl NamingActor {
cluster_node_manage: None,
cluster_delay_notify: None,
current_range: None,
namespace_actor: None,
//dal_addr,
}
}
Expand Down
Loading

0 comments on commit a38ab5b

Please sign in to comment.