newTxs and pendingTxs
Name: newTxs , pendingTxs
Options
Key
Description
Values
include
Fields to include in the transaction stream. The subscription plan determines the list of available fields.
tx_hash,tx_contents[Default: all]
duplicates
Whether or not to include transactions
already published in the feed.
True,False[Default]
include_from_blockchain
Whether or not to include transactions received first from the connected blockchain node. (Mainly used for testing)
True[Default],False
filters
You can specify filters in SQL-Like format to only receive certain transactions.
Users can customize the filters.
blockchain_network
Blockchain network name. Use with Cloud-API when working with BSC
Mainnet [Default, Ethereum Mainnet], BSC-Mainnet 
The BDN supports subscribing to two transaction streams:
newTxsis a stream of all new transactions as they are propagated in the BDN.pendingTxsis a stream of all new transactions as they enter the Ethereum/BSC transaction pool.
For expedience, all transactions received through the BDN are immediately published to the newTxs feed. By design, the Gateway/Cloud-API do not perform the same detail of transaction validation that the Ethereum nodes do, and cannot completely guarantee that all transactions propagated are valid (e.g. the Gateway/Cloud-API do not check for double spends). Therefore, these transactions have had basic validations done (e.g. checksums and other sanity checks) but may not be accepted into the TxPool.
The Gateway/Cloud-API can then leverage the Ethereum nodes for further validation of the transaction (e.g. check that it will be accepted into the TxPool), and publish results to the pendingTxs feed. Users planning to use pendingTxs with a Gateway feed should enable validation against their local Ethereum node.
It is expected that newTxs stream will perform faster than pendingTxs. The performance difference can be significant (10-100ms). Users interested in timely transaction information could find newTxs stream more appealing, while those who rely on strict correctness of the transactions data should utilize the pendingTxs stream.
The newTxs stream can send transactions that have been (a) previously confirmed hours or days prior, or (b) replaced by a higher priced transaction with the same nonce. It is recommended that users with latency sensitive applications track the latest nonce for each account and use it to filter out stale messages.
The fields allowed in the include section, which is currently available via Websocket only, depends on the user's subscription plan:
Plan
Available Fields
Introductory
tx_hash
Professional and above
tx_hash, tx_contents.chain_id, tx_contents.input, tx_contents.v, tx_contents.r, tx_contents.s, tx_contents.type, tx_contents.to, tx_contents.value, tx_contents.nonce, tx_contents.gas, tx_contents.gas_price , tx_contents.max_priority_fee_per_gas,tx_contents.max_fee_per_gas, tx_contents.max_fee_per_blob_gas, tx_contents.blob_versioned_hashes,tx_contents.yParity, local_region ,raw_tx
Deprecated:
tx_contents.from : requires --tx-include-sender-in-feed if you are running Local Gateway. See Startup Arguments.
The transaction feed will publish the data for each transaction in a separate message. Users that are only interested in a subset of transactions can utilize the feed's filtering options.
The transaction feeds are available via both Websocket and gRPC.
Examples - Websocket
Requests (Cloud-API) 
Notes:
Follow the examples below based on your subscription plan. We suggest you to use "try...catch" to better handle exceptions caused by potential disconnections. The examples below contain the minimum code required for all transactions stream subscriptions.
See Cloud-API IPs to work directly with IP (
wss://<IP>/ws)Enterprise plan users can choose to work directly with
wss://<region>.<network name>.blxrbdn.com/ws(e.g.wss://virginia.eth.blxrbdn.com/wsfor ETH)for the best performance.
Non-Enterprise plan users should use
wss://api.blxrbdn.com/ws.
## ETH Example
wscat -c wss://virginia.eth.blxrbdn.com/ws --header "Authorization: <YOUR-AUTHORIZATION-HEADER>"
> {"id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"]}]}
< ......
## BSC Example
wscat -c wss://virginia.bsc.blxrbdn.com/ws --header "Authorization: <YOUR-AUTHORIZATION-HEADER>"
> {"id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"], "blockchain_network": "BSC-Mainnet"}]}
< ......
var fs = require('fs');
const WebSocket = require('ws');
// Enterprise users can follow line 5-16
const ws = new WebSocket(
  'wss://virginia.eth.blxrbdn.com/ws', // for ETH
  // use 'wss://virginia.bsc.blxrbdn.com/ws',     //for BSC
  {
    headers: { 
      "Authorization" : <YOUR-AUTHORIZATION-HEADER> 
    },
    // Add the following line if you work with IP instead of DNS
    // rejectUnauthorized: false,
  }
);
//  Non Enterprise users should follow line 19-27
//  const ws = new WebSocket(
//    "wss://api.blxrbdn.com/ws", 
//    {
//      headers: { 
//        "Authorization" : <YOUR-AUTHORIZATION-HEADER> 
//      },
//      rejectUnauthorized: false,
//    }
//  );
function proceed() {
    // ETH Example
    ws.send(`{"jsonrpc": "2.0", "id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"]}]}`);
    // BSC Example (only available at endpoint wss://<region>.bsc.blxrbdn.com/ws)
    // ws.send(`{"jsonrpc": "2.0", "id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"], "blockchain_network": "BSC-Mainnet"}]}`);
    
}
function handle(nextNotification) {
    console.log(nextNotification.toString()); // or process it generally
}
ws.on('open', proceed);
ws.on('message', handle);
import asyncio, json, ssl, websockets
async def main():
    uri = "wss://virginia.eth.blxrbdn.com/ws"
    # Introductory tier users should follow
    # uri="wss://api.blxrbdn.com/ws"
    headers_auth_key = "YOUR_AUTHORIZATION_HEADER"
    
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE
    async with websockets.connect(
            uri,
            extra_headers={"Authorization": headers_auth_key},
            ssl=ssl_context
        ) as websocket:
        
        # ETH Example
        subscribe_request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "subscribe",
            "params": ["newTxs", {"include": ["tx_hash"]}]
        }
        
        await websocket.send(json.dumps(subscribe_request))
        response = await websocket.recv()
        subscription_id = json.loads(response)["result"]
        print(f"Subscribed successfully with subscription_id {subscription_id}")
        
        while True:
            try:
                next_notification = await websocket.recv()
                print(next_notification) # or process it generally
            except websockets.exceptions.ConnectionClosedError:
                print("Connection closed unexpectedly")
if __name__ == '__main__':
    asyncio.run(main())package main
import (
	"crypto/tls"
	"fmt"
	"github.com/gorilla/websocket"
	"net/http"
)
func main() {
	dialer := websocket.DefaultDialer
	// Add the following lines if you work with IP instead of DNS
	// tlsConfig := &tls.Config{
	// 	Certificates:       []tls.Certificate{cert},
	// 	InsecureSkipVerify: true,
	// }
	// dialer.TLSClientConfig = tlsConfig
	// Enterprise users can follow line 20
	wsSubscriber, _, err := dialer.Dial("wss://virginia.eth.blxrbdn.com/ws", http.Header{"Authorization": []string{<YOUR-AUTHORIZATION-HEADER>}})
	// Non Enterprise users can follow line 23
	// wsSubscriber, _, err := dialer.Dial("wss://api.blxrbdn.com/ws", http.Header{"Authorization": []string{<YOUR-AUTHORIZATION-HEADER>}})
	if err != nil {
		fmt.Println(err)
		return
	}
	// ETH Example
	subRequest := `{"id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"]}]}`
	// BSC Example (only available at endpoint wss://<region>.bsc.blxrbdn.com/ws)
	// subRequest := `{"id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"], "blockchain_network": "BSC-Mainnet"}]}`
	err = wsSubscriber.WriteMessage(websocket.TextMessage, []byte(subRequest))
	if err != nil {
		fmt.Println(err)
		return
	}
	for {
		_, nextNotification, err := wsSubscriber.ReadMessage()
		if err != nil {
			fmt.Println(err)
		}
		fmt.Println(string(nextNotification)) // or process it generally
	}
}
Requests (Gateway-API)
Notes:
We assume that the Gateway IP is
127.0.0.1with default ws port28333in the examples below. By default, the WebSocket endpoint isws://127.0.0.1:28333/wsfor Go Gateway.For Go Gateway, the authentication header is always required for ws connection.
wscat -c ws://127.0.0.1:28333/ws --header "Authorization: <YOUR-AUTHORIZATION-HEADER>"
> {"id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"]}]}
< ......
const WebSocket = require('ws');
const ws = new WebSocket(
  'ws://127.0.0.1:28333/ws', 
  {
    headers: { 
      "Authorization" : <YOUR-AUTHORIZATION-HEADER> 
    },
  }
);
function proceed() {
    ws.send(`{"jsonrpc": "2.0", "id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"]}]}`);
}
function handle(nextNotification) {
    console.log(nextNotification.toString()); // or process it generally
}
ws.on('open', proceed);
ws.on('message', handle);
import asyncio, json, websockets
ws_uri = "ws://127.0.0.1:28333/ws"
headers_auth_key = "YOUR_AUTHORIZATION_HEADER>"
async def main():
    while True:
        try:
            async with websockets.connect(
                ws_uri, 
                header=["Authorization:{}".format(auth_key)],            
                sslopt={"cert_reqs": ssl.CERT_NONE},
            ) as websocket:
                subscribe_request = {
                    "jsonrpc": "2.0",
                    "id": 1,
                    "method": "subscribe",
                    "params": ["newTxs", {"include": ["tx_hash"]}]
                }
                await websocket.send(json.dumps(subscribe_request))
                response = await websocket.recv()
                subscription_id = json.loads(response)["result"]
                
                while True:
                    next_notification = await websocket.recv()
                    print(next_notification)  # or process it generally
        except Exception as e:
            print(f"Connection broken to feed, {str(e)}, retrying.")
            await asyncio.sleep(5)
if __name__ == '__main__':
    asyncio.run(main())package main
import (
	"fmt"
	"github.com/gorilla/websocket"
)
func main() {
	dialer := websocket.DefaultDialer
	wsSubscriber, _, err := dialer.Dial("ws://127.0.0.1:28333/ws", http.Header{"Authorization": []string{<YOUR-AUTHORIZATION-HEADER>}})
	
	if err != nil {
		fmt.Println(err)
		return
	}
	subRequest := `{"id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"]}]}`
	err = wsSubscriber.WriteMessage(websocket.TextMessage, []byte(subRequest))
	if err != nil {
		fmt.Println(err)
		return
	}
	for {
		_, nextNotification, err := wsSubscriber.ReadMessage()
		if err != nil {
			fmt.Println(err)
		}
		fmt.Println(string(nextNotification)) // or process it generally
	}
}
Response (Tx Event)
# Examples for Professional and Enterprise plan:
# Type 0 legacy transaction:
<<< {
	"jsonrpc":"2.0",
	"id":null,
	"method":"subscribe",
	"params":{
		"subscription":"414f2873-a7b0-451c-aefa-4e9280f25ce7",
			"result":{
				"txHash":"0x277...2ae",
				"txContents":{
					"from":"0xcfc...bf2",
					"gas":"0x8caf",
					"gasPrice":"0x8d8f9fc00",
					"hash":"0x277...2ae",
					"input":"0x2e1...000",
					"nonce":"0x1eb",
					"value":"0x0",
					"v":"0x26",
					"r":"0xbf7...742",
					"s":"0x249...346",
					"type":"0x0",
					"to":"0xc02...cc2"
				},
				"localRegion":true
			}
		}
	}
 # Type 2 dynamic fee transaction:
 <<< {
	"jsonrpc":"2.0",
	"id":null,
	"method":"subscribe",
	"params":{
		"subscription":"414f2873-a7b0-451c-aefa-4e9280f25ce7",
			"result":{
				"txHash":"0x03...da0",
				"txContents":{
					"from":"0x001...9e8",
					"gas":"0xd6d8",
					"gasPrice":null,
					"hash":"0x03...da0",
					"input":"0x",
					"nonce":"0x2e6c02",
					"value":"0x8087c960bae00",
					"v":"0x1",
					"r":"0xc44...431",
					"s":"0x4bd...858",
					"yParity": "0x1",
					"type":"0x2",
					"to":"0xdea...aac",
					"chainId":"0x1",
					"accessList":[],
					"blobVersionedHashes": [],
					"maxPriorityFeePerGas":"0x3b9aca00",
					"maxFeePerGas":"0xba43b7400"
				},
				"localRegion":true
			}
		}
	}
# Type 3 blob transaction
<<< {
	"method": "subscribe",
	"params": {
		"subscription": "a9daaa94-3712-49e4-8615-6e1fa03d1ed6",
		"result": {
			"txHash": "0xc9...c8b",
			"txContents": {
				"accessList": [],
				"blobVersionedHashes": [
					"0x01...a7",
					"0x01...3c",
					"0x01...2c",
					"0x01...ce",
					"0x01...e9"
				],
				"chainId": "0x4268",
				"from": "0x96...d3e",
				"gas": "0x5208",
				"gasPrice": null,
				"hash": "0xc...c8b",
				"input": "0x",
				"maxFeePerBlobGas": "0xdf8475800",
				"maxFeePerGas": "0x8bb2c97000",
				"maxPriorityFeePerGas": "0x165a0bc00",
				"nonce": "0x3969",
				"r": "0xcbd8c81ebe0cfb85ee5d...e277281ec2c1317ae4",
				"s": "0xb1f33cac1924...9d94224aa3615a3",
				"to": "0x184a8b4...6fa73ce7",
				"type": "0x3",
				"v": "0x1",
				"value": "0x0",
				"yParity": "0x1"
			},
			"localRegion": true,
			"time": "2024-03-05 13:00:44.137835"
		}
	},
	"jsonrpc": "2.0"
}Examples - gRPC
Subscribing to Gateway Stream in Go using gRPC (ex: PendingTxs stream):
package main
import (
	"context"
	"fmt"
	"time"
	pb "github.com/bloXroute-Labs/gateway/v2/protobuf"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
)
type blxrCredentials struct {
	authorization string
}
func (bc blxrCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
	return map[string]string{
		"authorization": bc.authorization,
	}, nil
}
func (bc blxrCredentials) RequireTransportSecurity() bool {
	return false
}
func main() {
	// gRPC server default values
	gatewayHostIP := "localhost"
	gatewayGRPCPort := 5001
	
	// this will use localhost CA to verify the certificate
	creds := credentials.NewClientTLSFromCert(nil, "")
	// Open gRPC connection to Gateway.
	conn, _ := grpc.Dial(
		fmt.Sprintf("%v:%v", gatewayHostIP, gatewayGRPCPort),
		grpc.WithTransportCredentials(creds),
		grpc.WithPerRPCCredentials(blxrCredentials{authorization: "<YOUR-AUTHORIZATION-HEADER>"}),
	)
	// Use the Gateway client connection interface.
	client := pb.NewGatewayClient(conn)
	// create context and defer cancel of context
	callContext, cancel := context.WithTimeout(context.Background(), 24*time.Hour)
	defer cancel()
	// Create a subscription using the stream-specific method and request.
	stream, _ := client.PendingTxs(callContext, &pb.TxsRequest{Filters: ""})
	for {
		subscriptionNotification, err := stream.Recv()
		if err == nil {
			fmt.Println(subscriptionNotification) // or process it generally
		}
	}
}
Response (Tx Event)
{
    "tx": [
        {
            "from": "rmuf7AnbsJ...fUPVcONV9uc=",
            "local_region": true,
            "time": "1709652345242830000",
            "raw_tx": "AviPgkJogwkGG4Q7msoAhDuayvyCVAiU/.....TUNrTkSgO+tvr58mEdB+vViw+rXeY9NL9QEdf/O7Ndw9gOyf0Mc="
        }
    ]
}
Last updated