Skip to content

Latest commit

 

History

History
508 lines (446 loc) · 18.2 KB

波卡链源码分析之二启动流程.md

File metadata and controls

508 lines (446 loc) · 18.2 KB

波卡链源码分析之二启动流程

一、启动命令端


RUST语言使用cargo创建工程有一个好的地方,库是lib.rs标记,主程序main.rs标记,所以相对别的语言明显好找启动点。波卡链的主程序在polkadot/polkadot/src/main.rs中,打开这个文件:
#![warn(missing_docs)] //忽略丢失文档

extern crate polkadot_cli as cli;

#[macro_use]
extern crate error_chain;

quick_main!(run);

fn run() -> cli::error::Result<()> {
	cli::run(::std::env::args())
}

代码很简单,不过风格似乎在哪里见过?是不是和以太坊等的启动代码有些类似,只是这个更单薄一些。可能大家对RUST不太了解,所以在分析波卡链时同时对RUST一些知识进行说明。
quick_main!(run);这个是一个宏,能够产生main主函数,它在error_chain这个库中,所以上面要导入这个库。它除了能够产生主函数,还可以调用其下的函数并返回相关的错误结果Result<()>,这里当然就是run这个函数了。#[macro_use]是属性设置,加上!是代表包含的自身属性设置,不加代表下一个项目的属性设置。这里是说明可以把宏应用到当前的作用域中。
所以,可以进入run函数(cli/src/lib.rs):
pub fn run<I, T>(args: I) -> error::Result<()> where
	I: IntoIterator<Item = T>,
	T: Into<std::ffi::OsString> + Clone,
{
	//这个是主角,一个异步IO的栈,网络库
	let core = reactor::Core::new().expect("tokio::Core could not be created");

	let yaml = load_yaml!("./cli.yml");//polkadot启动的配置参数在yml文件中,加载之
	//RUST的SWITCH
	let matches = match clap::App::from_yaml(yaml).version(crate_version!()).get_matches_from_safe(args) {
		Ok(m) => m,
		Err(ref e) if e.kind == clap::ErrorKind::VersionDisplayed => return Ok(()),
		Err(ref e) if e.kind == clap::ErrorKind::HelpDisplayed || e.kind == clap::ErrorKind::VersionDisplayed => {
			let _ = clap::App::from_yaml(yaml).print_long_help();//类似default
			return Ok(());//如果是非链环境(帮助之类),则返回。
		}
		Err(e) => return Err(e.into()),
	};

	// TODO [ToDr] Split parameters parsing from actual execution.
	let log_pattern = matches.value_of("log").unwrap_or("");
	init_logger(log_pattern);
	fdlimit::raise_fd_limit();

	let mut config = service::Configuration::default();

	let base_path = matches.value_of("base-path")
		.map(|x| Path::new(x).to_owned())
		.unwrap_or_else(default_base_path);

  //keystroe的参数处理
	config.keystore_path = matches.value_of("keystore")
		.map(|x| Path::new(x).to_owned())
		.unwrap_or_else(|| keystore_path(&base_path))
		.to_string_lossy()
		.into();

	config.database_path = db_path(&base_path).to_string_lossy().into();

   //角色处理,四种角色,其使用的是network中的Role:pub use network::Role;
	let mut role = service::Role::FULL;
	if matches.is_present("collator") {
		info!("Starting collator.");
		role = service::Role::COLLATOR;
	} else if matches.is_present("validator") {
		info!("Starting validator.");
		role = service::Role::VALIDATOR;
	} else if matches.is_present("light") {
		info!("Starting light.");
		role = service::Role::LIGHT;
	}

	match matches.value_of("chain") {
		Some("dev") => config.chain_spec = ChainSpec::Development,
		Some("local") => config.chain_spec = ChainSpec::LocalTestnet,
		Some("poc-1") => config.chain_spec = ChainSpec::PoC1Testnet,
		None => (),
		Some(unknown) => panic!("Invalid chain name: {}", unknown),
	}
	info!("Chain specification: {}", match config.chain_spec {
		ChainSpec::Development => "Development",
		ChainSpec::LocalTestnet => "Local Testnet",
		ChainSpec::PoC1Testnet => "PoC-1 Testnet",
	});

	config.roles = role;
	{
		//配置启动节点和相关网络参数
		config.network.boot_nodes = matches
			.values_of("bootnodes")
			.map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect());
		config.network.config_path = Some(network_path(&base_path).to_string_lossy().into());
		config.network.net_config_path = config.network.config_path.clone();

		let port = match matches.value_of("port") {
			Some(port) => port.parse().expect("Invalid p2p port value specified."),
			None => 30333,
		};
		config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port));
		config.network.public_address = None;
		config.network.client_version = format!("parity-polkadot/{}", crate_version!());
		config.network.use_secret = match matches.value_of("node-key").map(|s| s.parse()) {
			Some(Ok(secret)) => Some(secret),
			Some(Err(err)) => return Err(format!("Error parsing node key: {}", err).into()),
			None => None,
		};
	}

	config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect();

 //根据角色来创建全节点或者轻量级节点
	match role == service::Role::LIGHT {
		true => run_until_exit(core, service::new_light(config.clone())?, &matches, config),
		false => run_until_exit(core, service::new_full(config.clone())?, &matches, config),
	}
}

这个函数其实没啥,类似于所有的区块链启动,对参数进行解析,并启动相关的操作,一步步分析别急。其实最关键提最后两行,也就是注释中说明的轻节点和全节点。
先看一下这个函数:
//rust的模板模板编程,WHERE类似于C#的限定,最后一个参数是一个函数指针
fn run_until_exit<B, E>(mut core: reactor::Core, service: service::Service<B, E>, matches: &clap::ArgMatches, config: service::Configuration) -> error::Result<()>
	where
		B: client::backend::Backend + Send + Sync + 'static,//静态的生命周期
		E: client::CallExecutor + Send + Sync + 'static,
		client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>
{
	let exit = {
		// can't use signal directly here because CtrlC takes only `Fn`.退出控制
		let (exit_send, exit) = mpsc::channel(1);
		//移动语义
		ctrlc::CtrlC::set_handler(move || {
			exit_send.clone().send(()).wait().expect("Error sending exit notification");
		});

    //推导返回值,可以使用return,但不推荐
		exit
	};

  //启动钓鱼服务
	informant::start(&service, core.handle());

  //创建RPC和相关的处理句柄
	let _rpc_servers = {
		let http_address = parse_address("127.0.0.1:9933", "rpc-port", matches)?;//问号表示可以有0~1个匹配
		let ws_address = parse_address("127.0.0.1:9944", "ws-port", matches)?;

    //||是闭包,其后为闭包表达式
		let handler = || {
			let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
			let pool = RpcTransactionPool {
				inner: service.transaction_pool(),
				network: service.network(),
			};
			//创建rpc_handler,代码分析见后面的RPC
			rpc::rpc_handler(service.client(), chain, pool, Configuration(config.clone()))
		};
		//处理元组,自动推导
		(
			//|address|是一个管道,用在闭包中。启动两个服务
			start_server(http_address, |address| rpc::start_http(address, handler())),
			start_server(ws_address, |address| rpc::start_ws(address, handler())),
		)
	};

  //运行异步IO,启动网络通信
	core.run(exit.into_future()).expect("Error running informant event loop");
	Ok(())
}

再看一下启动的服务:
fn start_server<T, F>(mut address: SocketAddr, start: F) -> Result<T, io::Error> where
	F: Fn(&SocketAddr) -> Result<T, io::Error>,
{
	start(&address)
		.or_else(|e| match e.kind() {
			io::ErrorKind::AddrInUse |
			io::ErrorKind::PermissionDenied => {
				warn!("Unable to bind server to {}. Trying random port.", address);
				address.set_port(0);
				start(&address)
			},
			_ => Err(e),//一定要有默认分支
		})
}

一个泛型函数,调用F函数指针,也就是前面的rpc::start_http和rpc::start_ws两个函数,编程语言越高级,越脱离人类的形式化感知,不知道是好事还是坏事。or_else是Result的一个结果分支语句。
基本上到这儿,相关的命令就全启动了。然后分析一下具体的细节。

二、相关服务


下面分别来看钓鱼服务和RPC服务:

1、钓鱼服务


polkadot/cli/src/informant.rs
/// Spawn informant on the event loop
pub fn start<B, E>(service: &Service<B, E>, handle: reactor::Handle)
	where
		B: client::backend::Backend + Send + Sync + 'static,//where限定子句,+号就是多重限制(继承)
		E: client::CallExecutor + Send + Sync + 'static,
		client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>
{
	let interval = reactor::Interval::new_at(Instant::now(), Duration::from_millis(TIMER_INTERVAL_MS), &handle)
		.expect("Error creating informant timer");

  //调用client(&self)等得到Clone值
	let network = service.network();
	let client = service.client();

	let display_notifications = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
		let sync_status = network.status();

    //if let语义
		if let Ok(best_block) = client.best_block_header() {
			let hash: HeaderHash = best_block.blake2_256().into();
			let status = match (sync_status.sync.state, sync_status.sync.best_seen_block) {
				(SyncState::Idle, _) => "Idle".into(),
				(SyncState::Downloading, None) => "Syncing".into(),
				(SyncState::Downloading, Some(n)) => format!("Syncing, target=#{}", n),
			};
			info!(target: "polkadot", "{} ({} peers), best: #{} ({})", status, sync_status.num_peers, best_block.number, hash)
		} else {
			warn!("Error getting best block information");
		}
		Ok(())
	});

	let client = service.client();//因为闭包,所以这里需要再次获得
	let display_block_import = client.import_notification_stream().for_each(|n| {
		info!(target: "polkadot", "Imported #{} ({})", n.header.number, n.hash);
		Ok(())
	});

	//处理通知和数据块的导入
	handle.spawn(display_notifications);
	handle.spawn(display_block_import);
}

使用 handle.spawn 将函数 Future 绑定到 event loop 上面。只有如此在处理并发连接时不会阻塞 event loop。写过异步I/O 的都会明白 socket 是非阻塞的即不可能通过一次 read 或者 write 就将这个 socket 的数据全部处理完成。因此还会重新将 socket 给注册到 event loop,这样当有新的事件的时候,event loop 会重新调用对应的回调函数。 tokio_core提供了这么一个异步IO的机制,确实比较好用。

2、RPC服务


在上面的创建服务时,没有分析创建两类节点:
polkadot/service/src/lib.rs
/// Creates light client and register protocol with the network service轻节点
pub fn new_light(config: Configuration) -> Result<Service<client::light::Backend, client::RemoteCallExecutor<client::light::Backend, network::OnDemand<network::Service>>>, error::Error> {
	//下划杠代表忽略绑定,类似于GO,MOVE语义加上管道形成一个闭包
	Service::new(move |_, executor, genesis_builder: GenesisBuilder| {
			let client_backend = client::light::new_light_backend();
			let fetch_checker = Arc::new(client::light::new_fetch_checker(client_backend.clone(), executor));
			let fetcher = Arc::new(network::OnDemand::new(fetch_checker));
			let client = client::light::new_light(client_backend, fetcher.clone(), genesis_builder)?;
			Ok((Arc::new(client), Some(fetcher)))
		},
		|client| Arc::new(polkadot_api::light::RemotePolkadotApiWrapper(client.clone())),
		|_client, _network, _tx_pool, _keystore| Ok(None),
		config)
}

/// Creates full client and register protocol with the network service全节点多了数据库的相关数据操作
pub fn new_full(config: Configuration) -> Result<Service<client_db::Backend, client::LocalCallExecutor<client_db::Backend, CodeExecutor>>, error::Error> {
	let is_validator = (config.roles & Role::VALIDATOR) == Role::VALIDATOR;
	Service::new(|db_settings, executor, genesis_builder: GenesisBuilder|
		Ok((Arc::new(client_db::new_client(db_settings, executor, genesis_builder)?), None)),
		|client| client,
		|client, network, tx_pool, keystore| {
			if !is_validator {
				return Ok(None);
			}

			// Load the first available key. Code above makes sure it exisis.
			let key = keystore.load(&keystore.contents()?[0], "")?;
			info!("Using authority key {:?}", key.public());
			//见下面推荐牟NEW
			Ok(Some(consensus::Service::new(
				client.clone(),
				client.clone(),
				network.clone(),
				tx_pool.clone(),
				::std::time::Duration::from_millis(4000), // TODO: dynamic
				key,
			)))
		},
		config)
}
/// Creates and register protocol with the network service这里很重要,是真正创建网络服务的地方
fn new<F, G, C, A>(client_creator: F, api_creator: G, consensus_creator: C, mut config: Configuration) -> Result<Self, error::Error>
	where  //RUST的WHERE这么做确实有点丑陋啊
		F: FnOnce(
				client_db::DatabaseSettings,
				CodeExecutor,
				GenesisBuilder,
			) -> Result<(Arc<Client<B, E>>, Option<Arc<network::OnDemand<network::Service>>>), error::Error>,
		G: Fn(
				Arc<Client<B, E>>,
			) -> Arc<A>,
		C: Fn(
				Arc<Client<B, E>>,
				Arc<network::Service>,
				Arc<Mutex<TransactionPool>>,
				&Keystore
			) -> Result<Option<consensus::Service>, error::Error>,
		A: PolkadotApi + Send + Sync + 'static,
{
	use std::sync::Barrier;//同步的内存屏障

	let (signal, exit) = ::exit_future::signal();//事件元组

	// Create client
	let executor = polkadot_executor::Executor::new();

	let mut keystore = Keystore::open(config.keystore_path.into())?;
	for seed in &config.keys {
		keystore.generate_from_seed(seed)?;//产生随机种子
	}

  //由上边的种子生产密钥对
	if keystore.contents()?.is_empty() {
		let key = keystore.generate("")?;
		info!("Generated a new keypair: {:?}", key.public());
	}

	let ChainConfig { genesis_config, boot_nodes } = match config.chain_spec {
		ChainSpec::Development => development_config(),
		ChainSpec::LocalTestnet => local_testnet_config(),
		ChainSpec::PoC1Testnet => poc_1_testnet_config(),
	};
	config.network.boot_nodes.extend(boot_nodes);

	let genesis_builder = GenesisBuilder {
		config: genesis_config,
	};

	let db_settings = client_db::DatabaseSettings {
		cache_size: None,
		path: config.database_path.into(),
	};

  //创建客户端
	let (client, on_demand) = client_creator(db_settings, executor, genesis_builder)?;
	let api = api_creator(client.clone());
	let best_header = client.best_block_header()?;
	info!("Starting Polkadot. Best block is #{}", best_header.number);
	//创建交易池及其适配器
	let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool)));
	let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
		pool: transaction_pool.clone(),
		client: client.clone(),
		api: api.clone(),
	});
	let network_params = network::Params {
		config: network::ProtocolConfig {
			roles: config.roles,
		},
		network_config: config.network,
		chain: client.clone(),
		on_demand: on_demand.clone().map(|d| d as Arc<network::OnDemandService>),
		transaction_pool: transaction_pool_adapter,
	};

  //创建网络通信服务
	let network = network::Service::new(network_params)?;
	let barrier = ::std::sync::Arc::new(Barrier::new(2));
	//引入多线程原子变量控制
	on_demand.map(|on_demand| on_demand.set_service_link(Arc::downgrade(&network)));

	let thread = {
		let client = client.clone();
		let network = network.clone();
		let txpool = transaction_pool.clone();

		let thread_barrier = barrier.clone();
		启动多线程并挂载启动网络等事件函数
		thread::spawn(move || {
			network.start_network();

			thread_barrier.wait();
			let mut core = Core::new().expect("tokio::Core could not be created");
			let events = client.import_notification_stream().for_each(move |notification| {
				network.on_block_imported(notification.hash, &notification.header);//导入区块
				prune_imported(&*client, &*txpool, notification.hash);//处理导入的区块

				Ok(())
			});

			core.handle().spawn(events);//绑定事件
			if let Err(e) = core.run(exit) {
				debug!("Polkadot service event loop shutdown with {:?}", e);
			}
			debug!("Polkadot service shutdown");
		})
	};

	// wait for the network to start up before starting the consensus
	// service.
	barrier.wait();

	// Spin consensus service if configured创建共识创建者
	let consensus_service = consensus_creator(client.clone(), network.clone(), transaction_pool.clone(), &keystore)?;

	Ok(Service {
		thread: Some(thread),
		client: client,
		network: network,
		transaction_pool: transaction_pool,
		signal: Some(signal),
		_consensus: consensus_service,
	})
}

这个还是有一些类似于以太坊,创建节点时把相关的网络服务都创建好。这样最后Ok返回让RUST自动推导结果。
把前面的服务分析明白后,就开始调用真正的RPC中的服务的分析:
substate/rpc-servers/src/lib.rs
/// Construct rpc `IoHandler`
pub fn rpc_handler<S, C, A, Y>(
	state: S,
	chain: C,
	author: A,
	system: Y,
) -> RpcHandler where
	S: apis::state::StateApi,
	C: apis::chain::ChainApi<Metadata=Metadata>,
	A: apis::author::AuthorApi,
	Y: apis::system::SystemApi,
{
	let mut io = pubsub::PubSubHandler::default();
	io.extend_with(state.to_delegate());
	io.extend_with(chain.to_delegate());
	io.extend_with(author.to_delegate());
	io.extend_with(system.to_delegate());
	io
}

/// Start HTTP server listening on given address.
pub fn start_http(
	addr: &std::net::SocketAddr,
	io: RpcHandler,
) -> io::Result<http::Server> {
	http::ServerBuilder::new(io)
		.threads(4)
		.rest_api(http::RestApi::Unsecure)
		.cors(http::DomainsValidation::Disabled)
		.start_http(addr)
}

/// Start WS server listening on given address.
pub fn start_ws(
	addr: &std::net::SocketAddr,
	io: RpcHandler,
) -> io::Result<ws::Server> {
	ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| Metadata::new(context.sender()))
		.start(addr)
		.map_err(|err| match err {
			ws::Error(ws::ErrorKind::Io(io), _) => io,
			ws::Error(ws::ErrorKind::ConnectionClosed, _) => io::ErrorKind::BrokenPipe.into(),
			ws::Error(e, _) => {
				error!("{}", e);
				io::ErrorKind::Other.into()
			}
		})
}

这两个函数里都使用了JSON的库,直接就构建了相关的服务,这两个库的资料目前找不到,所以没办法深入的分析说明。这里分析完成了相关的服务启动,下一次重点分析网络相关具体内容。