service.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. // Copyright 2019 Joystream Contributors
  2. // This file is part of Joystream node.
  3. // Joystream node is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation, either version 3 of the License, or
  6. // (at your option) any later version.
  7. // Joystream node is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU General Public License for more details.
  11. // You should have received a copy of the GNU General Public License
  12. // along with Joystream node. If not, see <http://www.gnu.org/licenses/>.
  13. #![warn(unused_extern_crates)]
  14. // Clippy linter warning.
  15. #![allow(clippy::type_complexity)] // disable it because this is foreign code and can be changed any time
  16. // Clippy linter warning.
  17. #![allow(clippy::redundant_closure_call)] // disable it because of the substrate lib design
  18. //! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
  19. use client_db::Backend;
  20. use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
  21. use inherents::InherentDataProviders;
  22. use network::{construct_simple_protocol, NetworkService};
  23. use node_runtime::{self, opaque::Block, GenesisConfig, RuntimeApi};
  24. use offchain::OffchainWorkers;
  25. use primitives::Blake2Hasher;
  26. use runtime_primitives::traits::Block as BlockT;
  27. use std::sync::Arc;
  28. use substrate_client::{Client, LocalCallExecutor, LongestChain};
  29. pub use substrate_executor::{native_executor_instance, NativeExecutor};
  30. use substrate_service::{
  31. error::Error as ServiceError, AbstractService, Configuration, NetworkStatus, Service,
  32. ServiceBuilder,
  33. };
  34. use transaction_pool::{self, txpool::Pool as TransactionPool};
  35. construct_simple_protocol! {
  36. /// Demo protocol attachment for substrate.
  37. pub struct NodeProtocol where Block = Block { }
  38. }
  39. // Declare an instance of the native executor named `Executor`. Include the wasm binary as the
  40. // equivalent wasm code.
  41. native_executor_instance!(
  42. pub Executor,
  43. node_runtime::api::dispatch,
  44. node_runtime::native_version
  45. );
  46. /// Starts a `ServiceBuilder` for a full service.
  47. ///
  48. /// Use this macro if you don't actually need the full service, but just the builder in order to
  49. /// be able to perform chain operations.
  50. #[macro_export]
  51. macro_rules! new_full_start {
  52. ($config:expr) => {{
  53. // type RpcExtension = jsonrpc_core::IoHandler<substrate_rpc::Metadata>;
  54. let mut import_setup = None;
  55. let inherent_data_providers = inherents::InherentDataProviders::new();
  56. let builder = substrate_service::ServiceBuilder::new_full::<
  57. node_runtime::opaque::Block,
  58. node_runtime::RuntimeApi,
  59. crate::service::Executor,
  60. >($config)?
  61. .with_select_chain(|_config, backend| {
  62. Ok(substrate_client::LongestChain::new(backend.clone()))
  63. })?
  64. .with_transaction_pool(|config, client| {
  65. Ok(transaction_pool::txpool::Pool::new(
  66. config,
  67. transaction_pool::FullChainApi::new(client),
  68. ))
  69. })?
  70. .with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
  71. let select_chain = select_chain
  72. .take()
  73. .ok_or_else(|| substrate_service::Error::SelectChainRequired)?;
  74. let (grandpa_block_import, grandpa_link) =
  75. grandpa::block_import::<_, _, _, node_runtime::RuntimeApi, _>(
  76. client.clone(),
  77. &*client,
  78. select_chain,
  79. )?;
  80. let justification_import = grandpa_block_import.clone();
  81. let (block_import, babe_link) = babe::block_import(
  82. babe::Config::get_or_compute(&*client)?,
  83. grandpa_block_import,
  84. client.clone(),
  85. client.clone(),
  86. )?;
  87. let import_queue = babe::import_queue(
  88. babe_link.clone(),
  89. block_import.clone(),
  90. Some(Box::new(justification_import)),
  91. None,
  92. client.clone(),
  93. client,
  94. inherent_data_providers.clone(),
  95. )?;
  96. import_setup = Some((block_import, grandpa_link, babe_link));
  97. Ok(import_queue)
  98. })?;
  99. // We don't have any custom rpc commands...
  100. // .with_rpc_extensions(|client, pool| -> RpcExtension {
  101. // node_rpc::create(client, pool)
  102. // })?;
  103. (builder, import_setup, inherent_data_providers)
  104. }};
  105. }
  106. /// Creates a full service from the configuration.
  107. ///
  108. /// We need to use a macro because the test suit doesn't work with an opaque service. It expects
  109. /// concrete types instead.
  110. macro_rules! new_full {
  111. ($config:expr, $with_startup_data: expr) => {{
  112. use futures::sync::mpsc;
  113. use network::DhtEvent;
  114. let (
  115. is_authority,
  116. force_authoring,
  117. name,
  118. disable_grandpa
  119. ) = (
  120. $config.roles.is_authority(),
  121. $config.force_authoring,
  122. $config.name.clone(),
  123. $config.disable_grandpa
  124. );
  125. // sentry nodes announce themselves as authorities to the network
  126. // and should run the same protocols authorities do, but it should
  127. // never actively participate in any consensus process.
  128. let participates_in_consensus = is_authority && !$config.sentry_mode;
  129. let (builder, mut import_setup, inherent_data_providers) = new_full_start!($config);
  130. // Dht event channel from the network to the authority discovery module. Use bounded channel to ensure
  131. // back-pressure. Authority discovery is triggering one event per authority within the current authority set.
  132. // This estimates the authority set size to be somewhere below 10 000 thereby setting the channel buffer size to
  133. // 10 000.
  134. let (dht_event_tx, _dht_event_rx) =
  135. mpsc::channel::<DhtEvent>(10_000);
  136. let service = builder.with_network_protocol(|_| Ok(crate::service::NodeProtocol::new()))?
  137. .with_finality_proof_provider(|client, backend|
  138. Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, client)) as _)
  139. )?
  140. .with_dht_event_tx(dht_event_tx)?
  141. .build()?;
  142. let (block_import, grandpa_link, babe_link) = import_setup.take()
  143. .expect("Link Half and Block Import are present for Full Services or setup failed before. qed");
  144. ($with_startup_data)(&block_import, &babe_link);
  145. if participates_in_consensus {
  146. let proposer = substrate_basic_authorship::ProposerFactory {
  147. client: service.client(),
  148. transaction_pool: service.transaction_pool(),
  149. };
  150. let client = service.client();
  151. let select_chain = service.select_chain()
  152. .ok_or(substrate_service::Error::SelectChainRequired)?;
  153. let babe_config = babe::BabeParams {
  154. keystore: service.keystore(),
  155. client,
  156. select_chain,
  157. env: proposer,
  158. block_import,
  159. sync_oracle: service.network(),
  160. inherent_data_providers: inherent_data_providers.clone(),
  161. force_authoring,
  162. babe_link,
  163. };
  164. let babe = babe::start_babe(babe_config)?;
  165. service.spawn_essential_task(babe);
  166. }
  167. // if the node isn't actively participating in consensus then it doesn't
  168. // need a keystore, regardless of which protocol we use below.
  169. let keystore = if participates_in_consensus {
  170. Some(service.keystore())
  171. } else {
  172. None
  173. };
  174. let config = grandpa::Config {
  175. // FIXME #1578 make this available through chainspec
  176. gossip_duration: std::time::Duration::from_millis(333),
  177. justification_period: 512,
  178. name: Some(name),
  179. observer_enabled: true,
  180. keystore,
  181. is_authority,
  182. };
  183. match (is_authority, disable_grandpa) {
  184. (false, false) => {
  185. // start the lightweight GRANDPA observer
  186. service.spawn_task(Box::new(grandpa::run_grandpa_observer(
  187. config,
  188. grandpa_link,
  189. service.network(),
  190. service.on_exit(),
  191. )?));
  192. },
  193. (true, false) => {
  194. // start the full GRANDPA voter
  195. let grandpa_config = grandpa::GrandpaParams {
  196. config,
  197. link: grandpa_link,
  198. network: service.network(),
  199. inherent_data_providers: inherent_data_providers.clone(),
  200. on_exit: service.on_exit(),
  201. telemetry_on_connect: Some(service.telemetry_on_connect_stream()),
  202. voting_rule: grandpa::VotingRulesBuilder::default().build(),
  203. };
  204. // the GRANDPA voter task is considered infallible, i.e.
  205. // if it fails we take down the service with it.
  206. service.spawn_essential_task(grandpa::run_grandpa_voter(grandpa_config)?);
  207. },
  208. (_, true) => {
  209. grandpa::setup_disabled_grandpa(
  210. service.client(),
  211. &inherent_data_providers,
  212. service.network(),
  213. )?;
  214. },
  215. }
  216. Ok((service, inherent_data_providers))
  217. }};
  218. ($config:expr) => {{
  219. new_full!($config, |_, _| {})
  220. }}
  221. }
  222. #[allow(dead_code)]
  223. type ConcreteBlock = node_runtime::opaque::Block;
  224. #[allow(dead_code)]
  225. type ConcreteClient = Client<
  226. Backend<ConcreteBlock>,
  227. LocalCallExecutor<Backend<ConcreteBlock>, NativeExecutor<Executor>>,
  228. ConcreteBlock,
  229. node_runtime::RuntimeApi,
  230. >;
  231. #[allow(dead_code)]
  232. type ConcreteBackend = Backend<ConcreteBlock>;
  233. /// A specialized configuration object for setting up the node..
  234. pub type NodeConfiguration<C> =
  235. Configuration<C, GenesisConfig /*, crate::chain_spec::Extensions*/>;
  236. /// Builds a new service for a full client.
  237. pub fn new_full<C: Send + Default + 'static>(config: NodeConfiguration<C>)
  238. -> Result<
  239. Service<
  240. ConcreteBlock,
  241. ConcreteClient,
  242. LongestChain<ConcreteBackend, ConcreteBlock>,
  243. NetworkStatus<ConcreteBlock>,
  244. NetworkService<ConcreteBlock, crate::service::NodeProtocol, <ConcreteBlock as BlockT>::Hash>,
  245. TransactionPool<transaction_pool::FullChainApi<ConcreteClient, ConcreteBlock>>,
  246. OffchainWorkers<
  247. ConcreteClient,
  248. <ConcreteBackend as substrate_client::backend::Backend<Block, Blake2Hasher>>::OffchainStorage,
  249. ConcreteBlock,
  250. >
  251. >,
  252. ServiceError,
  253. >
  254. {
  255. new_full!(config).map(|(service, _)| service)
  256. }
  257. /// Builds a new service for a light client.
  258. pub fn new_light<C: Send + Default + 'static>(
  259. config: NodeConfiguration<C>,
  260. ) -> Result<impl AbstractService, ServiceError> {
  261. // type RpcExtension = jsonrpc_core::IoHandler<substrate_rpc::Metadata>;
  262. let inherent_data_providers = InherentDataProviders::new();
  263. let service = ServiceBuilder::new_light::<Block, RuntimeApi, Executor>(config)?
  264. .with_select_chain(|_config, backend| Ok(LongestChain::new(backend.clone())))?
  265. .with_transaction_pool(|config, client| {
  266. Ok(TransactionPool::new(
  267. config,
  268. transaction_pool::FullChainApi::new(client),
  269. ))
  270. })?
  271. .with_import_queue_and_fprb(
  272. |_config, client, backend, fetcher, _select_chain, _tx_pool| {
  273. let fetch_checker = fetcher
  274. .map(|fetcher| fetcher.checker().clone())
  275. .ok_or_else(|| {
  276. "Trying to start light import queue without active fetch checker"
  277. })?;
  278. let grandpa_block_import = grandpa::light_block_import::<_, _, _, RuntimeApi>(
  279. client.clone(),
  280. backend,
  281. &*client,
  282. Arc::new(fetch_checker),
  283. )?;
  284. let finality_proof_import = grandpa_block_import.clone();
  285. let finality_proof_request_builder =
  286. finality_proof_import.create_finality_proof_request_builder();
  287. let (babe_block_import, babe_link) = babe::block_import(
  288. babe::Config::get_or_compute(&*client)?,
  289. grandpa_block_import,
  290. client.clone(),
  291. client.clone(),
  292. )?;
  293. let import_queue = babe::import_queue(
  294. babe_link,
  295. babe_block_import,
  296. None,
  297. Some(Box::new(finality_proof_import)),
  298. client.clone(),
  299. client,
  300. inherent_data_providers.clone(),
  301. )?;
  302. Ok((import_queue, finality_proof_request_builder))
  303. },
  304. )?
  305. .with_network_protocol(|_| Ok(NodeProtocol::new()))?
  306. .with_finality_proof_provider(|client, backend| {
  307. Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, client)) as _)
  308. })?
  309. // We don't have any custom rpc extensions
  310. // .with_rpc_extensions(|client, pool| -> RpcExtension {
  311. // node_rpc::create(client, pool)
  312. // })?
  313. .build()?;
  314. Ok(service)
  315. }