fix: stale activeStakers — era detection fallback + cleanup on era change

- handleBlock now checks activeEra every block as fallback when
  StakersElected event is dropped due to pruned state patch
- Clear all existing ActiveStaker entries before saving fresh data
  from current era exposure set (prevents stale nominators)
- Re-save pool stash accounts after cleanup
- Robust exposure parsing (handle Option and non-Option types)
- Better error logging with stack traces for APY computation failures
This commit is contained in:
2026-03-10 00:11:50 +03:00
parent 95e6e49d90
commit 23b6227772
+118 -27
View File
@@ -13,6 +13,7 @@ import {
} from "./constants";
let poolStakersInitialized = false;
let lastProcessedEra = -1;
/**
* Derive the bonded (stash) account for a nomination pool.
@@ -57,39 +58,60 @@ function derivePoolStash(poolId: number): string {
* - networkId: AH genesis
*/
export async function handleBlock(block: SubstrateBlock): Promise<void> {
if (poolStakersInitialized) return;
poolStakersInitialized = true;
if (!poolStakersInitialized) {
poolStakersInitialized = true;
logger.info("Initializing pool stash accounts from live chain state...");
logger.info("Initializing pool stash accounts from live chain state...");
const pools = await api.query.nominationPools.bondedPools.entries();
let count = 0;
const pools = await api.query.nominationPools.bondedPools.entries();
let count = 0;
for (const [key, poolOpt] of pools) {
const pool = poolOpt as Option<any>;
if (pool.isNone) continue;
for (const [key, poolOpt] of pools) {
const pool = poolOpt as Option<any>;
if (pool.isNone) continue;
const unwrapped = pool.unwrap();
if (unwrapped.points.toBigInt() === BigInt(0)) continue;
const unwrapped = pool.unwrap();
if (unwrapped.points.toBigInt() === BigInt(0)) continue;
const poolId = (key.args[0] as any).toNumber();
const stashAddress = derivePoolStash(poolId);
const poolId = (key.args[0] as any).toNumber();
const stashAddress = derivePoolStash(poolId);
const stakerId = `${PEZKUWI_ASSET_HUB_GENESIS}-${STAKING_TYPE_RELAYCHAIN}-${stashAddress}`;
const staker = ActiveStaker.create({
id: stakerId,
networkId: PEZKUWI_ASSET_HUB_GENESIS,
stakingType: STAKING_TYPE_RELAYCHAIN,
address: stashAddress,
});
await staker.save();
count++;
const stakerId = `${PEZKUWI_ASSET_HUB_GENESIS}-${STAKING_TYPE_RELAYCHAIN}-${stashAddress}`;
const staker = ActiveStaker.create({
id: stakerId,
networkId: PEZKUWI_ASSET_HUB_GENESIS,
stakingType: STAKING_TYPE_RELAYCHAIN,
address: stashAddress,
});
await staker.save();
count++;
}
logger.info(`Initialized ${count} pool stash accounts as active stakers`);
// Also compute and save APY on first block
await computeAndSaveAPY();
return;
}
logger.info(`Initialized ${count} pool stash accounts as active stakers`);
// Also compute and save APY on first block
await computeAndSaveAPY();
// Check for era changes every block — this is the fallback mechanism.
// The StakersElected event handler may miss era changes if events are dropped
// due to pruned state (pruned-state-fallback patch returns empty events).
if (api.query.staking && api.query.staking.activeEra) {
try {
const activeEraOpt = (await api.query.staking.activeEra()) as Option<any>;
if (!activeEraOpt.isNone) {
const currentEra = activeEraOpt.unwrap().index.toNumber();
if (lastProcessedEra >= 0 && currentEra > lastProcessedEra) {
logger.info(`Era change detected in handleBlock: ${lastProcessedEra} -> ${currentEra}, refreshing active stakers`);
await computeAndSaveAPY();
}
lastProcessedEra = currentEra;
}
} catch (e) {
// Don't crash the block handler for era detection failures
}
}
}
/**
@@ -181,7 +203,10 @@ async function computeAndSaveAPY(): Promise<void> {
try {
await _computeAndSaveAPYInner();
} catch (e) {
logger.warn(`APY computation failed: ${e}`);
logger.error(`APY computation FAILED (active stakers NOT updated): ${e}`);
if (e instanceof Error && e.stack) {
logger.error(`Stack trace: ${e.stack}`);
}
}
}
@@ -205,7 +230,14 @@ async function _computeAndSaveAPYInner(): Promise<void> {
for (const [key, exp] of overviews) {
const [, validatorId] = key.args;
const exposure = (exp as Option<any>).unwrap();
let exposure: any;
try {
const asOpt = exp as Option<any>;
if (asOpt.isNone) continue;
exposure = asOpt.unwrap();
} catch {
exposure = exp as any;
}
const total = exposure.total.toBigInt();
totalStaked += total;
validatorAddresses.push(validatorId.toString());
@@ -261,6 +293,21 @@ async function _computeAndSaveAPYInner(): Promise<void> {
maxAPY,
}).save();
// Remove ALL existing active stakers before refreshing with current era data.
// This prevents stale entries from nominators who are no longer in the exposure set.
try {
const existingStakers = await ActiveStaker.getByNetworkId(PEZKUWI_ASSET_HUB_GENESIS);
if (existingStakers && existingStakers.length > 0) {
for (const staker of existingStakers) {
await ActiveStaker.remove(staker.id);
}
logger.info(`Cleared ${existingStakers.length} stale active stakers for era ${currentEra}`);
}
} catch (e) {
logger.warn(`Failed to clear stale active stakers: ${e}`);
// Continue anyway — new stakers will be upserted via create().save()
}
// Save validators as active stakers on AH
for (const address of validatorAddresses) {
const stakerId = `${PEZKUWI_ASSET_HUB_GENESIS}-${STAKING_TYPE_RELAYCHAIN}-${address}`;
@@ -301,6 +348,31 @@ async function _computeAndSaveAPYInner(): Promise<void> {
}).save();
}
// Re-save pool stash accounts that may have been cleared above
try {
const pools = await api.query.nominationPools.bondedPools.entries();
let poolCount = 0;
for (const [key, poolOpt] of pools) {
const pool = poolOpt as Option<any>;
if (pool.isNone) continue;
const unwrapped = pool.unwrap();
if (unwrapped.points.toBigInt() === BigInt(0)) continue;
const poolId = (key.args[0] as any).toNumber();
const stashAddress = derivePoolStash(poolId);
const stakerId = `${PEZKUWI_ASSET_HUB_GENESIS}-${STAKING_TYPE_RELAYCHAIN}-${stashAddress}`;
await ActiveStaker.create({
id: stakerId,
networkId: PEZKUWI_ASSET_HUB_GENESIS,
stakingType: STAKING_TYPE_RELAYCHAIN,
address: stashAddress,
}).save();
poolCount++;
}
logger.info(`Re-saved ${poolCount} pool stash accounts`);
} catch (e) {
logger.warn(`Failed to re-save pool stashes: ${e}`);
}
logger.info(
`AH APY: ${(maxAPY * 100).toFixed(2)}% from ${
validators.length
@@ -318,12 +390,31 @@ async function _computeAndSaveAPYInner(): Promise<void> {
export async function handleAHStakersElected(
event: SubstrateEvent,
): Promise<void> {
logger.info("StakersElected event received, refreshing active stakers");
await computeAndSaveAPY();
// Update lastProcessedEra so handleBlock won't redundantly re-process
if (api.query.staking && api.query.staking.activeEra) {
try {
const activeEraOpt = (await api.query.staking.activeEra()) as Option<any>;
if (!activeEraOpt.isNone) {
lastProcessedEra = activeEraOpt.unwrap().index.toNumber();
}
} catch {}
}
}
/**
* Handle staking.StakingElection on Asset Hub (old format)
*/
export async function handleAHNewEra(event: SubstrateEvent): Promise<void> {
logger.info("StakingElection event received, refreshing active stakers");
await computeAndSaveAPY();
if (api.query.staking && api.query.staking.activeEra) {
try {
const activeEraOpt = (await api.query.staking.activeEra()) as Option<any>;
if (!activeEraOpt.isNone) {
lastProcessedEra = activeEraOpt.unwrap().index.toNumber();
}
} catch {}
}
}