In my project, I need to be able to subscribe and unsubscribe from different indicators but something weird keeps happening. When trying to subscribe to an indicator I get an HTTP error but I still receive data through the webSocket, obviously I also can’t unsubscribe from it and the webSocket gets stuck with this indicator. Even when I close the session and open the websocket again I still receive the previous data.
This is my code:
import requests
import threading, websocket, json
import time
import os
from dotenv import load_dotenv
load_dotenv()
IRONBEAM_USERNAME = os.getenv("IRONBEAM_USERNAME")
IRONBEAM_PASSWORD = os.getenv("IRONBEAM_PASSWORD")
IRONBEAM_API_KEY = os.getenv("IRONBEAM_API_KEY")
class Stream:
def getStreamId(self): #Gets streamID, called when object is created
url = "https://demo.ironbeamapi.com/v2" + "/stream/create"
url_token = f"{url}?token={self.token["token"]}"
try:
response = requests.get(url_token)
response.raise_for_status()
data = response.json()
streamId = data["streamId"]
print(f"{self.name}: Sucesfully got streamId -> {streamId}")
return streamId
except requests.HTTPError as e:
print(f"{self.name}: Failed to get streamId -> {e} , {e.response.text}")
def closeStream(self):
#Log.logMessage(f"{self.name}: Closing stream")
# asks the loop to stop
self.ws.keep_running = False
# closes the socket
self.ws.close()
# wait up to 5s for the thread to die
if self.thread and self.thread.is_alive():
self.thread.join(timeout=5)
print(f"{self.name}: Stream thread joined")
else:
print(f"{self.name}: Stream thred closed -> {self.thread} {self.thread.is_alive()}")
infoTypeI = ["quotes", "depths", "trades"]
infoTypeII = ["tickBars", "tradeBars", "timeBars", "volumeBars"]
streamFields = {"quotes": "q", "depths": "d", "trades": "tr", "tickBars": "tc", "tradeBars": "tb", "timeBars": "ti", "volumeBars": "vb"}
def __init__(self, token, name):
self.token = token
self.name = name
self.streamId = None
self.symbol = None
self.ids = {"quotes": [], "depths": [], "trades": [], "tickBars": [], "tradeBars": [], "timeBars": [], "volumeBars": []}
self.ws = None
self.thread: threading.Thread = None
#self.wsOpen = False
self.streamId = self.getStreamId()
self.open_stream()
def make_on_message(self):
def on_message(ws, msg):
data = json.loads(msg)
print("data:", data)
for field in self.streamFields.keys():
field_indicator = self.streamFields[field]
if field_indicator in data: #if the indicator exists in the received data
if isinstance(data[field_indicator], list) and len(data[field_indicator]) > 0: # if the field for that specific indicator is not empty
id = data[self.streamFields[field]][0]["i"]
if id not in self.ids[field]:
self.ids[field].append(data[self.streamFields[field]][0]["i"])
#self.ids[field] = data[self.streamFields[field]][0]["i"]
print(f"{self.name}: Sucesfully got IndicatorId {field} -> {self.ids[field]}")
return on_message
def open_stream(self):
endpoint = f"/stream/{self.streamId}"
url = f"wss://demo.ironbeamapi.com/v2{endpoint}"
url_token = f"{url}?token={self.token["token"]}"
ws = websocket.WebSocketApp(
url_token,
#on_open = self.make_on_open(),
on_message = self.make_on_message()
)
self.ws = ws
self.thread = threading.Thread(target=ws.run_forever, daemon=True) # daemon=True → exits automatically when the main program ends
self.thread.start()
def subscribe(self):
symbol = "XCME:NQ.M25"
info = "volumeBars"
period = 1
barType = "MINUTE"
loadSize = 10
prefix_endpoint = "subscribe_"
endpoint = f"/indicator/{self.streamId}/volumeBars/subscribe"
url = f"https://demo.ironbeamapi.com/v2{endpoint}"
headers = {
"Authorization": f"Bearer {self.token['token']}",
"Content-Type": "application/json"
}
payload = {
"symbol": symbol,
"period": period,
"barType": barType,
"loadSize": loadSize
}
data = None
resp = None
try:
resp = requests.post(url, json=payload, headers=headers)
resp.raise_for_status()
data = resp.json()
print(f"{self.name}: Sucesfully subscribed to {info} -> {data}")
except requests.HTTPError as e:
print(f"{self.name}: Failed to subscribe to {info} -> {e}, {e.response.text}, {resp}, {data}")
def unsubscribe(self):
info = "volumeBars"
if(len(self.ids[info]) <= 0):
return
for id in self.ids[info]:
endpoint = f"/indicator/{self.streamId}/unsubscribe/{id}"
url = f"https://demo.ironbeamapi.com/v2{endpoint}"
headers = {
"Authorization": f"Bearer {self.token['token']}",
"Content-Type": "application/json"
}
try:
resp = requests.delete(url, headers=headers)
resp.raise_for_status()
data = resp.json()
print(f"{self.name}: Sucesfully unsubscribed from {id} -> {data}")
except requests.HTTPError as e:
print(f"{self.name}: Failed to unsubscribe from {id} -> {e}, {e.response.text}")
def authorize(username, password, apiKey): #Authorizes user to use the API
endpoint = "/auth"
headers = {"Content-Type": "application/json"}
payload = {
"username": username,
"password": password,
"apiKey": apiKey
}
url = "https://demo.ironbeamapi.com/v2" + endpoint
try:
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
print("Succesfully authorized user")
data = response.json()
token = {
"token": data["token"],
"Id": username,
}
return token
except requests.HTTPError as e:
print(f"Failed to authorize user at {url}: ", e, e.response.text)
raise e
def logout(token): #Logs out user, this should be the last step
endpoint = "/logout"
url = "https://demo.ironbeamapi.com/v2" + endpoint
url_token = f"{url}?token={token["token"]}"
try:
response = requests.post(url_token)
response.raise_for_status()
print("Succesfully logged out user")
except requests.HTTPError as e:
print(f"Failed to logout user -> {e}, {e.response.text}")
if __name__ == '__main__':
symbol = "XCME:NQ.M25"
token = authorize(IRONBEAM_USERNAME, IRONBEAM_PASSWORD, IRONBEAM_API_KEY)
stream = Stream(token=token, name=symbol)
stream.open_stream()
time.sleep(10) #makes sure stream is open
stream.subscribe()
time.sleep(10) #make sure subscription is finished
stream.unsubscribe()
time.sleep(5) #make sure unsubscription is finished
stream.closeStream()
logout(token)
And these are the errors/messages I get:
"
Succesfully authorized user
XCME:NQ.M25: Sucesfully got streamId → 32f76cf1-416b-4e19-a674-fc8c6f309c8e
data: {‘p’: {‘ping’: ‘ping’}}
data: {‘p’: {‘ping’: ‘ping’}}
data: {‘tb’: , ‘tc’: , ‘ti’: , ‘vb’: [{‘t’: 1748000711322, ‘o’: 20944.25, ‘h’: 20950.75, ‘l’: 20944.25, ‘c’: 20950.75, ‘v’: 705, ‘tc’: 557, ‘d’: 705, ‘i’: ‘VolumeBars_638835991255904134’} … ], ‘r’: {‘status’: 0, ‘message’: ‘OK’}}
XCME:NQ.M25: Sucesfully got IndicatorId volumeBars → [‘VolumeBars_638835991255904134’]
XCME:NQ.M25: Failed to subscribe to volumeBars → 400 Client Error: Bad Request for url: https://demo.ironbeamapi.com/v2/indicator/32f76cf1-416b-4e19-a674-fc8c6f309c8e/volumeBars/subscribe, {“additionalProperties”:{},“error1”:“Can’t subscribe to volume bars”,“status”:“ERROR”,“message”:“Error”}, <Response [400]>, None
data: {‘p’: {‘ping’: ‘ping’}}
XCME:NQ.M25: Failed to unsubscribe from VolumeBars_638835991255904134 → 400 Client Error: Bad Request for url: https://demo.ironbeamapi.com/v2/indicator/32f76cf1-416b-4e19-a674-fc8c6f309c8e/unsubscribe/VolumeBars_638835991255904134, {“additionalProperties”:{},“error1”:“Can’t unsubscribe”,“status”:“ERROR”,“message”:“Error”}
XCME:NQ.M25: Stream thred closed → <Thread(Thread-2 (run_forever), stopped daemon 16224)> False
Succesfully logged out user
"
Any help would be greatly appreciated