From 65a45b943956cf011fb9e0d63eea646e314807af Mon Sep 17 00:00:00 2001 From: ebadiere Date: Wed, 28 Feb 2024 18:44:08 -0700 Subject: [PATCH 1/6] feat: adding polling for new heads. Signed-off-by: ebadiere --- package.json | 3 +- packages/relay/src/lib/poller.ts | 6 +- .../tests/acceptance/ws/subscribe.spec.ts | 21 -- .../acceptance/ws/subscribeNewHeads.spec.ts | 99 +++++++++ packages/server/tests/localAcceptance.env | 1 + .../server/tests/previewnetAcceptance.env | 2 + packages/ws-server/src/webSocketServer.ts | 189 ++++++++++++------ 7 files changed, 234 insertions(+), 87 deletions(-) create mode 100644 packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts diff --git a/package.json b/package.json index 980661b7ca..c3fbb260a9 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,7 @@ "acceptancetest:htsprecompilev1": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@htsprecompilev1' --exit", "acceptancetest:release": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@release' --exit", "acceptancetest:ws": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@web-socket' --exit", + "acceptancetest:ws_newheads": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@web-socket-newheads' --exit", "acceptancetest:precompile-calls": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@precompile-calls' --exit", "acceptancetest:cache-service": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@cache-service' --exit", "acceptancetest:rpc_api_schema_conformity": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@api-conformity' --exit", @@ -72,7 +73,7 @@ "keyv-file": "^0.3.0", "koa-cors": "^0.0.16", "koa-websocket": "^7.0.0", - "lerna": "^8.1.1", + "lerna": "^8.0.2", "pino": "^7.11.0", "pino-pretty": "^7.6.1", "prom-client": "^14.0.1", diff --git a/packages/relay/src/lib/poller.ts b/packages/relay/src/lib/poller.ts index 66be4babc6..aed66446b6 100644 --- a/packages/relay/src/lib/poller.ts +++ b/packages/relay/src/lib/poller.ts @@ -2,7 +2,7 @@ * * Hedera JSON RPC Relay * - * Copyright (C) 2023 Hedera Hashgraph, LLC + * Copyright (C) 2023-2024 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,6 +71,10 @@ export class Poller { filters?.topics || null, ); + poll.lastPolled = this.latestBlock; + } else if (event === 'newHeads' && process.env.WS_NEW_HEADS_ENABLED === 'true') { + data = await this.eth.getBlockByNumber('latest', true); + data.jsonrpc = '2.0'; poll.lastPolled = this.latestBlock; } else { this.logger.error(`${LOGGER_PREFIX} Polling for unsupported event: ${event}. Tag: ${poll.tag}`); diff --git a/packages/server/tests/acceptance/ws/subscribe.spec.ts b/packages/server/tests/acceptance/ws/subscribe.spec.ts index 34b4f87f60..20dcad13bd 100644 --- a/packages/server/tests/acceptance/ws/subscribe.spec.ts +++ b/packages/server/tests/acceptance/ws/subscribe.spec.ts @@ -356,27 +356,6 @@ describe('@web-socket Acceptance Tests', async function () { await new Promise((resolve) => setTimeout(resolve, 500)); }); - it('Expect Unsupported Method Error message when subscribing for newHeads method', async function () { - const webSocket = new WebSocket(WS_RELAY_URL); - let response = {}; - webSocket.on('message', function incoming(data) { - response = JSON.parse(data); - }); - webSocket.on('open', function open() { - webSocket.send('{"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"],"id":1}'); - }); - - // wait 500ms to expect the message - await new Promise((resolve) => setTimeout(resolve, 500)); - - expect(response.error.code).to.eq(predefined.UNSUPPORTED_METHOD.code); - expect(response.error.name).to.eq(predefined.UNSUPPORTED_METHOD.name); - expect(response.error.message).to.eq(predefined.UNSUPPORTED_METHOD.message); - - // close the connection - webSocket.close(); - }); - it('Expect Unsupported Method Error message when subscribing for newPendingTransactions method', async function () { const webSocket = new WebSocket(WS_RELAY_URL); let response = {}; diff --git a/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts b/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts new file mode 100644 index 0000000000..2a9684346c --- /dev/null +++ b/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts @@ -0,0 +1,99 @@ +/*- + * + * Hedera JSON RPC Relay + * + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// external resources +import { solidity } from 'ethereum-waffle'; +import chai, { expect } from 'chai'; +import WebSocket from 'ws'; +chai.use(solidity); + +import { ethers } from 'ethers'; +const WS_RELAY_URL = `${process.env.WS_RELAY_URL}`; + +describe('@web-socket-newheads Acceptance Tests', async function () { + this.timeout(240 * 1000); // 240 seconds + + let server; + + let wsProvider; + let originalWsNewHeadsEnabledValue; + + before(async () => { + const { socketServer } = global; + server = socketServer; + + // cache original ENV values + originalWsNewHeadsEnabledValue = process.env.WS_NEW_HEADS_ENABLED; + }); + + beforeEach(async () => { + // restore original ENV value + process.env.WS_NEW_HEADS_ENABLED = originalWsNewHeadsEnabledValue; + + wsProvider = await new ethers.WebSocketProvider(WS_RELAY_URL); + await new Promise((resolve) => setTimeout(resolve, 1000)); + if (server) expect(server._connections).to.equal(1); + }); + + afterEach(async () => { + if (wsProvider) { + await wsProvider.destroy(); + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + if (server) expect(server._connections).to.equal(0); + }); + + describe('Configuration', async function () { + it('When WS_NEW_HEADS_ENABLED is set to false ', async function () { + const webSocket = new WebSocket(WS_RELAY_URL); + process.env.WS_NEW_HEADS_ENABLED = 'false'; + let response = ''; + const messagePromise = new Promise((resolve, reject) => { + webSocket.on('message', function incoming(data) { + try { + const response = JSON.parse(data); + expect(response).to.have.property('error'); + expect(response.error).to.have.property('code'); + expect(response.error.code).to.equal(-32601); + expect(response.error).to.have.property('message'); + expect(response.error.message).to.equal('Unsupported JSON-RPC method'); + expect(response.error).to.have.property('name'); + expect(response.error.name).to.equal('Method not found'); + resolve(); + } catch (error) { + reject(error); + } + response = data; + }); + webSocket.on('open', function open() { + // send the request for newHeads + webSocket.send('{"id":1,"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"]}'); + }); + webSocket.on('error', (error) => { + reject(error); // Reject the promise on WebSocket error + }); + }); + await messagePromise; + + webSocket.close(); + process.env.WS_NEW_HEADS_ENABLED = originalWsNewHeadsEnabledValue; + }); + }); +}); diff --git a/packages/server/tests/localAcceptance.env b/packages/server/tests/localAcceptance.env index 991b6671bc..1a5c43009e 100644 --- a/packages/server/tests/localAcceptance.env +++ b/packages/server/tests/localAcceptance.env @@ -20,3 +20,4 @@ DEBUG_API_ENABLED=true SEND_RAW_TRANSACTION_SIZE_LIMIT=131072 BATCH_REQUESTS_ENABLED=true TEST_GAS_PRICE_DEVIATION=0.2 +WS_NEW_HEADS_ENABLED=true diff --git a/packages/server/tests/previewnetAcceptance.env b/packages/server/tests/previewnetAcceptance.env index 076772d63e..f58f3db13d 100644 --- a/packages/server/tests/previewnetAcceptance.env +++ b/packages/server/tests/previewnetAcceptance.env @@ -15,3 +15,5 @@ SUBSCRIPTIONS_ENABLED=true FILTER_API_ENABLED=true DEBUG_API_ENABLED=true TEST_GAS_PRICE_DEVIATION=0.2 +WS_NEW_HEADS_ENABLED=true + diff --git a/packages/ws-server/src/webSocketServer.ts b/packages/ws-server/src/webSocketServer.ts index b0e1c3f86c..6405f78f5b 100644 --- a/packages/ws-server/src/webSocketServer.ts +++ b/packages/ws-server/src/webSocketServer.ts @@ -1,8 +1,8 @@ -/*- +/* - * * Hedera JSON RPC Relay * - * Copyright (C) 2023 Hedera Hashgraph, LLC + * Copyright (C) 2023-2024 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,13 +19,12 @@ */ import dotenv from 'dotenv'; import path from 'path'; -dotenv.config({ path: path.resolve(__dirname, '../../../.env') }); import Koa from 'koa'; import jsonResp from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/RpcResponse'; import KoaJsonRpc from '@hashgraph/json-rpc-server/dist/koaJsonRpc'; import websockify from 'koa-websocket'; -import { Relay, RelayImpl, predefined, JsonRpcError } from '@hashgraph/json-rpc-relay'; +import { type Relay, RelayImpl, predefined, JsonRpcError } from '@hashgraph/json-rpc-relay'; import { Registry, Counter } from 'prom-client'; import pino from 'pino'; @@ -34,6 +33,8 @@ import { formatRequestIdMessage } from '@hashgraph/json-rpc-relay/dist/formatter import { EthSubscribeLogsParamsObject } from '@hashgraph/json-rpc-server/dist/validator'; import { v4 as uuid } from 'uuid'; import constants from '@hashgraph/json-rpc-relay/dist/lib/constants'; +import { log } from 'console'; +dotenv.config({ path: path.resolve(__dirname, '../../../.env') }); const mainLogger = pino({ name: 'hedera-json-rpc-relay', @@ -100,7 +101,7 @@ async function validateIsContractOrTokenAddress(address, requestId) { if (!isContractOrToken) { throw new JsonRpcError( predefined.INVALID_PARAMETER( - `filters.address`, + 'filters.address', `${address} is not a valid contract or token type or does not exists`, ), requestId, @@ -175,71 +176,81 @@ app.ws.use(async (ctx) => { methodsCounter.labels(method).inc(); methodsCounterByIp.labels(ctx.request.ip, method).inc(); - if (method === constants.METHODS.ETH_SUBSCRIBE) { - if (limiter.validateSubscriptionLimit(ctx)) { - const event = params[0]; - const filters = params[1]; - let subscriptionId; - - if (event === constants.SUBSCRIBE_EVENTS.LOGS) { - try { - await validateSubscribeEthLogsParams(filters, requestIdPrefix); - } catch (error) { - logger.error( - error, - `${connectionIdPrefix} ${requestIdPrefix} Encountered error on ${ - ctx.websocket.id - }, method: ${method}, params: ${JSON.stringify(params)}`, - ); - response = jsonResp(request.id, error, undefined); - ctx.websocket.send(JSON.stringify(response)); - return; - } + // Early return if subscription limit is exceeded + if (method === constants.METHODS.ETH_SUBSCRIBE && !limiter.validateSubscriptionLimit(ctx)) { + response = jsonResp(request.id, predefined.MAX_SUBSCRIPTIONS, undefined); + ctx.websocket.send(JSON.stringify(response)); + return; + } - if (!getMultipleAddressesEnabled() && Array.isArray(filters.address) && filters.address.length > 1) { - response = jsonResp( - request.id, - predefined.INVALID_PARAMETER('filters.address', 'Only one contract address is allowed'), - undefined, - ); - } else { - subscriptionId = relay.subs()?.subscribe(ctx.websocket, event, filters); + try { + switch (method) { + case constants.METHODS.ETH_SUBSCRIBE: { + const event = params[0]; + const filters = params[1]; + let subscriptionId; + + switch (event) { + case constants.SUBSCRIBE_EVENTS.LOGS: + ({ response, subscriptionId } = await handleEthSubscribeLogs( + filters, + requestIdPrefix, + response, + request, + subscriptionId, + ctx, + event, + )); + break; + + case constants.SUBSCRIBE_EVENTS.NEW_HEADS: + ({ response, subscriptionId } = handleEthSubscribeNewHeads( + response, + subscriptionId, + filters, + request, + ctx, + event, + )); + break; + case constants.SUBSCRIBE_EVENTS.NEW_PENDING_TRANSACTIONS: + response = jsonResp(request.id, predefined.UNSUPPORTED_METHOD, undefined); + break; + + default: + response = jsonResp(request.id, predefined.UNSUPPORTED_METHOD, undefined); } - } else if (event === constants.SUBSCRIBE_EVENTS.NEW_HEADS) { - response = jsonResp(request.id, predefined.UNSUPPORTED_METHOD, undefined); - } else if (event === constants.SUBSCRIBE_EVENTS.NEW_PENDING_TRANSACTIONS) { - response = jsonResp(request.id, predefined.UNSUPPORTED_METHOD, undefined); - } else { - response = jsonResp(request.id, predefined.UNSUPPORTED_METHOD, undefined); - } - limiter.incrementSubs(ctx); - - if (subscriptionId) { - response = jsonResp(request.id, null, subscriptionId); + limiter.incrementSubs(ctx); + response = response ?? (subscriptionId ? jsonResp(request.id, null, subscriptionId) : undefined); + break; } - } else { - response = jsonResp(request.id, predefined.MAX_SUBSCRIPTIONS, undefined); - } - } else if (method === constants.METHODS.ETH_UNSUBSCRIBE) { - const subId = params[0]; - const unsubbedCount = relay.subs()?.unsubscribe(ctx.websocket, subId); - const success = unsubbedCount !== 0; - if (success) { - limiter.decrementSubs(ctx, unsubbedCount); + case constants.METHODS.ETH_UNSUBSCRIBE: { + const subId = params[0]; + const unsubbedCount = relay.subs()?.unsubscribe(ctx.websocket, subId); + limiter.decrementSubs(ctx, unsubbedCount); + response = jsonResp(request.id, null, unsubbedCount !== 0); + break; + } + case constants.METHODS.ETH_CHAIN_ID: + response = jsonResp(request.id, null, CHAIN_ID); + break; + default: + response = jsonResp(request.id, DEFAULT_ERROR, null); } - - response = jsonResp(request.id, null, success); + } catch (error) { + logger.error( + error, + `${connectionIdPrefix} ${requestIdPrefix} Encountered error on ${ + ctx.websocket.id + }, method: ${method}, params: ${JSON.stringify(params)}`, + ); + response = jsonResp(request.id, error, undefined); } - // Clients want to know the chainId after connecting - else if (method === constants.METHODS.ETH_CHAIN_ID) { - response = jsonResp(request.id, null, CHAIN_ID); - } else { - response = jsonResp(request.id, DEFAULT_ERROR, null); + if (response) { + ctx.websocket.send(JSON.stringify(response)); } - - ctx.websocket.send(JSON.stringify(response)); }); if (pingInterval > 0) { @@ -269,7 +280,7 @@ httpApp.use(async (ctx, next) => { */ try { const result = relay.eth().chainId(); - if (result.indexOf('0x12') >= 0) { + if (result.includes('0x12')) { ctx.status = 200; ctx.body = 'OK'; } else { @@ -281,7 +292,7 @@ httpApp.use(async (ctx, next) => { throw e; } } else { - return next(); + return await next(); } }); @@ -298,3 +309,53 @@ process.on('uncaughtException', (err) => { }); export { app, httpApp }; +function handleEthSubscribeNewHeads( + response: any, + subscriptionId: any, + filters: any, + request: any, + ctx: any, + event: any, +) { + if (process.env.WS_NEW_HEADS_ENABLED === 'true') { + ({ response, subscriptionId } = subscribeToNewHeads(filters, response, request, subscriptionId, ctx, event)); + } else { + response = jsonResp(request.id, predefined.UNSUPPORTED_METHOD, undefined); + } + return { response, subscriptionId }; +} + +async function handleEthSubscribeLogs( + filters: any, + requestIdPrefix: string, + response: any, + request: any, + subscriptionId: any, + ctx: any, + event: any, +) { + await validateSubscribeEthLogsParams(filters, requestIdPrefix); + if (!getMultipleAddressesEnabled() && Array.isArray(filters.address) && filters.address.length > 1) { + response = jsonResp( + request.id, + predefined.INVALID_PARAMETER('filters.address', 'Only one contract address is allowed'), + undefined, + ); + } else { + subscriptionId = relay.subs()?.subscribe(ctx.websocket, event, filters); + } + return { response, subscriptionId }; +} + +function subscribeToNewHeads(filters: any, response: any, request: any, subscriptionId: any, ctx: any, event: any) { + if (filters !== undefined) { + response = jsonResp( + request.id, + predefined.INVALID_PARAMETER('filters', 'Filters should be undefined for newHeads event'), + undefined, + ); + } + subscriptionId = relay.subs()?.subscribe(ctx.websocket, event, 'newHeads'); + logger.info(`Subscribed to newHeads, subscriptionId: ${subscriptionId}`); + return { response, subscriptionId }; +} From 326b69a2396616a53d5c60ace963f40f93385254 Mon Sep 17 00:00:00 2001 From: ebadiere Date: Thu, 29 Feb 2024 14:34:58 -0700 Subject: [PATCH 2/6] fix: Test fix. Isolated the WS_SUBSCRIPTION_LIMIT setting for the test. Signed-off-by: ebadiere --- .../tests/acceptance/ws/subscribeNewHeads.spec.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts b/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts index 2a9684346c..cbd147c658 100644 --- a/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts +++ b/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts @@ -27,13 +27,13 @@ chai.use(solidity); import { ethers } from 'ethers'; const WS_RELAY_URL = `${process.env.WS_RELAY_URL}`; -describe('@web-socket-newheads Acceptance Tests', async function () { +describe('@web-socket Acceptance Tests', async function () { this.timeout(240 * 1000); // 240 seconds let server; let wsProvider; - let originalWsNewHeadsEnabledValue; + let originalWsNewHeadsEnabledValue, originalWsSubcriptionLimitValue; before(async () => { const { socketServer } = global; @@ -41,12 +41,14 @@ describe('@web-socket-newheads Acceptance Tests', async function () { // cache original ENV values originalWsNewHeadsEnabledValue = process.env.WS_NEW_HEADS_ENABLED; + originalWsSubcriptionLimitValue = process.env.WS_SUBSCRIPTION_LIMIT; }); beforeEach(async () => { - // restore original ENV value process.env.WS_NEW_HEADS_ENABLED = originalWsNewHeadsEnabledValue; + process.env.WS_SUBSCRIPTION_LIMIT = '10'; + wsProvider = await new ethers.WebSocketProvider(WS_RELAY_URL); await new Promise((resolve) => setTimeout(resolve, 1000)); if (server) expect(server._connections).to.equal(1); @@ -58,10 +60,11 @@ describe('@web-socket-newheads Acceptance Tests', async function () { await new Promise((resolve) => setTimeout(resolve, 1000)); } if (server) expect(server._connections).to.equal(0); + process.env.WS_SUBSCRIPTION_LIMIT = originalWsSubcriptionLimitValue; }); describe('Configuration', async function () { - it('When WS_NEW_HEADS_ENABLED is set to false ', async function () { + it('When WS_NEW_HEADS_ENABLED is set to false it should return unsupported method', async function () { const webSocket = new WebSocket(WS_RELAY_URL); process.env.WS_NEW_HEADS_ENABLED = 'false'; let response = ''; From c27648847fb29581e1b2c6a38fa7e520bc8f2eb4 Mon Sep 17 00:00:00 2001 From: ebadiere Date: Fri, 1 Mar 2024 08:34:58 -0700 Subject: [PATCH 3/6] fix: Fixed lerna version and set WS_NEW_HEADS_ENABLED for localAccpentance to false. Signed-off-by: ebadiere --- package.json | 2 +- packages/server/tests/localAcceptance.env | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index c3fbb260a9..a1d6cded5b 100644 --- a/package.json +++ b/package.json @@ -73,7 +73,7 @@ "keyv-file": "^0.3.0", "koa-cors": "^0.0.16", "koa-websocket": "^7.0.0", - "lerna": "^8.0.2", + "lerna": "^8.1.1", "pino": "^7.11.0", "pino-pretty": "^7.6.1", "prom-client": "^14.0.1", diff --git a/packages/server/tests/localAcceptance.env b/packages/server/tests/localAcceptance.env index 1a5c43009e..a981b54647 100644 --- a/packages/server/tests/localAcceptance.env +++ b/packages/server/tests/localAcceptance.env @@ -20,4 +20,4 @@ DEBUG_API_ENABLED=true SEND_RAW_TRANSACTION_SIZE_LIMIT=131072 BATCH_REQUESTS_ENABLED=true TEST_GAS_PRICE_DEVIATION=0.2 -WS_NEW_HEADS_ENABLED=true +WS_NEW_HEADS_ENABLED=false From f404f2484eaefac258c4ff3960c3384a31d94d3d Mon Sep 17 00:00:00 2001 From: ebadiere Date: Fri, 1 Mar 2024 11:29:16 -0700 Subject: [PATCH 4/6] feat: Added test for number of subscriptions. Signed-off-by: ebadiere --- .../acceptance/ws/subscribeNewHeads.spec.ts | 52 ++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts b/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts index cbd147c658..5ba27107fe 100644 --- a/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts +++ b/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts @@ -25,8 +25,40 @@ import WebSocket from 'ws'; chai.use(solidity); import { ethers } from 'ethers'; +import Assertions from '../../helpers/assertions'; +import { predefined } from '@hashgraph/json-rpc-relay'; const WS_RELAY_URL = `${process.env.WS_RELAY_URL}`; +const createSubscription = (ws, subscriptionId) => { + return new Promise((resolve, reject) => { + ws.on('message', function incoming(data) { + const response = JSON.parse(data); + if (response.id === subscriptionId && response.result) { + console.log(`Subscription ${subscriptionId} successful with ID: ${response.result}`); + resolve(response.result); // Resolve with the subscription ID + } else if (response.method === 'eth_subscription') { + console.log(`Subscription ${subscriptionId} received block:`, response.params.result); + // You can add more logic here to handle incoming blocks + } + }); + + ws.on('open', function open() { + ws.send( + JSON.stringify({ + id: subscriptionId, + jsonrpc: '2.0', + method: 'eth_subscribe', + params: ['newHeads'], + }), + ); + }); + + ws.on('error', (error) => { + reject(error); + }); + }); +}; + describe('@web-socket Acceptance Tests', async function () { this.timeout(240 * 1000); // 240 seconds @@ -64,7 +96,7 @@ describe('@web-socket Acceptance Tests', async function () { }); describe('Configuration', async function () { - it('When WS_NEW_HEADS_ENABLED is set to false it should return unsupported method', async function () { + it('Should return unsupported method when WS_NEW_HEADS_ENABLED is set to false', async function () { const webSocket = new WebSocket(WS_RELAY_URL); process.env.WS_NEW_HEADS_ENABLED = 'false'; let response = ''; @@ -98,5 +130,23 @@ describe('@web-socket Acceptance Tests', async function () { webSocket.close(); process.env.WS_NEW_HEADS_ENABLED = originalWsNewHeadsEnabledValue; }); + + it('Does not allow more subscriptions per connection than the specified limit with newHeads', async function () { + process.env.WS_SUBSCRIPTION_LIMIT = '2'; + // Create different subscriptions + for (let i = 0; i < 3; i++) { + if (i === 2) { + const expectedError = predefined.MAX_SUBSCRIPTIONS; + await Assertions.assertPredefinedRpcError(expectedError, wsProvider.send, true, wsProvider, [ + 'eth_subscribe', + ['newHeads'], + ]); + } else { + await wsProvider.send('eth_subscribe', ['newHeads']); + } + } + + await new Promise((resolve) => setTimeout(resolve, 500)); + }); }); }); From ae12d3ab875a2c86e3614cad0021710414e217dd Mon Sep 17 00:00:00 2001 From: ebadiere Date: Mon, 4 Mar 2024 12:54:37 -0700 Subject: [PATCH 5/6] feat: Added websocket test for block, fixed some json and the documentation. Signed-off-by: ebadiere --- docs/design/eth-subscribe.md | 3 +- .../relay/src/lib/subscriptionController.ts | 1 + .../acceptance/ws/subscribeNewHeads.spec.ts | 99 +++++++++++++------ 3 files changed, 72 insertions(+), 31 deletions(-) diff --git a/docs/design/eth-subscribe.md b/docs/design/eth-subscribe.md index d80f6bfbfd..719c03fc73 100644 --- a/docs/design/eth-subscribe.md +++ b/docs/design/eth-subscribe.md @@ -286,12 +286,13 @@ Add acceptance tests that follow the structure of the existing polling for logs "extraData": "0xd983010305844765746887676f312e342e328777696e646f7773", "gasLimit": "0x47e7c4", "gasUsed": "0x38658", + "hash":"0x50d2fe6747f21334213a8bc2691f02b6338f265e9f39f12c1f98f247e18f33aa", "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "miner": "0xf8b483dba2c3b7176a3da549ad41a48bb3121069", "nonce": "0x084149998194cc5f", "number": "0x1348c9", "parentHash": "0x7736fab79e05dc611604d22470dadad26f56fe494421b5b333de816ce1f25701", - "receiptRoot": "0x2fab35823ad00c7bb388595cb46652fe7886e00660a01e867824d3dceb1c8d36", + "receiptsRoot": "0x2fab35823ad00c7bb388595cb46652fe7886e00660a01e867824d3dceb1c8d36", "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", "stateRoot": "0xb3346685172db67de536d8765c43c31009d0eb3bd9c501c9be3229203f15f378", "timestamp": "0x56ffeff8", diff --git a/packages/relay/src/lib/subscriptionController.ts b/packages/relay/src/lib/subscriptionController.ts index c859c1121c..12a66c4225 100644 --- a/packages/relay/src/lib/subscriptionController.ts +++ b/packages/relay/src/lib/subscriptionController.ts @@ -171,6 +171,7 @@ export class SubscriptionController { this.resultsSentToSubscribersCounter.labels('sub.subscriptionId', tag).inc(); sub.connection.send( JSON.stringify({ + jsonrpc: '2.0', method: 'eth_subscription', params: subscriptionData, }), diff --git a/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts b/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts index 5ba27107fe..d1479e942d 100644 --- a/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts +++ b/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts @@ -29,36 +29,6 @@ import Assertions from '../../helpers/assertions'; import { predefined } from '@hashgraph/json-rpc-relay'; const WS_RELAY_URL = `${process.env.WS_RELAY_URL}`; -const createSubscription = (ws, subscriptionId) => { - return new Promise((resolve, reject) => { - ws.on('message', function incoming(data) { - const response = JSON.parse(data); - if (response.id === subscriptionId && response.result) { - console.log(`Subscription ${subscriptionId} successful with ID: ${response.result}`); - resolve(response.result); // Resolve with the subscription ID - } else if (response.method === 'eth_subscription') { - console.log(`Subscription ${subscriptionId} received block:`, response.params.result); - // You can add more logic here to handle incoming blocks - } - }); - - ws.on('open', function open() { - ws.send( - JSON.stringify({ - id: subscriptionId, - jsonrpc: '2.0', - method: 'eth_subscribe', - params: ['newHeads'], - }), - ); - }); - - ws.on('error', (error) => { - reject(error); - }); - }); -}; - describe('@web-socket Acceptance Tests', async function () { this.timeout(240 * 1000); // 240 seconds @@ -149,4 +119,73 @@ describe('@web-socket Acceptance Tests', async function () { await new Promise((resolve) => setTimeout(resolve, 500)); }); }); + + describe('Subscriptions for newHeads', async function () { + it('should subscribe to newHeads and receive a valid JSON RPC response', (done) => { + const webSocket = new WebSocket(WS_RELAY_URL); + const subscriptionId = 1; + webSocket.on('open', function open() { + webSocket.send( + JSON.stringify({ + id: subscriptionId, + jsonrpc: '2.0', + method: 'eth_subscribe', + params: ['newHeads'], + }), + ); + }); + + let responseCounter = 0; + webSocket.on('message', function incoming(data) { + const response = JSON.parse(data); + console.log(`responseCounter: ${responseCounter}, response: ${JSON.stringify(response)}`); + responseCounter++; + try { + // Validate the structure of the JSON RPC response + // first response is the subscription ID + if (responseCounter === 1) { + expect(response).to.have.property('jsonrpc', '2.0'); + expect(response).to.have.property('id', 1); + expect(response).to.have.property('result'); // Subscription ID + // Ensure the result is a non-empty string (a valid subscription ID) + expect(response.result).to.be.a('string').that.is.not.empty; + } else { + expect(response).to.have.property('jsonrpc', '2.0'); + expect(response).to.have.property('method', 'eth_subscription'); + expect(response).to.have.property('params'); + expect(response.params).to.have.property('result'); + expect(response.params.result).to.have.property('difficulty'); + expect(response.params.result).to.have.property('extraData'); + expect(response.params.result).to.have.property('gasLimit'); + expect(response.params.result).to.have.property('gasUsed'); + expect(response.params.result).to.have.property('logsBloom'); + expect(response.params.result).to.have.property('miner'); + expect(response.params.result).to.have.property('nonce'); + expect(response.params.result).to.have.property('number'); + expect(response.params.result).to.have.property('parentHash'); + expect(response.params.result).to.have.property('receiptsRoot'); + expect(response.params.result).to.have.property('sha3Uncles'); + expect(response.params.result).to.have.property('stateRoot'); + expect(response.params.result).to.have.property('timestamp'); + expect(response.params.result).to.have.property('transactionsRoot'); + expect(response.params.result).to.have.property('hash'); + expect(response.params).to.have.property('subscription'); + } + + if (responseCounter > 1) { + // Test is done + webSocket.close(); + done(); + } + } catch (error) { + webSocket.close(); + done(error); + } + }); + + webSocket.on('error', (error) => { + done(error); + }); + }); + }); }); From 026f07995a19f99642bb7ee0017aa539ddf0111b Mon Sep 17 00:00:00 2001 From: ebadiere Date: Thu, 7 Mar 2024 12:30:55 -0700 Subject: [PATCH 6/6] fix: Test fix, set WS_NEW_HEADS_ENABLED to true. Signed-off-by: ebadiere feat: Added support for "includeTransactions": true Signed-off-by: ebadiere feat: Tes clean up. Signed-off-by: ebadiere --- packages/relay/src/lib/poller.ts | 3 +- .../tests/acceptance/ws/subscribe.spec.ts | 3 +- .../acceptance/ws/subscribeNewHeads.spec.ts | 229 ++++++++++++++---- packages/ws-server/src/webSocketServer.ts | 18 +- 4 files changed, 193 insertions(+), 60 deletions(-) diff --git a/packages/relay/src/lib/poller.ts b/packages/relay/src/lib/poller.ts index aed66446b6..f393cb835d 100644 --- a/packages/relay/src/lib/poller.ts +++ b/packages/relay/src/lib/poller.ts @@ -73,7 +73,8 @@ export class Poller { poll.lastPolled = this.latestBlock; } else if (event === 'newHeads' && process.env.WS_NEW_HEADS_ENABLED === 'true') { - data = await this.eth.getBlockByNumber('latest', true); + data = await this.eth.getBlockByNumber('latest', filters?.includeTransactions ?? false); + // data = await this.eth.getBlockByNumber('latest', true); data.jsonrpc = '2.0'; poll.lastPolled = this.latestBlock; } else { diff --git a/packages/server/tests/acceptance/ws/subscribe.spec.ts b/packages/server/tests/acceptance/ws/subscribe.spec.ts index 20dcad13bd..d1693ae98d 100644 --- a/packages/server/tests/acceptance/ws/subscribe.spec.ts +++ b/packages/server/tests/acceptance/ws/subscribe.spec.ts @@ -2,7 +2,7 @@ * * Hedera JSON RPC Relay * - * Copyright (C) 2023 Hedera Hashgraph, LLC + * Copyright (C) 2023-2024 Hedera Hashgraph, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,6 @@ import assertions from '../../helpers/assertions'; import { AliasAccount } from '../../clients/servicesClient'; import { predefined, WebSocketError } from '../../../../../packages/relay'; import { ethers } from 'ethers'; -import constants from '@hashgraph/json-rpc-relay/dist/lib/constants'; import Assertions from '../../helpers/assertions'; import LogContractJson from '../../contracts/Logs.json'; import Constants from '../../helpers/constants'; diff --git a/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts b/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts index d1479e942d..992d995832 100644 --- a/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts +++ b/packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts @@ -27,19 +27,130 @@ chai.use(solidity); import { ethers } from 'ethers'; import Assertions from '../../helpers/assertions'; import { predefined } from '@hashgraph/json-rpc-relay'; +import { AliasAccount } from '../../clients/servicesClient'; +import { numberTo0x } from '../../../../../packages/relay/src/formatters'; +import RelayCall from '../../../tests/helpers/constants'; +import { Utils } from '../../helpers/utils'; +import e from 'express'; const WS_RELAY_URL = `${process.env.WS_RELAY_URL}`; +const ethAddressRegex = /^0x[a-fA-F0-9]*$/; + +async function sendTransaction( + ONE_TINYBAR: any, + CHAIN_ID: string | number, + accounts: AliasAccount[], + rpcServer: any, + requestId: any, + mirrorNodeServer: any, +) { + const transaction = { + value: ONE_TINYBAR, + gasLimit: numberTo0x(30000), + chainId: Number(CHAIN_ID), + to: accounts[1].address, + nonce: await rpcServer.getAccountNonce(accounts[0].address, requestId), + maxFeePerGas: await rpcServer.gasPrice(requestId), + }; + + const signedTx = await accounts[0].wallet.signTransaction(transaction); + const transactionHash = await rpcServer.sendRawTransaction(signedTx, requestId); + + await mirrorNodeServer.get(`/contracts/results/${transactionHash}`, requestId); + + return await rpcServer.call(RelayCall.ETH_ENDPOINTS.ETH_GET_TRANSACTION_RECEIPT, [transactionHash], requestId); +} + +function verifyResponse(response: any, done: Mocha.Done, webSocket: any, includeTransactions: boolean) { + if (response?.params?.result?.transactions?.length > 0) { + try { + expect(response).to.have.property('jsonrpc', '2.0'); + expect(response).to.have.property('method', 'eth_subscription'); + expect(response).to.have.property('params'); + expect(response.params).to.have.property('result'); + expect(response.params.result).to.have.property('difficulty'); + expect(response.params.result).to.have.property('extraData'); + expect(response.params.result).to.have.property('gasLimit'); + expect(response.params.result).to.have.property('gasUsed'); + expect(response.params.result).to.have.property('logsBloom'); + expect(response.params.result).to.have.property('miner'); + expect(response.params.result).to.have.property('nonce'); + expect(response.params.result).to.have.property('number'); + expect(response.params.result).to.have.property('parentHash'); + expect(response.params.result).to.have.property('receiptsRoot'); + expect(response.params.result).to.have.property('sha3Uncles'); + expect(response.params.result).to.have.property('stateRoot'); + expect(response.params.result).to.have.property('timestamp'); + expect(response.params.result).to.have.property('transactionsRoot'); + expect(response.params.result).to.have.property('hash'); + expect(response.params).to.have.property('subscription'); + + if (includeTransactions) { + expect(response.params.result).to.have.property('transactions'); + expect(response.params.result.transactions).to.have.lengthOf(1); + expect(response.params.result.transactions[0]).to.have.property('hash'); + expect(response.params.result.transactions[0]).to.have.property('nonce'); + expect(response.params.result.transactions[0]).to.have.property('blockHash'); + expect(response.params.result.transactions[0]).to.have.property('blockNumber'); + expect(response.params.result.transactions[0]).to.have.property('transactionIndex'); + expect(response.params.result.transactions[0]).to.have.property('from'); + expect(response.params.result.transactions[0]).to.have.property('to'); + expect(response.params.result.transactions[0]).to.have.property('value'); + expect(response.params.result.transactions[0]).to.have.property('gas'); + expect(response.params.result.transactions[0]).to.have.property('gasPrice'); + expect(response.params.result.transactions[0]).to.have.property('input'); + expect(response.params.result.transactions[0]).to.have.property('v'); + expect(response.params.result.transactions[0]).to.have.property('r'); + expect(response.params.result.transactions[0]).to.have.property('s'); + expect(response.params.result.transactions[0]).to.have.property('type'); + expect(response.params.result.transactions[0]).to.have.property('maxFeePerGas'); + expect(response.params.result.transactions[0]).to.have.property('maxPriorityFeePerGas'); + expect(response.params.result.transactions[0]).to.have.property('chainId'); + } else { + expect(response.params.result).to.have.property('transactions'); + expect(response.params.result.transactions).to.have.lengthOf(1); + expect(response.params.result.transactions[0]).to.match(ethAddressRegex); + } + done(); + } catch (error) { + webSocket.close(); + done(error); + } + } +} + describe('@web-socket Acceptance Tests', async function () { this.timeout(240 * 1000); // 240 seconds + const accounts: AliasAccount[] = []; + const CHAIN_ID = process.env.CHAIN_ID || 0; + const ONE_TINYBAR = Utils.add0xPrefix(Utils.toHex(ethers.parseUnits('1', 10))); + + const defaultGasPrice = numberTo0x(Assertions.defaultGasPrice); + const defaultGasLimit = numberTo0x(3_000_000); + + const defaultTransaction = { + value: ONE_TINYBAR, + chainId: Number(CHAIN_ID), + maxPriorityFeePerGas: defaultGasPrice, + maxFeePerGas: defaultGasPrice, + gasLimit: defaultGasLimit, + type: 2, + }; - let server; + let mirrorNodeServer, requestId, rpcServer, wsServer; let wsProvider; let originalWsNewHeadsEnabledValue, originalWsSubcriptionLimitValue; before(async () => { - const { socketServer } = global; - server = socketServer; + // @ts-ignore + const { servicesNode, socketServer, mirrorNode, relay, logger } = global; + mirrorNodeServer = mirrorNode; + rpcServer = relay; + wsServer = socketServer; + + accounts[0] = await servicesNode.createAliasAccount(100, relay.provider, requestId); + accounts[1] = await servicesNode.createAliasAccount(5, relay.provider, requestId); // cache original ENV values originalWsNewHeadsEnabledValue = process.env.WS_NEW_HEADS_ENABLED; @@ -53,7 +164,6 @@ describe('@web-socket Acceptance Tests', async function () { wsProvider = await new ethers.WebSocketProvider(WS_RELAY_URL); await new Promise((resolve) => setTimeout(resolve, 1000)); - if (server) expect(server._connections).to.equal(1); }); afterEach(async () => { @@ -61,7 +171,6 @@ describe('@web-socket Acceptance Tests', async function () { await wsProvider.destroy(); await new Promise((resolve) => setTimeout(resolve, 1000)); } - if (server) expect(server._connections).to.equal(0); process.env.WS_SUBSCRIPTION_LIMIT = originalWsSubcriptionLimitValue; }); @@ -103,6 +212,7 @@ describe('@web-socket Acceptance Tests', async function () { it('Does not allow more subscriptions per connection than the specified limit with newHeads', async function () { process.env.WS_SUBSCRIPTION_LIMIT = '2'; + process.env.WS_NEW_HEADS_ENABLED = 'true'; // Create different subscriptions for (let i = 0; i < 3; i++) { if (i === 2) { @@ -121,7 +231,8 @@ describe('@web-socket Acceptance Tests', async function () { }); describe('Subscriptions for newHeads', async function () { - it('should subscribe to newHeads and receive a valid JSON RPC response', (done) => { + it('should subscribe to newHeads, include transactions true, and receive a valid JSON RPC response', (done) => { + process.env.WS_NEW_HEADS_ENABLED = 'true'; const webSocket = new WebSocket(WS_RELAY_URL); const subscriptionId = 1; webSocket.on('open', function open() { @@ -130,61 +241,83 @@ describe('@web-socket Acceptance Tests', async function () { id: subscriptionId, jsonrpc: '2.0', method: 'eth_subscribe', - params: ['newHeads'], + params: ['newHeads', { includeTransactions: true }], }), ); }); let responseCounter = 0; + + sendTransaction(ONE_TINYBAR, CHAIN_ID, accounts, rpcServer, requestId, mirrorNodeServer); webSocket.on('message', function incoming(data) { const response = JSON.parse(data); - console.log(`responseCounter: ${responseCounter}, response: ${JSON.stringify(response)}`); + responseCounter++; - try { - // Validate the structure of the JSON RPC response - // first response is the subscription ID - if (responseCounter === 1) { - expect(response).to.have.property('jsonrpc', '2.0'); - expect(response).to.have.property('id', 1); - expect(response).to.have.property('result'); // Subscription ID - // Ensure the result is a non-empty string (a valid subscription ID) - expect(response.result).to.be.a('string').that.is.not.empty; - } else { - expect(response).to.have.property('jsonrpc', '2.0'); - expect(response).to.have.property('method', 'eth_subscription'); - expect(response).to.have.property('params'); - expect(response.params).to.have.property('result'); - expect(response.params.result).to.have.property('difficulty'); - expect(response.params.result).to.have.property('extraData'); - expect(response.params.result).to.have.property('gasLimit'); - expect(response.params.result).to.have.property('gasUsed'); - expect(response.params.result).to.have.property('logsBloom'); - expect(response.params.result).to.have.property('miner'); - expect(response.params.result).to.have.property('nonce'); - expect(response.params.result).to.have.property('number'); - expect(response.params.result).to.have.property('parentHash'); - expect(response.params.result).to.have.property('receiptsRoot'); - expect(response.params.result).to.have.property('sha3Uncles'); - expect(response.params.result).to.have.property('stateRoot'); - expect(response.params.result).to.have.property('timestamp'); - expect(response.params.result).to.have.property('transactionsRoot'); - expect(response.params.result).to.have.property('hash'); - expect(response.params).to.have.property('subscription'); - } + verifyResponse(response, done, webSocket, true); + if (responseCounter > 1) { + webSocket.close(); + done(); + } + }); + }); - if (responseCounter > 1) { - // Test is done - webSocket.close(); - done(); - } - } catch (error) { + it('should subscribe to newHeads, without the "include transactions", and receive a valid JSON RPC response', (done) => { + process.env.WS_NEW_HEADS_ENABLED = 'true'; + const webSocket = new WebSocket(WS_RELAY_URL); + const subscriptionId = 1; + webSocket.on('open', function open() { + webSocket.send( + JSON.stringify({ + id: subscriptionId, + jsonrpc: '2.0', + method: 'eth_subscribe', + params: ['newHeads'], + }), + ); + }); + + let responseCounter = 0; + + sendTransaction(ONE_TINYBAR, CHAIN_ID, accounts, rpcServer, requestId, mirrorNodeServer); + webSocket.on('message', function incoming(data) { + const response = JSON.parse(data); + + responseCounter++; + verifyResponse(response, done, webSocket, false); + if (responseCounter > 1) { webSocket.close(); - done(error); + done(); } }); + }); - webSocket.on('error', (error) => { - done(error); + it('should subscribe to newHeads, with "include transactions false", and receive a valid JSON RPC response', (done) => { + process.env.WS_NEW_HEADS_ENABLED = 'true'; + const webSocket = new WebSocket(WS_RELAY_URL); + const subscriptionId = 1; + webSocket.on('open', function open() { + webSocket.send( + JSON.stringify({ + id: subscriptionId, + jsonrpc: '2.0', + method: 'eth_subscribe', + params: ['newHeads', { includeTransactions: false }], + }), + ); + }); + + let responseCounter = 0; + + sendTransaction(ONE_TINYBAR, CHAIN_ID, accounts, rpcServer, requestId, mirrorNodeServer); + webSocket.on('message', function incoming(data) { + const response = JSON.parse(data); + + responseCounter++; + verifyResponse(response, done, webSocket, false); + if (responseCounter > 1) { + webSocket.close(); + done(); + } }); }); }); diff --git a/packages/ws-server/src/webSocketServer.ts b/packages/ws-server/src/webSocketServer.ts index 6405f78f5b..da75cbb2c0 100644 --- a/packages/ws-server/src/webSocketServer.ts +++ b/packages/ws-server/src/webSocketServer.ts @@ -347,15 +347,15 @@ async function handleEthSubscribeLogs( return { response, subscriptionId }; } -function subscribeToNewHeads(filters: any, response: any, request: any, subscriptionId: any, ctx: any, event: any) { - if (filters !== undefined) { - response = jsonResp( - request.id, - predefined.INVALID_PARAMETER('filters', 'Filters should be undefined for newHeads event'), - undefined, - ); - } - subscriptionId = relay.subs()?.subscribe(ctx.websocket, event, 'newHeads'); +function subscribeToNewHeads( + filters: any, + response: any, + request: any, + subscriptionId: any, + ctx: any, + event: any, +): { response: any; subscriptionId: any } { + subscriptionId = relay.subs()?.subscribe(ctx.websocket, event, filters); logger.info(`Subscribed to newHeads, subscriptionId: ${subscriptionId}`); return { response, subscriptionId }; }