Initial commit: Pezkuwi SubQuery indexer

- pezkuwi.yaml: Relay chain staking indexer (rewards, slashes, pools, transfers, era info)
- pezkuwi-assethub.yaml: Asset Hub indexer (NominationPools, asset transfers)
- GraphQL schema for staking data entities
- Handler mappings from Nova SubQuery base
This commit is contained in:
2026-02-12 23:55:13 +03:00
commit 0812cf9e7a
38 changed files with 16433 additions and 0 deletions
+35
View File
@@ -0,0 +1,35 @@
#!/bin/bash
SCRIPT_PATH=$(dirname "$0")
MAIN_DIRECTORY=${SCRIPT_PATH%/*}
folders=($(ls ${MAIN_DIRECTORY}/networks))
for item in ${folders[*]}
do
printf " %s\n" $item
if [ -d "${MAIN_DIRECTORY}/networks/$item/src" ]; then
rm -r ${MAIN_DIRECTORY}/networks/$item/src
rm ${MAIN_DIRECTORY}/networks/$item/tsconfig.json
rm ${MAIN_DIRECTORY}/networks/$item/schema.graphql
rm ${MAIN_DIRECTORY}/networks/$item/local-runner.sh
rm ${MAIN_DIRECTORY}/networks/$item/docker-compose.yml
fi
if [ -d "${MAIN_DIRECTORY}/networks/$item/node_modules" ]; then
rm -r ${MAIN_DIRECTORY}/networks/$item/node_modules
fi
if [ -d "${MAIN_DIRECTORY}/networks/$item/dist" ]; then
rm -r ${MAIN_DIRECTORY}/networks/$item/dist
fi
if [ -d "${MAIN_DIRECTORY}/networks/$item/.data" ]; then
rm -r ${MAIN_DIRECTORY}/networks/$item/.data
fi
if [ -f "${MAIN_DIRECTORY}/networks/$item/yarn.lock" ]; then
rm ${MAIN_DIRECTORY}/networks/$item/yarn.lock
fi
done
printf "Done !"
+18
View File
@@ -0,0 +1,18 @@
#!/bin/bash
# Get a list of YAML files in alphabetical order
yamlFiles=($(ls ../*.yaml | sort))
for file in "${yamlFiles[@]}"; do
outputFileName=".$(basename "$file" .yaml)-cid"
# Execute subql publish command
subql codegen -f "$file" && subql publish -f "$file"
# Move or create the output file in the ipfs-cids folder
mv "../$outputFileName" "../ipfs-cids/$outputFileName"
echo "Command executed for $file. Output file: $outputFileName"
done
echo "All project published successfully."
+17
View File
@@ -0,0 +1,17 @@
#!/bin/bash
SCRIPT_PATH=$(dirname "$0")
MAIN_DIRECTORY=${SCRIPT_PATH%/*}
folders=($(ls ${MAIN_DIRECTORY}/networks))
for item in ${folders[*]}
do
printf " %s\n" $item
scp -r ${MAIN_DIRECTORY}/src ${MAIN_DIRECTORY}/networks/$item
scp ${MAIN_DIRECTORY}/tsconfig.json ${MAIN_DIRECTORY}/networks/$item
scp ${MAIN_DIRECTORY}/schema.graphql ${MAIN_DIRECTORY}/networks/$item
scp ${MAIN_DIRECTORY}/local-runner.sh ${MAIN_DIRECTORY}/networks/$item
scp ${MAIN_DIRECTORY}/docker-compose.yml ${MAIN_DIRECTORY}/networks/$item
done
printf "Done !"
@@ -0,0 +1,58 @@
#!/usr/bin/env python3
import os
from jinja2 import Template
from table_representation import ProjectTableGenerator
from subquery_api import SubQueryDeploymentAPI
from telegram_notifications import TelegramNotifications
token = os.getenv("SUBQUERY_TOKEN")
organisation = "nova-wallet"
nova_network_list = "https://raw.githubusercontent.com/nova-wallet/nova-utils/master/chains/v11/chains_dev.json"
readme = Template("""
Projects' status is updated every 4 hours
SubQuery API data sources are grouped based on the following features:
📚 Operation History - Transfers and Extrinsics for Utility (main) token of the network <br />
✨ Multi-asset transfers - Support for transfer history for tokens from ORML and Assets pallets <br />
🥞 Staking rewards - Rewards history and accumulated total rewards, supports both Staking and ParachainStaking pallets <br />
📈 Staking analytics - Queries for current stake, validators statistics, and stake change history
# List of deployed projects
{{dapps_table}}
""")
def generate_project_table():
sub_query = SubQueryDeploymentAPI(auth_token=token, org=organisation)
sub_query.collect_all_project_data()
table_generator = ProjectTableGenerator(sub_query, nova_network_list)
table = table_generator.generate_table()
return table
if __name__ == '__main__':
dir_name = 'gh-pages-temp'
telegram = TelegramNotifications()
try:
os.makedirs(dir_name)
print("Directory ", dir_name, " Created ")
except FileExistsError:
print("Directory ", dir_name, " already exists")
with open("./gh-pages-temp/README.md", "w") as f:
f.write(readme.render(
dapps_table=generate_project_table()
))
# TODO: Temp remove, waiting for https://app.clickup.com/t/862kc4b47
# telegram.send_notification()
+34
View File
@@ -0,0 +1,34 @@
anyio==3.6.2
attrs==22.2.0
certifi==2022.12.7
chardet==5.1.0
charset-normalizer==3.0.1
DataProperty==0.55.0
exceptiongroup==1.1.0
h11==0.14.0
httpcore==0.16.3
httpx==0.23.3
idna==3.4
iniconfig==2.0.0
Jinja2==3.1.2
MarkupSafe==2.1.2
mbstrdecoder==1.1.1
packaging==23.0
pathvalidate==2.5.2
pluggy==1.0.0
pytablewriter==0.64.2
pytest==7.2.1
python-dateutil==2.8.2
python-telegram-bot==20.0
pytz==2022.7.1
PyYAML==6.0
requests==2.28.2
rfc3986==1.5.0
six==1.16.0
sniffio==1.3.0
tabledata==1.3.0
tcolorpy==0.1.2
tomli==2.0.1
typepy==1.3.0
urllib3==1.26.14
wget==3.2
+6
View File
@@ -0,0 +1,6 @@
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
+158
View File
@@ -0,0 +1,158 @@
from typing import List
from datetime import datetime, timedelta
import re
import requests
class DeploymentInstance():
def __init__(self, **kwargs) -> None:
self.id = kwargs['id']
self.project_key = kwargs['projectKey']
self.version = kwargs['version']
self.status = kwargs['status']
self.type = kwargs['type']
self.configuration = kwargs['configuration']
class SubQueryProject():
def __init__(self, **kwargs) -> None:
self.id = kwargs['id']
self.key = kwargs['key']
self.name = kwargs['name']
self.network = kwargs['network']
self.metadata = kwargs['metadata']
self.query_url = kwargs['queryUrl']
self.deployments: List[DeploymentInstance] = []
deployments = kwargs.get('deployments')
if deployments:
for deployment in deployments:
self.deployments.append(DeploymentInstance(**deployment))
class SubQueryDeploymentAPI():
base_url = "https://api.subquery.network"
def __init__(self, auth_token, org) -> None:
self.org = org
self.headers = {
'authority': 'api.subquery.network',
'accept': 'application/json, text/plain, */*',
'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8,ru;q=0.7',
'origin': 'https://managedservice.subquery.network',
'sec-ch-ua': '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36',
'Authorization': f'Bearer {auth_token}',
}
def _send_request(self, method, path, payload=None):
try:
response = requests.request(
method, self.base_url + path, headers=self.headers, data=payload)
if response.status_code == 401:
raise Exception(f"Unautorised:\n{response}")
return response
except Exception as e:
raise Exception(
f"Can't request to: {path} by method: {method} and payload: {payload} \nException: {e}")
def collect_all_project_data(self) -> List[SubQueryProject]:
self.get_all_projects_for_organisation()
print(
f"Organisation: {self.org}\nHas {len(self.org_projects)} projects")
print(f"Process of getting deployments have been started.")
for project in self.org_projects:
self.get_deployments_for_project(project)
print(
f"Project: {project.network} received: {len(project.deployments)} deployments.")
for deployment in project.deployments:
self.get_sync_status_for_deployment(deployment)
print(
f"Deployment for {project.network} status: {deployment.sync_status}, env: {deployment.type}")
return self.org_projects
def get_all_projects_for_organisation(self) -> List[SubQueryProject]:
projects = self._send_request(
method="GET", path=f"/user/projects?account={self.org}").json()
self.org_projects = [SubQueryProject(**project) for project in projects]
return self.org_projects
def get_sync_status_for_deployment(self, deployment: DeploymentInstance) -> DeploymentInstance:
if len(self.org_projects) == 0:
print("org_projects is empty, use get_all_projects_for_organisation first")
sync_status = self._send_request(
method="GET",
path=f"/v3/subqueries/{deployment.project_key}/deployments/{deployment.id}/sync-status"
).json()
if len(sync_status['networks']) == 0:
deployment.__setattr__('sync_status', None)
return deployment
deployment.__setattr__('sync_status', sync_status['networks'][0])
return deployment
def get_deployments_for_project(self, project: SubQueryProject) -> List[DeploymentInstance]:
if len(self.org_projects) == 0:
print("org_projects is empty, use get_all_projects_for_organisation first")
deployments = self._send_request(
method="GET",
path=f"/subqueries/{project.key}/deployments"
).json()
project.deployments = [DeploymentInstance(**deployment) for deployment in deployments]
return project.deployments
def find_project_by_parameter(self, parameter_name, parameter_value):
found_project = [project for project in self.org_projects if project.__getattribute__(
parameter_name) == parameter_value]
if found_project:
print("Project found")
for obj in found_project:
return obj
else:
print("Project not found.")
def get_logs(self, project_name: str, sid: str, level: str = 'info', stage: bool = False):
params = {
'level': level,
'stage': str(stage).lower(),
'sid': sid
}
response = self._send_request(method="GET", path=f'/v3/subqueries/{project_name}/logs')
if response.status_code == 200:
return response.json()
else:
response.raise_for_status()
def parse_logs(self, logs):
for log in logs['result']:
message = log.get('message')
timestamp = log.get('timestamp')
if message and 'Target height' in message and 'Current height' in message:
log_time = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')
if datetime.utcnow() - log_time <= timedelta(hours=1):
target_height = re.search('Target height: ([\d,]+)', message).group(1)
current_height = re.search('Current height: ([\d,]+)', message).group(1)
# Remove commas from the numbers
target_height = int(target_height.replace(',', ''))
current_height = int(current_height.replace(',', ''))
return target_height, current_height
return None, None
+53
View File
@@ -0,0 +1,53 @@
import subprocess
import wget
import os
import zipfile
import os
import platform
def get_subquery_cli(subquery_cli_version):
download_url = "https://github.com/fewensa/subquery-cli/releases/download/v" + subquery_cli_version
temporary_path = "./temporary"
current_platform = platform.system()
if current_platform == "Linux":
download_url += "/subquery-linux-x86_64.zip"
elif current_platform == "Darwin":
download_url += "/subquery-macos-x86_64.zip"
elif current_platform == "Windows":
download_url += "/subquery-windows-x86_64.zip"
else:
raise ValueError('Can\'t to recognize the operating system')
try:
os.makedirs(temporary_path, exist_ok=False)
wget.download(download_url, out = temporary_path)
for file in os.listdir(temporary_path):
with zipfile.ZipFile(temporary_path+'/'+file) as item:
item.extractall(temporary_path)
except:
pass
subprocess.call(['chmod', '-R', '777', temporary_path])
return temporary_path
def use_subquery_cli(subquery_cli_version, *args):
temporary_path = get_subquery_cli(subquery_cli_version)
data_from_subquery = subprocess.check_output([temporary_path+'/subquery', *args]).decode()
return data_from_subquery
if __name__ == "__main__":
# token = os.environ['SUBQUERY_TOKEN', '']
token=''
# project_key = os.environ['PROJECT_KEY', '']
project_key = ''
subquery_cli_version = '0.2.4'
use_subquery_cli(subquery_cli_version, '--token', token, 'deployment', 'list', '-o', 'json', '--org', 'nova-wallet', '--key', project_key)
@@ -0,0 +1,144 @@
import json
import requests
from pytablewriter import MarkdownTableWriter
from telegram_notifications import TelegramNotifications
from subquery_api import SubQueryDeploymentAPI, SubQueryProject, DeploymentInstance
class ProjectTableGenerator:
def __init__(self, sub_query: SubQueryDeploymentAPI, nova_network_list_url: str):
self.sub_query = sub_query
self.nova_network_list_url = nova_network_list_url
def generate_table(self):
writer = MarkdownTableWriter(
headers=["--", "Network", "Features", "Stage status", "Prod status", "Stage commit", "Prod commit"],
value_matrix=self.generate_value_matrix_for_table(),
margin=1
)
writer.write_table()
return writer
def generate_value_matrix_for_table(self):
network_list = self.generate_network_list()
returning_array = []
for network in network_list:
network_data_array = self.generate_network_data_array(network)
returning_array.append(network_data_array)
print(f'{network.get("name").title()} generated!')
returning_array.sort()
increment = iter(range(1, len(returning_array)+1))
[network.insert(0, next(increment)) for network in returning_array]
return returning_array
def generate_network_data_array(self, network: dict):
network_data_array = []
subquery_project_data = self.sub_query.find_project_by_parameter('name', network.get('name'))
network_data_array.append(
f"[{network.get('name').title()}](https://explorer.subquery.network/subquery/{subquery_project_data.key})"
)
prod_status, prod_commit, stage_status, stage_comit = self.generate_progress_status(
next(filter(lambda project: project.name == network['name'], self.sub_query.org_projects))
)
network_data_array.extend([network.get('features'), stage_status, prod_status, stage_comit, prod_commit])
return network_data_array
def generate_network_list(self):
feature_list = []
chains_list = self._send_http_request(self.nova_network_list_url)
available_projects = self.sub_query.org_projects
for project in available_projects:
prod_genesis = self.get_prod_genesis(project)
if not prod_genesis: # Skip undeployed projects
continue
project_genesis = self._remove_hex_prefix(prod_genesis)
chain = next(iter([chain for chain in chains_list if chain.get('chainId') == project_genesis]), None)
feature_list.append({
"name": project.name,
"genesis": project_genesis,
"features": self.check_features(chain)
})
return feature_list
def get_prod_genesis(self, project):
try:
return [deploy.configuration['chainId'] for deploy in project.deployments if deploy.type == 'primary']
except:
print(f"Network: {project.network} has old deployment, need to redeploy")
return None
def generate_progress_status(self, project: SubQueryProject):
prod, stage = None, None
for deployment in project.deployments:
if deployment.type == 'primary':
prod = deployment
elif deployment.type == 'stage':
stage = deployment
else:
raise Exception(f"Unknown deployment type: {deployment.type} in project: {project}")
prod_status, prod_commit = self.fill_status_bar(prod, project)
stage_status, stage_commit = self.fill_status_bar(stage, project)
return prod_status, prod_commit, stage_status, stage_commit
def fill_status_bar(self, instance: DeploymentInstance, project: SubQueryProject):
if not instance:
return '![0](https://progress-bar.dev/0?title=N/A)', '-'
commit = instance.version[0:8]
if instance.status == 'processing':
return '![0](https://progress-bar.dev/0?title=Processing...)', commit
if instance.status == 'error' and self.get_sync_percentage(instance, project) == '0':
return '![0](https://progress-bar.dev/0?title=Error)', commit
percent = self.get_sync_percentage(instance, project)
return f'![{percent}](https://progress-bar.dev/{percent}?title={instance.type.capitalize()})', commit
def is_sync_status_valid(self, sync_status):
if sync_status is None:
return False
return all(key in sync_status and sync_status[key] is not None for key in ['processingBlock', 'targetBlock'])
def get_sync_percentage(self, instance: DeploymentInstance, project: SubQueryProject) -> str:
if not self.is_sync_status_valid(instance.sync_status):
logs = self.sub_query.get_logs(project.key, instance.id)
target_block, processing_block = self.sub_query.parse_logs(logs)
else:
processing_block = instance.sync_status.get('processingBlock')
target_block = instance.sync_status.get('targetBlock')
telegram = TelegramNotifications()
if processing_block and target_block:
if processing_block != -1:
return str(int((processing_block / target_block) * 100))
else:
telegram.add_row_in_telegram_notification(project=project, instance=instance)
return '0'
else:
telegram.add_row_in_telegram_notification(project=project, instance=instance)
return '0'
def check_features(self, chain: json):
def has_transfer_history(chain):
return True
def has_orml_or_asset(chain):
return any(asset.get('type') in ['orml', 'statemine'] for asset in chain.get('assets'))
def has_staking_analytics(chain):
return chain.get('assets')[0].get('staking') == 'relaychain'
def has_rewards_history(chain):
return bool(chain.get('assets')[0].get('staking'))
dict = {
"📚 Operation History": has_transfer_history,
"✨ Multi assets": has_orml_or_asset,
"📈 Staking analytics": has_staking_analytics,
"🥞 Staking rewards": has_rewards_history
}
if chain is None:
return list(dict.keys())[0]
features = [feature for feature, criteria in dict.items() if criteria(chain)]
return '<br />'.join(features)
def _send_http_request(self, url: str):
try:
response = requests.get(url)
except requests.exceptions.RequestException as e:
raise SystemExit(e)
return json.loads(response.text)
def _remove_hex_prefix(self, hex_string):
return hex_string[2:]
@@ -0,0 +1,38 @@
import os
import asyncio
import telegram
from subquery_api import SubQueryProject, DeploymentInstance
from singleton import Singleton
class TelegramNotifications(metaclass=Singleton):
notify_message_title = "⚠️ SubQuery projects error ⚠️"
notify_projects_message = []
def __init__(self) -> None:
self.token = os.getenv("TELEGRAM_BOT_TOKEN")
self.chat_id = os.getenv("TELEGRAM_CHAT_ID")
async def send_telegram_message(self, message):
bot = telegram.Bot(token=self.token)
await bot.send_message(chat_id=self.chat_id, text=message, parse_mode="MarkdownV2")
def send_notification(self):
if len(self.notify_projects_message) != 0:
notification_message = self.notify_message_title
for project_message in self.notify_projects_message:
notification_message += project_message
shielded_message = notification_message.replace('-', '\-')
asyncio.run(self.send_telegram_message(shielded_message))
else:
pass
def add_row_in_telegram_notification(self, project: SubQueryProject, instance: DeploymentInstance):
notify_project_name = project.name.title()
self.notify_projects_message.append(
f"\n\n*{notify_project_name}* Indexer is unhealthy\!\nProject URL: [Link to project](https://managedservice.subquery.network/orgs/nova-wallet/projects/{instance.project_key.split('/')[1]}/deployments?slot={instance.type})\nExplorer URL: [Link to explorer](https://explorer.subquery.network/subquery/{instance.project_key})\nEnvironment: {instance.type.capitalize()}"
)
+31
View File
@@ -0,0 +1,31 @@
import json
import pytest
import os
from subquery_cli import use_subquery_cli
subquery_cli_version = '0.2.4'
token = os.environ['SUBQUERY_TOKEN']
project_key = os.environ['PROJECT_KEY']
@pytest.fixture
def get_project_data():
project_data = json.loads(
use_subquery_cli(
subquery_cli_version, '--token', token, 'deployment', 'list', '-o', 'json', '--org', 'nova-wallet', '--key', project_key
))
stage_project = next(
item for item in project_data if item["type"] == "stage")
return stage_project
def test_project_status(get_project_data):
assert get_project_data['status'] == 'running'
def test_sync_status_test(get_project_data):
sync_status = use_subquery_cli(
subquery_cli_version, '--token', token, 'deployment', 'sync-status', '--id', str(get_project_data['id']), '--key', project_key, '--org', 'nova-wallet')
status = sync_status.split("percent: ")[1:]
assertion_value = status[0].split('%')[0:][0]
assert assertion_value == '100.00'
+69
View File
@@ -0,0 +1,69 @@
#!/bin/bash
# Require bash v4+
#
# You should download cli file for your operation sistem and put it in root directory.
# https://github.com/fewensa/subquery-cli/releases/
SCRIPT_PATH=$(dirname "$0")
MAIN_DIRECTORY=${SCRIPT_PATH%/*}
SUBQUERY_TOKEN="${SUBQUERY_TOKEN}"
ORGANISATION="nova-wallet"
BASE_DESCRIPTION="Nova SubQuery project is indexing the blockchain and provides a convenient API for fetching operation history & analytics data. It is used by the <a href=\"https://novawallet.io\">Nova Wallet</a>
Feel free to use this API for your app! 💖</br>
<mark>Make sure that you add filters and sorting rules to your queries!</mark></br>
Following API & datasource is supported:
📚 Transfers and extrinsics (transactions). Both or either can be fetched, for example:
<code>query {historyElements{nodes{transfer extrinsic}}}</code>
</br>"
MULTIASSET_DESCRIPTION="✨ Transfer history for additional assets in the network (based on \"assets\"/\"ORML\" Substrate pallet):
<code>query {historyElements{nodes{assetTransfer}}}</code>
</br>"
STAKING_DESCRIPTION="🥞 Staking rewards history:
<code>query {historyElements{nodes{reward}}}</code>
🎁 Total staking rewards for the desired acocunt:
<code>query {accumulatedRewards{nodes{id amount}}}</code>
</br>"
STAKING_ANALITIC="📊 Current stake — returns bonded amount:
<code>query {accumulatedStakes{nodes{id amount}}}</code>
👨‍🔧 Validators statistics:
<code>query {eraValidatorInfos{nodes{address era total own others}}}</code>
📈 Stake change history:
<code>query {stakeChanges{nodes{blockNumber extrinsicHash address amount accumulatedAmount type}}}</code>
</br>"
MULTIASSET_PROJECTS=('statemine parallel parallel-heiko westmint moonbeam moonriver astar shiden karura acala bifrost interlay kintsugi')
HAS_STAKING=('polkadot kusama westend moonbeam moonriver')
HAS_STAKING_ANALYTIC=('polkadot kusama westend')
folders=($(ls ${MAIN_DIRECTORY}/networks))
for item in ${folders[*]}; do
DESCRIPTION=${BASE_DESCRIPTION}
if [[ " ${MULTIASSET_PROJECTS[*]} " =~ " ${item} " ]]; then
DESCRIPTION+=${MULTIASSET_DESCRIPTION}
fi
if [[ " ${HAS_STAKING[*]} " =~ " ${item} " ]]; then
DESCRIPTION+=${STAKING_DESCRIPTION}
fi
if [[ " ${HAS_STAKING_ANALYTIC[*]} " =~ " ${item} " ]]; then
DESCRIPTION+=${STAKING_ANALITIC}
fi
$MAIN_DIRECTORY/subquery --token ${SUBQUERY_TOKEN} project update --org ${ORGANISATION} --key $item --description "${DESCRIPTION}" --subtitle "Nova Wallet SubQuery project for ${item^} network"
done
echo "Done !"