diff --git a/src/mappings/PoolStakers.ts b/src/mappings/PoolStakers.ts index 8c50c68..26113b1 100644 --- a/src/mappings/PoolStakers.ts +++ b/src/mappings/PoolStakers.ts @@ -13,6 +13,7 @@ import { } from "./constants"; let poolStakersInitialized = false; +let lastProcessedEra = -1; /** * Derive the bonded (stash) account for a nomination pool. @@ -57,39 +58,60 @@ function derivePoolStash(poolId: number): string { * - networkId: AH genesis */ export async function handleBlock(block: SubstrateBlock): Promise { - if (poolStakersInitialized) return; - poolStakersInitialized = true; + if (!poolStakersInitialized) { + poolStakersInitialized = true; - logger.info("Initializing pool stash accounts from live chain state..."); + logger.info("Initializing pool stash accounts from live chain state..."); - const pools = await api.query.nominationPools.bondedPools.entries(); - let count = 0; + const pools = await api.query.nominationPools.bondedPools.entries(); + let count = 0; - for (const [key, poolOpt] of pools) { - const pool = poolOpt as Option; - if (pool.isNone) continue; + for (const [key, poolOpt] of pools) { + const pool = poolOpt as Option; + if (pool.isNone) continue; - const unwrapped = pool.unwrap(); - if (unwrapped.points.toBigInt() === BigInt(0)) continue; + const unwrapped = pool.unwrap(); + if (unwrapped.points.toBigInt() === BigInt(0)) continue; - const poolId = (key.args[0] as any).toNumber(); - const stashAddress = derivePoolStash(poolId); + const poolId = (key.args[0] as any).toNumber(); + const stashAddress = derivePoolStash(poolId); - const stakerId = `${PEZKUWI_ASSET_HUB_GENESIS}-${STAKING_TYPE_RELAYCHAIN}-${stashAddress}`; - const staker = ActiveStaker.create({ - id: stakerId, - networkId: PEZKUWI_ASSET_HUB_GENESIS, - stakingType: STAKING_TYPE_RELAYCHAIN, - address: stashAddress, - }); - await staker.save(); - count++; + const stakerId = `${PEZKUWI_ASSET_HUB_GENESIS}-${STAKING_TYPE_RELAYCHAIN}-${stashAddress}`; + const staker = ActiveStaker.create({ + id: stakerId, + networkId: PEZKUWI_ASSET_HUB_GENESIS, + stakingType: STAKING_TYPE_RELAYCHAIN, + address: stashAddress, + }); + await staker.save(); + count++; + } + + logger.info(`Initialized ${count} pool stash accounts as active stakers`); + + // Also compute and save APY on first block + await computeAndSaveAPY(); + return; } - logger.info(`Initialized ${count} pool stash accounts as active stakers`); - - // Also compute and save APY on first block - await computeAndSaveAPY(); + // Check for era changes every block — this is the fallback mechanism. + // The StakersElected event handler may miss era changes if events are dropped + // due to pruned state (pruned-state-fallback patch returns empty events). + if (api.query.staking && api.query.staking.activeEra) { + try { + const activeEraOpt = (await api.query.staking.activeEra()) as Option; + if (!activeEraOpt.isNone) { + const currentEra = activeEraOpt.unwrap().index.toNumber(); + if (lastProcessedEra >= 0 && currentEra > lastProcessedEra) { + logger.info(`Era change detected in handleBlock: ${lastProcessedEra} -> ${currentEra}, refreshing active stakers`); + await computeAndSaveAPY(); + } + lastProcessedEra = currentEra; + } + } catch (e) { + // Don't crash the block handler for era detection failures + } + } } /** @@ -181,7 +203,10 @@ async function computeAndSaveAPY(): Promise { try { await _computeAndSaveAPYInner(); } catch (e) { - logger.warn(`APY computation failed: ${e}`); + logger.error(`APY computation FAILED (active stakers NOT updated): ${e}`); + if (e instanceof Error && e.stack) { + logger.error(`Stack trace: ${e.stack}`); + } } } @@ -205,7 +230,14 @@ async function _computeAndSaveAPYInner(): Promise { for (const [key, exp] of overviews) { const [, validatorId] = key.args; - const exposure = (exp as Option).unwrap(); + let exposure: any; + try { + const asOpt = exp as Option; + if (asOpt.isNone) continue; + exposure = asOpt.unwrap(); + } catch { + exposure = exp as any; + } const total = exposure.total.toBigInt(); totalStaked += total; validatorAddresses.push(validatorId.toString()); @@ -261,6 +293,21 @@ async function _computeAndSaveAPYInner(): Promise { maxAPY, }).save(); + // Remove ALL existing active stakers before refreshing with current era data. + // This prevents stale entries from nominators who are no longer in the exposure set. + try { + const existingStakers = await ActiveStaker.getByNetworkId(PEZKUWI_ASSET_HUB_GENESIS); + if (existingStakers && existingStakers.length > 0) { + for (const staker of existingStakers) { + await ActiveStaker.remove(staker.id); + } + logger.info(`Cleared ${existingStakers.length} stale active stakers for era ${currentEra}`); + } + } catch (e) { + logger.warn(`Failed to clear stale active stakers: ${e}`); + // Continue anyway — new stakers will be upserted via create().save() + } + // Save validators as active stakers on AH for (const address of validatorAddresses) { const stakerId = `${PEZKUWI_ASSET_HUB_GENESIS}-${STAKING_TYPE_RELAYCHAIN}-${address}`; @@ -301,6 +348,31 @@ async function _computeAndSaveAPYInner(): Promise { }).save(); } + // Re-save pool stash accounts that may have been cleared above + try { + const pools = await api.query.nominationPools.bondedPools.entries(); + let poolCount = 0; + for (const [key, poolOpt] of pools) { + const pool = poolOpt as Option; + if (pool.isNone) continue; + const unwrapped = pool.unwrap(); + if (unwrapped.points.toBigInt() === BigInt(0)) continue; + const poolId = (key.args[0] as any).toNumber(); + const stashAddress = derivePoolStash(poolId); + const stakerId = `${PEZKUWI_ASSET_HUB_GENESIS}-${STAKING_TYPE_RELAYCHAIN}-${stashAddress}`; + await ActiveStaker.create({ + id: stakerId, + networkId: PEZKUWI_ASSET_HUB_GENESIS, + stakingType: STAKING_TYPE_RELAYCHAIN, + address: stashAddress, + }).save(); + poolCount++; + } + logger.info(`Re-saved ${poolCount} pool stash accounts`); + } catch (e) { + logger.warn(`Failed to re-save pool stashes: ${e}`); + } + logger.info( `AH APY: ${(maxAPY * 100).toFixed(2)}% from ${ validators.length @@ -318,12 +390,31 @@ async function _computeAndSaveAPYInner(): Promise { export async function handleAHStakersElected( event: SubstrateEvent, ): Promise { + logger.info("StakersElected event received, refreshing active stakers"); await computeAndSaveAPY(); + // Update lastProcessedEra so handleBlock won't redundantly re-process + if (api.query.staking && api.query.staking.activeEra) { + try { + const activeEraOpt = (await api.query.staking.activeEra()) as Option; + if (!activeEraOpt.isNone) { + lastProcessedEra = activeEraOpt.unwrap().index.toNumber(); + } + } catch {} + } } /** * Handle staking.StakingElection on Asset Hub (old format) */ export async function handleAHNewEra(event: SubstrateEvent): Promise { + logger.info("StakingElection event received, refreshing active stakers"); await computeAndSaveAPY(); + if (api.query.staking && api.query.staking.activeEra) { + try { + const activeEraOpt = (await api.query.staking.activeEra()) as Option; + if (!activeEraOpt.isNone) { + lastProcessedEra = activeEraOpt.unwrap().index.toNumber(); + } + } catch {} + } }