Files
pezkuwichain 0812cf9e7a Initial commit: Pezkuwi SubQuery indexer
- pezkuwi.yaml: Relay chain staking indexer (rewards, slashes, pools, transfers, era info)
- pezkuwi-assethub.yaml: Asset Hub indexer (NominationPools, asset transfers)
- GraphQL schema for staking data entities
- Handler mappings from Nova SubQuery base
2026-02-12 23:55:13 +03:00

145 lines
6.6 KiB
Python

import json
import requests
from pytablewriter import MarkdownTableWriter
from telegram_notifications import TelegramNotifications
from subquery_api import SubQueryDeploymentAPI, SubQueryProject, DeploymentInstance
class ProjectTableGenerator:
def __init__(self, sub_query: SubQueryDeploymentAPI, nova_network_list_url: str):
self.sub_query = sub_query
self.nova_network_list_url = nova_network_list_url
def generate_table(self):
writer = MarkdownTableWriter(
headers=["--", "Network", "Features", "Stage status", "Prod status", "Stage commit", "Prod commit"],
value_matrix=self.generate_value_matrix_for_table(),
margin=1
)
writer.write_table()
return writer
def generate_value_matrix_for_table(self):
network_list = self.generate_network_list()
returning_array = []
for network in network_list:
network_data_array = self.generate_network_data_array(network)
returning_array.append(network_data_array)
print(f'{network.get("name").title()} generated!')
returning_array.sort()
increment = iter(range(1, len(returning_array)+1))
[network.insert(0, next(increment)) for network in returning_array]
return returning_array
def generate_network_data_array(self, network: dict):
network_data_array = []
subquery_project_data = self.sub_query.find_project_by_parameter('name', network.get('name'))
network_data_array.append(
f"[{network.get('name').title()}](https://explorer.subquery.network/subquery/{subquery_project_data.key})"
)
prod_status, prod_commit, stage_status, stage_comit = self.generate_progress_status(
next(filter(lambda project: project.name == network['name'], self.sub_query.org_projects))
)
network_data_array.extend([network.get('features'), stage_status, prod_status, stage_comit, prod_commit])
return network_data_array
def generate_network_list(self):
feature_list = []
chains_list = self._send_http_request(self.nova_network_list_url)
available_projects = self.sub_query.org_projects
for project in available_projects:
prod_genesis = self.get_prod_genesis(project)
if not prod_genesis: # Skip undeployed projects
continue
project_genesis = self._remove_hex_prefix(prod_genesis)
chain = next(iter([chain for chain in chains_list if chain.get('chainId') == project_genesis]), None)
feature_list.append({
"name": project.name,
"genesis": project_genesis,
"features": self.check_features(chain)
})
return feature_list
def get_prod_genesis(self, project):
try:
return [deploy.configuration['chainId'] for deploy in project.deployments if deploy.type == 'primary']
except:
print(f"Network: {project.network} has old deployment, need to redeploy")
return None
def generate_progress_status(self, project: SubQueryProject):
prod, stage = None, None
for deployment in project.deployments:
if deployment.type == 'primary':
prod = deployment
elif deployment.type == 'stage':
stage = deployment
else:
raise Exception(f"Unknown deployment type: {deployment.type} in project: {project}")
prod_status, prod_commit = self.fill_status_bar(prod, project)
stage_status, stage_commit = self.fill_status_bar(stage, project)
return prod_status, prod_commit, stage_status, stage_commit
def fill_status_bar(self, instance: DeploymentInstance, project: SubQueryProject):
if not instance:
return '![0](https://progress-bar.dev/0?title=N/A)', '-'
commit = instance.version[0:8]
if instance.status == 'processing':
return '![0](https://progress-bar.dev/0?title=Processing...)', commit
if instance.status == 'error' and self.get_sync_percentage(instance, project) == '0':
return '![0](https://progress-bar.dev/0?title=Error)', commit
percent = self.get_sync_percentage(instance, project)
return f'![{percent}](https://progress-bar.dev/{percent}?title={instance.type.capitalize()})', commit
def is_sync_status_valid(self, sync_status):
if sync_status is None:
return False
return all(key in sync_status and sync_status[key] is not None for key in ['processingBlock', 'targetBlock'])
def get_sync_percentage(self, instance: DeploymentInstance, project: SubQueryProject) -> str:
if not self.is_sync_status_valid(instance.sync_status):
logs = self.sub_query.get_logs(project.key, instance.id)
target_block, processing_block = self.sub_query.parse_logs(logs)
else:
processing_block = instance.sync_status.get('processingBlock')
target_block = instance.sync_status.get('targetBlock')
telegram = TelegramNotifications()
if processing_block and target_block:
if processing_block != -1:
return str(int((processing_block / target_block) * 100))
else:
telegram.add_row_in_telegram_notification(project=project, instance=instance)
return '0'
else:
telegram.add_row_in_telegram_notification(project=project, instance=instance)
return '0'
def check_features(self, chain: json):
def has_transfer_history(chain):
return True
def has_orml_or_asset(chain):
return any(asset.get('type') in ['orml', 'statemine'] for asset in chain.get('assets'))
def has_staking_analytics(chain):
return chain.get('assets')[0].get('staking') == 'relaychain'
def has_rewards_history(chain):
return bool(chain.get('assets')[0].get('staking'))
dict = {
"📚 Operation History": has_transfer_history,
"✨ Multi assets": has_orml_or_asset,
"📈 Staking analytics": has_staking_analytics,
"🥞 Staking rewards": has_rewards_history
}
if chain is None:
return list(dict.keys())[0]
features = [feature for feature, criteria in dict.items() if criteria(chain)]
return '<br />'.join(features)
def _send_http_request(self, url: str):
try:
response = requests.get(url)
except requests.exceptions.RequestException as e:
raise SystemExit(e)
return json.loads(response.text)
def _remove_hex_prefix(self, hex_string):
return hex_string[2:]