Rebrand: polkadot → pezkuwi, substrate → bizinikiwi, kusama → dicle

This commit is contained in:
2026-01-07 02:29:40 +03:00
commit d5f038faea
1383 changed files with 1088018 additions and 0 deletions
@@ -0,0 +1,325 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import type { HealthChecker, SmoldotHealth } from './types.js';
import { stringify } from '@pezkuwi/util';
interface JSONRequest {
id: string;
jsonrpc: '2.0',
method: string;
params: unknown[];
}
/*
* Creates a new health checker.
*
* The role of the health checker is to report to the user the health of a smoldot chain.
*
* In order to use it, start by creating a health checker, and call `setSendJsonRpc` to set the
* way to send a JSON-RPC request to a chain. The health checker is disabled by default. Use
* `start()` in order to start the health checks. The `start()` function must be passed a callback called
* when an update to the health of the node is available.
*
* In order to send a JSON-RPC request to the chain, you **must** use the `sendJsonRpc` function
* of the health checker. The health checker rewrites the `id` of the requests it receives.
*
* When the chain send a JSON-RPC response, it must be passed to `responsePassThrough()`. This
* function intercepts the responses destined to the requests that have been emitted by the health
* checker and returns `null`. If the response doesn't concern the health checker, the response is
* simply returned by the function.
*
* # How it works
*
* The health checker periodically calls the `system_health` JSON-RPC call in order to determine
* the health of the chain.
*
* In addition to this, as long as the health check reports that `isSyncing` is `true`, the
* health checker also maintains a subscription to new best blocks using `chain_subscribeNewHeads`.
* Whenever a new block is notified, a health check is performed immediately in order to determine
* whether `isSyncing` has changed to `false`.
*
* Thanks to this subscription, the latency of the report of the switch from `isSyncing: true` to
* `isSyncing: false` is very low.
*
*/
export function healthChecker (): HealthChecker {
// `null` if health checker is not started.
let checker: null | InnerChecker = null;
let sendJsonRpc: null | ((request: string) => void) = null;
return {
responsePassThrough: (jsonRpcResponse) => {
if (checker === null) {
return jsonRpcResponse;
}
return checker.responsePassThrough(jsonRpcResponse);
},
sendJsonRpc: (request) => {
if (!sendJsonRpc) {
throw new Error('setSendJsonRpc must be called before sending requests');
}
if (checker === null) {
sendJsonRpc(request);
} else {
checker.sendJsonRpc(request);
}
},
setSendJsonRpc: (cb) => {
sendJsonRpc = cb;
},
start: (healthCallback) => {
if (checker !== null) {
throw new Error("Can't start the health checker multiple times in parallel");
} else if (!sendJsonRpc) {
throw new Error('setSendJsonRpc must be called before starting the health checks');
}
checker = new InnerChecker(healthCallback, sendJsonRpc);
checker.update(true);
},
stop: () => {
if (checker === null) {
return;
} // Already stopped.
checker.destroy();
checker = null;
}
};
}
class InnerChecker {
#healthCallback: (health: SmoldotHealth) => void;
#currentHealthCheckId: string | null = null;
#currentHealthTimeout: ReturnType<typeof setTimeout> | null = null;
#currentSubunsubRequestId: string | null = null;
#currentSubscriptionId: string | null = null;
#requestToSmoldot: (request: JSONRequest) => void;
#isSyncing = false;
#nextRequestId = 0;
constructor (healthCallback: (health: SmoldotHealth) => void, requestToSmoldot: (request: string) => void) {
this.#healthCallback = healthCallback;
this.#requestToSmoldot = (request: JSONRequest) => requestToSmoldot(stringify(request));
}
sendJsonRpc = (request: string): void => {
// Replace the `id` in the request to prefix the request ID with `extern:`.
let parsedRequest: JSONRequest;
try {
parsedRequest = JSON.parse(request) as JSONRequest;
} catch {
return;
}
if (parsedRequest.id) {
const newId = 'extern:' + stringify(parsedRequest.id);
parsedRequest.id = newId;
}
this.#requestToSmoldot(parsedRequest);
};
responsePassThrough = (jsonRpcResponse: string): string | null => {
let parsedResponse: {id: string, result?: SmoldotHealth, params?: { subscription: string }};
try {
parsedResponse = JSON.parse(jsonRpcResponse) as { id: string, result?: SmoldotHealth };
} catch {
return jsonRpcResponse;
}
// Check whether response is a response to `system_health`.
if (parsedResponse.id && this.#currentHealthCheckId === parsedResponse.id) {
this.#currentHealthCheckId = null;
// Check whether query was successful. It is possible for queries to fail for
// various reasons, such as the client being overloaded.
if (!parsedResponse.result) {
this.update(false);
return null;
}
this.#healthCallback(parsedResponse.result);
this.#isSyncing = parsedResponse.result.isSyncing;
this.update(false);
return null;
}
// Check whether response is a response to the subscription or unsubscription.
if (
parsedResponse.id &&
this.#currentSubunsubRequestId === parsedResponse.id
) {
this.#currentSubunsubRequestId = null;
// Check whether query was successful. It is possible for queries to fail for
// various reasons, such as the client being overloaded.
if (!parsedResponse.result) {
this.update(false);
return null;
}
if (this.#currentSubscriptionId) {
this.#currentSubscriptionId = null;
} else {
this.#currentSubscriptionId = parsedResponse.result as unknown as string;
}
this.update(false);
return null;
}
// Check whether response is a notification to a subscription.
if (
parsedResponse.params &&
this.#currentSubscriptionId &&
parsedResponse.params.subscription === this.#currentSubscriptionId
) {
// Note that after a successful subscription, a notification containing
// the current best block is always returned. Considering that a
// subscription is performed in response to a health check, calling
// `startHealthCheck()` here will lead to a second health check.
// It might seem redundant to perform two health checks in a quick
// succession, but doing so doesn't lead to any problem, and it is
// actually possible for the health to have changed in between as the
// current best block might have been updated during the subscription
// request.
this.update(true);
return null;
}
// Response doesn't concern us.
if (parsedResponse.id) {
const id: string = parsedResponse.id;
// Need to remove the `extern:` prefix.
if (!id.startsWith('extern:')) {
throw new Error('State inconsistency in health checker');
}
const newId = JSON.parse(id.slice('extern:'.length)) as string;
parsedResponse.id = newId;
}
return stringify(parsedResponse);
};
update = (startNow: boolean): void => {
// If `startNow`, clear `#currentHealthTimeout` so that it is set below.
if (startNow && this.#currentHealthTimeout) {
clearTimeout(this.#currentHealthTimeout);
this.#currentHealthTimeout = null;
}
if (!this.#currentHealthTimeout) {
const startHealthRequest = () => {
this.#currentHealthTimeout = null;
// No matter what, don't start a health request if there is already one in progress.
// This is sane to do because receiving a response to a health request calls `update()`.
if (this.#currentHealthCheckId) {
return;
}
// Actual request starting.
this.#currentHealthCheckId = `health-checker:${this.#nextRequestId}`;
this.#nextRequestId += 1;
this.#requestToSmoldot({
id: this.#currentHealthCheckId,
jsonrpc: '2.0',
method: 'system_health',
params: []
});
};
if (startNow) {
startHealthRequest();
} else {
this.#currentHealthTimeout = setTimeout(startHealthRequest, 1000);
}
}
if (
this.#isSyncing &&
!this.#currentSubscriptionId &&
!this.#currentSubunsubRequestId
) {
this.startSubscription();
}
if (
!this.#isSyncing &&
this.#currentSubscriptionId &&
!this.#currentSubunsubRequestId
) {
this.endSubscription();
}
};
startSubscription = (): void => {
if (this.#currentSubunsubRequestId || this.#currentSubscriptionId) {
throw new Error('Internal error in health checker');
}
this.#currentSubunsubRequestId = `health-checker:${this.#nextRequestId}`;
this.#nextRequestId += 1;
this.#requestToSmoldot({
id: this.#currentSubunsubRequestId,
jsonrpc: '2.0',
method: 'chain_subscribeNewHeads',
params: []
});
};
endSubscription = (): void => {
if (this.#currentSubunsubRequestId || !this.#currentSubscriptionId) {
throw new Error('Internal error in health checker');
}
this.#currentSubunsubRequestId = `health-checker:${this.#nextRequestId}`;
this.#nextRequestId += 1;
this.#requestToSmoldot({
id: this.#currentSubunsubRequestId,
jsonrpc: '2.0',
method: 'chain_unsubscribeNewHeads',
params: [this.#currentSubscriptionId]
});
};
destroy = (): void => {
if (this.#currentHealthTimeout) {
clearTimeout(this.#currentHealthTimeout);
this.#currentHealthTimeout = null;
}
};
}
export class HealthCheckError extends Error {
readonly #cause: unknown;
getCause (): unknown {
return this.#cause;
}
constructor (response: unknown, message = 'Got error response asking for system health') {
super(message);
this.#cause = response;
}
}
@@ -0,0 +1,671 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import type * as Sc from '@bizinikiwi/connect';
import type { HealthChecker, SmoldotHealth } from './types.js';
import { stringify } from '@pezkuwi/util';
import { ScProvider } from './index.js';
interface MockChain extends Sc.Chain {
_spec: () => string;
_recevedRequests: () => string[];
_isTerminated: () => boolean;
_triggerCallback: (response: string | object) => void;
_setTerminateInterceptor: (fn: () => void) => void;
_setSendJsonRpcInterceptor: (fn: (rpc: string) => void) => void;
_getLatestRequest: () => string;
}
interface MockedHealthChecker extends HealthChecker {
_isActive: () => boolean;
_triggerHealthUpdate: (update: SmoldotHealth) => void;
}
type MockSc = typeof Sc & {
latestChain: () => MockChain;
};
enum WellKnownChain {
pezkuwi = 'pezkuwi',
ksmcc3 = 'ksmcc3',
pezkuwichain_v2_2 = 'pezkuwichain_v2_2',
dicle2 = 'dicle2'
}
const wait = (ms: number) =>
new Promise((resolve) =>
setTimeout(resolve, ms)
);
function healthCheckerMock (): MockedHealthChecker {
let cb: (health: SmoldotHealth) => void = () => undefined;
let sendJsonRpc: (request: string) => void = () => undefined;
let isActive = false;
return {
_isActive: () => isActive,
_triggerHealthUpdate: (update: SmoldotHealth) => {
cb(update);
},
responsePassThrough: (response) => response,
sendJsonRpc: (...args) => sendJsonRpc(...args),
setSendJsonRpc: (cb) => {
sendJsonRpc = cb;
},
start: (x) => {
isActive = true;
cb = x;
},
stop: () => {
isActive = false;
}
};
}
function healthCheckerFactory () {
const _healthCheckers: MockedHealthChecker[] = [];
return {
_healthCheckers,
_latestHealthChecker: () => _healthCheckers.slice(-1)[0],
healthChecker: () => {
const result = healthCheckerMock();
_healthCheckers.push(result);
return result;
}
};
}
function getFakeChain (spec: string): MockChain {
const _receivedRequests: string[] = [];
let _isTerminated = false;
const _pendingResponses: string[] = [];
let _responseResolver: ((value: string) => void) | null = null;
let terminateInterceptor = Function.prototype;
let sendJsonRpcInterceptor = Function.prototype;
const triggerCallback = (response: string | object) => {
const res = typeof response === 'string' ? response : stringify(response);
if (_responseResolver) {
_responseResolver(res);
_responseResolver = null;
} else {
_pendingResponses.push(res);
}
};
const nextJsonRpcResponse = (): Promise<string> => {
if (_pendingResponses.length > 0) {
return Promise.resolve(_pendingResponses.shift()!);
}
return new Promise((resolve) => {
_responseResolver = resolve;
});
};
const jsonRpcResponses: AsyncIterableIterator<string> = {
[Symbol.asyncIterator] () {
return this;
},
async next () {
if (_isTerminated) {
return { done: true, value: undefined };
}
const value = await nextJsonRpcResponse();
return { done: false, value };
}
};
return {
_getLatestRequest: () => _receivedRequests[_receivedRequests.length - 1],
_isTerminated: () => _isTerminated,
_recevedRequests: () => _receivedRequests,
_setSendJsonRpcInterceptor: (fn) => {
sendJsonRpcInterceptor = fn;
},
_setTerminateInterceptor: (fn) => {
terminateInterceptor = fn;
},
_spec: () => spec,
_triggerCallback: triggerCallback,
addChain: (chainSpec) =>
Promise.resolve(getFakeChain(chainSpec)),
remove: () => {
terminateInterceptor();
_isTerminated = true;
},
sendJsonRpc: (rpc) => {
sendJsonRpcInterceptor(rpc);
_receivedRequests.push(rpc);
},
nextJsonRpcResponse,
jsonRpcResponses
};
}
function getFakeClient () {
const chains: MockChain[] = [];
let addChainInterceptor: Promise<void> = Promise.resolve();
let addWellKnownChainInterceptor: Promise<void> = Promise.resolve();
return {
_chains: () => chains,
_setAddChainInterceptor: (interceptor: Promise<void>) => {
addChainInterceptor = interceptor;
},
_setAddWellKnownChainInterceptor: (interceptor: Promise<void>) => {
addWellKnownChainInterceptor = interceptor;
},
addChain: (chainSpec: string): Promise<MockChain> =>
addChainInterceptor.then(() => {
const result = getFakeChain(chainSpec);
chains.push(result);
return result;
}),
addWellKnownChain: (
wellKnownChain: string
): Promise<MockChain> =>
addWellKnownChainInterceptor.then(() => {
const result = getFakeChain(wellKnownChain);
chains.push(result);
return result;
})
};
}
function connectorFactory (): MockSc {
const clients: ReturnType<typeof getFakeClient>[] = [];
const latestClient = () => clients[clients.length - 1];
return {
WellKnownChain,
_clients: () => clients,
createScClient: () => {
const result = getFakeClient();
clients.push(result);
return result;
},
latestChain: () =>
latestClient()._chains()[latestClient()._chains().length - 1],
latestClient
} as unknown as MockSc;
}
function setChainSyncyingStatus (isSyncing: boolean): void {
getCurrentHealthChecker()._triggerHealthUpdate({
isSyncing,
peers: 1,
shouldHavePeers: true
});
}
let mockSc: MockSc;
let mockedHealthChecker: ReturnType<typeof healthCheckerFactory>;
const getCurrentHealthChecker = () => mockedHealthChecker._latestHealthChecker();
describe('ScProvider', () => {
beforeAll(() => {
mockSc = connectorFactory();
mockedHealthChecker = healthCheckerFactory();
});
describe('on', () => {
it('emits `connected` as soon as the chain is not syncing', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
const onConnected = jest.fn();
provider.on('connected', onConnected);
expect(onConnected).not.toHaveBeenCalled();
setChainSyncyingStatus(false);
expect(onConnected).toHaveBeenCalled();
});
it('stops receiving notifications after unsubscribing', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
const onConnected = jest.fn();
provider.on('connected', onConnected)();
expect(onConnected).not.toHaveBeenCalled();
setChainSyncyingStatus(false);
expect(onConnected).not.toHaveBeenCalled();
});
it('synchronously emits connected if the Provider is already `connected`', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
setChainSyncyingStatus(false);
const onConnected = jest.fn();
provider.on('connected', onConnected);
expect(onConnected).toHaveBeenCalled();
});
it('emits `disconnected` once the chain goes back to syncing', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
setChainSyncyingStatus(false);
const onConnected = jest.fn();
const onDisconnected = jest.fn();
provider.on('connected', onConnected);
provider.on('disconnected', onDisconnected);
expect(onConnected).toHaveBeenCalled();
expect(onDisconnected).not.toHaveBeenCalled();
onConnected.mockReset();
setChainSyncyingStatus(true);
expect(onConnected).not.toHaveBeenCalled();
expect(onDisconnected).toHaveBeenCalled();
});
});
describe('hasSubscriptions', () => {
it('supports subscriptions', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
expect(provider.hasSubscriptions).toBe(true);
});
});
describe('clone', () => {
it('can not be clonned', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
expect(() => provider.clone()).toThrow();
});
});
describe('connect', () => {
it('does not create a new chain when trying to re-connect while the current chain is syncing', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
const chain = mockSc.latestChain();
await provider.connect(undefined, mockedHealthChecker.healthChecker);
expect(chain).toBe(mockSc.latestChain());
});
it('throws when trying to connect on an already connected Provider', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
setChainSyncyingStatus(false);
await expect(
provider.connect(undefined, mockedHealthChecker.healthChecker)
).rejects.toThrow(/Already connected/);
});
});
describe('disconnect', () => {
it('removes the chain and cleans up', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
const chain = mockSc.latestChain();
await provider.disconnect();
expect(chain._isTerminated()).toBe(true);
});
// eslint-disable-next-line jest/expect-expect
it('does not throw when disconnecting on an already disconnected Provider', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
await provider.disconnect();
await provider.disconnect();
});
});
describe('send', () => {
it('throws when trying to send a request while the Provider is not connected', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
await expect(provider.send('', [])).rejects.toThrow();
});
it('receives responses to its requests', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
const chain = mockSc.latestChain();
setChainSyncyingStatus(false);
const responsePromise = provider.send<unknown>('getData', ['foo']);
await wait(0);
expect(chain._getLatestRequest()).toEqual(
'{"id":1,"jsonrpc":"2.0","method":"getData","params":["foo"]}'
);
const result = { foo: 'foo' };
chain._triggerCallback({
id: 1,
jsonrpc: '2.0',
result
});
const response = await responsePromise;
expect(response).toEqual(result);
});
it("rejects when the response can't be deserialized", async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
const chain = mockSc.latestChain();
setChainSyncyingStatus(false);
setTimeout(() => {
chain._triggerCallback({
id: 1,
jsonrpc: '2.0'
});
}, 0);
await expect(provider.send('getData', ['foo'])).rejects.toThrow();
});
it('rejects when the smoldot chain has crashed', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
const chain = mockSc.latestChain();
setChainSyncyingStatus(false);
await wait(0);
chain._setSendJsonRpcInterceptor(() => {
throw new Error('boom!');
});
await expect(
provider.send('getData', ['foo'])
).rejects.toThrow(/Disconnected/);
expect(provider.isConnected).toBe(false);
});
});
describe('subscribe', () => {
it('subscribes and recives messages until it unsubscribes', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
const chain = mockSc.latestChain();
setChainSyncyingStatus(false);
const unsubscribeToken = 'unsubscribeToken';
setTimeout(() => {
chain._triggerCallback({
id: 1,
jsonrpc: '2.0',
result: unsubscribeToken
});
}, 0);
const cb = jest.fn();
const token = await provider.subscribe(
'foo',
'chain_subscribeNewHeads',
['baz'],
cb
);
expect(token).toBe(unsubscribeToken);
expect(cb).not.toHaveBeenCalled();
chain._triggerCallback({
jsonrpc: '2.0',
method: 'foo',
params: {
result: 1,
subscription: token
}
});
expect(cb).toHaveBeenCalledTimes(1);
expect(cb).toHaveBeenLastCalledWith(null, 1);
chain._triggerCallback({
jsonrpc: '2.0',
method: 'foo',
params: {
result: 2,
subscription: token
}
});
expect(cb).toHaveBeenCalledTimes(2);
expect(cb).toHaveBeenLastCalledWith(null, 2);
provider
.unsubscribe('foo', 'chain_unsubscribeNewHeads', unsubscribeToken)
.catch(console.error);
chain._triggerCallback({
jsonrpc: '2.0',
method: 'foo',
params: {
result: 3,
subscription: token
}
});
expect(cb).toHaveBeenCalledTimes(2);
expect(cb).toHaveBeenLastCalledWith(null, 2);
});
it('ignores subscription messages that were received before the subscription token', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
const chain = mockSc.latestChain();
setChainSyncyingStatus(false);
const unsubscribeToken = 'unsubscribeToken';
chain._triggerCallback({
jsonrpc: '2.0',
method: 'foo',
params: {
result: 1,
subscription: unsubscribeToken
}
});
setTimeout(() => {
chain._triggerCallback({
id: 1,
jsonrpc: '2.0',
result: unsubscribeToken
});
}, 0);
const cb = jest.fn();
const token = await provider.subscribe(
'foo',
'chain_subscribeNewHeads',
['baz'],
cb
);
expect(token).toBe(unsubscribeToken);
expect(cb).not.toHaveBeenCalled();
});
it('emits the error when the message has an error', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
const chain = mockSc.latestChain();
setChainSyncyingStatus(false);
await wait(0);
const unsubscribeToken = 'unsubscribeToken';
setTimeout(() => {
chain._triggerCallback({
id: 1,
jsonrpc: '2.0',
result: unsubscribeToken
});
}, 0);
const cb = jest.fn();
const token = await provider.subscribe(
'foo',
'chain_subscribeNewHeads',
['baz'],
cb
);
chain._triggerCallback({
jsonrpc: '2.0',
method: 'foo',
params: {
error: 'boom',
subscription: unsubscribeToken
}
});
expect(token).toBe(unsubscribeToken);
expect(cb).toHaveBeenCalledTimes(1);
expect(cb).toHaveBeenLastCalledWith(expect.any(Error), undefined);
});
it('errors when subscribing to an unsupported method', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
setChainSyncyingStatus(false);
await wait(0);
await expect(
provider.subscribe('foo', 'bar', ['baz'], () => undefined)
).rejects.toThrow(/Unsupported subscribe method: bar/);
});
});
describe('unsubscribe', () => {
it('rejects when trying to unsubscribe from un unexisting subscription', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
setChainSyncyingStatus(false);
await expect(
provider.unsubscribe('', '', '')
).rejects.toThrow(/Unable to find active subscription/);
});
});
it('cleans up the stale subscriptions once it reconnects', async () => {
const provider = new ScProvider(mockSc, '');
await provider.connect(undefined, mockedHealthChecker.healthChecker);
const chain = mockSc.latestChain();
// setting the syncing status of the chain to fals so that the Provider
// gets `connected`
setChainSyncyingStatus(false);
// while connected we create a subscription
const unsubscribeToken = 'unsubscribeToken';
setTimeout(() => {
chain._triggerCallback({
id: 1,
jsonrpc: '2.0',
result: unsubscribeToken
});
}, 0);
const cb = jest.fn();
const token = await provider.subscribe(
'foo',
'chain_subscribeNewHeads',
['baz'],
cb
);
// setting the syncing status of the chain to fals so that the Provider
// gets `disconnected`
setChainSyncyingStatus(true);
// let's wait some time in order to ensure that the stale unsubscription
// messages are not sent until the chain syncing status changes back to false
await wait(200);
// before we let the healthChecker know that the chain is no longer syncing,
// let's make sure that the chain has received the correct request, and
// most importantly that it has not received a request for unsubscribing
// from the stale subscription, since that request should happen once the
// chain is no longer syncing
expect(chain._recevedRequests()).toEqual([
'{"id":1,"jsonrpc":"2.0","method":"chain_subscribeNewHeads","params":["baz"]}'
]);
// lets change the sync status back to false
setChainSyncyingStatus(false);
// let's wait one tick to ensure that the microtasks got processed
await wait(0);
// let's make sure that we have now sent the request for killing the
// stale subscription
expect(chain._recevedRequests()).toEqual([
'{"id":1,"jsonrpc":"2.0","method":"chain_subscribeNewHeads","params":["baz"]}',
`{"id":2,"jsonrpc":"2.0","method":"chain_unsubscribeNewHeads","params":["${token}"]}`,
'{"id":3,"jsonrpc":"2.0","method":"chain_subscribeNewHeads","params":["baz"]}'
]);
});
});
@@ -0,0 +1,427 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import type * as ScType from '@bizinikiwi/connect';
import type { JsonRpcResponse, ProviderInterface, ProviderInterfaceCallback, ProviderInterfaceEmitCb, ProviderInterfaceEmitted } from '../types.js';
import { EventEmitter } from 'eventemitter3';
import { isError, isFunction, isObject, logger, noop, objectSpread } from '@pezkuwi/util';
import { RpcCoder } from '../coder/index.js';
import { healthChecker } from './Health.js';
type ResponseCallback = (response: string | Error) => void;
// We define the interface with items we use - this means that we don't really
// need to be passed a full `import * as Sc from '@ubstrate/connect'`, but can
// also make do with a { WellKnownChain, createScClient } interface
interface BizinikiwiConnect {
WellKnownChain: typeof ScType['WellKnownChain'];
createScClient: typeof ScType['createScClient'];
}
const l = logger('api-bizinikiwi-connect');
// These methods have been taken from:
// https://github.com/pezkuwichain/smoldot/blob/17425040ddda47d539556eeaf62b88c4240d1d42/src/json_rpc/methods.rs#L338-L462
// It's important to take into account that smoldot is adding support to the new
// json-rpc-interface https://pezkuwichain.github.io/json-rpc-interface-spec/
// However, at the moment this list only includes methods that belong to the "old" API
const subscriptionUnsubscriptionMethods = new Map<string, string>([
['author_submitAndWatchExtrinsic', 'author_unwatchExtrinsic'],
['chain_subscribeAllHeads', 'chain_unsubscribeAllHeads'],
['chain_subscribeFinalizedHeads', 'chain_unsubscribeFinalizedHeads'],
['chain_subscribeFinalisedHeads', 'chain_subscribeFinalisedHeads'],
['chain_subscribeNewHeads', 'chain_unsubscribeNewHeads'],
['chain_subscribeNewHead', 'chain_unsubscribeNewHead'],
['chain_subscribeRuntimeVersion', 'chain_unsubscribeRuntimeVersion'],
['subscribe_newHead', 'unsubscribe_newHead'],
['state_subscribeRuntimeVersion', 'state_unsubscribeRuntimeVersion'],
['state_subscribeStorage', 'state_unsubscribeStorage']
]);
const scClients = new WeakMap<ScProvider, ScType.ScClient>();
interface ActiveSubs {
type: string,
method: string,
params: any[],
callback: ProviderInterfaceCallback
}
export class ScProvider implements ProviderInterface {
readonly #Sc: BizinikiwiConnect;
readonly #coder: RpcCoder = new RpcCoder();
readonly #spec: string | ScType.WellKnownChain;
readonly #sharedSandbox?: ScProvider | undefined;
readonly #subscriptions = new Map<string, [ResponseCallback, { unsubscribeMethod: string; id: string | number }]>();
readonly #resubscribeMethods = new Map<string, ActiveSubs>();
readonly #requests = new Map<number, ResponseCallback>();
readonly #wellKnownChains: Set<ScType.WellKnownChain>;
readonly #eventemitter: EventEmitter = new EventEmitter();
#chain: Promise<ScType.Chain> | null = null;
#isChainReady = false;
public constructor (Sc: BizinikiwiConnect, spec: string | ScType.WellKnownChain, sharedSandbox?: ScProvider) {
if (!isObject(Sc) || !isObject(Sc.WellKnownChain) || !isFunction(Sc.createScClient)) {
throw new Error('Expected an @bizinikiwi/connect interface as first parameter to ScProvider');
}
this.#Sc = Sc;
this.#spec = spec;
this.#sharedSandbox = sharedSandbox;
this.#wellKnownChains = new Set(Object.values(Sc.WellKnownChain));
}
public get hasSubscriptions (): boolean {
// Indicates that subscriptions are supported
return !!true;
}
public get isClonable (): boolean {
return !!false;
}
public get isConnected (): boolean {
return !!this.#chain && this.#isChainReady;
}
public clone (): ProviderInterface {
throw new Error('clone() is not supported.');
}
// Config details can be found in @bizinikiwi/connect repo following the link:
// https://github.com/pezkuwichain/bizinikiwi-connect/blob/main/packages/connect/src/connector/index.ts
async connect (config?: ScType.Config, checkerFactory = healthChecker): Promise<void> {
if (this.isConnected) {
throw new Error('Already connected!');
}
// it could happen that after emitting `disconnected` due to the fact that
// smoldot is syncing, the consumer tries to reconnect after a certain amount
// of time... In which case we want to make sure that we don't create a new
// chain.
if (this.#chain) {
await this.#chain;
return;
}
if (this.#sharedSandbox && !this.#sharedSandbox.isConnected) {
await this.#sharedSandbox.connect();
}
const client = this.#sharedSandbox
? scClients.get(this.#sharedSandbox)
: this.#Sc.createScClient(config);
if (!client) {
throw new Error('Unknown ScProvider!');
}
scClients.set(this, client);
const hc = checkerFactory();
const onResponse = (res: string): void => {
const hcRes = hc.responsePassThrough(res);
if (!hcRes) {
return;
}
const response = JSON.parse(hcRes) as JsonRpcResponse<string>;
let decodedResponse: string | Error;
try {
decodedResponse = this.#coder.decodeResponse(response);
} catch (e) {
decodedResponse = e as Error;
}
// It's not a subscription message, but rather a standar RPC response
if (response.params?.subscription === undefined || !response.method) {
return this.#requests.get(response.id)?.(decodedResponse);
}
// We are dealing with a subscription message
const subscriptionId = `${response.method}::${response.params.subscription}`;
const callback = this.#subscriptions.get(subscriptionId)?.[0];
callback?.(decodedResponse);
};
const addChain = this.#sharedSandbox
? (async (...args) => {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const source = this.#sharedSandbox!;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return (await source.#chain)!.addChain(...args);
}) as ScType.AddChain
: this.#wellKnownChains.has(this.#spec as ScType.WellKnownChain)
? client.addWellKnownChain
: client.addChain;
this.#chain = addChain(this.#spec as ScType.WellKnownChain).then((chain) => {
hc.setSendJsonRpc(chain.sendJsonRpc);
// Start async response processing loop
// This replaces the callback-based API from older @substrate/connect versions
(async () => {
try {
for await (const res of chain.jsonRpcResponses) {
onResponse(res);
}
} catch {
// Chain was removed or connection closed - this is expected
}
})();
this.#isChainReady = false;
const cleanup = () => {
// If there are any callbacks left, we have to reject/error them.
// Otherwise, that would cause a memory leak.
const disconnectionError = new Error('Disconnected');
this.#requests.forEach((cb) => cb(disconnectionError));
this.#subscriptions.forEach(([cb]) => cb(disconnectionError));
this.#subscriptions.clear();
};
const staleSubscriptions: {
unsubscribeMethod: string
id: number | string
}[] = [];
const killStaleSubscriptions = () => {
if (staleSubscriptions.length === 0) {
return;
}
const stale = staleSubscriptions.pop();
if (!stale) {
throw new Error('Unable to get stale subscription');
}
const { id, unsubscribeMethod } = stale;
Promise
.race([
this.send(unsubscribeMethod, [id]).catch(noop),
new Promise((resolve) => setTimeout(resolve, 500))
])
.then(killStaleSubscriptions)
.catch(noop);
};
hc.start((health) => {
const isReady =
!health.isSyncing && (health.peers > 0 || !health.shouldHavePeers);
// if it's the same as before, then nothing has changed and we are done
if (this.#isChainReady === isReady) {
return;
}
this.#isChainReady = isReady;
if (!isReady) {
// If we've reached this point, that means that the chain used to be "ready"
// and now we are about to emit `disconnected`.
//
// This will cause the PezkuwiJs API think that the connection is
// actually dead. In reality the smoldot chain is not dead, of course.
// However, we have to cleanup all the existing callbacks because when
// the smoldot chain stops syncing, then we will emit `connected` and
// the PezkuwiJs API will try to re-create the previous
// subscriptions and requests. Although, now is not a good moment
// to be sending unsubscription messages to the smoldot chain, we
// should wait until is no longer syncing to send the unsubscription
// messages from the stale subscriptions of the previous connection.
//
// That's why -before we perform the cleanup of `this.#subscriptions`-
// we keep the necessary information that we will need later on to
// kill the stale subscriptions.
[...this.#subscriptions.values()].forEach((s) => {
staleSubscriptions.push(s[1]);
});
cleanup();
this.#eventemitter.emit('disconnected');
} else {
killStaleSubscriptions();
this.#eventemitter.emit('connected');
if (this.#resubscribeMethods.size) {
this.#resubscribe();
}
}
});
return objectSpread({}, chain, {
remove: () => {
hc.stop();
chain.remove();
cleanup();
},
sendJsonRpc: hc.sendJsonRpc.bind(hc)
});
});
try {
await this.#chain;
} catch (e) {
this.#chain = null;
this.#eventemitter.emit('error', e);
throw e;
}
}
#resubscribe = (): void => {
const promises: any[] = [];
this.#resubscribeMethods.forEach((subDetails: ActiveSubs): void => {
// only re-create subscriptions which are not in author (only area where
// transactions are created, i.e. submissions such as 'author_submitAndWatchExtrinsic'
// are not included (and will not be re-broadcast)
if (subDetails.type.startsWith('author_')) {
return;
}
try {
const promise = new Promise<void>((resolve) => {
this.subscribe(subDetails.type, subDetails.method, subDetails.params, subDetails.callback).catch((error) => console.log(error));
resolve();
});
promises.push(promise);
} catch (error) {
l.error(error);
}
});
Promise.all(promises).catch((err) => l.log(err));
};
async disconnect (): Promise<void> {
if (!this.#chain) {
return;
}
const chain = await this.#chain;
this.#chain = null;
this.#isChainReady = false;
try {
chain.remove();
} catch (_) {}
this.#eventemitter.emit('disconnected');
}
public on (type: ProviderInterfaceEmitted, sub: ProviderInterfaceEmitCb): () => void {
// It's possible. Although, quite unlikely, that by the time that pezkuwi
// subscribes to the `connected` event, the Provider is already connected.
// In that case, we must emit to let the consumer know that we are connected.
if (type === 'connected' && this.isConnected) {
sub();
}
this.#eventemitter.on(type, sub);
return (): void => {
this.#eventemitter.removeListener(type, sub);
};
}
public async send<T = any> (method: string, params: unknown[]): Promise<T> {
if (!this.isConnected || !this.#chain) {
throw new Error('Provider is not connected');
}
const chain = await this.#chain;
const [id, json] = this.#coder.encodeJson(method, params);
const result = new Promise<T>((resolve, reject): void => {
this.#requests.set(id, (response) => {
(isError(response) ? reject : resolve)(response as unknown as T);
});
try {
chain.sendJsonRpc(json);
} catch (e) {
this.#chain = null;
try {
chain.remove();
} catch (_) {}
this.#eventemitter.emit('error', e);
}
});
try {
return await result;
} finally {
// let's ensure that once the Promise is resolved/rejected, then we remove
// remove its entry from the internal #requests
this.#requests.delete(id);
}
}
public async subscribe (type: string, method: string, params: any[], callback: ProviderInterfaceCallback): Promise<number | string> {
if (!subscriptionUnsubscriptionMethods.has(method)) {
throw new Error(`Unsupported subscribe method: ${method}`);
}
const id = await this.send<number | string>(method, params);
const subscriptionId = `${type}::${id}`;
const cb = (response: Error | string) => {
if (response instanceof Error) {
callback(response, undefined);
} else {
callback(null, response);
}
};
const unsubscribeMethod = subscriptionUnsubscriptionMethods.get(method);
if (!unsubscribeMethod) {
throw new Error('Invalid unsubscribe method found');
}
this.#resubscribeMethods.set(subscriptionId, { callback, method, params, type });
this.#subscriptions.set(subscriptionId, [cb, { id, unsubscribeMethod }]);
return id;
}
public unsubscribe (type: string, method: string, id: number | string): Promise<boolean> {
if (!this.isConnected) {
throw new Error('Provider is not connected');
}
const subscriptionId = `${type}::${id}`;
if (!this.#subscriptions.has(subscriptionId)) {
return Promise.reject(
new Error(`Unable to find active subscription=${subscriptionId}`)
);
}
this.#resubscribeMethods.delete(subscriptionId);
this.#subscriptions.delete(subscriptionId);
return this.send(method, [id]);
}
}
@@ -0,0 +1,16 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
export interface SmoldotHealth {
isSyncing: boolean
peers: number
shouldHavePeers: boolean
}
export interface HealthChecker {
setSendJsonRpc(sendRequest: (request: string) => void): void
start(healthCallback: (health: SmoldotHealth) => void): void
stop(): void
sendJsonRpc(request: string): void
responsePassThrough(response: string): string | null
}
+8
View File
@@ -0,0 +1,8 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
export { ScProvider } from './bizinikiwi-connect/index.js';
export { HttpProvider } from './http/index.js';
export { DEFAULT_CAPACITY, LRUCache } from './lru.js';
export { packageInfo } from './packageInfo.js';
export { WsProvider } from './ws/index.js';
@@ -0,0 +1,70 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import type { JsonRpcResponse } from '../types.js';
import { RpcCoder } from './index.js';
describe('decodeResponse', (): void => {
let coder: RpcCoder;
beforeEach((): void => {
coder = new RpcCoder();
});
it('expects a non-empty input object', (): void => {
expect(
() => coder.decodeResponse(undefined as unknown as JsonRpcResponse<unknown>)
).toThrow(/Invalid jsonrpc/);
});
it('expects a valid jsonrpc field', (): void => {
expect(
() => coder.decodeResponse({} as JsonRpcResponse<unknown>)
).toThrow(/Invalid jsonrpc/);
});
it('expects a valid id field', (): void => {
expect(
() => coder.decodeResponse({ jsonrpc: '2.0' } as JsonRpcResponse<unknown>)
).toThrow(/Invalid id/);
});
it('expects a valid result field', (): void => {
expect(
() => coder.decodeResponse({ id: 1, jsonrpc: '2.0' } as JsonRpcResponse<unknown>)
).toThrow(/No result/);
});
it('throws any error found', (): void => {
expect(
() => coder.decodeResponse({ error: { code: 123, message: 'test error' }, id: 1, jsonrpc: '2.0' } as JsonRpcResponse<unknown>)
).toThrow(/123: test error/);
});
it('throws any error found, with data', (): void => {
expect(
() => coder.decodeResponse({ error: { code: 123, data: 'Error("Some random error description")', message: 'test error' }, id: 1, jsonrpc: '2.0' } as JsonRpcResponse<unknown>)
).toThrow(/123: test error: Some random error description/);
});
it('allows for number subscription ids', (): void => {
expect(
coder.decodeResponse({ id: 1, jsonrpc: '2.0', method: 'test', params: { result: 'test result', subscription: 1 } } as JsonRpcResponse<unknown>)
).toEqual('test result');
});
it('allows for string subscription ids', (): void => {
expect(
coder.decodeResponse({ id: 1, jsonrpc: '2.0', method: 'test', params: { result: 'test result', subscription: 'abc' } } as JsonRpcResponse<unknown>)
).toEqual('test result');
});
it('returns the result', (): void => {
expect(
coder.decodeResponse({ id: 1, jsonrpc: '2.0', result: 'some result' } as JsonRpcResponse<unknown>)
).toEqual('some result');
});
});
@@ -0,0 +1,20 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import { RpcCoder } from './index.js';
describe('encodeJson', (): void => {
let coder: RpcCoder;
beforeEach((): void => {
coder = new RpcCoder();
});
it('encodes a valid JsonRPC JSON string', (): void => {
expect(
coder.encodeJson('method', ['params'])
).toEqual([1, '{"id":1,"jsonrpc":"2.0","method":"method","params":["params"]}']);
});
});
@@ -0,0 +1,25 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import { RpcCoder } from './index.js';
describe('encodeObject', (): void => {
let coder: RpcCoder;
beforeEach((): void => {
coder = new RpcCoder();
});
it('encodes a valid JsonRPC object', (): void => {
expect(
coder.encodeObject('method', ['a', 'b'])
).toEqual([1, {
id: 1,
jsonrpc: '2.0',
method: 'method',
params: ['a', 'b']
}]);
});
});
@@ -0,0 +1,111 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import { isError } from '@pezkuwi/util';
import RpcError from './error.js';
describe('RpcError', (): void => {
describe('constructor', (): void => {
it('constructs an Error that is still an Error', (): void => {
expect(
isError(
new RpcError()
)
).toEqual(true);
});
});
describe('static', (): void => {
it('exposes the .CODES as a static', (): void => {
expect(
Object.keys(RpcError.CODES)
).not.toEqual(0);
});
});
describe('constructor properties', (): void => {
it('sets the .message property', (): void => {
expect(
new RpcError('test message').message
).toEqual('test message');
});
it("sets the .message to '' when not set", (): void => {
expect(
new RpcError().message
).toEqual('');
});
it('sets the .code property', (): void => {
expect(
new RpcError('test message', 1234).code
).toEqual(1234);
});
it('sets the .code to UKNOWN when not set', (): void => {
expect(
new RpcError('test message').code
).toEqual(RpcError.CODES.UNKNOWN);
});
it('sets the .data property', (): void => {
const data = 'here';
expect(
new RpcError('test message', 1234, data).data
).toEqual(data);
});
it('sets the .data property to generic value', (): void => {
const data = { custom: 'value' } as const;
expect(
new RpcError('test message', 1234, data).data
).toEqual(data);
});
});
describe('stack traces', (): void => {
// eslint-disable-next-line @typescript-eslint/ban-types
let captureStackTrace: (targetObject: Record<string, any>, constructorOpt?: Function | undefined) => void;
beforeEach((): void => {
captureStackTrace = Error.captureStackTrace;
Error.captureStackTrace = function (error): void {
Object.defineProperty(error, 'stack', {
configurable: true,
get: function getStack (): string {
const value = 'some stack returned';
Object.defineProperty(this, 'stack', { value });
return value;
}
});
};
});
afterEach((): void => {
Error.captureStackTrace = captureStackTrace;
});
it('captures via captureStackTrace when available', (): void => {
expect(
new RpcError().stack
).toEqual('some stack returned');
});
it('captures via stack when captureStackTrace not available', (): void => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
Error.captureStackTrace = null as any;
expect(
new RpcError().stack.length
).not.toEqual(0);
});
});
});
+66
View File
@@ -0,0 +1,66 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import type { RpcErrorInterface } from '../types.js';
import { isFunction } from '@pezkuwi/util';
const UNKNOWN = -99999;
function extend<Data, K extends keyof RpcError<Data>> (that: RpcError<Data>, name: K, value: RpcError<Data>[K]): void {
Object.defineProperty(that, name, {
configurable: true,
enumerable: false,
value
});
}
/**
* @name RpcError
* @summary Extension to the basic JS Error.
* @description
* The built-in JavaScript Error class is extended by adding a code to allow for Error categorization. In addition to the normal `stack`, `message`, the numeric `code` and `data` (any types) parameters are available on the object.
* @example
* <BR>
*
* ```javascript
* const { RpcError } from '@pezkuwi/util');
*
* throw new RpcError('some message', RpcError.CODES.METHOD_NOT_FOUND); // => error.code = -32601
* ```
*/
export default class RpcError<T = never> extends Error implements RpcErrorInterface<T> {
public code!: number;
public data?: T;
public override message!: string;
public override name!: string;
public override stack!: string;
public constructor (message = '', code: number = UNKNOWN, data?: T) {
super();
extend(this, 'message', String(message));
extend(this, 'name', this.constructor.name);
extend(this, 'data', data);
extend(this, 'code', code);
if (isFunction(Error.captureStackTrace)) {
Error.captureStackTrace(this, this.constructor);
} else {
const { stack } = new Error(message);
stack && extend(this, 'stack', stack);
}
}
public static CODES = {
ASSERT: -90009,
INVALID_JSONRPC: -99998,
METHOD_NOT_FOUND: -32601, // Rust client
UNKNOWN
};
}
+88
View File
@@ -0,0 +1,88 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import type { JsonRpcRequest, JsonRpcResponse, JsonRpcResponseBaseError } from '../types.js';
import { isNumber, isString, isUndefined, stringify } from '@pezkuwi/util';
import RpcError from './error.js';
function formatErrorData (data?: string | number): string {
if (isUndefined(data)) {
return '';
}
const formatted = `: ${isString(data)
? data.replace(/Error\("/g, '').replace(/\("/g, '(').replace(/"\)/g, ')').replace(/\(/g, ', ').replace(/\)/g, '')
: stringify(data)}`;
// We need some sort of cut-off here since these can be very large and
// very nested, pick a number and trim the result display to it
return formatted.length <= 256
? formatted
: `${formatted.substring(0, 255)}`;
}
function checkError (error?: JsonRpcResponseBaseError): void {
if (error) {
const { code, data, message } = error;
throw new RpcError(`${code}: ${message}${formatErrorData(data)}`, code, data);
}
}
/** @internal */
export class RpcCoder {
#id = 0;
public decodeResponse <T> (response?: JsonRpcResponse<T>): T {
if (!response || response.jsonrpc !== '2.0') {
throw new Error('Invalid jsonrpc field in decoded object');
}
const isSubscription = !isUndefined(response.params) && !isUndefined(response.method);
if (
!isNumber(response.id) &&
(
!isSubscription || (
!isNumber(response.params.subscription) &&
!isString(response.params.subscription)
)
)
) {
throw new Error('Invalid id field in decoded object');
}
checkError(response.error);
if (response.result === undefined && !isSubscription) {
throw new Error('No result found in jsonrpc response');
}
if (isSubscription) {
checkError(response.params.error);
return response.params.result;
}
return response.result;
}
public encodeJson (method: string, params: unknown[]): [number, string] {
const [id, data] = this.encodeObject(method, params);
return [id, stringify(data)];
}
public encodeObject (method: string, params: unknown[]): [number, JsonRpcRequest] {
const id = ++this.#id;
return [id, {
id,
jsonrpc: '2.0',
method,
params
}];
}
}
+10
View File
@@ -0,0 +1,10 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
const HTTP_URL = 'http://127.0.0.1:9933';
const WS_URL = 'ws://127.0.0.1:9944';
export default {
HTTP_URL,
WS_URL
};
@@ -0,0 +1,72 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import { TEST_HTTP_URL } from '../mock/mockHttp.js';
import { HttpProvider } from './index.js';
describe('Http', (): void => {
let http: HttpProvider;
beforeEach((): void => {
http = new HttpProvider(TEST_HTTP_URL);
});
it('requires an http:// prefixed endpoint', (): void => {
expect(
() => new HttpProvider('ws://')
).toThrow(/with 'http/);
});
it('allows https:// endpoints', (): void => {
expect(
() => new HttpProvider('https://')
).not.toThrow();
});
it('allows custom headers', (): void => {
expect(
() => new HttpProvider('https://', { foo: 'bar' })
).not.toThrow();
});
it('should throw error on negative cache capacity or TTL', () => {
expect(() =>
new HttpProvider(TEST_HTTP_URL, {}, -5, 30000)
).toThrow(/'capacity' must be a non-negative integer/);
expect(() =>
new HttpProvider(TEST_HTTP_URL, {}, 1024, -1000)
).toThrow(/'ttl' must be between 0 and 1800000 ms or null to disable/);
});
it('allow clone', (): void => {
const clone = http.clone();
/* eslint-disable */
expect((clone as any)['#endpoint']).toEqual((http as any)['#endpoint']);
expect((clone as any)['#headers']).toEqual((http as any)['#headers']);
/* eslint-enable */
});
it('always returns isConnected true', (): void => {
expect(http.isConnected).toEqual(true);
});
it('does not (yet) support subscribe', async (): Promise<void> => {
await http.subscribe('', '', [], (cb): void => {
// eslint-disable-next-line jest/no-conditional-expect
expect(cb).toEqual(expect.anything());
}).catch((error): void => {
// eslint-disable-next-line jest/no-conditional-expect
expect((error as Error).message).toMatch(/does not have subscriptions/);
});
});
it('does not (yet) support unsubscribe', async (): Promise<void> => {
await http.unsubscribe('', '', 0).catch((error): void => {
// eslint-disable-next-line jest/no-conditional-expect
expect((error as Error).message).toMatch(/does not have subscriptions/);
});
});
});
+238
View File
@@ -0,0 +1,238 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import type RpcError from '../coder/error.js';
import type { JsonRpcResponse, ProviderInterface, ProviderInterfaceCallback, ProviderInterfaceEmitCb, ProviderInterfaceEmitted, ProviderStats } from '../types.js';
import { logger, noop, stringify } from '@pezkuwi/util';
import { fetch } from '@pezkuwi/x-fetch';
import { RpcCoder } from '../coder/index.js';
import defaults from '../defaults.js';
import { DEFAULT_CAPACITY, DEFAULT_TTL, LRUCache } from '../lru.js';
const ERROR_SUBSCRIBE = 'HTTP Provider does not have subscriptions, use WebSockets instead';
const l = logger('api-http');
/**
* # @pezkuwi/rpc-provider
*
* @name HttpProvider
*
* @description The HTTP Provider allows sending requests using HTTP to a HTTP RPC server TCP port. It does not support subscriptions so you won't be able to listen to events such as new blocks or balance changes. It is usually preferable using the [[WsProvider]].
*
* @example
* <BR>
*
* ```javascript
* import Api from '@pezkuwi/api/promise';
* import { HttpProvider } from '@pezkuwi/rpc-provider';
*
* const provider = new HttpProvider('http://127.0.0.1:9933');
* const api = new Api(provider);
* ```
*
* @see [[WsProvider]]
*/
export class HttpProvider implements ProviderInterface {
readonly #callCache: LRUCache;
readonly #cacheCapacity: number;
readonly #coder: RpcCoder;
readonly #endpoint: string;
readonly #headers: Record<string, string>;
readonly #stats: ProviderStats;
readonly #ttl: number | null | undefined;
/**
* @param {string} endpoint The endpoint url starting with http://
* @param {Record<string, string>} headers The headers provided to the underlying Http Endpoint
* @param {number} [cacheCapacity] Custom size of the HttpProvider LRUCache. Defaults to `DEFAULT_CAPACITY` (1024)
* @param {number} [cacheTtl] Custom TTL of the HttpProvider LRUCache. Determines how long an object can live in the cache. Defaults to `DEFAULT_TTL` (30000)
*/
constructor (endpoint: string = defaults.HTTP_URL, headers: Record<string, string> = {}, cacheCapacity?: number, cacheTtl?: number | null) {
if (!/^(https|http):\/\//.test(endpoint)) {
throw new Error(`Endpoint should start with 'http://' or 'https://', received '${endpoint}'`);
}
this.#coder = new RpcCoder();
this.#endpoint = endpoint;
this.#headers = headers;
this.#cacheCapacity = cacheCapacity === 0 ? 0 : cacheCapacity || DEFAULT_CAPACITY;
const ttl = cacheTtl === undefined ? DEFAULT_TTL : cacheTtl;
this.#callCache = new LRUCache(cacheCapacity === 0 ? 0 : cacheCapacity || DEFAULT_CAPACITY, ttl);
this.#ttl = cacheTtl;
this.#stats = {
active: { requests: 0, subscriptions: 0 },
total: { bytesRecv: 0, bytesSent: 0, cached: 0, errors: 0, requests: 0, subscriptions: 0, timeout: 0 }
};
}
/**
* @summary `true` when this provider supports subscriptions
*/
public get hasSubscriptions (): boolean {
return !!false;
}
/**
* @description Returns a clone of the object
*/
public clone (): HttpProvider {
return new HttpProvider(this.#endpoint, this.#headers);
}
/**
* @description Manually connect from the connection
*/
public async connect (): Promise<void> {
// noop
}
/**
* @description Manually disconnect from the connection
*/
public async disconnect (): Promise<void> {
// noop
}
/**
* @description Returns the connection stats
*/
public get stats (): ProviderStats {
return this.#stats;
}
/**
* @description Returns the connection stats
*/
public get ttl (): number | null | undefined {
return this.#ttl;
}
/**
* @summary `true` when this provider supports clone()
*/
public get isClonable (): boolean {
return !!true;
}
/**
* @summary Whether the node is connected or not.
* @return {boolean} true if connected
*/
public get isConnected (): boolean {
return !!true;
}
/**
* @summary Events are not supported with the HttpProvider, see [[WsProvider]].
* @description HTTP Provider does not have 'on' emitters. WebSockets should be used instead.
*/
public on (_type: ProviderInterfaceEmitted, _sub: ProviderInterfaceEmitCb): () => void {
l.error('HTTP Provider does not have \'on\' emitters, use WebSockets instead');
return noop;
}
/**
* @summary Send HTTP POST Request with Body to configured HTTP Endpoint.
*/
public async send <T> (method: string, params: unknown[], isCacheable?: boolean): Promise<T> {
this.#stats.total.requests++;
const [, body] = this.#coder.encodeJson(method, params);
if (this.#cacheCapacity === 0) {
return this.#send(body);
}
const cacheKey = isCacheable ? `${method}::${stringify(params)}` : '';
let resultPromise: Promise<T> | null = isCacheable
? this.#callCache.get(cacheKey)
: null;
if (!resultPromise) {
resultPromise = this.#send(body);
if (isCacheable) {
this.#callCache.set(cacheKey, resultPromise);
}
} else {
this.#stats.total.cached++;
}
return resultPromise;
}
async #send <T> (body: string): Promise<T> {
this.#stats.active.requests++;
this.#stats.total.bytesSent += body.length;
try {
const response = await fetch(this.#endpoint, {
body,
headers: {
Accept: 'application/json',
'Content-Length': `${body.length}`,
'Content-Type': 'application/json',
...this.#headers
},
method: 'POST'
});
if (!response.ok) {
throw new Error(`[${response.status}]: ${response.statusText}`);
}
const result = await response.text();
this.#stats.total.bytesRecv += result.length;
const decoded = this.#coder.decodeResponse(JSON.parse(result) as JsonRpcResponse<T>);
this.#stats.active.requests--;
return decoded;
} catch (e) {
this.#stats.active.requests--;
this.#stats.total.errors++;
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const { method, params } = JSON.parse(body);
const rpcError: RpcError = e as RpcError;
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const failedRequest = `\nFailed HTTP Request: ${JSON.stringify({ method, params })}`;
// Provide HTTP Request alongside the error
rpcError.message = `${rpcError.message}${failedRequest}`;
throw rpcError;
}
}
/**
* @summary Subscriptions are not supported with the HttpProvider, see [[WsProvider]].
*/
// eslint-disable-next-line @typescript-eslint/require-await
public async subscribe (_types: string, _method: string, _params: unknown[], _cb: ProviderInterfaceCallback): Promise<number> {
l.error(ERROR_SUBSCRIBE);
throw new Error(ERROR_SUBSCRIBE);
}
/**
* @summary Subscriptions are not supported with the HttpProvider, see [[WsProvider]].
*/
// eslint-disable-next-line @typescript-eslint/require-await
public async unsubscribe (_type: string, _method: string, _id: number): Promise<boolean> {
l.error(ERROR_SUBSCRIBE);
throw new Error(ERROR_SUBSCRIBE);
}
}
@@ -0,0 +1,61 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import type { Mock } from '../mock/types.js';
import { mockHttp, TEST_HTTP_URL } from '../mock/mockHttp.js';
import { HttpProvider } from './index.js';
// Does not work with Node 18 (native fetch)
// See https://github.com/nock/nock/issues/2397
// eslint-disable-next-line jest/no-disabled-tests
describe.skip('send', (): void => {
let http: HttpProvider;
let mock: Mock;
beforeEach((): void => {
http = new HttpProvider(TEST_HTTP_URL);
});
afterEach(async () => {
if (mock) {
await mock.done();
}
});
it('passes the body through correctly', (): Promise<void> => {
mock = mockHttp([{
method: 'test_body',
reply: {
result: 'ok'
}
}]);
return http
.send('test_body', ['param'])
.then((): void => {
expect(mock.body['test_body']).toEqual({
id: 1,
jsonrpc: '2.0',
method: 'test_body',
params: ['param']
});
});
});
it('throws error when !response.ok', async (): Promise<any> => {
mock = mockHttp([{
code: 500,
method: 'test_error'
}]);
return http
.send('test_error', [])
.catch((error): void => {
// eslint-disable-next-line jest/no-conditional-expect
expect((error as Error).message).toMatch(/\[500\]/);
});
});
});
+11
View File
@@ -0,0 +1,11 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import type { Logger } from '@pezkuwi/util/types';
import type { RpcCoder } from '../coder/index.js';
export interface HttpState {
coder: RpcCoder;
endpoint: string;
l: Logger;
}
+6
View File
@@ -0,0 +1,6 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import './packageDetect.js';
export * from './bundle.js';
+74
View File
@@ -0,0 +1,74 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import { LRUCache } from './lru.js';
describe('LRUCache', (): void => {
let lru: LRUCache | undefined;
beforeEach((): void => {
lru = new LRUCache(4, 500);
});
it('allows getting of items below capacity', (): void => {
const keys = ['1', '2', '3', '4'];
keys.forEach((k) => lru?.set(k, `${k}${k}${k}`));
const lruKeys = lru?.keys();
expect(lruKeys?.join(', ')).toBe(keys.reverse().join(', '));
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);
keys.forEach((k) => expect(lru?.get(k)).toEqual(`${k}${k}${k}`));
});
it('drops items when at capacity', (): void => {
const keys = ['1', '2', '3', '4', '5', '6'];
keys.forEach((k) => lru?.set(k, `${k}${k}${k}`));
expect(lru?.keys().join(', ')).toEqual(keys.slice(2).reverse().join(', '));
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);
keys.slice(2).forEach((k) => expect(lru?.get(k)).toEqual(`${k}${k}${k}`));
});
it('adjusts the order as they are used', (): void => {
const keys = ['1', '2', '3', '4', '5'];
keys.forEach((k) => lru?.set(k, `${k}${k}${k}`));
expect(lru?.entries()).toEqual([['5', '555'], ['4', '444'], ['3', '333'], ['2', '222']]);
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);
lru?.get('3');
expect(lru?.entries()).toEqual([['3', '333'], ['5', '555'], ['4', '444'], ['2', '222']]);
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);
lru?.set('4', '4433');
expect(lru?.entries()).toEqual([['4', '4433'], ['3', '333'], ['5', '555'], ['2', '222']]);
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);
lru?.set('6', '666');
expect(lru?.entries()).toEqual([['6', '666'], ['4', '4433'], ['3', '333'], ['5', '555']]);
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);
});
it('evicts items with TTL', (): void => {
const keys = ['1', '2', '3', '4', '5'];
keys.forEach((k) => lru?.set(k, `${k}${k}${k}`));
expect(lru?.entries()).toEqual([['5', '555'], ['4', '444'], ['3', '333'], ['2', '222']]);
setTimeout((): void => {
lru?.get('3');
expect(lru?.entries()).toEqual([['3', '333']]);
}, 800);
});
});
+197
View File
@@ -0,0 +1,197 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
// Assuming all 1.5MB responses, we apply a default allowing for 192MB
// cache space (depending on the historic queries this would vary, metadata
// for Zagros/Pezkuwi/Bizinikiwi falls between 600-750K, 2x for estimate)
export const DEFAULT_CAPACITY = 1024;
export const DEFAULT_TTL = 30000; // 30 seconds
const MAX_TTL = 1800_000; // 30 minutes
// If the user decides to disable the TTL we set the value
// to a very high number (A year = 365 * 24 * 60 * 60 * 1000).
const DISABLED_TTL = 31_536_000_000;
class LRUNode {
readonly key: string;
#expires: number;
#ttl: number;
readonly createdAt: number;
public next: LRUNode;
public prev: LRUNode;
constructor (key: string, ttl: number) {
this.key = key;
this.#ttl = ttl;
this.#expires = Date.now() + ttl;
this.createdAt = Date.now();
this.next = this.prev = this;
}
public refresh (): void {
this.#expires = Date.now() + this.#ttl;
}
public get expiry (): number {
return this.#expires;
}
}
// https://en.wikipedia.org/wiki/Cache_replacement_policies#LRU
export class LRUCache {
readonly capacity: number;
readonly #data = new Map<string, unknown>();
readonly #refs = new Map<string, LRUNode>();
#length = 0;
#head: LRUNode;
#tail: LRUNode;
readonly #ttl: number;
constructor (capacity = DEFAULT_CAPACITY, ttl: number | null = DEFAULT_TTL) {
// Validate capacity
if (!Number.isInteger(capacity) || capacity < 0) {
throw new Error(`LRUCache initialization error: 'capacity' must be a non-negative integer. Received: ${capacity}`);
}
// Validate ttl
if (ttl !== null && (!Number.isFinite(ttl) || ttl < 0 || ttl > MAX_TTL)) {
throw new Error(`LRUCache initialization error: 'ttl' must be between 0 and ${MAX_TTL} ms or null to disable. Received: ${ttl}`);
}
this.capacity = capacity;
ttl ? this.#ttl = ttl : this.#ttl = DISABLED_TTL;
this.#head = this.#tail = new LRUNode('<empty>', this.#ttl);
}
get ttl (): number | null {
return this.#ttl;
}
get length (): number {
return this.#length;
}
get lengthData (): number {
return this.#data.size;
}
get lengthRefs (): number {
return this.#refs.size;
}
entries (): [string, unknown][] {
const keys = this.keys();
const count = keys.length;
const entries = new Array<[string, unknown]>(count);
for (let i = 0; i < count; i++) {
const key = keys[i];
entries[i] = [key, this.#data.get(key)];
}
return entries;
}
keys (): string[] {
const keys: string[] = [];
if (this.#length) {
let curr = this.#head;
while (curr !== this.#tail) {
keys.push(curr.key);
curr = curr.next;
}
keys.push(curr.key);
}
return keys;
}
get <T> (key: string): T | null {
const data = this.#data.get(key);
if (data) {
this.#toHead(key);
// Evict TTL once data is refreshed
this.#evictTTL();
return data as T;
}
this.#evictTTL();
return null;
}
set <T> (key: string, value: T): void {
if (this.#data.has(key)) {
this.#toHead(key);
} else {
const node = new LRUNode(key, this.#ttl);
this.#refs.set(node.key, node);
if (this.length === 0) {
this.#head = this.#tail = node;
} else {
this.#head.prev = node;
node.next = this.#head;
this.#head = node;
}
if (this.#length === this.capacity) {
this.#data.delete(this.#tail.key);
this.#refs.delete(this.#tail.key);
this.#tail = this.#tail.prev;
this.#tail.next = this.#head;
} else {
this.#length += 1;
}
}
// Evict TTL once data is refreshed or added
this.#evictTTL();
this.#data.set(key, value);
}
#evictTTL () {
// Find last node to keep
// traverse map to find the expired nodes
while (this.#tail.expiry && this.#tail.expiry < Date.now() && this.#length > 0) {
this.#refs.delete(this.#tail.key);
this.#data.delete(this.#tail.key);
this.#length -= 1;
this.#tail = this.#tail.prev;
this.#tail.next = this.#head;
}
if (this.#length === 0) {
this.#head = this.#tail = new LRUNode('<empty>', this.#ttl);
}
}
#toHead (key: string): void {
const ref = this.#refs.get(key);
if (ref && ref !== this.#head) {
ref.refresh();
ref.prev.next = ref.next;
ref.next.prev = ref.prev;
ref.next = this.#head;
this.#head.prev = ref;
this.#head = ref;
}
}
}
+259
View File
@@ -0,0 +1,259 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/* eslint-disable camelcase */
import type { Header } from '@pezkuwi/types/interfaces';
import type { Codec, Registry } from '@pezkuwi/types/types';
import type { ProviderInterface, ProviderInterfaceEmitCb, ProviderInterfaceEmitted } from '../types.js';
import type { MockStateDb, MockStateSubscriptionCallback, MockStateSubscriptions } from './types.js';
import { EventEmitter } from 'eventemitter3';
import { createTestKeyring } from '@pezkuwi/keyring/testing';
import { decorateStorage, Metadata } from '@pezkuwi/types';
import jsonrpc from '@pezkuwi/types/interfaces/jsonrpc';
import rpcHeader from '@pezkuwi/types-support/json/Header.004.json' assert { type: 'json' };
import rpcSignedBlock from '@pezkuwi/types-support/json/SignedBlock.004.immortal.json' assert { type: 'json' };
import rpcMetadata from '@pezkuwi/types-support/metadata/static-bizinikiwi';
import { BN, bnToU8a, logger, u8aToHex } from '@pezkuwi/util';
import { randomAsU8a } from '@pezkuwi/util-crypto';
const INTERVAL = 1000;
const SUBSCRIPTIONS: string[] = Array.prototype.concat.apply(
[],
Object.values(jsonrpc).map((section): string[] =>
Object
.values(section)
.filter(({ isSubscription }) => isSubscription)
.map(({ jsonrpc }) => jsonrpc)
.concat('chain_subscribeNewHead')
)
) as string[];
const keyring = createTestKeyring({ type: 'ed25519' });
const l = logger('api-mock');
/**
* A mock provider mainly used for testing.
* @return {ProviderInterface} The mock provider
* @internal
*/
export class MockProvider implements ProviderInterface {
private db: MockStateDb = {};
private emitter = new EventEmitter();
private intervalId?: ReturnType<typeof setInterval> | null;
public isUpdating = true;
private registry: Registry;
private prevNumber = new BN(-1);
private requests: Record<string, (...params: any[]) => unknown> = {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
chain_getBlock: () => this.registry.createType('SignedBlock', rpcSignedBlock.result).toJSON(),
chain_getBlockHash: () => '0x1234000000000000000000000000000000000000000000000000000000000000',
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
chain_getFinalizedHead: () => this.registry.createType('Header', rpcHeader.result).hash,
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
chain_getHeader: () => this.registry.createType('Header', rpcHeader.result).toJSON(),
rpc_methods: () => this.registry.createType('RpcMethods').toJSON(),
state_getKeys: () => [],
state_getKeysPaged: () => [],
state_getMetadata: () => rpcMetadata,
state_getRuntimeVersion: () => this.registry.createType('RuntimeVersion').toHex(),
state_getStorage: (storage: MockStateDb, [key]: string[]) => u8aToHex(storage[key]),
system_chain: () => 'mockChain',
system_health: () => ({}),
system_name: () => 'mockClient',
system_properties: () => ({ ss58Format: 42 }),
system_upgradedToTripleRefCount: () => this.registry.createType('bool', true),
system_version: () => '9.8.7',
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, sort-keys
dev_echo: (_, params: any) => params
};
public subscriptions: MockStateSubscriptions = SUBSCRIPTIONS.reduce((subs, name): MockStateSubscriptions => {
subs[name] = {
callbacks: {},
lastValue: null
};
return subs;
}, ({} as MockStateSubscriptions));
private subscriptionId = 0;
private subscriptionMap: Record<number, string> = {};
constructor (registry: Registry) {
this.registry = registry;
this.init();
}
public get hasSubscriptions (): boolean {
return !!true;
}
public clone (): MockProvider {
throw new Error('Unimplemented');
}
public async connect (): Promise<void> {
// noop
}
// eslint-disable-next-line @typescript-eslint/require-await
public async disconnect (): Promise<void> {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
}
public get isClonable (): boolean {
return !!false;
}
public get isConnected (): boolean {
return !!true;
}
public on (type: ProviderInterfaceEmitted, sub: ProviderInterfaceEmitCb): () => void {
this.emitter.on(type, sub);
return (): void => {
this.emitter.removeListener(type, sub);
};
}
// eslint-disable-next-line @typescript-eslint/require-await
public async send <T = any> (method: string, params: unknown[]): Promise<T> {
l.debug(() => ['send', method, params]);
if (!this.requests[method]) {
throw new Error(`provider.send: Invalid method '${method}'`);
}
return this.requests[method](this.db, params) as T;
}
// eslint-disable-next-line @typescript-eslint/require-await
public async subscribe (_type: string, method: string, ...params: unknown[]): Promise<number> {
l.debug(() => ['subscribe', method, params]);
if (!this.subscriptions[method]) {
throw new Error(`provider.subscribe: Invalid method '${method}'`);
}
const callback = params.pop() as MockStateSubscriptionCallback;
const id = ++this.subscriptionId;
this.subscriptions[method].callbacks[id] = callback;
this.subscriptionMap[id] = method;
if (this.subscriptions[method].lastValue !== null) {
callback(null, this.subscriptions[method].lastValue);
}
return id;
}
// eslint-disable-next-line @typescript-eslint/require-await
public async unsubscribe (_type: string, _method: string, id: number): Promise<boolean> {
const sub = this.subscriptionMap[id];
l.debug(() => ['unsubscribe', id, sub]);
if (!sub) {
throw new Error(`Unable to find subscription for ${id}`);
}
delete this.subscriptionMap[id];
delete this.subscriptions[sub].callbacks[id];
return true;
}
private init (): void {
const emitEvents: ProviderInterfaceEmitted[] = ['connected', 'disconnected'];
let emitIndex = 0;
let newHead = this.makeBlockHeader();
let counter = -1;
const metadata = new Metadata(this.registry, rpcMetadata);
this.registry.setMetadata(metadata);
const query = decorateStorage(this.registry, metadata.asLatest, metadata.version);
// Do something every 1 seconds
this.intervalId = setInterval((): void => {
if (!this.isUpdating) {
return;
}
// create a new header (next block)
newHead = this.makeBlockHeader();
// increment the balances and nonce for each account
keyring.getPairs().forEach(({ publicKey }, index): void => {
this.setStateBn(query['system']['account'](publicKey), newHead.number.toBn().addn(index));
});
// set the timestamp for the current block
this.setStateBn(query['timestamp']['now'](), Math.floor(Date.now() / 1000));
this.updateSubs('chain_subscribeNewHead', newHead);
// We emit connected/disconnected at intervals
if (++counter % 2 === 1) {
if (++emitIndex === emitEvents.length) {
emitIndex = 0;
}
this.emitter.emit(emitEvents[emitIndex]);
}
}, INTERVAL);
}
private makeBlockHeader (): Header {
const blockNumber = this.prevNumber.addn(1);
const header = this.registry.createType('Header', {
digest: {
logs: []
},
extrinsicsRoot: randomAsU8a(),
number: blockNumber,
parentHash: blockNumber.isZero()
? new Uint8Array(32)
: bnToU8a(this.prevNumber, { bitLength: 256, isLe: false }),
stateRoot: bnToU8a(blockNumber, { bitLength: 256, isLe: false })
});
this.prevNumber = blockNumber;
return header as unknown as Header;
}
private setStateBn (key: Uint8Array, value: BN | number): void {
this.db[u8aToHex(key)] = bnToU8a(value, { bitLength: 64, isLe: true });
}
private updateSubs (method: string, value: Codec): void {
this.subscriptions[method].lastValue = value;
Object
.values(this.subscriptions[method].callbacks)
.forEach((cb): void => {
try {
cb(null, value.toJSON());
} catch (error) {
l.error(`Error on '${method}' subscription`, error);
}
});
}
}
@@ -0,0 +1,35 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import type { Mock } from './types.js';
import nock from 'nock';
interface Request {
code?: number;
method: string;
reply?: Record<string, unknown>;
}
interface HttpMock extends Mock {
post: (uri: string) => {
reply: (code: number, handler: (uri: string, body: { id: string }) => unknown) => HttpMock
}
}
export const TEST_HTTP_URL = 'http://localhost:9944';
export function mockHttp (requests: Request[]): Mock {
nock.cleanAll();
return requests.reduce((scope: HttpMock, request: Request) =>
scope
.post('/')
.reply(request.code || 200, (_uri: string, body: { id: string }) => {
scope.body = scope.body || {};
scope.body[request.method] = body;
return Object.assign({ id: body.id, jsonrpc: '2.0' }, request.reply || {}) as unknown;
}),
nock(TEST_HTTP_URL) as unknown as HttpMock);
}
+92
View File
@@ -0,0 +1,92 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import { Server, WebSocket } from 'mock-socket';
import { stringify } from '@pezkuwi/util';
interface Scope {
body: Record<string, Record<string, unknown>>;
requests: number;
server: Server;
done: any;
}
interface ErrorDef {
id: number;
error: {
code: number;
message: string;
};
}
interface ReplyDef {
id: number;
reply: {
result: unknown;
};
}
interface RpcBase {
id: number;
jsonrpc: '2.0';
}
type RpcError = RpcBase & ErrorDef;
type RpcReply = RpcBase & { result: unknown };
export type Request = { method: string } & (ErrorDef | ReplyDef);
global.WebSocket = WebSocket as typeof global.WebSocket;
export const TEST_WS_URL = 'ws://localhost:9955';
// should be JSONRPC def return
function createError ({ error: { code, message }, id }: ErrorDef): RpcError {
return {
error: {
code,
message
},
id,
jsonrpc: '2.0'
};
}
// should be JSONRPC def return
function createReply ({ id, reply: { result } }: ReplyDef): RpcReply {
return {
id,
jsonrpc: '2.0',
result
};
}
// scope definition returned
export function mockWs (requests: Request[], wsUrl: string = TEST_WS_URL): Scope {
const server = new Server(wsUrl);
let requestCount = 0;
const scope: Scope = {
body: {},
done: () => new Promise<void>((resolve) => server.stop(resolve)),
requests: 0,
server
};
server.on('connection', (socket): void => {
socket.on('message', (body): void => {
const request = requests[requestCount];
const response = (request as ErrorDef).error
? createError(request as ErrorDef)
: createReply(request as ReplyDef);
scope.body[request.method] = body as unknown as Record<string, unknown>;
requestCount++;
socket.send(stringify(response));
});
});
return scope;
}
+43
View File
@@ -0,0 +1,43 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import type { ProviderInterfaceEmitted } from '../types.js';
import { TypeRegistry } from '@pezkuwi/types/create';
import { MockProvider } from './index.js';
describe('on', (): void => {
const registry = new TypeRegistry();
let mock: MockProvider;
beforeEach((): void => {
mock = new MockProvider(registry);
});
afterEach(async () => {
await mock.disconnect();
});
// eslint-disable-next-line jest/expect-expect
it('emits both connected and disconnected events', async (): Promise<void> => {
const events: Record<string, boolean> = { connected: false, disconnected: false };
await new Promise<boolean>((resolve) => {
const handler = (type: ProviderInterfaceEmitted): void => {
mock.on(type, (): void => {
events[type] = true;
if (Object.values(events).filter((value): boolean => value).length === 2) {
resolve(true);
}
});
};
handler('connected');
handler('disconnected');
});
});
});
@@ -0,0 +1,38 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import { TypeRegistry } from '@pezkuwi/types/create';
import { MockProvider } from './index.js';
describe('send', (): void => {
const registry = new TypeRegistry();
let mock: MockProvider;
beforeEach((): void => {
mock = new MockProvider(registry);
});
afterEach(async () => {
await mock.disconnect();
});
it('fails on non-supported methods', (): Promise<any> => {
return mock
.send('something_invalid', [])
.catch((error): void => {
// eslint-disable-next-line jest/no-conditional-expect
expect((error as Error).message).toMatch(/Invalid method/);
});
});
it('returns values for mocked requests', (): Promise<void> => {
return mock
.send('system_name', [])
.then((result): void => {
expect(result).toBe('mockClient');
});
});
});
@@ -0,0 +1,81 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import { TypeRegistry } from '@pezkuwi/types/create';
import { MockProvider } from './index.js';
describe('subscribe', (): void => {
const registry = new TypeRegistry();
let mock: MockProvider;
beforeEach((): void => {
mock = new MockProvider(registry);
});
afterEach(async () => {
await mock.disconnect();
});
it('fails on unknown methods', async (): Promise<void> => {
await mock
.subscribe('test', 'test_notFound')
.catch((error): void => {
// eslint-disable-next-line jest/no-conditional-expect
expect((error as Error).message).toMatch(/Invalid method 'test_notFound'/);
});
});
it('returns a subscription id', async (): Promise<void> => {
await mock
.subscribe('chain_newHead', 'chain_subscribeNewHead', (): void => undefined)
.then((id): void => {
expect(id).toEqual(1);
});
});
it('calls back with the last known value', async (): Promise<void> => {
mock.isUpdating = false;
mock.subscriptions.chain_subscribeNewHead.lastValue = 'testValue';
await new Promise<boolean>((resolve) => {
mock.subscribe('chain_newHead', 'chain_subscribeNewHead', (_: any, value: string): void => {
// eslint-disable-next-line jest/no-conditional-expect
expect(value).toEqual('testValue');
resolve(true);
}).catch(console.error);
});
});
// eslint-disable-next-line jest/expect-expect
it('calls back with new headers', async (): Promise<void> => {
await new Promise<boolean>((resolve) => {
mock.subscribe('chain_newHead', 'chain_subscribeNewHead', (_: any, header: { number: number }): void => {
if (header.number === 4) {
resolve(true);
}
}).catch(console.error);
});
});
// eslint-disable-next-line jest/expect-expect
it('handles errors within callbacks gracefully', async (): Promise<void> => {
let hasThrown = false;
await new Promise<boolean>((resolve) => {
mock.subscribe('chain_newHead', 'chain_subscribeNewHead', (_: any, header: { number: number }): void => {
if (!hasThrown) {
hasThrown = true;
throw new Error('testing');
}
if (header.number === 3) {
resolve(true);
}
}).catch(console.error);
});
});
});
+36
View File
@@ -0,0 +1,36 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import type { Server } from 'mock-socket';
export type Global = typeof globalThis & {
WebSocket: typeof WebSocket;
fetch: any;
}
export interface Mock {
body: Record<string, Record<string, unknown>>;
requests: number;
server: Server;
done: () => Promise<void>;
}
export type MockStateSubscriptionCallback = (error: Error | null, value: any) => void;
export interface MockStateSubscription {
callbacks: Record<number, MockStateSubscriptionCallback>;
lastValue: any;
}
export interface MockStateSubscriptions {
// known
chain_subscribeNewHead: MockStateSubscription;
state_subscribeStorage: MockStateSubscription;
// others
[key: string]: MockStateSubscription;
}
export type MockStateDb = Record<string, Uint8Array>;
export type MockStateRequests = Record<string, (db: MockStateDb, params: any[]) => string>;
@@ -0,0 +1,57 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import { TypeRegistry } from '@pezkuwi/types/create';
import { MockProvider } from './index.js';
describe('unsubscribe', (): void => {
const registry = new TypeRegistry();
let mock: MockProvider;
let id: number;
beforeEach((): Promise<void> => {
mock = new MockProvider(registry);
return mock
.subscribe('chain_newHead', 'chain_subscribeNewHead', (): void => undefined)
.then((_id): void => {
id = _id;
});
});
afterEach(async () => {
await mock.disconnect();
});
it('fails on unknown ids', async (): Promise<void> => {
await mock
.unsubscribe('chain_newHead', 'chain_subscribeNewHead', 5)
.catch((error): boolean => {
// eslint-disable-next-line jest/no-conditional-expect
expect((error as Error).message).toMatch(/Unable to find/);
return false;
});
});
// eslint-disable-next-line jest/expect-expect
it('unsubscribes successfully', async (): Promise<void> => {
await mock.unsubscribe('chain_newHead', 'chain_subscribeNewHead', id);
});
it('fails on double unsubscribe', async (): Promise<void> => {
await mock.unsubscribe('chain_newHead', 'chain_subscribeNewHead', id)
.then((): Promise<boolean> =>
mock.unsubscribe('chain_newHead', 'chain_subscribeNewHead', id)
)
.catch((error): boolean => {
// eslint-disable-next-line jest/no-conditional-expect
expect((error as Error).message).toMatch(/Unable to find/);
return false;
});
});
});
+4
View File
@@ -0,0 +1,4 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
export * from './index.js';
@@ -0,0 +1,12 @@
// Copyright 2017-2026 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
// Do not edit, auto-generated by @pezkuwi/dev
// (packageInfo imports will be kept as-is, user-editable)
import { packageInfo as typesInfo } from '@pezkuwi/types/packageInfo';
import { detectPackage } from '@pezkuwi/util';
import { packageInfo } from './packageInfo.js';
detectPackage(packageInfo, null, [typesInfo]);
+6
View File
@@ -0,0 +1,6 @@
// Copyright 2017-2026 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
// Do not edit, auto-generated by @pezkuwi/dev
export const packageInfo = { name: '@pezkuwi/rpc-provider', path: 'auto', type: 'auto', version: '16.5.4' };
+101
View File
@@ -0,0 +1,101 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
export interface JsonRpcObject {
id: number;
jsonrpc: '2.0';
}
export interface JsonRpcRequest extends JsonRpcObject {
method: string;
params: unknown[];
}
export interface JsonRpcResponseBaseError {
code: number;
data?: number | string;
message: string;
}
export interface RpcErrorInterface<T> {
code: number;
data?: T;
message: string;
stack: string;
}
interface JsonRpcResponseSingle<T> {
error?: JsonRpcResponseBaseError;
result: T;
}
interface JsonRpcResponseSubscription<T> {
method?: string;
params: {
error?: JsonRpcResponseBaseError;
result: T;
subscription: number | string;
};
}
export type JsonRpcResponseBase<T> = JsonRpcResponseSingle<T> & JsonRpcResponseSubscription<T>;
export type JsonRpcResponse<T> = JsonRpcObject & JsonRpcResponseBase<T>;
export type ProviderInterfaceCallback = (error: Error | null, result: any) => void;
export type ProviderInterfaceEmitted = 'connected' | 'disconnected' | 'error';
export type ProviderInterfaceEmitCb = (value?: any) => any;
export interface ProviderInterface {
/** true if the provider supports subscriptions (not available for HTTP) */
readonly hasSubscriptions: boolean;
/** true if the clone() functionality is available on the provider */
readonly isClonable: boolean;
/** true if the provider is currently connected (ws/sc has connection logic) */
readonly isConnected: boolean;
/** (optional) stats for the provider with connections/bytes */
readonly stats?: ProviderStats;
/** (optional) stats for the provider with connections/bytes */
readonly ttl?: number | null;
clone (): ProviderInterface;
connect (): Promise<void>;
disconnect (): Promise<void>;
on (type: ProviderInterfaceEmitted, sub: ProviderInterfaceEmitCb): () => void;
send <T = any> (method: string, params: unknown[], isCacheable?: boolean): Promise<T>;
subscribe (type: string, method: string, params: unknown[], cb: ProviderInterfaceCallback): Promise<number | string>;
unsubscribe (type: string, method: string, id: number | string): Promise<boolean>;
}
/** Stats for a specific endpoint */
export interface EndpointStats {
/** The total number of bytes sent */
bytesRecv: number;
/** The total number of bytes received */
bytesSent: number;
/** The number of cached/in-progress requests made */
cached: number;
/** The number of errors found */
errors: number;
/** The number of requests */
requests: number;
/** The number of subscriptions */
subscriptions: number;
/** The number of request timeouts */
timeout: number;
}
/** Overall stats for the provider */
export interface ProviderStats {
/** Details for the active/open requests */
active: {
/** Number of active requests */
requests: number;
/** Number of active subscriptions */
subscriptions: number;
};
/** The total requests that have been made */
total: EndpointStats;
}
@@ -0,0 +1,167 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import type { Mock } from '../mock/types.js';
import { mockWs } from '../mock/mockWs.js';
import { WsProvider } from './index.js';
const TEST_WS_URL = 'ws://localhost-connect.spec.ts:9988';
function sleep (ms = 100): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
describe('onConnect', (): void => {
let mocks: Mock[];
let provider: WsProvider | null;
beforeEach((): void => {
mocks = [mockWs([], TEST_WS_URL)];
});
afterEach(async () => {
if (provider) {
await provider.disconnect();
await sleep();
provider = null;
}
await Promise.all(mocks.map((m) => m.done()));
await sleep();
});
it('Does not connect when autoConnect is false', async () => {
provider = new WsProvider(TEST_WS_URL, 0);
await sleep();
expect(provider.isConnected).toBe(false);
await provider.connect();
await sleep();
expect(provider.isConnected).toBe(true);
await provider.disconnect();
await sleep();
expect(provider.isConnected).toBe(false);
});
it('Does connect when autoConnect is true', async () => {
provider = new WsProvider(TEST_WS_URL, 1);
await sleep();
expect(provider.isConnected).toBe(true);
});
it('Creates a new WebSocket instance by calling the connect() method', async () => {
provider = new WsProvider(TEST_WS_URL, false);
expect(provider.isConnected).toBe(false);
expect(mocks[0].server.clients().length).toBe(0);
await provider.connect();
await sleep();
expect(provider.isConnected).toBe(true);
expect(mocks[0].server.clients()).toHaveLength(1);
});
it('Connects to first endpoint when an array is given', async () => {
provider = new WsProvider([TEST_WS_URL], 1);
await sleep();
expect(provider.isConnected).toBe(true);
expect(mocks[0].server.clients()).toHaveLength(1);
});
it('Does not allow connect() on already-connected', async () => {
provider = new WsProvider([TEST_WS_URL], 1);
await sleep();
expect(provider.isConnected).toBe(true);
await expect(
provider.connect()
).rejects.toThrow(/already connected/);
});
it('Connects to the second endpoint when the first is unreachable', async () => {
const endpoints: string[] = ['ws://localhost-unreachable-connect.spec.ts:9956', TEST_WS_URL];
provider = new WsProvider(endpoints, 1);
await sleep();
expect(mocks[0].server.clients()).toHaveLength(1);
expect(provider.isConnected).toBe(true);
});
it('Connects to the second endpoint when the first is dropped', async () => {
const endpoints: string[] = [TEST_WS_URL, 'ws://localhost-connect.spec.ts:9957'];
mocks.push(mockWs([], endpoints[1]));
provider = new WsProvider(endpoints, 1);
await sleep();
// Check that first server is connected
expect(mocks[0].server.clients()).toHaveLength(1);
expect(mocks[1].server.clients()).toHaveLength(0);
// Close connection from first server
mocks[0].server.clients()[0].close();
await sleep();
// Check that second server is connected
expect(mocks[1].server.clients()).toHaveLength(1);
expect(provider.isConnected).toBe(true);
});
it('Round-robin of endpoints on WsProvider', async () => {
const endpoints: string[] = [
TEST_WS_URL,
'ws://localhost-connect.spec.ts:9956',
'ws://localhost-connect.spec.ts:9957',
'ws://invalid-connect.spec.ts:9956',
'ws://localhost-connect.spec.ts:9958'
];
mocks.push(mockWs([], endpoints[1]));
mocks.push(mockWs([], endpoints[2]));
mocks.push(mockWs([], endpoints[4]));
const mockNext = [
mocks[1],
mocks[2],
mocks[3],
mocks[0]
];
provider = new WsProvider(endpoints, 1);
for (let round = 0; round < 2; round++) {
for (let mock = 0; mock < mocks.length; mock++) {
await sleep();
// Wwe are connected, the current mock has the connection and the next doesn't
expect(provider.isConnected).toBe(true);
expect(mocks[mock].server.clients()).toHaveLength(1);
expect(mockNext[mock].server.clients()).toHaveLength(0);
// Close connection from first server
mocks[mock].server.clients()[0].close();
}
}
});
});
+41
View File
@@ -0,0 +1,41 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
// from https://stackoverflow.com/questions/19304157/getting-the-reason-why-websockets-closed-with-close-code-1006
const known: Record<number, string> = {
1000: 'Normal Closure',
1001: 'Going Away',
1002: 'Protocol Error',
1003: 'Unsupported Data',
1004: '(For future)',
1005: 'No Status Received',
1006: 'Abnormal Closure',
1007: 'Invalid frame payload data',
1008: 'Policy Violation',
1009: 'Message too big',
1010: 'Missing Extension',
1011: 'Internal Error',
1012: 'Service Restart',
1013: 'Try Again Later',
1014: 'Bad Gateway',
1015: 'TLS Handshake'
};
export function getWSErrorString (code: number): string {
if (code >= 0 && code <= 999) {
return '(Unused)';
} else if (code >= 1016) {
if (code <= 1999) {
return '(For WebSocket standard)';
} else if (code <= 2999) {
return '(For WebSocket extensions)';
} else if (code <= 3999) {
return '(For libraries and frameworks)';
} else if (code <= 4999) {
return '(For applications)';
}
}
return known[code] || '(Unknown)';
}
@@ -0,0 +1,97 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import type { Request } from '../mock/mockWs.js';
import type { Mock } from '../mock/types.js';
import { mockWs } from '../mock/mockWs.js';
import { WsProvider } from './index.js';
const TEST_WS_URL = 'ws://localhost-index.spec.ts:9977';
let provider: WsProvider | null;
let mock: Mock;
function createWs (requests: Request[], autoConnect = 1000, headers?: Record<string, string>, timeout?: number): WsProvider {
mock = mockWs(requests, TEST_WS_URL);
provider = new WsProvider(TEST_WS_URL, autoConnect, headers, timeout);
return provider;
}
describe('Ws', (): void => {
afterEach(async () => {
if (mock) {
await mock.done();
}
if (provider) {
await provider.disconnect();
provider = null;
}
});
it('returns the connected state', (): void => {
expect(
createWs([]).isConnected
).toEqual(false);
});
// eslint-disable-next-line jest/expect-expect
it('allows you to initialize the provider with custom headers', () => {
createWs([], 100, { foo: 'bar' });
});
// eslint-disable-next-line jest/expect-expect
it('allows you to set custom timeout value for handlers', () => {
const CUSTOM_TIMEOUT_S = 90;
const CUSTOM_TIMEOUT_MS = CUSTOM_TIMEOUT_S * 1000;
createWs([], 100, { foo: 'bar' }, CUSTOM_TIMEOUT_MS);
});
});
describe('Endpoint Parsing', (): void => {
// eslint-disable-next-line jest/expect-expect
it('Succeeds when WsProvider endpoint is a valid string', () => {
/* eslint-disable no-new */
new WsProvider(TEST_WS_URL, 0);
});
it('should throw error on negative cache capacity or TTL', () => {
expect(() => new WsProvider(TEST_WS_URL, false, {}, undefined, -5, 30000)).toThrow(/'capacity' must be a non-negative integer/);
expect(() => new WsProvider(TEST_WS_URL, false, {}, undefined, 1024, -1000)).toThrow(/'ttl' must be between 0 and 1800000 ms or null to disable/);
});
it('Throws when WsProvider endpoint is an invalid string', () => {
expect(
() => new WsProvider('http://127.0.0.1:9955', 0)
).toThrow(/^Endpoint should start with /);
});
// eslint-disable-next-line jest/expect-expect
it('Succeeds when WsProvider endpoint is a valid array', () => {
const endpoints: string[] = ['ws://127.0.0.1:9955', 'wss://testnet.io:9944', 'ws://mychain.com:9933'];
/* eslint-disable no-new */
new WsProvider(endpoints, 0);
});
it('Throws when WsProvider endpoint is an empty array', () => {
const endpoints: string[] = [];
expect(
() => new WsProvider(endpoints, 0)
).toThrow('WsProvider requires at least one Endpoint');
});
it('Throws when WsProvider endpoint is an invalid array', () => {
const endpoints: string[] = ['ws://127.0.0.1:9955', 'http://bad.co:9944', 'ws://mychain.com:9933'];
expect(
() => new WsProvider(endpoints, 0)
).toThrow(/^Endpoint should start with /);
});
});
+652
View File
@@ -0,0 +1,652 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import type { Class } from '@pezkuwi/util/types';
import type RpcError from '../coder/error.js';
import type { EndpointStats, JsonRpcResponse, ProviderInterface, ProviderInterfaceCallback, ProviderInterfaceEmitCb, ProviderInterfaceEmitted, ProviderStats } from '../types.js';
import { EventEmitter } from 'eventemitter3';
import { isChildClass, isNull, isUndefined, logger, noop, objectSpread, stringify } from '@pezkuwi/util';
import { xglobal } from '@pezkuwi/x-global';
import { WebSocket } from '@pezkuwi/x-ws';
import { RpcCoder } from '../coder/index.js';
import defaults from '../defaults.js';
import { DEFAULT_CAPACITY, DEFAULT_TTL, LRUCache } from '../lru.js';
import { getWSErrorString } from './errors.js';
interface SubscriptionHandler {
callback: ProviderInterfaceCallback;
type: string;
}
interface WsStateAwaiting {
callback: ProviderInterfaceCallback;
method: string;
params: unknown[];
start: number;
subscription?: SubscriptionHandler | undefined;
}
interface WsStateSubscription extends SubscriptionHandler {
method: string;
params: unknown[];
}
const ALIASES: Record<string, string> = {
chain_finalisedHead: 'chain_finalizedHead',
chain_subscribeFinalisedHeads: 'chain_subscribeFinalizedHeads',
chain_unsubscribeFinalisedHeads: 'chain_unsubscribeFinalizedHeads'
};
const RETRY_DELAY = 2_500;
const DEFAULT_TIMEOUT_MS = 60 * 1000;
const TIMEOUT_INTERVAL = 5_000;
const l = logger('api-ws');
/** @internal Clears a Record<*> of all keys, optionally with all callback on clear */
function eraseRecord<T> (record: Record<string, T>, cb?: (item: T) => void): void {
Object.keys(record).forEach((key): void => {
if (cb) {
cb(record[key]);
}
delete record[key];
});
}
/** @internal Creates a default/empty stats object */
function defaultEndpointStats (): EndpointStats {
return { bytesRecv: 0, bytesSent: 0, cached: 0, errors: 0, requests: 0, subscriptions: 0, timeout: 0 };
}
/**
* # @pezkuwi/rpc-provider/ws
*
* @name WsProvider
*
* @description The WebSocket Provider allows sending requests using WebSocket to a WebSocket RPC server TCP port. Unlike the [[HttpProvider]], it does support subscriptions and allows listening to events such as new blocks or balance changes.
*
* @example
* <BR>
*
* ```javascript
* import Api from '@pezkuwi/api/promise';
* import { WsProvider } from '@pezkuwi/rpc-provider/ws';
*
* const provider = new WsProvider('ws://127.0.0.1:9944');
* const api = new Api(provider);
* ```
*
* @see [[HttpProvider]]
*/
export class WsProvider implements ProviderInterface {
readonly #callCache: LRUCache;
readonly #coder: RpcCoder;
readonly #endpoints: string[];
readonly #headers: Record<string, string>;
readonly #eventemitter: EventEmitter;
readonly #handlers: Record<string, WsStateAwaiting> = {};
readonly #isReadyPromise: Promise<WsProvider>;
readonly #stats: ProviderStats;
readonly #waitingForId: Record<string, JsonRpcResponse<unknown>> = {};
readonly #cacheCapacity: number;
readonly #ttl: number | null | undefined;
#autoConnectMs: number;
#endpointIndex: number;
#endpointStats: EndpointStats;
#isConnected = false;
#subscriptions: Record<string, WsStateSubscription> = {};
#timeoutId?: ReturnType<typeof setInterval> | null = null;
#websocket: WebSocket | null;
#timeout: number;
/**
* @param {string | string[]} endpoint The endpoint url. Usually `ws://ip:9944` or `wss://ip:9944`, may provide an array of endpoint strings.
* @param {number | false} autoConnectMs Whether to connect automatically or not (default). Provided value is used as a delay between retries.
* @param {Record<string, string>} headers The headers provided to the underlying WebSocket
* @param {number} [timeout] Custom timeout value used per request . Defaults to `DEFAULT_TIMEOUT_MS`
* @param {number} [cacheCapacity] Custom size of the WsProvider LRUCache. Defaults to `DEFAULT_CAPACITY` (1024)
* @param {number} [cacheTtl] Custom TTL of the WsProvider LRUCache. Determines how long an object can live in the cache. Defaults to DEFAULT_TTL` (30000)
*/
constructor (endpoint: string | string[] = defaults.WS_URL, autoConnectMs: number | false = RETRY_DELAY, headers: Record<string, string> = {}, timeout?: number, cacheCapacity?: number, cacheTtl?: number | null) {
const endpoints = Array.isArray(endpoint)
? endpoint
: [endpoint];
if (endpoints.length === 0) {
throw new Error('WsProvider requires at least one Endpoint');
}
endpoints.forEach((endpoint) => {
if (!/^(wss|ws):\/\//.test(endpoint)) {
throw new Error(`Endpoint should start with 'ws://', received '${endpoint}'`);
}
});
const ttl = cacheTtl === undefined ? DEFAULT_TTL : cacheTtl;
this.#callCache = new LRUCache(cacheCapacity === 0 ? 0 : cacheCapacity || DEFAULT_CAPACITY, ttl);
this.#ttl = cacheTtl;
this.#cacheCapacity = cacheCapacity || DEFAULT_CAPACITY;
this.#eventemitter = new EventEmitter();
this.#autoConnectMs = autoConnectMs || 0;
this.#coder = new RpcCoder();
this.#endpointIndex = -1;
this.#endpoints = endpoints;
this.#headers = headers;
this.#websocket = null;
this.#stats = {
active: { requests: 0, subscriptions: 0 },
total: defaultEndpointStats()
};
this.#endpointStats = defaultEndpointStats();
this.#timeout = timeout || DEFAULT_TIMEOUT_MS;
if (autoConnectMs && autoConnectMs > 0) {
this.connectWithRetry().catch(noop);
}
this.#isReadyPromise = new Promise((resolve): void => {
this.#eventemitter.once('connected', (): void => {
resolve(this);
});
});
}
/**
* @summary `true` when this provider supports subscriptions
*/
public get hasSubscriptions (): boolean {
return !!true;
}
/**
* @summary `true` when this provider supports clone()
*/
public get isClonable (): boolean {
return !!true;
}
/**
* @summary Whether the node is connected or not.
* @return {boolean} true if connected
*/
public get isConnected (): boolean {
return this.#isConnected;
}
/**
* @description Promise that resolves the first time we are connected and loaded
*/
public get isReady (): Promise<WsProvider> {
return this.#isReadyPromise;
}
public get endpoint (): string {
return this.#endpoints[this.#endpointIndex];
}
/**
* @description Returns a clone of the object
*/
public clone (): WsProvider {
return new WsProvider(this.#endpoints);
}
protected selectEndpointIndex (endpoints: string[]): number {
return (this.#endpointIndex + 1) % endpoints.length;
}
/**
* @summary Manually connect
* @description The [[WsProvider]] connects automatically by default, however if you decided otherwise, you may
* connect manually using this method.
*/
// eslint-disable-next-line @typescript-eslint/require-await
public async connect (): Promise<void> {
if (this.#websocket) {
throw new Error('WebSocket is already connected');
}
try {
this.#endpointIndex = this.selectEndpointIndex(this.#endpoints);
// the as here is Deno-specific - not available on the globalThis
this.#websocket = typeof xglobal.WebSocket !== 'undefined' && isChildClass(xglobal.WebSocket as unknown as Class<WebSocket>, WebSocket)
? new WebSocket(this.endpoint)
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore - WS may be an instance of ws, which supports options
: new WebSocket(this.endpoint, undefined, {
headers: this.#headers
});
if (this.#websocket) {
this.#websocket.onclose = this.#onSocketClose;
this.#websocket.onerror = this.#onSocketError;
this.#websocket.onmessage = this.#onSocketMessage;
this.#websocket.onopen = this.#onSocketOpen;
}
// timeout any handlers that have not had a response
this.#timeoutId = setInterval(() => this.#timeoutHandlers(), TIMEOUT_INTERVAL);
} catch (error) {
l.error(error);
this.#emit('error', error);
throw error;
}
}
/**
* @description Connect, never throwing an error, but rather forcing a retry
*/
public async connectWithRetry (): Promise<void> {
if (this.#autoConnectMs > 0) {
try {
await this.connect();
} catch {
setTimeout((): void => {
this.connectWithRetry().catch(noop);
}, this.#autoConnectMs);
}
}
}
/**
* @description Manually disconnect from the connection, clearing auto-connect logic
*/
// eslint-disable-next-line @typescript-eslint/require-await
public async disconnect (): Promise<void> {
// switch off autoConnect, we are in manual mode now
this.#autoConnectMs = 0;
try {
if (this.#websocket) {
// 1000 - Normal closure; the connection successfully completed
this.#websocket.close(1000);
}
} catch (error) {
l.error(error);
this.#emit('error', error);
throw error;
}
}
/**
* @description Returns the connection stats
*/
public get stats (): ProviderStats {
return {
active: {
requests: Object.keys(this.#handlers).length,
subscriptions: Object.keys(this.#subscriptions).length
},
total: this.#stats.total
};
}
/**
* @description Returns the connection stats
*/
public get ttl (): number | null | undefined {
return this.#ttl;
}
public get endpointStats (): EndpointStats {
return this.#endpointStats;
}
/**
* @summary Listens on events after having subscribed using the [[subscribe]] function.
* @param {ProviderInterfaceEmitted} type Event
* @param {ProviderInterfaceEmitCb} sub Callback
* @return unsubscribe function
*/
public on (type: ProviderInterfaceEmitted, sub: ProviderInterfaceEmitCb): () => void {
this.#eventemitter.on(type, sub);
return (): void => {
this.#eventemitter.removeListener(type, sub);
};
}
/**
* @summary Send JSON data using WebSockets to configured HTTP Endpoint or queue.
* @param method The RPC methods to execute
* @param params Encoded parameters as applicable for the method
* @param subscription Subscription details (internally used)
*/
public send <T = any> (method: string, params: unknown[], isCacheable?: boolean, subscription?: SubscriptionHandler): Promise<T> {
this.#endpointStats.requests++;
this.#stats.total.requests++;
const [id, body] = this.#coder.encodeJson(method, params);
if (this.#cacheCapacity === 0) {
return this.#send(id, body, method, params, subscription);
}
const cacheKey = isCacheable ? `${method}::${stringify(params)}` : '';
let resultPromise: Promise<T> | null = isCacheable
? this.#callCache.get(cacheKey)
: null;
if (!resultPromise) {
resultPromise = this.#send(id, body, method, params, subscription);
if (isCacheable) {
this.#callCache.set(cacheKey, resultPromise);
}
} else {
this.#endpointStats.cached++;
this.#stats.total.cached++;
}
return resultPromise;
}
async #send <T> (id: number, body: string, method: string, params: unknown[], subscription?: SubscriptionHandler): Promise<T> {
return new Promise<T>((resolve, reject): void => {
try {
if (!this.isConnected || this.#websocket === null) {
throw new Error('WebSocket is not connected');
}
const callback = (error?: Error | null, result?: T): void => {
error
? reject(error)
: resolve(result as T);
};
l.debug(() => ['calling', method, body]);
this.#handlers[id] = {
callback,
method,
params,
start: Date.now(),
subscription
};
const bytesSent = body.length;
this.#endpointStats.bytesSent += bytesSent;
this.#stats.total.bytesSent += bytesSent;
this.#websocket.send(body);
} catch (error) {
this.#endpointStats.errors++;
this.#stats.total.errors++;
const rpcError: RpcError = error as RpcError;
const failedRequest = `\nFailed WS Request: ${JSON.stringify({ method, params })}`;
// Provide WS Request alongside the error
rpcError.message = `${rpcError.message}${failedRequest}`;
reject(rpcError);
}
});
}
/**
* @name subscribe
* @summary Allows subscribing to a specific event.
*
* @example
* <BR>
*
* ```javascript
* const provider = new WsProvider('ws://127.0.0.1:9944');
* const rpc = new Rpc(provider);
*
* rpc.state.subscribeStorage([[storage.system.account, <Address>]], (_, values) => {
* console.log(values)
* }).then((subscriptionId) => {
* console.log('balance changes subscription id: ', subscriptionId)
* })
* ```
*/
public subscribe (type: string, method: string, params: unknown[], callback: ProviderInterfaceCallback): Promise<number | string> {
this.#endpointStats.subscriptions++;
this.#stats.total.subscriptions++;
// subscriptions are not cached, LRU applies to .at(<blockHash>) only
return this.send<number | string>(method, params, false, { callback, type });
}
/**
* @summary Allows unsubscribing to subscriptions made with [[subscribe]].
*/
public async unsubscribe (type: string, method: string, id: number | string): Promise<boolean> {
const subscription = `${type}::${id}`;
// FIXME This now could happen with re-subscriptions. The issue is that with a re-sub
// the assigned id now does not match what the API user originally received. It has
// a slight complication in solving - since we cannot rely on the send id, but rather
// need to find the actual subscription id to map it
if (isUndefined(this.#subscriptions[subscription])) {
l.debug(() => `Unable to find active subscription=${subscription}`);
return false;
}
delete this.#subscriptions[subscription];
try {
return this.isConnected && !isNull(this.#websocket)
? this.send<boolean>(method, [id])
: true;
} catch {
return false;
}
}
#emit = (type: ProviderInterfaceEmitted, ...args: unknown[]): void => {
this.#eventemitter.emit(type, ...args);
};
#onSocketClose = (event: CloseEvent): void => {
const error = new Error(`disconnected from ${this.endpoint}: ${event.code}:: ${event.reason || getWSErrorString(event.code)}`);
if (this.#autoConnectMs > 0) {
l.error(error.message);
}
this.#isConnected = false;
if (this.#websocket) {
this.#websocket.onclose = null;
this.#websocket.onerror = null;
this.#websocket.onmessage = null;
this.#websocket.onopen = null;
this.#websocket = null;
}
if (this.#timeoutId) {
clearInterval(this.#timeoutId);
this.#timeoutId = null;
}
// reject all hanging requests
eraseRecord(this.#handlers, (h) => {
try {
h.callback(error, undefined);
} catch (err) {
// does not throw
l.error(err);
}
});
eraseRecord(this.#waitingForId);
// Reset stats for active endpoint
this.#endpointStats = defaultEndpointStats();
this.#emit('disconnected');
if (this.#autoConnectMs > 0) {
setTimeout((): void => {
this.connectWithRetry().catch(noop);
}, this.#autoConnectMs);
}
};
#onSocketError = (error: Event): void => {
l.debug(() => ['socket error', error]);
this.#emit('error', error);
};
#onSocketMessage = (message: MessageEvent<string>): void => {
l.debug(() => ['received', message.data]);
const bytesRecv = message.data.length;
this.#endpointStats.bytesRecv += bytesRecv;
this.#stats.total.bytesRecv += bytesRecv;
const response = JSON.parse(message.data) as JsonRpcResponse<string>;
return isUndefined(response.method)
? this.#onSocketMessageResult(response)
: this.#onSocketMessageSubscribe(response);
};
#onSocketMessageResult = (response: JsonRpcResponse<string>): void => {
const handler = this.#handlers[response.id];
if (!handler) {
l.debug(() => `Unable to find handler for id=${response.id}`);
return;
}
try {
const { method, params, subscription } = handler;
const result = this.#coder.decodeResponse<string>(response);
// first send the result - in case of subs, we may have an update
// immediately if we have some queued results already
handler.callback(null, result);
if (subscription) {
const subId = `${subscription.type}::${result}`;
this.#subscriptions[subId] = objectSpread({}, subscription, {
method,
params
});
// if we have a result waiting for this subscription already
if (this.#waitingForId[subId]) {
this.#onSocketMessageSubscribe(this.#waitingForId[subId]);
}
}
} catch (error) {
this.#endpointStats.errors++;
this.#stats.total.errors++;
handler.callback(error as Error, undefined);
}
delete this.#handlers[response.id];
};
#onSocketMessageSubscribe = (response: JsonRpcResponse<unknown>): void => {
if (!response.method) {
throw new Error('No method found in JSONRPC response');
}
const method = ALIASES[response.method] || response.method;
const subId = `${method}::${response.params.subscription}`;
const handler = this.#subscriptions[subId];
if (!handler) {
// store the JSON, we could have out-of-order subid coming in
this.#waitingForId[subId] = response;
l.debug(() => `Unable to find handler for subscription=${subId}`);
return;
}
// housekeeping
delete this.#waitingForId[subId];
try {
const result = this.#coder.decodeResponse(response);
handler.callback(null, result);
} catch (error) {
this.#endpointStats.errors++;
this.#stats.total.errors++;
handler.callback(error as Error, undefined);
}
};
#onSocketOpen = (): boolean => {
if (this.#websocket === null) {
throw new Error('WebSocket cannot be null in onOpen');
}
l.debug(() => ['connected to', this.endpoint]);
this.#isConnected = true;
this.#resubscribe();
this.#emit('connected');
return true;
};
#resubscribe = (): void => {
const subscriptions = this.#subscriptions;
this.#subscriptions = {};
Promise.all(Object.keys(subscriptions).map(async (id): Promise<void> => {
const { callback, method, params, type } = subscriptions[id];
// only re-create subscriptions which are not in author (only area where
// transactions are created, i.e. submissions such as 'author_submitAndWatchExtrinsic'
// are not included (and will not be re-broadcast)
if (type.startsWith('author_')) {
return;
}
try {
await this.subscribe(type, method, params, callback);
} catch (error) {
l.error(error);
}
})).catch(l.error);
};
#timeoutHandlers = (): void => {
const now = Date.now();
const ids = Object.keys(this.#handlers);
for (let i = 0, count = ids.length; i < count; i++) {
const handler = this.#handlers[ids[i]];
if ((now - handler.start) > this.#timeout) {
try {
handler.callback(new Error(`No response received from RPC endpoint in ${this.#timeout / 1000}s`), undefined);
} catch {
// ignore
}
this.#endpointStats.timeout++;
this.#stats.total.timeout++;
delete this.#handlers[ids[i]];
}
}
};
}
+126
View File
@@ -0,0 +1,126 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import type { Request } from '../mock/mockWs.js';
import type { Global, Mock } from '../mock/types.js';
import { mockWs } from '../mock/mockWs.js';
import { WsProvider } from './index.js';
declare const global: Global;
const TEST_WS_URL = 'ws://localhost-send.spec.ts:9965';
let provider: WsProvider | null;
let mock: Mock;
function createMock (requests: Request[]): void {
mock = mockWs(requests, TEST_WS_URL);
}
function createWs (autoConnect = 1000): Promise<WsProvider> {
provider = new WsProvider(TEST_WS_URL, autoConnect);
return provider.isReady;
}
describe('send', (): void => {
let globalWs: typeof WebSocket;
beforeEach((): void => {
globalWs = global.WebSocket;
});
afterEach(async () => {
global.WebSocket = globalWs;
if (mock) {
await mock.done();
}
if (provider) {
await provider.disconnect();
provider = null;
}
});
it('handles internal errors', (): Promise<any> => {
createMock([{
id: 1,
method: 'test_body',
reply: {
result: 'ok'
}
}]);
return createWs().then((ws) =>
ws
.send('test_encoding', [{ error: 'send error' }])
.catch((error): void => {
// eslint-disable-next-line jest/no-conditional-expect
expect((error as Error).message).toEqual('send error');
})
);
});
it('passes the body through correctly', (): Promise<void> => {
createMock([{
id: 1,
method: 'test_body',
reply: {
result: 'ok'
}
}]);
return createWs().then((ws) =>
ws
.send('test_body', ['param'])
.then((): void => {
expect(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(mock.body as any).test_body
).toEqual('{"id":1,"jsonrpc":"2.0","method":"test_body","params":["param"]}');
})
);
});
it('throws error when !response.ok', (): Promise<any> => {
createMock([{
error: {
code: 666,
message: 'error'
},
id: 1,
method: 'something'
}]);
return createWs().then((ws) =>
ws
.send('test_error', [])
.catch((error): void => {
// eslint-disable-next-line jest/no-conditional-expect
expect((error as Error).message).toMatch(/666: error/);
})
);
});
it('adds subscriptions', (): Promise<void> => {
createMock([{
id: 1,
method: 'test_sub',
reply: {
result: 1
}
}]);
return createWs().then((ws) =>
ws
.send('test_sub', [])
.then((id): void => {
expect(id).toEqual(1);
})
);
});
});
@@ -0,0 +1,20 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import { WsProvider } from './index.js';
describe('state', (): void => {
it('requires an ws:// prefixed endpoint', (): void => {
expect(
() => new WsProvider('http://', 0)
).toThrow(/with 'ws/);
});
it('allows wss:// endpoints', (): void => {
expect(
() => new WsProvider('wss://', 0)
).not.toThrow();
});
});
@@ -0,0 +1,68 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import type { Request } from '../mock/mockWs.js';
import type { Global, Mock } from './../mock/types.js';
import { mockWs } from '../mock/mockWs.js';
import { WsProvider } from './index.js';
declare const global: Global;
const TEST_WS_URL = 'ws://localhost-subscribe.test.ts:9933';
let provider: WsProvider | null;
let mock: Mock;
function createMock (requests: Request[]): void {
mock = mockWs(requests, TEST_WS_URL);
}
function createWs (autoConnect = 1000): Promise<WsProvider> {
provider = new WsProvider(TEST_WS_URL, autoConnect);
return provider.isReady;
}
describe('subscribe', (): void => {
let globalWs: typeof WebSocket;
beforeEach((): void => {
globalWs = global.WebSocket;
});
afterEach(async () => {
global.WebSocket = globalWs;
if (mock) {
await mock.done();
}
if (provider) {
await provider.disconnect();
provider = null;
}
});
it('adds subscriptions', (): Promise<void> => {
createMock([{
id: 1,
method: 'test_sub',
reply: {
result: 1
}
}]);
return createWs().then((ws) =>
ws
.subscribe('type', 'test_sub', [], (cb): void => {
expect(cb).toEqual(expect.anything());
})
.then((id): void => {
expect(id).toEqual(1);
})
);
});
});
@@ -0,0 +1,100 @@
// Copyright 2017-2025 @pezkuwi/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
/// <reference types="@pezkuwi/dev-test/globals.d.ts" />
import type { Request } from '../mock/mockWs.js';
import type { Global, Mock } from './../mock/types.js';
import { mockWs } from '../mock/mockWs.js';
import { WsProvider } from './index.js';
declare const global: Global;
const TEST_WS_URL = 'ws://localhost-unsubscribe.test.ts:9933';
let provider: WsProvider | null;
let mock: Mock;
function createMock (requests: Request[]): void {
mock = mockWs(requests, TEST_WS_URL);
}
function createWs (autoConnect = 1000): Promise<WsProvider> {
provider = new WsProvider(TEST_WS_URL, autoConnect);
return provider.isReady;
}
describe('subscribe', (): void => {
let globalWs: typeof WebSocket;
beforeEach((): void => {
globalWs = global.WebSocket;
});
afterEach(async () => {
global.WebSocket = globalWs;
if (mock) {
await mock.done();
}
if (provider) {
await provider.disconnect();
provider = null;
}
});
it('removes subscriptions', async (): Promise<void> => {
createMock([
{
id: 1,
method: 'subscribe_test',
reply: {
result: 1
}
},
{
id: 2,
method: 'unsubscribe_test',
reply: {
result: true
}
}
]);
await createWs().then((ws) =>
ws
.subscribe('test', 'subscribe_test', [], (cb): void => {
expect(cb).toEqual(expect.anything());
})
.then((id): Promise<boolean> => {
return ws.unsubscribe('test', 'subscribe_test', id);
})
);
});
it('fails when sub not found', (): Promise<void> => {
createMock([{
id: 1,
method: 'subscribe_test',
reply: {
result: 1
}
}]);
return createWs().then((ws) =>
ws
.subscribe('test', 'subscribe_test', [], (cb): void => {
expect(cb).toEqual(expect.anything());
})
.then((): Promise<boolean> => {
return ws.unsubscribe('test', 'subscribe_test', 111);
})
.then((result): void => {
expect(result).toBe(false);
})
);
});
});