Files
revive-differential-tests/scripts/print_benchmark_metrics_csv.py
T
Omar 78ac7ee381 Fix the Fallback Gas Limiter (#217)
* Add code to disable the fallback gas filler

* Allow benchmarks driver to await tx receipts

* Improve the transaction submission logic

* Update Python Script to process Geth benchmarks
2025-12-04 13:19:48 +00:00

316 lines
10 KiB
Python

"""
Utilities to print benchmark metrics from a report JSON into CSV.
Usage:
python scripts/print_benchmark_metrics_csv.py /absolute/path/to/report.json
The script prints, for each metadata path, case index, and mode combination,
CSV rows aligned to mined blocks with the following columns:
- block_number
- number_of_txs
- tps (transaction_per_second)
- gps (gas_per_second)
- gas_block_fullness
- ref_time (if available)
- max_ref_time (if available)
- proof_size (if available)
- max_proof_size (if available)
- ref_time_block_fullness (if available)
- proof_size_block_fullness (if available)
Important nuance: TPS and GPS arrays have (number_of_blocks - 1) items. The
first block row has no TPS/GPS; the CSV leaves those cells empty for the first
row and aligns subsequent values to their corresponding next block.
"""
from __future__ import annotations
import json
import sys
import csv
from typing import List, Mapping, TypedDict, no_type_check
class EthereumMinedBlockInformation(TypedDict):
"""EVM block information extracted from the report.
Attributes:
block_number: The block height.
block_timestamp: The UNIX timestamp of the block.
mined_gas: Total gas used (mined) in the block.
block_gas_limit: The gas limit of the block.
transaction_hashes: List of transaction hashes included in the block.
"""
block_number: int
block_timestamp: int
mined_gas: int
block_gas_limit: int
transaction_hashes: List[str]
class SubstrateMinedBlockInformation(TypedDict):
"""Substrate-specific block resource usage fields.
Attributes:
ref_time: The consumed ref time in the block.
max_ref_time: The maximum ref time allowed for the block.
proof_size: The consumed proof size in the block.
max_proof_size: The maximum proof size allowed for the block.
"""
ref_time: int
max_ref_time: int
proof_size: int
max_proof_size: int
class MinedBlockInformation(TypedDict):
"""Block-level information for a mined block with both EVM and optional Substrate fields."""
ethereum_block_information: EthereumMinedBlockInformation
substrate_block_information: SubstrateMinedBlockInformation | None
def substrate_block_information_ref_time(
block: SubstrateMinedBlockInformation | None,
) -> int | None:
if block is None:
return None
else:
return block["ref_time"]
def substrate_block_information_max_ref_time(
block: SubstrateMinedBlockInformation | None,
) -> int | None:
if block is None:
return None
else:
return block["max_ref_time"]
def substrate_block_information_proof_size(
block: SubstrateMinedBlockInformation | None,
) -> int | None:
if block is None:
return None
else:
return block["proof_size"]
def substrate_block_information_max_proof_size(
block: SubstrateMinedBlockInformation | None,
) -> int | None:
if block is None:
return None
else:
return block["max_proof_size"]
class Metric(TypedDict):
"""Metric data of integer values keyed by platform identifier.
Attributes:
minimum: Single scalar minimum per platform.
maximum: Single scalar maximum per platform.
mean: Single scalar mean per platform.
median: Single scalar median per platform.
raw: Time-series (or list) of values per platform.
"""
minimum: Mapping[str, int]
maximum: Mapping[str, int]
mean: Mapping[str, int]
median: Mapping[str, int]
raw: Mapping[str, List[int]]
class Metrics(TypedDict):
"""All metrics that may be present for a given execution report.
Note that some metrics are optional and present only for specific platforms
or execution modes.
"""
transaction_per_second: Metric
gas_per_second: Metric
gas_block_fullness: Metric
ref_time_block_fullness: Metric | None
proof_size_block_fullness: Metric | None
@no_type_check
def metrics_raw_item(
metrics: Metrics, name: str, target: str, index: int
) -> int | None:
l: list[int] = metrics.get(name, dict()).get("raw", dict()).get(target, dict())
try:
return l[index]
except:
return None
class ExecutionReport(TypedDict):
"""Execution report for a mode containing mined blocks and metrics.
Attributes:
mined_block_information: Mapping from platform identifier to the list of
mined blocks observed for that platform.
metrics: The computed metrics for the execution.
"""
mined_block_information: Mapping[str, List[MinedBlockInformation]]
metrics: Metrics
class CaseReport(TypedDict):
"""Report for a single case, keyed by mode string."""
mode_execution_reports: Mapping[str, ExecutionReport]
class MetadataFileReport(TypedDict):
"""Report subtree keyed by case indices for a metadata file path."""
case_reports: Mapping[str, CaseReport]
class ReportRoot(TypedDict):
"""Top-level report schema with execution information keyed by metadata path."""
execution_information: Mapping[str, MetadataFileReport]
BlockInformation = TypedDict(
"BlockInformation",
{
"Block Number": int,
"Timestamp": int,
"Datetime": None,
"Transaction Count": int,
"TPS": int | None,
"GPS": int | None,
"Gas Mined": int,
"Block Gas Limit": int,
"Block Fullness Gas": float,
"Ref Time": int | None,
"Max Ref Time": int | None,
"Block Fullness Ref Time": int | None,
"Proof Size": int | None,
"Max Proof Size": int | None,
"Block Fullness Proof Size": int | None,
},
)
"""A typed dictionary used to hold all of the block information"""
def load_report(path: str) -> ReportRoot:
"""Load the report JSON from disk.
Args:
path: Absolute or relative filesystem path to the JSON report file.
Returns:
The parsed report as a typed dictionary structure.
"""
with open(path, "r", encoding="utf-8") as f:
data: ReportRoot = json.load(f)
return data
def main() -> None:
report_path: str = sys.argv[1]
report: ReportRoot = load_report(report_path)
# TODO: Remove this in the future, but for now, the target is fixed.
target: str = sys.argv[2]
csv_writer = csv.writer(sys.stdout)
for _, metadata_file_report in report["execution_information"].items():
for _, case_report in metadata_file_report["case_reports"].items():
for _, execution_report in case_report["mode_execution_reports"].items():
blocks_information: list[MinedBlockInformation] = execution_report[
"mined_block_information"
][target]
resolved_blocks: list[BlockInformation] = []
for i, block_information in enumerate(blocks_information):
mined_gas: int = block_information["ethereum_block_information"][
"mined_gas"
]
block_gas_limit: int = block_information[
"ethereum_block_information"
]["block_gas_limit"]
resolved_blocks.append(
{
"Block Number": block_information[
"ethereum_block_information"
]["block_number"],
"Timestamp": block_information[
"ethereum_block_information"
]["block_timestamp"],
"Datetime": None,
"Transaction Count": len(
block_information["ethereum_block_information"][
"transaction_hashes"
]
),
"TPS": (
None
if i == 0
else execution_report["metrics"][
"transaction_per_second"
]["raw"][target][i - 1]
),
"GPS": (
None
if i == 0
else execution_report["metrics"]["gas_per_second"][
"raw"
][target][i - 1]
),
"Gas Mined": block_information[
"ethereum_block_information"
]["mined_gas"],
"Block Gas Limit": block_information[
"ethereum_block_information"
]["block_gas_limit"],
"Block Fullness Gas": mined_gas / block_gas_limit,
"Ref Time": substrate_block_information_ref_time(
block_information["substrate_block_information"]
),
"Max Ref Time": substrate_block_information_max_ref_time(
block_information["substrate_block_information"]
),
"Block Fullness Ref Time": metrics_raw_item(
execution_report["metrics"],
"ref_time_block_fullness",
target,
i,
),
"Proof Size": substrate_block_information_proof_size(
block_information["substrate_block_information"]
),
"Max Proof Size": substrate_block_information_max_proof_size(
block_information["substrate_block_information"]
),
"Block Fullness Proof Size": metrics_raw_item(
execution_report["metrics"],
"proof_size_block_fullness",
target,
i,
),
}
)
csv_writer = csv.DictWriter(sys.stdout, resolved_blocks[0].keys())
csv_writer.writeheader()
csv_writer.writerows(resolved_blocks)
if __name__ == "__main__":
main()