Subscribing and Unsubscribing from indicators

In my project, I need to be able to subscribe and unsubscribe from different indicators but something weird keeps happening. When trying to subscribe to an indicator I get an HTTP error but I still receive data through the webSocket, obviously I also can’t unsubscribe from it and the webSocket gets stuck with this indicator. Even when I close the session and open the websocket again I still receive the previous data.

This is my code:

import requests
import threading, websocket, json
import time


import os
from dotenv import load_dotenv

load_dotenv()
IRONBEAM_USERNAME = os.getenv("IRONBEAM_USERNAME") 
IRONBEAM_PASSWORD = os.getenv("IRONBEAM_PASSWORD") 
IRONBEAM_API_KEY = os.getenv("IRONBEAM_API_KEY")  

class Stream:
    def getStreamId(self): #Gets streamID, called when object is created
        url = "https://demo.ironbeamapi.com/v2" + "/stream/create"
        url_token = f"{url}?token={self.token["token"]}"

        try:
            response = requests.get(url_token)
            response.raise_for_status()

            data = response.json()
            streamId = data["streamId"]

            print(f"{self.name}: Sucesfully got streamId -> {streamId}")
            return streamId
        except requests.HTTPError as e:
            print(f"{self.name}: Failed to get streamId -> {e} , {e.response.text}")

    def closeStream(self):
        #Log.logMessage(f"{self.name}: Closing stream")

        # asks the loop to stop
        self.ws.keep_running = False  
        # closes the socket
        self.ws.close()

        # wait up to 5s for the thread to die
        if self.thread and self.thread.is_alive():
            self.thread.join(timeout=5)
            print(f"{self.name}: Stream thread joined")
        else:
            print(f"{self.name}: Stream thred closed -> {self.thread} {self.thread.is_alive()}")

    
    infoTypeI = ["quotes", "depths", "trades"]
    infoTypeII = ["tickBars", "tradeBars", "timeBars", "volumeBars"]
    streamFields = {"quotes": "q", "depths": "d", "trades": "tr", "tickBars": "tc", "tradeBars": "tb", "timeBars": "ti", "volumeBars": "vb"}


    def __init__(self, token, name):
        self.token = token
        self.name = name


        self.streamId = None
        self.symbol = None
        self.ids = {"quotes": [], "depths": [], "trades": [], "tickBars": [], "tradeBars": [], "timeBars": [], "volumeBars": []}

        self.ws = None
        self.thread: threading.Thread = None
        #self.wsOpen = False
        
        self.streamId = self.getStreamId()
        self.open_stream()

    def make_on_message(self):
        def on_message(ws, msg):
            data = json.loads(msg)

            print("data:", data)

            for field in self.streamFields.keys():
                field_indicator = self.streamFields[field]
                if field_indicator in data: #if the indicator exists in the received data
                    if isinstance(data[field_indicator], list) and len(data[field_indicator]) > 0: # if the field for that specific indicator is not empty
                        id = data[self.streamFields[field]][0]["i"]
                        if id not in self.ids[field]:
                            self.ids[field].append(data[self.streamFields[field]][0]["i"])
                            #self.ids[field] = data[self.streamFields[field]][0]["i"]
                            print(f"{self.name}: Sucesfully got IndicatorId {field} -> {self.ids[field]}")
        return on_message

    def open_stream(self):
        endpoint = f"/stream/{self.streamId}"
        url = f"wss://demo.ironbeamapi.com/v2{endpoint}"
        url_token = f"{url}?token={self.token["token"]}"

        ws = websocket.WebSocketApp(
            url_token, 
            #on_open = self.make_on_open(),
            on_message = self.make_on_message()
        )
        self.ws = ws
        self.thread = threading.Thread(target=ws.run_forever, daemon=True) # daemon=True → exits automatically when the main program ends
        self.thread.start()

    def subscribe(self):
        symbol = "XCME:NQ.M25"
        info = "volumeBars"
        period = 1
        barType = "MINUTE"
        loadSize = 10


        prefix_endpoint = "subscribe_"
        endpoint = f"/indicator/{self.streamId}/volumeBars/subscribe"

        url = f"https://demo.ironbeamapi.com/v2{endpoint}"
        headers = {
            "Authorization": f"Bearer {self.token['token']}",
            "Content-Type": "application/json"
        }

        payload = {
            "symbol": symbol,
            "period": period,
            "barType": barType,
            "loadSize": loadSize
        }
        data = None
        resp = None
        try:
            resp = requests.post(url, json=payload, headers=headers)
            resp.raise_for_status()
            data = resp.json()

            print(f"{self.name}: Sucesfully subscribed to {info} -> {data}")    
        except requests.HTTPError as e:
            print(f"{self.name}: Failed to subscribe to {info} -> {e}, {e.response.text}, {resp}, {data}")    

    def unsubscribe(self):
        info = "volumeBars"
        if(len(self.ids[info]) <= 0):
            return
        for id in self.ids[info]:
            endpoint = f"/indicator/{self.streamId}/unsubscribe/{id}"
            url = f"https://demo.ironbeamapi.com/v2{endpoint}"
            headers = {
                "Authorization": f"Bearer {self.token['token']}",
                "Content-Type": "application/json"
            }

            try:
                resp = requests.delete(url, headers=headers)
                resp.raise_for_status()
                data = resp.json()

                print(f"{self.name}: Sucesfully unsubscribed from {id} -> {data}")
            except requests.HTTPError as e:
                print(f"{self.name}: Failed to unsubscribe from {id} -> {e}, {e.response.text}")
            
def authorize(username, password, apiKey): #Authorizes user to use the API
        endpoint = "/auth"

        headers = {"Content-Type": "application/json"}
        payload = {
            "username": username,
            "password": password,
            "apiKey": apiKey
        }
        url = "https://demo.ironbeamapi.com/v2" + endpoint

        try:
            response = requests.post(url, json=payload, headers=headers)
            response.raise_for_status()
            print("Succesfully authorized user")

            data = response.json()
            token = {
                "token": data["token"],
                "Id": username,
            } 
            return token
        except requests.HTTPError as e:
            print(f"Failed to authorize user at {url}: ", e, e.response.text)
            raise e

def logout(token): #Logs out user, this should be the last step
        endpoint = "/logout"
        url = "https://demo.ironbeamapi.com/v2" + endpoint
        url_token = f"{url}?token={token["token"]}"

        try:
            response = requests.post(url_token)
            response.raise_for_status()
            print("Succesfully logged out user")

        except requests.HTTPError as e:
            print(f"Failed to logout user -> {e}, {e.response.text}")


if __name__ == '__main__':
    symbol = "XCME:NQ.M25"
    token = authorize(IRONBEAM_USERNAME, IRONBEAM_PASSWORD, IRONBEAM_API_KEY)
    stream = Stream(token=token, name=symbol)

    stream.open_stream()

    time.sleep(10) #makes sure stream is open

    stream.subscribe()

    time.sleep(10) #make sure subscription is finished

    stream.unsubscribe() 

    time.sleep(5) #make sure unsubscription is finished

    stream.closeStream() 

    logout(token)

And these are the errors/messages I get:

"
Succesfully authorized user
XCME:NQ.M25: Sucesfully got streamId → 32f76cf1-416b-4e19-a674-fc8c6f309c8e

data: {‘p’: {‘ping’: ‘ping’}}
data: {‘p’: {‘ping’: ‘ping’}}

data: {‘tb’: , ‘tc’: , ‘ti’: , ‘vb’: [{‘t’: 1748000711322, ‘o’: 20944.25, ‘h’: 20950.75, ‘l’: 20944.25, ‘c’: 20950.75, ‘v’: 705, ‘tc’: 557, ‘d’: 705, ‘i’: ‘VolumeBars_638835991255904134’} … ], ‘r’: {‘status’: 0, ‘message’: ‘OK’}}

XCME:NQ.M25: Sucesfully got IndicatorId volumeBars → [‘VolumeBars_638835991255904134’]

XCME:NQ.M25: Failed to subscribe to volumeBars → 400 Client Error: Bad Request for url: https://demo.ironbeamapi.com/v2/indicator/32f76cf1-416b-4e19-a674-fc8c6f309c8e/volumeBars/subscribe, {“additionalProperties”:{},“error1”:“Can’t subscribe to volume bars”,“status”:“ERROR”,“message”:“Error”}, <Response [400]>, None
data: {‘p’: {‘ping’: ‘ping’}}

XCME:NQ.M25: Failed to unsubscribe from VolumeBars_638835991255904134 → 400 Client Error: Bad Request for url: https://demo.ironbeamapi.com/v2/indicator/32f76cf1-416b-4e19-a674-fc8c6f309c8e/unsubscribe/VolumeBars_638835991255904134, {“additionalProperties”:{},“error1”:“Can’t unsubscribe”,“status”:“ERROR”,“message”:“Error”}
XCME:NQ.M25: Stream thred closed → <Thread(Thread-2 (run_forever), stopped daemon 16224)> False
Succesfully logged out user
"

Any help would be greatly appreciated

I have the exact same issue. Also you might notice that when you crate a new stream you will still get the indicators so they are not even in the correct stream.

I still haven’t managed to fix the issue, I would just work around the problem if at least the data was sent at the interval specified. When subscribing to MINUTE bars, new data comes at random intervals, from 10-50 seconds and I’m not sure why that is

It seems that the order that things are done is very important. If you have had sucess at getting streaming quotes, could you please post the order in which you did things. Thanks, Soup

When stream is opened, a very large record is sent, then pings are sent.
The pings can be used to determine if a connection exists.
After the stream sends an open message, send a subscribe message, WITHOUT a subscription
there will be ONLY ping message.
Here is a C++ sample of a subscribe function that subscribes to the volumeBar indicator.
It is only intended to be a sample, and should be modified to meet your needs.

#include <cpr/cpr.h>
#include <nlohmann/json.hpp>
// ixwebsocket
#include <ixwebsocket/IXNetSystem.h>

bool subscribe() {
std::cerr << “in subscribe.\n”;
std::string ws_base = “https://live.ironbeamapi.com/v2/indicator/”;
std::string tickBars_url = “/tickBars/subscribe”;
std::string timeBars_url = “/timeBars/subscribe”;
std::string tradeBar_url = “/tradeBars/subscribe”;
std::string volumeBar_url = “/volumeBars/subscribe”;
std::string subscribe_url=ws_base+streamId+volumeBar_url;
nlohmann::json payload{
{“symbol”, “XCME:MNQ.H26”},
{“period”, “1”},
{“barType”, “MINUTE”},
{“loadSize”,“1” } };
std::string payload_string = payload.dump();

 auto r = cpr::Post(
     cpr::Url{ subscribe_url },
     cpr::Body{ payload_string },
     cpr::Header{ 
         {"Content-Type","application/json"}, 
         {"Authorization", "Bearer " + token} }
     );

 if (r.status_code != 200)
 {
     std::cerr << "[Ironbeam] Subcribe failed HTTP " << r.status_code << ": " << r.text << "\n";
     std::cerr << subscribe_url << std::endl;
     std::cerr << r.raw_header << std::endl;
     return false;
 }
 std::cerr << "[Ironbeam] Subscribe sucessful.\n";
 return true;

}

The resulting streamer message for that subscriptions looks like:
{“tb”:,“tc”:,“ti”:,“vb”:[{“t”:1768485181877,“o”:25899.5,“h”:25900.25,“l”:25895.25,“c”:25896,“v”:936,“tc”:474,“d”:936,“i”:“VolumeBars_639040807287685149”}],“r”:{“status”:0,“message”:“OK”}}

After auto j = nlohmann::json::parse(s, nullptr, false);

jrec looks like:
[J rec] {“r”:{“message”:“OK”,“status”:0},“tb”:,“tc”:,“ti”:,“vb”:[{“c”:25898.5,“d”:945,“h”:25902,“i”:“VolumeBars_639040807287685149”,“l”:25896.25,“o”:25898.75,“t”:1768485360456,“tc”:527,“v”:945}]}