#!/usr/bin/env python3 """ Backend API Testing for PezkuwiChain Mobile App Tests authentication endpoints with Supabase and blockchain endpoints """ import requests import json import uuid import time from datetime import datetime # Backend URL from frontend/.env BACKEND_URL = "https://kurdish-id.preview.emergentagent.com/api" class BackendTester: def __init__(self): self.session = requests.Session() self.test_results = [] self.user_data = None def log_result(self, test_name, success, message, response_data=None): """Log test result""" result = { "test": test_name, "success": success, "message": message, "timestamp": datetime.now().isoformat(), "response_data": response_data } self.test_results.append(result) status = "āœ… PASS" if success else "āŒ FAIL" print(f"{status} {test_name}: {message}") if response_data and not success: print(f" Response: {json.dumps(response_data, indent=2)}") def test_signup(self): """Test user signup endpoint""" print("\nšŸ” Testing User Signup...") # Generate unique test email with standard format unique_id = str(uuid.uuid4())[:8] test_email = f"test{unique_id}@gmail.com" signup_data = { "email": test_email, "password": "TestPassword123!", "first_name": "Test", "last_name": "User", "phone": "+964750123456", "referral_code": "REF123", "language": "ku" } try: response = self.session.post( f"{BACKEND_URL}/auth/signup", json=signup_data, timeout=30 ) if response.status_code in [200, 201]: data = response.json() # Validate response structure required_fields = ["user_id", "email", "access_token", "refresh_token", "first_name", "last_name"] missing_fields = [field for field in required_fields if field not in data] if missing_fields: self.log_result( "Signup Response Structure", False, f"Missing fields: {missing_fields}", data ) return False # Store user data for subsequent tests self.user_data = { "email": test_email, "password": signup_data["password"], "user_id": data["user_id"], "access_token": data["access_token"], "refresh_token": data["refresh_token"] } self.log_result( "User Signup", True, f"User created successfully with ID: {data['user_id'][:8]}...", {k: v for k, v in data.items() if k not in ["access_token", "refresh_token"]} ) return True else: self.log_result( "User Signup", False, f"HTTP {response.status_code}: {response.text}", {"status_code": response.status_code, "response": response.text} ) return False except requests.exceptions.RequestException as e: self.log_result("User Signup", False, f"Request failed: {str(e)}") return False def test_signin(self): """Test user signin endpoint""" print("\nšŸ”‘ Testing User Signin...") if not self.user_data: self.log_result("User Signin", False, "No user data available (signup must succeed first)") return False signin_data = { "email": self.user_data["email"], "password": self.user_data["password"] } try: response = self.session.post( f"{BACKEND_URL}/auth/signin", json=signin_data, timeout=30 ) if response.status_code == 200: data = response.json() # Validate response structure required_fields = ["user_id", "email", "access_token", "refresh_token", "first_name", "last_name"] missing_fields = [field for field in required_fields if field not in data] if missing_fields: self.log_result( "Signin Response Structure", False, f"Missing fields: {missing_fields}", data ) return False # Verify user_id matches if data["user_id"] != self.user_data["user_id"]: self.log_result( "Signin User ID Match", False, f"User ID mismatch: expected {self.user_data['user_id']}, got {data['user_id']}" ) return False # Update tokens self.user_data["access_token"] = data["access_token"] self.user_data["refresh_token"] = data["refresh_token"] self.log_result( "User Signin", True, f"User signed in successfully: {data['email']}", {k: v for k, v in data.items() if k not in ["access_token", "refresh_token"]} ) return True else: self.log_result( "User Signin", False, f"HTTP {response.status_code}: {response.text}", {"status_code": response.status_code, "response": response.text} ) return False except requests.exceptions.RequestException as e: self.log_result("User Signin", False, f"Request failed: {str(e)}") return False def test_get_user_profile(self): """Test get user profile endpoint""" print("\nšŸ‘¤ Testing Get User Profile...") if not self.user_data: self.log_result("Get User Profile", False, "No user data available (signup must succeed first)") return False try: response = self.session.get( f"{BACKEND_URL}/auth/user/{self.user_data['user_id']}", timeout=30 ) if response.status_code == 200: data = response.json() # Validate profile data expected_fields = ["id", "email", "first_name", "last_name", "phone", "language"] missing_fields = [field for field in expected_fields if field not in data] if missing_fields: self.log_result( "Profile Response Structure", False, f"Missing fields: {missing_fields}", data ) return False # Verify data matches signup if data["email"] != self.user_data["email"]: self.log_result( "Profile Email Match", False, f"Email mismatch: expected {self.user_data['email']}, got {data['email']}" ) return False self.log_result( "Get User Profile", True, f"Profile retrieved successfully for user: {data['email']}", {k: v for k, v in data.items() if k not in ["id"]} ) return True else: self.log_result( "Get User Profile", False, f"HTTP {response.status_code}: {response.text}", {"status_code": response.status_code, "response": response.text} ) return False except requests.exceptions.RequestException as e: self.log_result("Get User Profile", False, f"Request failed: {str(e)}") return False def test_blockchain_balance(self): """Test blockchain balance endpoint""" print("\nā›“ļø Testing Blockchain Balance...") # Test with a sample Polkadot address test_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" balance_data = { "address": test_address } try: response = self.session.post( f"{BACKEND_URL}/blockchain/balance", json=balance_data, timeout=30 ) if response.status_code == 200: data = response.json() # Validate response structure required_fields = ["address", "hez", "pez", "transferrable", "reserved", "timestamp"] missing_fields = [field for field in required_fields if field not in data] if missing_fields: self.log_result( "Balance Response Structure", False, f"Missing fields: {missing_fields}", data ) return False # Verify address matches if data["address"] != test_address: self.log_result( "Balance Address Match", False, f"Address mismatch: expected {test_address}, got {data['address']}" ) return False self.log_result( "Blockchain Balance", True, f"Balance retrieved: HEZ={data['hez']}, PEZ={data['pez']}", data ) return True else: self.log_result( "Blockchain Balance", False, f"HTTP {response.status_code}: {response.text}", {"status_code": response.status_code, "response": response.text} ) return False except requests.exceptions.RequestException as e: self.log_result("Blockchain Balance", False, f"Request failed: {str(e)}") return False def diagnose_supabase_issue(self): """Diagnose the Supabase configuration issue""" print("\nšŸ” Diagnosing Supabase Configuration...") try: # Test direct Supabase connection import os from supabase import create_client # Load backend env from dotenv import load_dotenv from pathlib import Path ROOT_DIR = Path("/app/backend") load_dotenv(ROOT_DIR / '.env') SUPABASE_URL = os.environ.get('SUPABASE_URL') SUPABASE_KEY = os.environ.get('SUPABASE_KEY') supabase = create_client(SUPABASE_URL, SUPABASE_KEY) # Test auth signup unique_id = str(uuid.uuid4())[:8] test_email = f"diagtest{unique_id}@gmail.com" auth_response = supabase.auth.sign_up({ "email": test_email, "password": "TestPassword123!" }) if auth_response.user: self.log_result( "Supabase Auth Signup", True, f"Auth user created: {auth_response.user.id[:8]}..." ) # Test users table insert with minimal data try: user_data = {"id": auth_response.user.id, "email": test_email} result = supabase.table("users").insert(user_data).execute() self.log_result( "Supabase Users Table Insert", True, "Successfully inserted user data" ) except Exception as e: error_msg = str(e) if "row-level security policy" in error_msg: self.log_result( "Supabase RLS Policy Issue", False, "Backend using anon key - needs service role key or RLS policy configuration", {"error": error_msg, "solution": "Use service role key for server-side operations"} ) else: self.log_result( "Supabase Users Table Insert", False, f"Insert failed: {error_msg}" ) else: self.log_result( "Supabase Auth Signup", False, "Auth signup failed" ) except Exception as e: self.log_result( "Supabase Diagnosis", False, f"Diagnosis failed: {str(e)}" ) def run_all_tests(self): """Run all backend tests""" print(f"šŸš€ Starting Backend API Tests") print(f"Backend URL: {BACKEND_URL}") print("=" * 60) # First diagnose the Supabase issue self.diagnose_supabase_issue() # Test authentication flow signup_success = self.test_signup() signin_success = self.test_signin() if signup_success else False profile_success = self.test_get_user_profile() if signup_success else False # Test blockchain endpoint balance_success = self.test_blockchain_balance() # Summary print("\n" + "=" * 60) print("šŸ“Š TEST SUMMARY") print("=" * 60) total_tests = len(self.test_results) passed_tests = sum(1 for result in self.test_results if result["success"]) failed_tests = total_tests - passed_tests print(f"Total Tests: {total_tests}") print(f"Passed: {passed_tests}") print(f"Failed: {failed_tests}") print(f"Success Rate: {(passed_tests/total_tests)*100:.1f}%") if failed_tests > 0: print("\nāŒ FAILED TESTS:") for result in self.test_results: if not result["success"]: print(f" - {result['test']}: {result['message']}") return { "total": total_tests, "passed": passed_tests, "failed": failed_tests, "success_rate": (passed_tests/total_tests)*100, "results": self.test_results } if __name__ == "__main__": tester = BackendTester() results = tester.run_all_tests()