In order to unsubscribe from a stream, you may create a context using context.WithTimeout() and pass the returned context into the subscribe request method and call the returned cancel function with defer.
Examples
Subscribing to Gateway Stream in Go using gRPC (ex: NewTxs stream):
package main
import (
"context"
"fmt"
"time"
pb "github.com/bloXroute-Labs/gateway/v2/protobuf"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
type blxrCredentials struct {
authorization string
}
func (bc blxrCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": bc.authorization,
}, nil
}
func (bc blxrCredentials) RequireTransportSecurity() bool {
return false
}
func main() {
// gRPC server default values
gatewayHostIP := "localhost"
gatewayGRPCPort := 5001
// this will use localhost CA to verify the certificate
creds := credentials.NewClientTLSFromCert(nil, "")
// Open gRPC connection to Gateway.
conn, _ := grpc.Dial(
fmt.Sprintf("%v:%v", gatewayHostIP, gatewayGRPCPort),
grpc.WithTransportCredentials(creds),
grpc.WithPerRPCCredentials(blxrCredentials{authorization: "<YOUR-AUTHORIZATION-HEADER>"}),
// for the best networking performance between gateway and a client
// we recommend to use following dial configuration:
grpc.WithWriteBufferSize(0),
grpc.WithInitialConnWindowSize(128*1024),
)
// Use the Gateway client connection interface.
client := pb.NewGatewayClient(conn)
// create context and defer cancel of context
callContext, cancel := context.WithTimeout(context.Background(), 24*time.Hour)
defer cancel()
// Create a subscription using the stream-specific method and request.
stream, _ := client.NewTxs(callContext, &pb.TxsRequest{Filters: "", Includes: ""})
for {
subscriptionNotification, err := stream.Recv()
if err == nil {
fmt.Println(subscriptionNotification) // or process it generally
}
}
}