Websocket
Handling the Notification
To process a new notification, call the function get_next_subscription_notification_by_id
, passing in the subscription_id returned from the subscribe
command as a parameter.
Examples
Handling the transaction notification from a Gateway Stream in Python (line 8 handles the notification):
import asyncio, json, ssl, websockets
async def main():
try:
ws_uri = "ws://127.0.0.1:28333/ws",
auth_key = "YOUR_AUTHORIZATION_HEADER"
async with websockets.connect(
ws_uri,
header=["Authorization:{}".format(auth_key)],
) as websocket:
subscription_request = json.dumps({"id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"]}]})
await websocket.send(subscription_request)
response = await websocket.recv()
subscription_id = json.loads(response)['result']
while True:
response = await websocket.recv()
response = json.loads(response)
if response['id'] == subscription_id:
print(response) # or process it generally
except Exception as e:
print(f'Connection failed, Reason: {e}')
if __name__ == '__main__':
asyncio.run(main())
Handling the transaction notification from a Cloud-API Stream in Python (version 3.7 or higher):
import asyncio, json, ssl, websockets
async def main():
try:
auth_key = "YOUR_AUTHORIZATION_HEADER"
async with websockets.connect(
'wss://virginia.eth.blxrbdn.com/ws',
header=["Authorization:{}".format(auth_key)],
sslopt={"cert_reqs": ssl.CERT_NONE},
) as websocket:
subscription_request = json.dumps({"id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"]}]})
await websocket.send(subscription_request)
subscription_id = None
while True:
response = await websocket.recv()
response_json = json.loads(response)
if response_json.get('id') == 1:
subscription_id = response_json.get('result')
else:
print(response_json) # or process it generally
except Exception as e:
print(f'Connection failed, Reason: {e}')
finally:
if subscription_id is not None:
try:
unsubscription_request = json.dumps({"id": 2, "method": "unsubscribe", "params": [subscription_id]})
await websocket.send(unsubscription_request)
except Exception as e:
print(f'Unsubscription failed, Reason: {e}')
if __name__ == '__main__':
asyncio.run(main())
Handling the transaction notification from a Cloud-API Stream in Python (version 3.7 or higher):
import asyncio, json, ssl, websockets
async def main():
try:
auth_key = "YOUR_AUTHORIZATION_HEADER"
async with websockets.connect(
'wss://api.blxrbdn.com/ws',
header=["Authorization:{}".format(auth_key)],
sslopt={"cert_reqs": ssl.CERT_NONE},
) as websocket:
subscription_request = json.dumps({"id": 1, "method": "subscribe", "params": ["newTxs", {"include": ["tx_hash"]}]})
await websocket.send(subscription_request)
subscription_id = None
while True:
response = await websocket.recv()
response_json = json.loads(response)
if response_json.get('id') == 1:
subscription_id = response_json.get('result')
else:
print(response_json) # or process it generally
except Exception as e:
print(f'Connection failed, Reason: {e}')
finally:
if subscription_id is not None:
try:
unsubscription_request = json.dumps({"id": 2, "method": "unsubscribe", "params": [subscription_id]})
await websocket.send(unsubscription_request)
except Exception as e:
print(f'Unsubscription failed, Reason: {e}')
if __name__ == '__main__':
asyncio.run(main())
Last updated