// This file is part of Substrate. // Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. // SPDX-License-Identifier: Apache-2.0 // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! # Scheduler //! A Pallet for scheduling dispatches. //! //! - [`Config`] //! - [`Call`] //! - [`Pallet`] //! //! ## Overview //! //! This Pallet exposes capabilities for scheduling dispatches to occur at a //! specified block number or at a specified period. These scheduled dispatches //! may be named or anonymous and may be canceled. //! //! **NOTE:** The scheduled calls will be dispatched with the default filter //! for the origin: namely `frame_system::Config::BaseCallFilter` for all origin //! except root which will get no filter. And not the filter contained in origin //! use to call `fn schedule`. //! //! If a call is scheduled using proxy or whatever mecanism which adds filter, //! then those filter will not be used when dispatching the schedule call. //! //! ## Interface //! //! ### Dispatchable Functions //! //! * `schedule` - schedule a dispatch, which may be periodic, to occur at a specified block and //! with a specified priority. //! * `cancel` - cancel a scheduled dispatch, specified by block number and index. //! * `schedule_named` - augments the `schedule` interface with an additional `Vec` parameter //! that can be used for identification. //! * `cancel_named` - the named complement to the cancel function. // Ensure we're `no_std` when compiling for Wasm. #![cfg_attr(not(feature = "std"), no_std)] #[cfg(feature = "runtime-benchmarks")] mod benchmarking; pub mod migration; #[cfg(test)] mod mock; #[cfg(test)] mod tests; pub mod weights; use codec::{Decode, Encode, MaxEncodedLen}; use frame_support::{ dispatch::{ DispatchError, DispatchResult, Dispatchable, GetDispatchInfo, Parameter, RawOrigin, }, ensure, traits::{ schedule::{self, DispatchTime, MaybeHashed}, Bounded, CallerTrait, EnsureOrigin, Get, Hash as PreimageHash, IsType, OriginTrait, PalletInfoAccess, PrivilegeCmp, QueryPreimage, StorageVersion, StorePreimage, }, weights::Weight, }; use frame_system::{self as system}; pub use pallet::*; use scale_info::TypeInfo; use sp_io::hashing::blake2_256; use sp_runtime::{ traits::{BadOrigin, One, Saturating, Zero}, BoundedVec, RuntimeDebug, }; use sp_std::{borrow::Borrow, cmp::Ordering, marker::PhantomData, prelude::*}; pub use weights::WeightInfo; /// Just a simple index for naming period tasks. pub type PeriodicIndex = u32; /// The location of a scheduled task that can be used to remove it. pub type TaskAddress = (BlockNumber, u32); pub type CallOrHashOf = MaybeHashed<::RuntimeCall, ::Hash>; #[cfg_attr(any(feature = "std", test), derive(PartialEq, Eq))] #[derive(Clone, RuntimeDebug, Encode, Decode)] struct ScheduledV1 { maybe_id: Option>, priority: schedule::Priority, call: Call, maybe_periodic: Option>, } /// Information regarding an item to be executed in the future. #[cfg_attr(any(feature = "std", test), derive(PartialEq, Eq))] #[derive(Clone, RuntimeDebug, Encode, Decode, MaxEncodedLen, TypeInfo)] pub struct Scheduled { /// The unique identity for this task, if there is one. maybe_id: Option, /// This task's priority. priority: schedule::Priority, /// The call to be dispatched. call: Call, /// If the call is periodic, then this points to the information concerning that. maybe_periodic: Option>, /// The origin with which to dispatch the call. origin: PalletsOrigin, _phantom: PhantomData, } use crate::{Scheduled as ScheduledV3, Scheduled as ScheduledV2}; pub type ScheduledV2Of = ScheduledV2< Vec, ::RuntimeCall, ::BlockNumber, ::PalletsOrigin, ::AccountId, >; pub type ScheduledV3Of = ScheduledV3< Vec, CallOrHashOf, ::BlockNumber, ::PalletsOrigin, ::AccountId, >; pub type ScheduledOf = Scheduled< TaskName, Bounded<::RuntimeCall>, ::BlockNumber, ::PalletsOrigin, ::AccountId, >; struct WeightCounter { used: Weight, limit: Weight, } impl WeightCounter { fn check_accrue(&mut self, w: Weight) -> bool { let test = self.used.saturating_add(w); if test.any_gt(self.limit) { false } else { self.used = test; true } } fn can_accrue(&mut self, w: Weight) -> bool { self.used.saturating_add(w).all_lte(self.limit) } } pub(crate) trait MarginalWeightInfo: WeightInfo { fn service_task(maybe_lookup_len: Option, named: bool, periodic: bool) -> Weight { let base = Self::service_task_base(); let mut total = match maybe_lookup_len { None => base, Some(l) => Self::service_task_fetched(l as u32), }; if named { total.saturating_accrue(Self::service_task_named().saturating_sub(base)); } if periodic { total.saturating_accrue(Self::service_task_periodic().saturating_sub(base)); } total } } impl MarginalWeightInfo for T {} #[frame_support::pallet] pub mod pallet { use super::*; use frame_support::{dispatch::PostDispatchInfo, pallet_prelude::*}; use frame_system::pallet_prelude::*; /// The current storage version. const STORAGE_VERSION: StorageVersion = StorageVersion::new(3); #[pallet::pallet] #[pallet::generate_store(pub(super) trait Store)] #[pallet::storage_version(STORAGE_VERSION)] pub struct Pallet(_); /// `system::Config` should always be included in our implied traits. #[pallet::config] pub trait Config: frame_system::Config { /// The overarching event type. type RuntimeEvent: From> + IsType<::RuntimeEvent>; /// The aggregated origin which the dispatch will take. type RuntimeOrigin: OriginTrait + From + IsType<::RuntimeOrigin>; /// The caller origin, overarching type of all pallets origins. type PalletsOrigin: From> + CallerTrait + MaxEncodedLen; /// The aggregated call type. type RuntimeCall: Parameter + Dispatchable< RuntimeOrigin = ::RuntimeOrigin, PostInfo = PostDispatchInfo, > + GetDispatchInfo + From>; /// The maximum weight that may be scheduled per block for any dispatchables. #[pallet::constant] type MaximumWeight: Get; /// Required origin to schedule or cancel calls. type ScheduleOrigin: EnsureOrigin<::RuntimeOrigin>; /// Compare the privileges of origins. /// /// This will be used when canceling a task, to ensure that the origin that tries /// to cancel has greater or equal privileges as the origin that created the scheduled task. /// /// For simplicity the [`EqualPrivilegeOnly`](frame_support::traits::EqualPrivilegeOnly) can /// be used. This will only check if two given origins are equal. type OriginPrivilegeCmp: PrivilegeCmp; /// The maximum number of scheduled calls in the queue for a single block. #[pallet::constant] type MaxScheduledPerBlock: Get; /// Weight information for extrinsics in this pallet. type WeightInfo: WeightInfo; /// The preimage provider with which we look up call hashes to get the call. type Preimages: QueryPreimage + StorePreimage; } #[pallet::storage] pub type IncompleteSince = StorageValue<_, T::BlockNumber>; /// Items to be executed, indexed by the block number that they should be executed on. #[pallet::storage] pub type Agenda = StorageMap< _, Twox64Concat, T::BlockNumber, BoundedVec>, T::MaxScheduledPerBlock>, ValueQuery, >; /// Lookup from a name to the block number and index of the task. /// /// For v3 -> v4 the previously unbounded identities are Blake2-256 hashed to form the v4 /// identities. #[pallet::storage] pub(crate) type Lookup = StorageMap<_, Twox64Concat, TaskName, TaskAddress>; /// Events type. #[pallet::event] #[pallet::generate_deposit(pub(super) fn deposit_event)] pub enum Event { /// Scheduled some task. Scheduled { when: T::BlockNumber, index: u32 }, /// Canceled some task. Canceled { when: T::BlockNumber, index: u32 }, /// Dispatched some task. Dispatched { task: TaskAddress, id: Option<[u8; 32]>, result: DispatchResult, }, /// The call for the provided hash was not found so the task has been aborted. CallUnavailable { task: TaskAddress, id: Option<[u8; 32]> }, /// The given task was unable to be renewed since the agenda is full at that block. PeriodicFailed { task: TaskAddress, id: Option<[u8; 32]> }, /// The given task can never be executed since it is overweight. PermanentlyOverweight { task: TaskAddress, id: Option<[u8; 32]> }, } #[pallet::error] pub enum Error { /// Failed to schedule a call FailedToSchedule, /// Cannot find the scheduled call. NotFound, /// Given target block number is in the past. TargetBlockNumberInPast, /// Reschedule failed because it does not change scheduled time. RescheduleNoChange, /// Attempt to use a non-named function on a named task. Named, } #[pallet::hooks] impl Hooks> for Pallet { /// Execute the scheduled calls fn on_initialize(now: T::BlockNumber) -> Weight { let mut weight_counter = WeightCounter { used: Weight::zero(), limit: T::MaximumWeight::get() }; Self::service_agendas(&mut weight_counter, now, u32::max_value()); weight_counter.used } } #[pallet::call] impl Pallet { /// Anonymously schedule a task. #[pallet::weight(::WeightInfo::schedule(T::MaxScheduledPerBlock::get()))] pub fn schedule( origin: OriginFor, when: T::BlockNumber, maybe_periodic: Option>, priority: schedule::Priority, call: Box<::RuntimeCall>, ) -> DispatchResult { T::ScheduleOrigin::ensure_origin(origin.clone())?; let origin = ::RuntimeOrigin::from(origin); Self::do_schedule( DispatchTime::At(when), maybe_periodic, priority, origin.caller().clone(), T::Preimages::bound(*call)?, )?; Ok(()) } /// Cancel an anonymously scheduled task. #[pallet::weight(::WeightInfo::cancel(T::MaxScheduledPerBlock::get()))] pub fn cancel(origin: OriginFor, when: T::BlockNumber, index: u32) -> DispatchResult { T::ScheduleOrigin::ensure_origin(origin.clone())?; let origin = ::RuntimeOrigin::from(origin); Self::do_cancel(Some(origin.caller().clone()), (when, index))?; Ok(()) } /// Schedule a named task. #[pallet::weight(::WeightInfo::schedule_named(T::MaxScheduledPerBlock::get()))] pub fn schedule_named( origin: OriginFor, id: TaskName, when: T::BlockNumber, maybe_periodic: Option>, priority: schedule::Priority, call: Box<::RuntimeCall>, ) -> DispatchResult { T::ScheduleOrigin::ensure_origin(origin.clone())?; let origin = ::RuntimeOrigin::from(origin); Self::do_schedule_named( id, DispatchTime::At(when), maybe_periodic, priority, origin.caller().clone(), T::Preimages::bound(*call)?, )?; Ok(()) } /// Cancel a named scheduled task. #[pallet::weight(::WeightInfo::cancel_named(T::MaxScheduledPerBlock::get()))] pub fn cancel_named(origin: OriginFor, id: TaskName) -> DispatchResult { T::ScheduleOrigin::ensure_origin(origin.clone())?; let origin = ::RuntimeOrigin::from(origin); Self::do_cancel_named(Some(origin.caller().clone()), id)?; Ok(()) } /// Anonymously schedule a task after a delay. /// /// # /// Same as [`schedule`]. /// # #[pallet::weight(::WeightInfo::schedule(T::MaxScheduledPerBlock::get()))] pub fn schedule_after( origin: OriginFor, after: T::BlockNumber, maybe_periodic: Option>, priority: schedule::Priority, call: Box<::RuntimeCall>, ) -> DispatchResult { T::ScheduleOrigin::ensure_origin(origin.clone())?; let origin = ::RuntimeOrigin::from(origin); Self::do_schedule( DispatchTime::After(after), maybe_periodic, priority, origin.caller().clone(), T::Preimages::bound(*call)?, )?; Ok(()) } /// Schedule a named task after a delay. /// /// # /// Same as [`schedule_named`](Self::schedule_named). /// # #[pallet::weight(::WeightInfo::schedule_named(T::MaxScheduledPerBlock::get()))] pub fn schedule_named_after( origin: OriginFor, id: TaskName, after: T::BlockNumber, maybe_periodic: Option>, priority: schedule::Priority, call: Box<::RuntimeCall>, ) -> DispatchResult { T::ScheduleOrigin::ensure_origin(origin.clone())?; let origin = ::RuntimeOrigin::from(origin); Self::do_schedule_named( id, DispatchTime::After(after), maybe_periodic, priority, origin.caller().clone(), T::Preimages::bound(*call)?, )?; Ok(()) } } } impl> Pallet { /// Migrate storage format from V1 to V4. /// /// Returns the weight consumed by this migration. pub fn migrate_v1_to_v4() -> Weight { use migration::v1 as old; let mut weight = T::DbWeight::get().reads_writes(1, 1); // Delete all undecodable values. // `StorageMap::translate` is not enough since it just skips them and leaves the keys in. let keys = old::Agenda::::iter_keys().collect::>(); for key in keys { weight.saturating_accrue(T::DbWeight::get().reads(1)); if let Err(_) = old::Agenda::::try_get(&key) { weight.saturating_accrue(T::DbWeight::get().writes(1)); old::Agenda::::remove(&key); log::warn!("Deleted undecodable agenda"); } } Agenda::::translate::< Vec::RuntimeCall, T::BlockNumber>>>, _, >(|_, agenda| { Some(BoundedVec::truncate_from( agenda .into_iter() .map(|schedule| { weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1)); schedule.and_then(|schedule| { if let Some(id) = schedule.maybe_id.as_ref() { let name = blake2_256(id); if let Some(item) = old::Lookup::::take(id) { Lookup::::insert(name, item); } weight.saturating_accrue(T::DbWeight::get().reads_writes(2, 2)); } let call = T::Preimages::bound(schedule.call).ok()?; if call.lookup_needed() { weight.saturating_accrue(T::DbWeight::get().reads_writes(0, 1)); } Some(Scheduled { maybe_id: schedule.maybe_id.map(|x| blake2_256(&x[..])), priority: schedule.priority, call, maybe_periodic: schedule.maybe_periodic, origin: system::RawOrigin::Root.into(), _phantom: Default::default(), }) }) }) .collect::>(), )) }); #[allow(deprecated)] frame_support::storage::migration::remove_storage_prefix( Self::name().as_bytes(), b"StorageVersion", &[], ); StorageVersion::new(4).put::(); weight + T::DbWeight::get().writes(2) } /// Migrate storage format from V2 to V4. /// /// Returns the weight consumed by this migration. pub fn migrate_v2_to_v4() -> Weight { use migration::v2 as old; let mut weight = T::DbWeight::get().reads_writes(1, 1); // Delete all undecodable values. // `StorageMap::translate` is not enough since it just skips them and leaves the keys in. let keys = old::Agenda::::iter_keys().collect::>(); for key in keys { weight.saturating_accrue(T::DbWeight::get().reads(1)); if let Err(_) = old::Agenda::::try_get(&key) { weight.saturating_accrue(T::DbWeight::get().writes(1)); old::Agenda::::remove(&key); log::warn!("Deleted undecodable agenda"); } } Agenda::::translate::>>, _>(|_, agenda| { Some(BoundedVec::truncate_from( agenda .into_iter() .map(|schedule| { weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1)); schedule.and_then(|schedule| { if let Some(id) = schedule.maybe_id.as_ref() { let name = blake2_256(id); if let Some(item) = old::Lookup::::take(id) { Lookup::::insert(name, item); } weight.saturating_accrue(T::DbWeight::get().reads_writes(2, 2)); } let call = T::Preimages::bound(schedule.call).ok()?; if call.lookup_needed() { weight.saturating_accrue(T::DbWeight::get().reads_writes(0, 1)); } Some(Scheduled { maybe_id: schedule.maybe_id.map(|x| blake2_256(&x[..])), priority: schedule.priority, call, maybe_periodic: schedule.maybe_periodic, origin: schedule.origin, _phantom: Default::default(), }) }) }) .collect::>(), )) }); #[allow(deprecated)] frame_support::storage::migration::remove_storage_prefix( Self::name().as_bytes(), b"StorageVersion", &[], ); StorageVersion::new(4).put::(); weight + T::DbWeight::get().writes(2) } /// Migrate storage format from V3 to V4. /// /// Returns the weight consumed by this migration. #[allow(deprecated)] pub fn migrate_v3_to_v4() -> Weight { use migration::v3 as old; let mut weight = T::DbWeight::get().reads_writes(2, 1); // Delete all undecodable values. // `StorageMap::translate` is not enough since it just skips them and leaves the keys in. let blocks = old::Agenda::::iter_keys().collect::>(); for block in blocks { weight.saturating_accrue(T::DbWeight::get().reads(1)); if let Err(_) = old::Agenda::::try_get(&block) { weight.saturating_accrue(T::DbWeight::get().writes(1)); old::Agenda::::remove(&block); log::warn!("Deleted undecodable agenda of block: {:?}", block); } } Agenda::::translate::>>, _>(|block, agenda| { log::info!("Migrating agenda of block: {:?}", &block); Some(BoundedVec::truncate_from( agenda .into_iter() .map(|schedule| { weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1)); schedule .and_then(|schedule| { if let Some(id) = schedule.maybe_id.as_ref() { let name = blake2_256(id); if let Some(item) = old::Lookup::::take(id) { Lookup::::insert(name, item); log::info!("Migrated name for id: {:?}", id); } else { log::error!("No name in Lookup for id: {:?}", &id); } weight.saturating_accrue(T::DbWeight::get().reads_writes(2, 2)); } else { log::info!("Schedule is unnamed"); } let call = match schedule.call { MaybeHashed::Hash(h) => { let bounded = Bounded::from_legacy_hash(h); // Check that the call can be decoded in the new runtime. if let Err(err) = T::Preimages::peek::< ::RuntimeCall, >(&bounded) { log::error!( "Dropping undecodable call {}: {:?}", &h, &err ); return None } weight.saturating_accrue(T::DbWeight::get().reads(1)); log::info!("Migrated call by hash, hash: {:?}", h); bounded }, MaybeHashed::Value(v) => { let call = T::Preimages::bound(v) .map_err(|e| { log::error!("Could not bound Call: {:?}", e) }) .ok()?; if call.lookup_needed() { weight.saturating_accrue( T::DbWeight::get().reads_writes(0, 1), ); } log::info!( "Migrated call by value, hash: {:?}", call.hash() ); call }, }; Some(Scheduled { maybe_id: schedule.maybe_id.map(|x| blake2_256(&x[..])), priority: schedule.priority, call, maybe_periodic: schedule.maybe_periodic, origin: schedule.origin, _phantom: Default::default(), }) }) .or_else(|| { log::info!("Schedule in agenda for block {:?} is empty - nothing to do here.", &block); None }) }) .collect::>(), )) }); #[allow(deprecated)] frame_support::storage::migration::remove_storage_prefix( Self::name().as_bytes(), b"StorageVersion", &[], ); StorageVersion::new(4).put::(); weight + T::DbWeight::get().writes(2) } } impl Pallet { /// Helper to migrate scheduler when the pallet origin type has changed. pub fn migrate_origin + codec::Decode>() { Agenda::::translate::< Vec< Option< Scheduled< TaskName, Bounded<::RuntimeCall>, T::BlockNumber, OldOrigin, T::AccountId, >, >, >, _, >(|_, agenda| { Some(BoundedVec::truncate_from( agenda .into_iter() .map(|schedule| { schedule.map(|schedule| Scheduled { maybe_id: schedule.maybe_id, priority: schedule.priority, call: schedule.call, maybe_periodic: schedule.maybe_periodic, origin: schedule.origin.into(), _phantom: Default::default(), }) }) .collect::>(), )) }); } fn resolve_time(when: DispatchTime) -> Result { let now = frame_system::Pallet::::block_number(); let when = match when { DispatchTime::At(x) => x, // The current block has already completed it's scheduled tasks, so // Schedule the task at lest one block after this current block. DispatchTime::After(x) => now.saturating_add(x).saturating_add(One::one()), }; if when <= now { return Err(Error::::TargetBlockNumberInPast.into()) } Ok(when) } fn place_task( when: T::BlockNumber, what: ScheduledOf, ) -> Result, (DispatchError, ScheduledOf)> { let maybe_name = what.maybe_id; let index = Self::push_to_agenda(when, what)?; let address = (when, index); if let Some(name) = maybe_name { Lookup::::insert(name, address) } Self::deposit_event(Event::Scheduled { when: address.0, index: address.1 }); Ok(address) } fn push_to_agenda( when: T::BlockNumber, what: ScheduledOf, ) -> Result)> { let mut agenda = Agenda::::get(when); let index = if (agenda.len() as u32) < T::MaxScheduledPerBlock::get() { // will always succeed due to the above check. let _ = agenda.try_push(Some(what)); agenda.len() as u32 - 1 } else { if let Some(hole_index) = agenda.iter().position(|i| i.is_none()) { agenda[hole_index] = Some(what); hole_index as u32 } else { return Err((DispatchError::Exhausted, what)) } }; Agenda::::insert(when, agenda); Ok(index) } fn do_schedule( when: DispatchTime, maybe_periodic: Option>, priority: schedule::Priority, origin: T::PalletsOrigin, call: Bounded<::RuntimeCall>, ) -> Result, DispatchError> { let when = Self::resolve_time(when)?; // sanitize maybe_periodic let maybe_periodic = maybe_periodic .filter(|p| p.1 > 1 && !p.0.is_zero()) // Remove one from the number of repetitions since we will schedule one now. .map(|(p, c)| (p, c - 1)); let task = Scheduled { maybe_id: None, priority, call, maybe_periodic, origin, _phantom: PhantomData, }; Self::place_task(when, task).map_err(|x| x.0) } fn do_cancel( origin: Option, (when, index): TaskAddress, ) -> Result<(), DispatchError> { let scheduled = Agenda::::try_mutate(when, |agenda| { agenda.get_mut(index as usize).map_or( Ok(None), |s| -> Result>, DispatchError> { if let (Some(ref o), Some(ref s)) = (origin, s.borrow()) { if matches!( T::OriginPrivilegeCmp::cmp_privilege(o, &s.origin), Some(Ordering::Less) | None ) { return Err(BadOrigin.into()) } }; Ok(s.take()) }, ) })?; if let Some(s) = scheduled { T::Preimages::drop(&s.call); if let Some(id) = s.maybe_id { Lookup::::remove(id); } Self::deposit_event(Event::Canceled { when, index }); Ok(()) } else { return Err(Error::::NotFound.into()) } } fn do_reschedule( (when, index): TaskAddress, new_time: DispatchTime, ) -> Result, DispatchError> { let new_time = Self::resolve_time(new_time)?; if new_time == when { return Err(Error::::RescheduleNoChange.into()) } let task = Agenda::::try_mutate(when, |agenda| { let task = agenda.get_mut(index as usize).ok_or(Error::::NotFound)?; ensure!(!matches!(task, Some(Scheduled { maybe_id: Some(_), .. })), Error::::Named); task.take().ok_or(Error::::NotFound) })?; Self::deposit_event(Event::Canceled { when, index }); Self::place_task(new_time, task).map_err(|x| x.0) } fn do_schedule_named( id: TaskName, when: DispatchTime, maybe_periodic: Option>, priority: schedule::Priority, origin: T::PalletsOrigin, call: Bounded<::RuntimeCall>, ) -> Result, DispatchError> { // ensure id it is unique if Lookup::::contains_key(&id) { return Err(Error::::FailedToSchedule.into()) } let when = Self::resolve_time(when)?; // sanitize maybe_periodic let maybe_periodic = maybe_periodic .filter(|p| p.1 > 1 && !p.0.is_zero()) // Remove one from the number of repetitions since we will schedule one now. .map(|(p, c)| (p, c - 1)); let task = Scheduled { maybe_id: Some(id), priority, call, maybe_periodic, origin, _phantom: Default::default(), }; Self::place_task(when, task).map_err(|x| x.0) } fn do_cancel_named(origin: Option, id: TaskName) -> DispatchResult { Lookup::::try_mutate_exists(id, |lookup| -> DispatchResult { if let Some((when, index)) = lookup.take() { let i = index as usize; Agenda::::try_mutate(when, |agenda| -> DispatchResult { if let Some(s) = agenda.get_mut(i) { if let (Some(ref o), Some(ref s)) = (origin, s.borrow()) { if matches!( T::OriginPrivilegeCmp::cmp_privilege(o, &s.origin), Some(Ordering::Less) | None ) { return Err(BadOrigin.into()) } T::Preimages::drop(&s.call); } *s = None; } Ok(()) })?; Self::deposit_event(Event::Canceled { when, index }); Ok(()) } else { return Err(Error::::NotFound.into()) } }) } fn do_reschedule_named( id: TaskName, new_time: DispatchTime, ) -> Result, DispatchError> { let new_time = Self::resolve_time(new_time)?; let lookup = Lookup::::get(id); let (when, index) = lookup.ok_or(Error::::NotFound)?; if new_time == when { return Err(Error::::RescheduleNoChange.into()) } let task = Agenda::::try_mutate(when, |agenda| { let task = agenda.get_mut(index as usize).ok_or(Error::::NotFound)?; task.take().ok_or(Error::::NotFound) })?; Self::deposit_event(Event::Canceled { when, index }); Self::place_task(new_time, task).map_err(|x| x.0) } } enum ServiceTaskError { /// Could not be executed due to missing preimage. Unavailable, /// Could not be executed due to weight limitations. Overweight, } use ServiceTaskError::*; impl Pallet { /// Service up to `max` agendas queue starting from earliest incompletely executed agenda. fn service_agendas(weight: &mut WeightCounter, now: T::BlockNumber, max: u32) { if !weight.check_accrue(T::WeightInfo::service_agendas_base()) { return } let mut incomplete_since = now + One::one(); let mut when = IncompleteSince::::take().unwrap_or(now); let mut executed = 0; let max_items = T::MaxScheduledPerBlock::get(); let mut count_down = max; let service_agenda_base_weight = T::WeightInfo::service_agenda_base(max_items); while count_down > 0 && when <= now && weight.can_accrue(service_agenda_base_weight) { if !Self::service_agenda(weight, &mut executed, now, when, u32::max_value()) { incomplete_since = incomplete_since.min(when); } when.saturating_inc(); count_down.saturating_dec(); } incomplete_since = incomplete_since.min(when); if incomplete_since <= now { IncompleteSince::::put(incomplete_since); } } /// Returns `true` if the agenda was fully completed, `false` if it should be revisited at a /// later block. fn service_agenda( weight: &mut WeightCounter, executed: &mut u32, now: T::BlockNumber, when: T::BlockNumber, max: u32, ) -> bool { let mut agenda = Agenda::::get(when); let mut ordered = agenda .iter() .enumerate() .filter_map(|(index, maybe_item)| { maybe_item.as_ref().map(|item| (index as u32, item.priority)) }) .collect::>(); ordered.sort_by_key(|k| k.1); let within_limit = weight.check_accrue(T::WeightInfo::service_agenda_base(ordered.len() as u32)); debug_assert!(within_limit, "weight limit should have been checked in advance"); // Items which we know can be executed and have postponed for execution in a later block. let mut postponed = (ordered.len() as u32).saturating_sub(max); // Items which we don't know can ever be executed. let mut dropped = 0; for (agenda_index, _) in ordered.into_iter().take(max as usize) { let task = match agenda[agenda_index as usize].take() { None => continue, Some(t) => t, }; let base_weight = T::WeightInfo::service_task( task.call.lookup_len().map(|x| x as usize), task.maybe_id.is_some(), task.maybe_periodic.is_some(), ); if !weight.can_accrue(base_weight) { postponed += 1; break } let result = Self::service_task(weight, now, when, agenda_index, *executed == 0, task); agenda[agenda_index as usize] = match result { Err((Unavailable, slot)) => { dropped += 1; slot }, Err((Overweight, slot)) => { postponed += 1; slot }, Ok(()) => { *executed += 1; None }, }; } if postponed > 0 || dropped > 0 { Agenda::::insert(when, agenda); } else { Agenda::::remove(when); } postponed == 0 } /// Service (i.e. execute) the given task, being careful not to overflow the `weight` counter. /// /// This involves: /// - removing and potentially replacing the `Lookup` entry for the task. /// - realizing the task's call which can include a preimage lookup. /// - Rescheduling the task for execution in a later agenda if periodic. fn service_task( weight: &mut WeightCounter, now: T::BlockNumber, when: T::BlockNumber, agenda_index: u32, is_first: bool, mut task: ScheduledOf, ) -> Result<(), (ServiceTaskError, Option>)> { if let Some(ref id) = task.maybe_id { Lookup::::remove(id); } let (call, lookup_len) = match T::Preimages::peek(&task.call) { Ok(c) => c, Err(_) => return Err((Unavailable, Some(task))), }; weight.check_accrue(T::WeightInfo::service_task( lookup_len.map(|x| x as usize), task.maybe_id.is_some(), task.maybe_periodic.is_some(), )); match Self::execute_dispatch(weight, task.origin.clone(), call) { Err(Unavailable) => { debug_assert!(false, "Checked to exist with `peek`"); Self::deposit_event(Event::CallUnavailable { task: (when, agenda_index), id: task.maybe_id, }); Err((Unavailable, Some(task))) }, Err(Overweight) if is_first => { T::Preimages::drop(&task.call); Self::deposit_event(Event::PermanentlyOverweight { task: (when, agenda_index), id: task.maybe_id, }); Err((Unavailable, Some(task))) }, Err(Overweight) => Err((Overweight, Some(task))), Ok(result) => { Self::deposit_event(Event::Dispatched { task: (when, agenda_index), id: task.maybe_id, result, }); if let &Some((period, count)) = &task.maybe_periodic { if count > 1 { task.maybe_periodic = Some((period, count - 1)); } else { task.maybe_periodic = None; } let wake = now.saturating_add(period); match Self::place_task(wake, task) { Ok(_) => {}, Err((_, task)) => { // TODO: Leave task in storage somewhere for it to be rescheduled // manually. T::Preimages::drop(&task.call); Self::deposit_event(Event::PeriodicFailed { task: (when, agenda_index), id: task.maybe_id, }); }, } } else { T::Preimages::drop(&task.call); } Ok(()) }, } } /// Make a dispatch to the given `call` from the given `origin`, ensuring that the `weight` /// counter does not exceed its limit and that it is counted accurately (e.g. accounted using /// post info if available). /// /// NOTE: Only the weight for this function will be counted (origin lookup, dispatch and the /// call itself). fn execute_dispatch( weight: &mut WeightCounter, origin: T::PalletsOrigin, call: ::RuntimeCall, ) -> Result { let base_weight = match origin.as_system_ref() { Some(&RawOrigin::Signed(_)) => T::WeightInfo::execute_dispatch_signed(), _ => T::WeightInfo::execute_dispatch_unsigned(), }; let call_weight = call.get_dispatch_info().weight; // We only allow a scheduled call if it cannot push the weight past the limit. let max_weight = base_weight.saturating_add(call_weight); if !weight.can_accrue(max_weight) { return Err(Overweight) } let dispatch_origin = origin.into(); let (maybe_actual_call_weight, result) = match call.dispatch(dispatch_origin) { Ok(post_info) => (post_info.actual_weight, Ok(())), Err(error_and_info) => (error_and_info.post_info.actual_weight, Err(error_and_info.error)), }; let call_weight = maybe_actual_call_weight.unwrap_or(call_weight); weight.check_accrue(base_weight); weight.check_accrue(call_weight); Ok(result) } } impl> schedule::v2::Anon::RuntimeCall, T::PalletsOrigin> for Pallet { type Address = TaskAddress; type Hash = T::Hash; fn schedule( when: DispatchTime, maybe_periodic: Option>, priority: schedule::Priority, origin: T::PalletsOrigin, call: CallOrHashOf, ) -> Result { let call = call.as_value().ok_or(DispatchError::CannotLookup)?; let call = T::Preimages::bound(call)?.transmute(); Self::do_schedule(when, maybe_periodic, priority, origin, call) } fn cancel((when, index): Self::Address) -> Result<(), ()> { Self::do_cancel(None, (when, index)).map_err(|_| ()) } fn reschedule( address: Self::Address, when: DispatchTime, ) -> Result { Self::do_reschedule(address, when) } fn next_dispatch_time((when, index): Self::Address) -> Result { Agenda::::get(when).get(index as usize).ok_or(()).map(|_| when) } } impl> schedule::v2::Named::RuntimeCall, T::PalletsOrigin> for Pallet { type Address = TaskAddress; type Hash = T::Hash; fn schedule_named( id: Vec, when: DispatchTime, maybe_periodic: Option>, priority: schedule::Priority, origin: T::PalletsOrigin, call: CallOrHashOf, ) -> Result { let call = call.as_value().ok_or(())?; let call = T::Preimages::bound(call).map_err(|_| ())?.transmute(); let name = blake2_256(&id[..]); Self::do_schedule_named(name, when, maybe_periodic, priority, origin, call).map_err(|_| ()) } fn cancel_named(id: Vec) -> Result<(), ()> { let name = blake2_256(&id[..]); Self::do_cancel_named(None, name).map_err(|_| ()) } fn reschedule_named( id: Vec, when: DispatchTime, ) -> Result { let name = blake2_256(&id[..]); Self::do_reschedule_named(name, when) } fn next_dispatch_time(id: Vec) -> Result { let name = blake2_256(&id[..]); Lookup::::get(name) .and_then(|(when, index)| Agenda::::get(when).get(index as usize).map(|_| when)) .ok_or(()) } } impl schedule::v3::Anon::RuntimeCall, T::PalletsOrigin> for Pallet { type Address = TaskAddress; fn schedule( when: DispatchTime, maybe_periodic: Option>, priority: schedule::Priority, origin: T::PalletsOrigin, call: Bounded<::RuntimeCall>, ) -> Result { Self::do_schedule(when, maybe_periodic, priority, origin, call) } fn cancel((when, index): Self::Address) -> Result<(), DispatchError> { Self::do_cancel(None, (when, index)).map_err(map_err_to_v3_err::) } fn reschedule( address: Self::Address, when: DispatchTime, ) -> Result { Self::do_reschedule(address, when).map_err(map_err_to_v3_err::) } fn next_dispatch_time((when, index): Self::Address) -> Result { Agenda::::get(when) .get(index as usize) .ok_or(DispatchError::Unavailable) .map(|_| when) } } use schedule::v3::TaskName; impl schedule::v3::Named::RuntimeCall, T::PalletsOrigin> for Pallet { type Address = TaskAddress; fn schedule_named( id: TaskName, when: DispatchTime, maybe_periodic: Option>, priority: schedule::Priority, origin: T::PalletsOrigin, call: Bounded<::RuntimeCall>, ) -> Result { Self::do_schedule_named(id, when, maybe_periodic, priority, origin, call) } fn cancel_named(id: TaskName) -> Result<(), DispatchError> { Self::do_cancel_named(None, id).map_err(map_err_to_v3_err::) } fn reschedule_named( id: TaskName, when: DispatchTime, ) -> Result { Self::do_reschedule_named(id, when).map_err(map_err_to_v3_err::) } fn next_dispatch_time(id: TaskName) -> Result { Lookup::::get(id) .and_then(|(when, index)| Agenda::::get(when).get(index as usize).map(|_| when)) .ok_or(DispatchError::Unavailable) } } /// Maps a pallet error to an `schedule::v3` error. fn map_err_to_v3_err(err: DispatchError) -> DispatchError { if err == DispatchError::from(Error::::NotFound) { DispatchError::Unavailable } else { err } }