Files
pezkuwi-subquery/scripts/python_scripts/telegram_notifications.py
T
pezkuwichain 0812cf9e7a Initial commit: Pezkuwi SubQuery indexer
- pezkuwi.yaml: Relay chain staking indexer (rewards, slashes, pools, transfers, era info)
- pezkuwi-assethub.yaml: Asset Hub indexer (NominationPools, asset transfers)
- GraphQL schema for staking data entities
- Handler mappings from Nova SubQuery base
2026-02-12 23:55:13 +03:00

39 lines
1.6 KiB
Python

import os
import asyncio
import telegram
from subquery_api import SubQueryProject, DeploymentInstance
from singleton import Singleton
class TelegramNotifications(metaclass=Singleton):
notify_message_title = "⚠️ SubQuery projects error ⚠️"
notify_projects_message = []
def __init__(self) -> None:
self.token = os.getenv("TELEGRAM_BOT_TOKEN")
self.chat_id = os.getenv("TELEGRAM_CHAT_ID")
async def send_telegram_message(self, message):
bot = telegram.Bot(token=self.token)
await bot.send_message(chat_id=self.chat_id, text=message, parse_mode="MarkdownV2")
def send_notification(self):
if len(self.notify_projects_message) != 0:
notification_message = self.notify_message_title
for project_message in self.notify_projects_message:
notification_message += project_message
shielded_message = notification_message.replace('-', '\-')
asyncio.run(self.send_telegram_message(shielded_message))
else:
pass
def add_row_in_telegram_notification(self, project: SubQueryProject, instance: DeploymentInstance):
notify_project_name = project.name.title()
self.notify_projects_message.append(
f"\n\n*{notify_project_name}* Indexer is unhealthy\!\nProject URL: [Link to project](https://managedservice.subquery.network/orgs/nova-wallet/projects/{instance.project_key.split('/')[1]}/deployments?slot={instance.type})\nExplorer URL: [Link to explorer](https://explorer.subquery.network/subquery/{instance.project_key})\nEnvironment: {instance.type.capitalize()}"
)