Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding polling for new heads. #2160

Merged
merged 5 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"acceptancetest:htsprecompilev1": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@htsprecompilev1' --exit",
"acceptancetest:release": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@release' --exit",
"acceptancetest:ws": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@web-socket' --exit",
"acceptancetest:ws_newheads": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@web-socket-newheads' --exit",
"acceptancetest:precompile-calls": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@precompile-calls' --exit",
"acceptancetest:cache-service": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@cache-service' --exit",
"acceptancetest:rpc_api_schema_conformity": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@api-conformity' --exit",
Expand Down
6 changes: 5 additions & 1 deletion packages/relay/src/lib/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Hedera JSON RPC Relay
*
* Copyright (C) 2023 Hedera Hashgraph, LLC
* Copyright (C) 2023-2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,6 +71,10 @@ export class Poller {
filters?.topics || null,
);

poll.lastPolled = this.latestBlock;
} else if (event === 'newHeads' && process.env.WS_NEW_HEADS_ENABLED === 'true') {
data = await this.eth.getBlockByNumber('latest', true);
data.jsonrpc = '2.0';
poll.lastPolled = this.latestBlock;
} else {
this.logger.error(`${LOGGER_PREFIX} Polling for unsupported event: ${event}. Tag: ${poll.tag}`);
Expand Down
21 changes: 0 additions & 21 deletions packages/server/tests/acceptance/ws/subscribe.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,27 +356,6 @@ describe('@web-socket Acceptance Tests', async function () {
await new Promise((resolve) => setTimeout(resolve, 500));
});

it('Expect Unsupported Method Error message when subscribing for newHeads method', async function () {
const webSocket = new WebSocket(WS_RELAY_URL);
let response = {};
webSocket.on('message', function incoming(data) {
response = JSON.parse(data);
});
webSocket.on('open', function open() {
webSocket.send('{"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"],"id":1}');
});

// wait 500ms to expect the message
await new Promise((resolve) => setTimeout(resolve, 500));

expect(response.error.code).to.eq(predefined.UNSUPPORTED_METHOD.code);
expect(response.error.name).to.eq(predefined.UNSUPPORTED_METHOD.name);
expect(response.error.message).to.eq(predefined.UNSUPPORTED_METHOD.message);

// close the connection
webSocket.close();
});

it('Expect Unsupported Method Error message when subscribing for newPendingTransactions method', async function () {
const webSocket = new WebSocket(WS_RELAY_URL);
let response = {};
Expand Down
153 changes: 153 additions & 0 deletions packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*-
*
* Hedera JSON RPC Relay
*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// external resources
import { solidity } from 'ethereum-waffle';
import chai, { expect } from 'chai';
import WebSocket from 'ws';
chai.use(solidity);

import { ethers } from 'ethers';
import Assertions from '../../helpers/assertions';
import { predefined } from '@hashgraph/json-rpc-relay';
const WS_RELAY_URL = `${process.env.WS_RELAY_URL}`;

const createSubscription = (ws, subscriptionId) => {
return new Promise((resolve, reject) => {
ws.on('message', function incoming(data) {
const response = JSON.parse(data);
if (response.id === subscriptionId && response.result) {
console.log(`Subscription ${subscriptionId} successful with ID: ${response.result}`);
resolve(response.result); // Resolve with the subscription ID
} else if (response.method === 'eth_subscription') {
console.log(`Subscription ${subscriptionId} received block:`, response.params.result);
// You can add more logic here to handle incoming blocks
}
});

ws.on('open', function open() {
ws.send(
JSON.stringify({
id: subscriptionId,
jsonrpc: '2.0',
method: 'eth_subscribe',
params: ['newHeads'],
}),
);
});

ws.on('error', (error) => {
reject(error);
});
});
};

describe('@web-socket Acceptance Tests', async function () {
this.timeout(240 * 1000); // 240 seconds

let server;

let wsProvider;
let originalWsNewHeadsEnabledValue, originalWsSubcriptionLimitValue;

before(async () => {
const { socketServer } = global;
server = socketServer;

// cache original ENV values
originalWsNewHeadsEnabledValue = process.env.WS_NEW_HEADS_ENABLED;
originalWsSubcriptionLimitValue = process.env.WS_SUBSCRIPTION_LIMIT;
});

beforeEach(async () => {
process.env.WS_NEW_HEADS_ENABLED = originalWsNewHeadsEnabledValue;

process.env.WS_SUBSCRIPTION_LIMIT = '10';

wsProvider = await new ethers.WebSocketProvider(WS_RELAY_URL);
await new Promise((resolve) => setTimeout(resolve, 1000));
if (server) expect(server._connections).to.equal(1);
});

afterEach(async () => {
if (wsProvider) {
await wsProvider.destroy();
await new Promise((resolve) => setTimeout(resolve, 1000));
}
if (server) expect(server._connections).to.equal(0);
process.env.WS_SUBSCRIPTION_LIMIT = originalWsSubcriptionLimitValue;
});

describe('Configuration', async function () {
it('Should return unsupported method when WS_NEW_HEADS_ENABLED is set to false', async function () {
const webSocket = new WebSocket(WS_RELAY_URL);
process.env.WS_NEW_HEADS_ENABLED = 'false';
let response = '';
const messagePromise = new Promise((resolve, reject) => {
webSocket.on('message', function incoming(data) {
try {
const response = JSON.parse(data);
expect(response).to.have.property('error');
expect(response.error).to.have.property('code');
expect(response.error.code).to.equal(-32601);
expect(response.error).to.have.property('message');
expect(response.error.message).to.equal('Unsupported JSON-RPC method');
expect(response.error).to.have.property('name');
expect(response.error.name).to.equal('Method not found');
resolve();
} catch (error) {
reject(error);
}
response = data;
});
webSocket.on('open', function open() {
// send the request for newHeads
webSocket.send('{"id":1,"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"]}');
});
webSocket.on('error', (error) => {
reject(error); // Reject the promise on WebSocket error
});
});
await messagePromise;

webSocket.close();
process.env.WS_NEW_HEADS_ENABLED = originalWsNewHeadsEnabledValue;
});

it('Does not allow more subscriptions per connection than the specified limit with newHeads', async function () {
process.env.WS_SUBSCRIPTION_LIMIT = '2';
process.env.WS_NEW_HEADS_ENABLED = 'true';
// Create different subscriptions
for (let i = 0; i < 3; i++) {
if (i === 2) {
const expectedError = predefined.MAX_SUBSCRIPTIONS;
await Assertions.assertPredefinedRpcError(expectedError, wsProvider.send, true, wsProvider, [
'eth_subscribe',
['newHeads'],
]);
} else {
await wsProvider.send('eth_subscribe', ['newHeads']);
}
}

await new Promise((resolve) => setTimeout(resolve, 500));
});
});
});
1 change: 1 addition & 0 deletions packages/server/tests/localAcceptance.env
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ DEBUG_API_ENABLED=true
SEND_RAW_TRANSACTION_SIZE_LIMIT=131072
BATCH_REQUESTS_ENABLED=true
TEST_GAS_PRICE_DEVIATION=0.2
WS_NEW_HEADS_ENABLED=false
2 changes: 2 additions & 0 deletions packages/server/tests/previewnetAcceptance.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ SUBSCRIPTIONS_ENABLED=true
FILTER_API_ENABLED=true
DEBUG_API_ENABLED=true
TEST_GAS_PRICE_DEVIATION=0.2
WS_NEW_HEADS_ENABLED=true

Loading
Loading