mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-23 21:21:06 +00:00
Add retry mechanics to pallet-scheduler (#3060)
Fixes #3014 This PR adds retry mechanics to `pallet-scheduler`, as described in the issue above. Users can now set a retry configuration for a task so that, in case its scheduled run fails, it will be retried after a number of blocks, for a specified number of times or until it succeeds. If a retried task runs successfully before running out of retries, its remaining retry counter will be reset to the initial value. If a retried task runs out of retries, it will be removed from the schedule. Tasks which need to be scheduled for a retry are still subject to weight metering and agenda space, same as a regular task. Periodic tasks will have their periodic schedule put on hold while the task is retrying. --------- Signed-off-by: georgepisaltu <george.pisaltu@parity.io> Co-authored-by: command-bot <>
This commit is contained in:
@@ -122,6 +122,17 @@ pub type CallOrHashOf<T> =
|
||||
pub type BoundedCallOf<T> =
|
||||
Bounded<<T as Config>::RuntimeCall, <T as frame_system::Config>::Hashing>;
|
||||
|
||||
/// The configuration of the retry mechanism for a given task along with its current state.
|
||||
#[derive(Clone, Copy, RuntimeDebug, PartialEq, Eq, Encode, Decode, MaxEncodedLen, TypeInfo)]
|
||||
pub struct RetryConfig<Period> {
|
||||
/// Initial amount of retries allowed.
|
||||
total_retries: u8,
|
||||
/// Amount of retries left.
|
||||
remaining: u8,
|
||||
/// Period of time between retry attempts.
|
||||
period: Period,
|
||||
}
|
||||
|
||||
#[cfg_attr(any(feature = "std", test), derive(PartialEq, Eq))]
|
||||
#[derive(Clone, RuntimeDebug, Encode, Decode)]
|
||||
struct ScheduledV1<Call, BlockNumber> {
|
||||
@@ -148,6 +159,26 @@ pub struct Scheduled<Name, Call, BlockNumber, PalletsOrigin, AccountId> {
|
||||
_phantom: PhantomData<AccountId>,
|
||||
}
|
||||
|
||||
impl<Name, Call, BlockNumber, PalletsOrigin, AccountId>
|
||||
Scheduled<Name, Call, BlockNumber, PalletsOrigin, AccountId>
|
||||
where
|
||||
Call: Clone,
|
||||
PalletsOrigin: Clone,
|
||||
{
|
||||
/// Create a new task to be used for retry attempts of the original one. The cloned task will
|
||||
/// have the same `priority`, `call` and `origin`, but will always be non-periodic and unnamed.
|
||||
pub fn as_retry(&self) -> Self {
|
||||
Self {
|
||||
maybe_id: None,
|
||||
priority: self.priority,
|
||||
call: self.call.clone(),
|
||||
maybe_periodic: None,
|
||||
origin: self.origin.clone(),
|
||||
_phantom: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use crate::{Scheduled as ScheduledV3, Scheduled as ScheduledV2};
|
||||
|
||||
pub type ScheduledV2Of<T> = ScheduledV2<
|
||||
@@ -273,6 +304,16 @@ pub mod pallet {
|
||||
ValueQuery,
|
||||
>;
|
||||
|
||||
/// Retry configurations for items to be executed, indexed by task address.
|
||||
#[pallet::storage]
|
||||
pub type Retries<T: Config> = StorageMap<
|
||||
_,
|
||||
Blake2_128Concat,
|
||||
TaskAddress<BlockNumberFor<T>>,
|
||||
RetryConfig<BlockNumberFor<T>>,
|
||||
OptionQuery,
|
||||
>;
|
||||
|
||||
/// Lookup from a name to the block number and index of the task.
|
||||
///
|
||||
/// For v3 -> v4 the previously unbounded identities are Blake2-256 hashed to form the v4
|
||||
@@ -295,10 +336,22 @@ pub mod pallet {
|
||||
id: Option<TaskName>,
|
||||
result: DispatchResult,
|
||||
},
|
||||
/// Set a retry configuration for some task.
|
||||
RetrySet {
|
||||
task: TaskAddress<BlockNumberFor<T>>,
|
||||
id: Option<TaskName>,
|
||||
period: BlockNumberFor<T>,
|
||||
retries: u8,
|
||||
},
|
||||
/// Cancel a retry configuration for some task.
|
||||
RetryCancelled { task: TaskAddress<BlockNumberFor<T>>, id: Option<TaskName> },
|
||||
/// The call for the provided hash was not found so the task has been aborted.
|
||||
CallUnavailable { task: TaskAddress<BlockNumberFor<T>>, id: Option<TaskName> },
|
||||
/// The given task was unable to be renewed since the agenda is full at that block.
|
||||
PeriodicFailed { task: TaskAddress<BlockNumberFor<T>>, id: Option<TaskName> },
|
||||
/// The given task was unable to be retried since the agenda is full at that block or there
|
||||
/// was not enough weight to reschedule it.
|
||||
RetryFailed { task: TaskAddress<BlockNumberFor<T>>, id: Option<TaskName> },
|
||||
/// The given task can never be executed since it is overweight.
|
||||
PermanentlyOverweight { task: TaskAddress<BlockNumberFor<T>>, id: Option<TaskName> },
|
||||
}
|
||||
@@ -440,6 +493,111 @@ pub mod pallet {
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set a retry configuration for a task so that, in case its scheduled run fails, it will
|
||||
/// be retried after `period` blocks, for a total amount of `retries` retries or until it
|
||||
/// succeeds.
|
||||
///
|
||||
/// Tasks which need to be scheduled for a retry are still subject to weight metering and
|
||||
/// agenda space, same as a regular task. If a periodic task fails, it will be scheduled
|
||||
/// normally while the task is retrying.
|
||||
///
|
||||
/// Tasks scheduled as a result of a retry for a periodic task are unnamed, non-periodic
|
||||
/// clones of the original task. Their retry configuration will be derived from the
|
||||
/// original task's configuration, but will have a lower value for `remaining` than the
|
||||
/// original `total_retries`.
|
||||
#[pallet::call_index(6)]
|
||||
#[pallet::weight(<T as Config>::WeightInfo::set_retry())]
|
||||
pub fn set_retry(
|
||||
origin: OriginFor<T>,
|
||||
task: TaskAddress<BlockNumberFor<T>>,
|
||||
retries: u8,
|
||||
period: BlockNumberFor<T>,
|
||||
) -> DispatchResult {
|
||||
T::ScheduleOrigin::ensure_origin(origin.clone())?;
|
||||
let origin = <T as Config>::RuntimeOrigin::from(origin);
|
||||
let (when, index) = task;
|
||||
let agenda = Agenda::<T>::get(when);
|
||||
let scheduled = agenda
|
||||
.get(index as usize)
|
||||
.and_then(Option::as_ref)
|
||||
.ok_or(Error::<T>::NotFound)?;
|
||||
Self::ensure_privilege(origin.caller(), &scheduled.origin)?;
|
||||
Retries::<T>::insert(
|
||||
(when, index),
|
||||
RetryConfig { total_retries: retries, remaining: retries, period },
|
||||
);
|
||||
Self::deposit_event(Event::RetrySet { task, id: None, period, retries });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set a retry configuration for a named task so that, in case its scheduled run fails, it
|
||||
/// will be retried after `period` blocks, for a total amount of `retries` retries or until
|
||||
/// it succeeds.
|
||||
///
|
||||
/// Tasks which need to be scheduled for a retry are still subject to weight metering and
|
||||
/// agenda space, same as a regular task. If a periodic task fails, it will be scheduled
|
||||
/// normally while the task is retrying.
|
||||
///
|
||||
/// Tasks scheduled as a result of a retry for a periodic task are unnamed, non-periodic
|
||||
/// clones of the original task. Their retry configuration will be derived from the
|
||||
/// original task's configuration, but will have a lower value for `remaining` than the
|
||||
/// original `total_retries`.
|
||||
#[pallet::call_index(7)]
|
||||
#[pallet::weight(<T as Config>::WeightInfo::set_retry_named())]
|
||||
pub fn set_retry_named(
|
||||
origin: OriginFor<T>,
|
||||
id: TaskName,
|
||||
retries: u8,
|
||||
period: BlockNumberFor<T>,
|
||||
) -> DispatchResult {
|
||||
T::ScheduleOrigin::ensure_origin(origin.clone())?;
|
||||
let origin = <T as Config>::RuntimeOrigin::from(origin);
|
||||
let (when, agenda_index) = Lookup::<T>::get(&id).ok_or(Error::<T>::NotFound)?;
|
||||
let agenda = Agenda::<T>::get(when);
|
||||
let scheduled = agenda
|
||||
.get(agenda_index as usize)
|
||||
.and_then(Option::as_ref)
|
||||
.ok_or(Error::<T>::NotFound)?;
|
||||
Self::ensure_privilege(origin.caller(), &scheduled.origin)?;
|
||||
Retries::<T>::insert(
|
||||
(when, agenda_index),
|
||||
RetryConfig { total_retries: retries, remaining: retries, period },
|
||||
);
|
||||
Self::deposit_event(Event::RetrySet {
|
||||
task: (when, agenda_index),
|
||||
id: Some(id),
|
||||
period,
|
||||
retries,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes the retry configuration of a task.
|
||||
#[pallet::call_index(8)]
|
||||
#[pallet::weight(<T as Config>::WeightInfo::cancel_retry())]
|
||||
pub fn cancel_retry(
|
||||
origin: OriginFor<T>,
|
||||
task: TaskAddress<BlockNumberFor<T>>,
|
||||
) -> DispatchResult {
|
||||
T::ScheduleOrigin::ensure_origin(origin.clone())?;
|
||||
let origin = <T as Config>::RuntimeOrigin::from(origin);
|
||||
Self::do_cancel_retry(origin.caller(), task)?;
|
||||
Self::deposit_event(Event::RetryCancelled { task, id: None });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Cancel the retry configuration of a named task.
|
||||
#[pallet::call_index(9)]
|
||||
#[pallet::weight(<T as Config>::WeightInfo::cancel_retry_named())]
|
||||
pub fn cancel_retry_named(origin: OriginFor<T>, id: TaskName) -> DispatchResult {
|
||||
T::ScheduleOrigin::ensure_origin(origin.clone())?;
|
||||
let origin = <T as Config>::RuntimeOrigin::from(origin);
|
||||
let task = Lookup::<T>::get(&id).ok_or(Error::<T>::NotFound)?;
|
||||
Self::do_cancel_retry(origin.caller(), task)?;
|
||||
Self::deposit_event(Event::RetryCancelled { task, id: Some(id) });
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -838,12 +996,7 @@ impl<T: Config> Pallet<T> {
|
||||
Ok(None),
|
||||
|s| -> Result<Option<Scheduled<_, _, _, _, _>>, DispatchError> {
|
||||
if let (Some(ref o), Some(ref s)) = (origin, s.borrow()) {
|
||||
if matches!(
|
||||
T::OriginPrivilegeCmp::cmp_privilege(o, &s.origin),
|
||||
Some(Ordering::Less) | None
|
||||
) {
|
||||
return Err(BadOrigin.into())
|
||||
}
|
||||
Self::ensure_privilege(o, &s.origin)?;
|
||||
};
|
||||
Ok(s.take())
|
||||
},
|
||||
@@ -854,6 +1007,7 @@ impl<T: Config> Pallet<T> {
|
||||
if let Some(id) = s.maybe_id {
|
||||
Lookup::<T>::remove(id);
|
||||
}
|
||||
Retries::<T>::remove((when, index));
|
||||
Self::cleanup_agenda(when);
|
||||
Self::deposit_event(Event::Canceled { when, index });
|
||||
Ok(())
|
||||
@@ -931,12 +1085,8 @@ impl<T: Config> Pallet<T> {
|
||||
Agenda::<T>::try_mutate(when, |agenda| -> DispatchResult {
|
||||
if let Some(s) = agenda.get_mut(i) {
|
||||
if let (Some(ref o), Some(ref s)) = (origin, s.borrow()) {
|
||||
if matches!(
|
||||
T::OriginPrivilegeCmp::cmp_privilege(o, &s.origin),
|
||||
Some(Ordering::Less) | None
|
||||
) {
|
||||
return Err(BadOrigin.into())
|
||||
}
|
||||
Self::ensure_privilege(o, &s.origin)?;
|
||||
Retries::<T>::remove((when, index));
|
||||
T::Preimages::drop(&s.call);
|
||||
}
|
||||
*s = None;
|
||||
@@ -973,6 +1123,20 @@ impl<T: Config> Pallet<T> {
|
||||
Self::deposit_event(Event::Canceled { when, index });
|
||||
Self::place_task(new_time, task).map_err(|x| x.0)
|
||||
}
|
||||
|
||||
fn do_cancel_retry(
|
||||
origin: &T::PalletsOrigin,
|
||||
(when, index): TaskAddress<BlockNumberFor<T>>,
|
||||
) -> Result<(), DispatchError> {
|
||||
let agenda = Agenda::<T>::get(when);
|
||||
let scheduled = agenda
|
||||
.get(index as usize)
|
||||
.and_then(Option::as_ref)
|
||||
.ok_or(Error::<T>::NotFound)?;
|
||||
Self::ensure_privilege(origin, &scheduled.origin)?;
|
||||
Retries::<T>::remove((when, index));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
enum ServiceTaskError {
|
||||
@@ -1124,11 +1288,21 @@ impl<T: Config> Pallet<T> {
|
||||
},
|
||||
Err(()) => Err((Overweight, Some(task))),
|
||||
Ok(result) => {
|
||||
let failed = result.is_err();
|
||||
let maybe_retry_config = Retries::<T>::take((when, agenda_index));
|
||||
Self::deposit_event(Event::Dispatched {
|
||||
task: (when, agenda_index),
|
||||
id: task.maybe_id,
|
||||
result,
|
||||
});
|
||||
|
||||
match maybe_retry_config {
|
||||
Some(retry_config) if failed => {
|
||||
Self::schedule_retry(weight, now, when, agenda_index, &task, retry_config);
|
||||
},
|
||||
_ => {},
|
||||
}
|
||||
|
||||
if let &Some((period, count)) = &task.maybe_periodic {
|
||||
if count > 1 {
|
||||
task.maybe_periodic = Some((period, count - 1));
|
||||
@@ -1137,7 +1311,10 @@ impl<T: Config> Pallet<T> {
|
||||
}
|
||||
let wake = now.saturating_add(period);
|
||||
match Self::place_task(wake, task) {
|
||||
Ok(_) => {},
|
||||
Ok(new_address) =>
|
||||
if let Some(retry_config) = maybe_retry_config {
|
||||
Retries::<T>::insert(new_address, retry_config);
|
||||
},
|
||||
Err((_, task)) => {
|
||||
// TODO: Leave task in storage somewhere for it to be rescheduled
|
||||
// manually.
|
||||
@@ -1192,6 +1369,70 @@ impl<T: Config> Pallet<T> {
|
||||
let _ = weight.try_consume(call_weight);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Check if a task has a retry configuration in place and, if so, try to reschedule it.
|
||||
///
|
||||
/// Possible causes for failure to schedule a retry for a task:
|
||||
/// - there wasn't enough weight to run the task reschedule logic
|
||||
/// - there was no retry configuration in place
|
||||
/// - there were no more retry attempts left
|
||||
/// - the agenda was full.
|
||||
fn schedule_retry(
|
||||
weight: &mut WeightMeter,
|
||||
now: BlockNumberFor<T>,
|
||||
when: BlockNumberFor<T>,
|
||||
agenda_index: u32,
|
||||
task: &ScheduledOf<T>,
|
||||
retry_config: RetryConfig<BlockNumberFor<T>>,
|
||||
) {
|
||||
if weight
|
||||
.try_consume(T::WeightInfo::schedule_retry(T::MaxScheduledPerBlock::get()))
|
||||
.is_err()
|
||||
{
|
||||
Self::deposit_event(Event::RetryFailed {
|
||||
task: (when, agenda_index),
|
||||
id: task.maybe_id,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
let RetryConfig { total_retries, mut remaining, period } = retry_config;
|
||||
remaining = match remaining.checked_sub(1) {
|
||||
Some(n) => n,
|
||||
None => return,
|
||||
};
|
||||
let wake = now.saturating_add(period);
|
||||
match Self::place_task(wake, task.as_retry()) {
|
||||
Ok(address) => {
|
||||
// Reinsert the retry config to the new address of the task after it was
|
||||
// placed.
|
||||
Retries::<T>::insert(address, RetryConfig { total_retries, remaining, period });
|
||||
},
|
||||
Err((_, task)) => {
|
||||
// TODO: Leave task in storage somewhere for it to be
|
||||
// rescheduled manually.
|
||||
T::Preimages::drop(&task.call);
|
||||
Self::deposit_event(Event::RetryFailed {
|
||||
task: (when, agenda_index),
|
||||
id: task.maybe_id,
|
||||
});
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensure that `left` has at least the same level of privilege or higher than `right`.
|
||||
///
|
||||
/// Returns an error if `left` has a lower level of privilege or the two cannot be compared.
|
||||
fn ensure_privilege(
|
||||
left: &<T as Config>::PalletsOrigin,
|
||||
right: &<T as Config>::PalletsOrigin,
|
||||
) -> Result<(), DispatchError> {
|
||||
if matches!(T::OriginPrivilegeCmp::cmp_privilege(left, right), Some(Ordering::Less) | None)
|
||||
{
|
||||
return Err(BadOrigin.into());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Config> schedule::v2::Anon<BlockNumberFor<T>, <T as Config>::RuntimeCall, T::PalletsOrigin>
|
||||
|
||||
Reference in New Issue
Block a user