""" ML Tech, Inc. Demo strat docstring.
This strategy serve as an example to help researchers understand the order entry,
order management/tracking, market data collections, etc. Most of the functionalities provided by our trading
environment are used in this strat.
This is a simple one instrument (BTC-USDT linear perpetual in Bybit) market-making strategy. Trading logic is: It
rests at the top bid/offer behind a fixed threshold(_entry_qty_threshold_) size. If the size drops below the
threshold, the order is canceled. If a resting buy(sell) order is filled, a closing sell(buy) order is placed if the
size on the top bid(offer) is at least as much as a fixed threshold(_exit_qty_threshold_) size. If the size of the
bid(offer) drops below the threshold, an aggressive order closing order is sent to close the position.
See more details on the comments below.
"""
import typing
import math
from mlthon.api.istrat_env import IStratEnv
from mlthon.api.istrategy import IStrategy
from mlthon.basics import utils, logging
from mlthon.basics.defs import GtwRejectCode, CancelCode, FeedHandlerStatus, GtwStatus, Exchange, Side, \
ExecInstructions, OrderType, TIF, OrderStatus
from mlthon.basics.price import Price
from mlthon.basics.qty import Qty
from mlthon.instruments.instrument import Instrument
from mlthon.order.books.level_book import LevelBook
from mlthon.order.order import Order
from mlthon.order.order_mgr import OrderMgr
from mlthon.basics.tick_format import TickFormat, TickUnit
[docs]class DemoStrat(IStrategy):
def __init__(self, cfg):
self.log_ = logging.get_logger(self)
self.log_.info("DemoStrat constructor called!")
# use order manager to track the orders within the lifecycle of a strat. See the comments below for details
self._order_mgr_ = OrderMgr("byt")
self.env_ = None # type: typing.Optional[IStratEnv]
self.lvl_book_ = LevelBook()
# used a dictionary to store instrument information. Its key, instrument name, is like byt:perp:BTC-USDT
# (BTC-USDT linear perpetual in Bybit)
self._instruments_by_name_ = {}
self._exch_ = Exchange.Bybit # target exchange in this strat
self._instrument_: Instrument = None
self._instrument_id_ = -1
# self._entry_qty_threshold_ = Qty.from_int(10000)
# self._exit_qty_threshold_ = Qty.from_int(2000)
# # resting qty of opening order
# self._resting_size_ = 10
self._entry_qty_threshold_ = Qty.from_int(40)
self._exit_qty_threshold_ = Qty.from_int(20)
# resting qty of opening order
self._resting_size_ = Qty.from_str("0.001")
# _place_orders_is used to enable/disable trading via telegram
# the strat will send orders to exchange only if this flag is set to True
self._place_orders_ = False
self._net_notional_ = 0.0
self._pnl_ = 0.0 # used to keep track of PNL
# best bid/ask related variables, which is used for trading signal
self._best_bid_ = Price.zero()
self._best_bid_qty_ = Qty.zero()
self._best_ask_ = Price.zero()
self._best_ask_qty_ = Qty.zero()
self._pos_ = Qty.zero()
self._last_order_cancelled_ts_ = math.nan
self._quote_delay_ = 30000 # 30 seconds
[docs] def on_framework_connected(self, env: IStratEnv):
self.log_.info("Now connected to framework")
self.env_ = env
# login is REQUIRED, to let trading environment register this strategy so that we could start to send msg
# back and forward
self.env_.login("demo_strat")
# every 2 seconds (2000 milliseconds), we would update the order (send order, modify order, cancel order, etc.)
self.env_.setup_timer(2000, self.on_recurring_two_secs)
# every 5 seconds, get position information
self.env_.setup_timer(5000, self.on_one_shot_five_secs) #
# Hook command is just manual command which is hooked to a callback func. Once the strategy is running and add
# a hook command, go to the corresponding telegram bot and use MENU, a button with the command name would show
# in there. Click the command, our trading environment will invoke the corresponding callback.
# TODO: may not use telegram in the future
# For example, here we use a "start_trading" hook command. Go to the telegram bot and there'll be a
# "start_trading" button in the MENU. Once clicking on it, basically the strategy will start trading (update
# orders, etc.)
self.env_.add_cmd_hook("start_trading", self.start_trading_command)
self.env_.add_cmd_hook("stop_trading", self.stop_trading_command)
self.env_.add_cmd_hook("get_position", self.get_position_command)
self.env_.add_cmd_hook("get_orders", self.get_orders_command)
self.env_.add_cmd_hook("get_pnl", self.get_pnl_command)
self._place_orders_ = False
[docs] def on_framework_disconnected(self):
# This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please
# refer to on_framework_disconnected in IStrategy documentation reference for more information
self.log_.info("Framework is disconnected!!")
[docs] def on_start(self, params: str):
# This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please
# refer to on_start in IStrategy documentation reference for more information
self.log_.info("on_start() called with params '" + params + "'")
self.env_.add_cmd_hook("test_command", self.hooked_command_test)
[docs] def on_stop(self, params: str):
# This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please
# refer to on_stop in IStrategy documentation reference for more information
self.log_.info("on_stop() called with params '" + params + "'")
self.env_.send_stop_ack() # send to trading system side that the strategy has received the stop msg
[docs] def on_instruments(self, instruments: typing.Iterable[Instrument]):
"""Define the behavior of this strat when receiving all the information of instruments.
A common practice is to keep the instrument inside this strat class. This strat will log the instrument info
here at the same time, to help researchers check all the information."""
self._instruments_by_name_ = {instrument.get_instrument_name(): instrument for instrument in instruments}
# sometimes keeping instruments_by_id is necessary as well, for example:
# self._instruments_by_id_ = {instrument.get_instrument_id(): instrument for instrument in instruments}
# instrument ID is an unique identifier for an instrument. e.g. 30000000003 = byt:perp:BTC-USDT (i.e. BTC-USDT
# linear perpetual in Bybit)
for instrument in instruments:
self.log_.info("fetched instrument: " + str(instrument))
self.log_.info('''
"instrument_type":"{instrument_type}",
"mlt_symbol":"{mlt_symbol}",
"price_tick_rules": {price_tick_rules},
"quantity_tick_rules": {quantity_tick_rules},
"multiplier":"{multiplier}",
"valid_start_ts":{valid_start_ts},
"valid_end_ts":{valid_end_ts},
"instrument_group_id":"{instrument_group_id}",
"base_currency":"{base_currency}",
"quote_currency":"{quote_currency}",
"min_quantity":"{min_quantity}",
"exch_instr_id":"{exch_instr_id}",
"quantification":"{quantification}",
"instrument_id":{instrument_id},
"price_precision":{price_precision},
"quantity_precision":{quantity_precision}
'''.format(instrument_type=instrument.get_instrument_type(),
mlt_symbol=instrument.get_mlt_symbol(),
price_tick_rules=instrument.get_price_tick_rules(),
quantity_tick_rules=instrument.get_quantity_tick_rules(),
multiplier=instrument.get_multiplier(),
valid_start_ts=instrument.get_valid_start_ts(),
valid_end_ts=instrument.get_valid_end_ts(),
instrument_group_id=instrument.get_instrument_group_id(),
base_currency=instrument.get_base_currency(),
quote_currency=instrument.get_quote_currency(),
min_quantity=instrument.get_min_quantity(),
exch_instr_id=instrument.get_exch_instr_id(),
quantification=instrument.get_quantification(),
instrument_id=instrument.get_instrument_id(),
price_precision=instrument.get_price_precision(),
quantity_precision=instrument.get_quantity_precision()))
# The next several lines are specific to this strategy. Don't need to follow them in other strats.
# In this example, we use BTC-USDT in Bybit as the trading target. So in this case, we assure that the first
# instrument sent from our trading system would be BTC-USDT in Bybit. That's why we just use the first
# instrument here. In other cases, BTC-USDT in Bybit may not be the first instrument.
self._instrument_ = list(instruments)[0]
self._instrument_id_ = self._instrument_.get_instrument_id() # use the first instrument (byt:perp:BTC-USDT)
self._exch_ = self._instrument_.get_exchange()
[docs] def on_strat_params_update(self, strat_params: dict):
"""This strat doesn't need any strat params, so just put a log and skip it"""
self.log_.info("on_strat_params_update() is invoked")
# -----------------------------------------------------------------------------------
# ----------------------------- Timer and Hook Callback -----------------------------
# -----------------------------------------------------------------------------------
[docs] def on_recurring_two_secs(self):
self.log_.warning("2 second recurring timer callback!")
self.update_orders_()
return True
[docs] def on_one_shot_five_secs(self):
self.log_.info("5 second one-shot timer callback!")
if self._instrument_:
# If instrument information is received, fetch position info request
self.env_.fetch_position_info(Exchange.Bybit, instrument_id=self._instrument_id_)
return False
[docs] def hooked_command_test(self, params: str):
self.log_.info("Hooked command test callback with params: '%s'" % params)
"""Instrument ID 30000000003: BTC-USDT USDT perpetual in Bybit"""
self.env_.fetch_position_info(Exchange.Bybit, instrument_id=self._instrument_id_)
[docs] def get_pnl_command(self, params: str):
# it would publish the PNL into telegram channel TODO: telegram may be replaced in the future
self.env_.publish_telegram("PnL: " + str(self._pnl_))
[docs] def start_trading_command(self, params: str):
self._place_orders_ = True
self.log_.info("Start trading command sent!!")
self.env_.publish_telegram("Starting Trading!!")
[docs] def stop_trading_command(self, params: str):
self._place_orders_ = False
self.cancel_all_open_orders_("Stop Trading command sent")
self.log_.info("Stop trading command sent!!")
self.env_.publish_telegram("Stop Trading, cancelling all open orders!!")
[docs] def get_position_command(self, params: str):
self.env_.publish_telegram("Position: " + self._pos_.to_str())
self.log_.info("Get position command sent!!")
[docs] def get_orders_command(self, params: str):
self.env_.fetch_orders_info(exchange=self._exch_, instrument_id=self._instrument_id_)
self.log_.info("Get orders command sent!!")
[docs] def send_order_(self, side: Side, price: typing.Union[float, Price], qty: Qty, reason: str, exch: Exchange,
instrument_id: int, exec_instr=ExecInstructions.Unset, ord_type=OrderType.Limit,
tif=TIF.GTC, close_position: bool = False):
"""Function used by strat to send a new order to our trading system. Order manager is used to determine if
order can be sent and to keep track of order through it's lifecycle"""
# format price and qty
price, qty = self.format_price_and_qty_(price=price, qty=qty, side=side)
# use order manager to prepare this new order. order manager would track whether this order is valid and ready
# to be sent. If yes, it would automatically generate an unique ID for the order, so the researcher won't need
# to create or record the ID manually.
order = self._order_mgr_.prepare_new_order(instrument_id=instrument_id, side=side,
price=price, qty=qty, exec_instr=exec_instr,
order_type=OrderType.Limit, tif=TIF.GTC,
nullable_attachment=reason)
if order: # if order is not None, then it's really to be sent
new_order_clid = order.get_client_id()
self.env_.send_new_order(client_id=new_order_clid, exchange=exch, instrument_id=instrument_id,
side=side, price=price, qty=qty, exec_instr=exec_instr, ord_type=ord_type,
time_in_force=tif, close_position=close_position)
self.env_.publish_telegram("New order " + str(new_order_clid) + ": " + side.name
+ " " + qty.to_str() + "@" + price.to_str() + " "
+ str(new_order_clid) + " reason: " + reason)
else:
self.env_.publish_telegram("Not able to send a new_order!!")
[docs] def modify_order_(self, client_id: str, side: Side, price: typing.Union[float, Price], qty: Qty, reason: str,
exch: Exchange):
"""Function used by strat to send a modify order to our trading system. Order manager is used to determine if
order can be modified and to keep track of order through it's lifecycle"""
# format price and qty
price, qty = self.format_price_and_qty_(price=price, qty=qty, side=side)
# use order manager to prepare this modify order. order manager would track whether this order is valid and
# ready to be sent. If yes, it would return True, False otherwise. With the order mgr, researchers won't cause
# trading environment or need to wait for the rejection from exchange if ModifyOrder request is invalid or there
# is any pending status, etc (eg: Modify is invalid on canceled order).
success = self._order_mgr_.prepare_modify_order(client_id=client_id, new_price=price, new_qty=qty)
if success:
self.env_.send_modify_order(exchange=exch, client_ord_id=client_id, new_price=price,
new_qty=qty)
self.env_.publish_telegram("Sent a modify order " + str(client_id) + ": " + side.name
+ " " + qty.to_str() + "@" + price.to_str() + " " + " reason: " + reason)
[docs] def cancel_order_(self, order: Order, instrument_id: int, reason: str = 'cancel_order'):
"""Function used by strat to send a cancel order to our trading system. Order manager is used to determine if
order can be cancelled and to keep track of order through it's lifecycle"""
order_id = order.get_client_id()
# order cannot be cancelled if it has PendingNew, PendingModify or PendingCancel status, so check is needed
if order.is_new_pending() or order.is_modify_pending() or order.is_cancel_pending():
order_state = "PendingNew" if order.is_new_pending() else \
"PendingModify" if order.is_modify_pending() else "PendingCancel"
self.log_.info("Cannot cancel Order: " + str(order_id) + " in " + order_state + " state")
return
# Use order manager to prepare this cancel order. Order manager would track whether the cancel is valid and
# ready to be sent. If yes, it would return True, False otherwise. With the order mgr, researchers won't cause
# trading environment or need to wait for the rejection from exchange if CancelOrder request is invalid or there
# is any pending status, etc.
ready_to_send_cancel = self._order_mgr_.prepare_cancel_order(client_id=order_id)
if ready_to_send_cancel:
self.env_.send_cancel_order(exchange=self._exch_, instrument_id=instrument_id,
client_ord_id=order_id)
self.env_.publish_telegram("Sent a cancel: " + str(order_id) + " reason:" + reason)
[docs] def cancel_all_open_orders_(self, reason='cancel_all'):
# If applied correctly, order manager can return all open orders. This function is used by strat to cancel
# all open orders
orders = self._order_mgr_.get_open_orders()
if orders:
self.log_.info("Cancelling all open orders!!")
for order in orders:
self.cancel_order_(order, instrument_id=self._instrument_id_, reason=reason)
[docs] def cancel_all_open_orders_on_side_(self, side: Side, reason='cancel_all'):
# If applied correctly, order manager can return all open orders. This function is used by strat to cancel
# all open orders on a particular side
orders = self._order_mgr_.get_open_orders_on_side(side=side)
if orders:
self.log_.info("Cancelling all open orders on side " + side.name)
for order in orders:
self.cancel_order_(order, instrument_id=self._instrument_id_, reason=reason)
[docs] def modify_resting_orders_(self, side: Side, price: Price, qty: Qty):
# Function used by strat to modify all resting orders with new price on a side
modify_orders = True
orders = self._order_mgr_.get_open_orders_on_side(side=side)
if orders:
for order in orders:
# send modify order all the resting orders (on the target side but with a different price from target)
if order.get_side() == side and order.get_price() != price:
self.modify_order_(order.get_client_id(), side, price, qty,
reason="Modify resting order", exch=self._exch_)
else:
modify_orders = False
return modify_orders
[docs] def is_market_valid_(self):
return self._best_bid_ < self._best_ask_
[docs] def update_orders_(self):
"""This function implements the trading logic and updates orders as necessary"""
if (not self.is_market_valid_()) or (not self._place_orders_):
self.log_.warning("Invalid Market or trading is disabled !!")
self.cancel_all_open_orders_()
else:
# If order was cancelled because bid/ask qty fell below _entry_qty_threshold_, then this strat waits
# for 30seconds before placing new order. This is just to minimize frequency of order placement in this
# strat if previous orders were cancelled.
now_ts = utils.get_now_ts()
if math.isnan(self._last_order_cancelled_ts_):
self._last_order_cancelled_ts_ = now_ts - (self._quote_delay_ + 1)
if (now_ts < self._last_order_cancelled_ts_ + self._quote_delay_) and self._pos_ == Qty.zero():
return
if self._pos_ == Qty.zero():
# If current position is zero, rest at the top bid/offer behind a fixed threshold(_entry_qty_threshold_)
# size. If there are already existing orders modify them, else place new order
qty = self._resting_size_
if self._best_bid_qty_ > self._entry_qty_threshold_:
price = self._best_bid_
side = Side.Buy
modify_orders = self.modify_resting_orders_(side=side, price=price, qty=qty)
if not modify_orders:
self.send_order_(side=Side.Buy, price=price, qty=qty, reason="Open Position",
exch=self._exch_, instrument_id=self._instrument_id_,
exec_instr=ExecInstructions.PostOnly, close_position=False)
else:
self.cancel_all_open_orders_on_side_(side=Side.Buy)
if self._best_ask_qty_ > self._entry_qty_threshold_:
price = self._best_ask_
side = Side.Sell
modify_orders = self.modify_resting_orders_(side=side, price=price, qty=qty)
if not modify_orders:
self.send_order_(side=Side.Sell, price=price, qty=qty, reason="Open Position",
exch=self._exch_, instrument_id=self._instrument_id_,
exec_instr=ExecInstructions.PostOnly, close_position=False)
else:
self.cancel_all_open_orders_on_side_(side=Side.Sell)
else:
# If a resting buy(sell) order is filled, a closing sell(buy) order is placed if the size on the
# top bid(offer) is at least as much as a fixed threshold(_exit_qty_threshold_) size. If the size of the
# bid(offer) drops below the threshold, an aggressive order closing order is sent to close the position.
qty = abs(self._pos_)
exit_exec_instr = ExecInstructions.Unset
if self._pos_ > Qty.zero():
side = Side.Sell
if self._best_bid_qty_ > self._exit_qty_threshold_:
# Closing sell rest order will be placed
price = self._best_ask_
else:
# Aggressive closing sell order will be placed to close the position
price = self._best_bid_
else:
side = Side.Buy
if self._best_ask_qty_ > self._exit_qty_threshold_:
# Closing buy rest order will be placed
price = self._best_bid_
else:
# Aggressive closing buy order will be placed to close the position
price = self._best_ask_
modify_orders = self.modify_resting_orders_(side=side, price=price, qty=qty)
if not modify_orders:
self.send_order_(side=side, price=price, qty=qty, reason="Close Position",
exch=self._exch_, instrument_id=self._instrument_id_,
exec_instr=exit_exec_instr, close_position=True)
# -----------------------------------------------------------------------------------
# ------------------------------ Order Entry Callbacks ------------------------------
# -----------------------------------------------------------------------------------
[docs] def on_new_order_ack(self, exchange: Exchange, client_id: str, exchange_id: str, instrument_id: int, side: Side,
price: Price, qty: Qty, leaves_qty: Qty, order_type: OrderType, tif: TIF,
exec_instr: ExecInstructions):
# use order manager to check whether this order is sent by the strat
order = self._order_mgr_.get_order_with_client_id(client_id)
if order:
# use order manager to keep track of NewOrderAck for this order, so order can be tracked through out
# it's life cycle using order mgr
self._order_mgr_.apply_new_order_ack(client_id=client_id, exchange_id=exchange_id, price=price,
leaves_qty=qty)
self.log_.info("on_new_order_ack: {ep} {clid} {side} {qty} @ {price}"
.format(ep=exchange.name, side=side.name, clid=client_id, qty=qty, price=price))
[docs] def on_modify_order_ack(self, exchange: Exchange, client_id: str, new_price: Price, new_qty: Qty, leaves_qty: Qty):
# use order manager to check whether this order is sent by the strat
order = self._order_mgr_.get_order_with_client_id(client_id)
if order:
# use order manager to keep track of ModifyOrderAck for this order, so order can be tracked through out
# it's life cycle using order mgr
self._order_mgr_.apply_modify_order_ack(client_id=client_id, leaves_qty=leaves_qty)
self.log_.info("on_modify_order_ack: {ep} {clid} {qty} @ {price}"
.format(ep=exchange.name, clid=client_id, qty=new_qty, price=new_price))
[docs] def on_cancel_all_ack(self, exchange: Exchange, instrument_id: int):
self.log_.info("on_cancel_all_ack: {ep} {instr_id}".format(ep=exchange.name, instr_id=instrument_id))
[docs] def on_cancel_order_ack(self, exchange: Exchange, client_id: str):
# use order manager to keep track of CancelOrderAck for this order, so order can be tracked through out
# it's life cycle using order mgr. Keep track of when order is cancelled
self._order_mgr_.apply_cancel_order_ack(client_id=client_id)
self.log_.info("on_cancel_order_ack: {ep} {clid}".format(ep=exchange.name, clid=client_id))
self._last_order_cancelled_ts_ = utils.get_now_ts()
[docs] def on_new_order_reject(self, exchange: Exchange, client_id: str, reject_code: GtwRejectCode, reject_reason: str):
# use order manager to keep track of NewOrderReject for this order, so order can be tracked through out
# it's life cycle using order mgr
self._order_mgr_.apply_new_order_reject(client_id=client_id)
self.log_.info("on_new_order_reject: {ep} {clid} {rej_code} '{rej_detail}'"
.format(ep=exchange.name, clid=client_id, rej_code=reject_code.name, rej_detail=reject_reason))
[docs] def on_modify_order_reject(self, exchange: Exchange, client_id: str, reject_code: GtwRejectCode,
reject_reason: str):
# use order manager to keep track of ModifyOrderReject for this order, so order can be tracked through out
# it's life cycle using order mgr
self._order_mgr_.apply_modify_order_reject(client_id=client_id)
self.log_.info("on_modify_order_reject: {ep} {clid} {rej_code} '{rej_detail}'"
.format(ep=exchange.name, clid=client_id, rej_code=reject_code.name, rej_detail=reject_reason))
[docs] def on_cancel_all_reject(self, exchange: Exchange, instrument_id: int, reject_code: GtwRejectCode,
reject_reason: str):
self.log_.info("on_cancel_all_reject: {ep} {instr_id} {rej_code} '{rej_detail}'"
.format(ep=exchange.name, instr_id=instrument_id, rej_code=reject_code.name,
rej_detail=reject_reason))
[docs] def on_cancel_order_reject(self, exchange: Exchange, client_id: str, reject_code: GtwRejectCode,
reject_reason: str):
# use order manager to keep track of CancelOrderReject for this order, so order can be tracked through out
# it's life cycle using order mgr
self._order_mgr_.apply_cancel_order_reject(client_id)
self.log_.info("on_cancel_order_reject: {ep} {clid} {rej_code} '{rej_detail}'"
.format(ep=exchange.name, clid=client_id, rej_code=reject_code.name, rej_detail=reject_reason))
[docs] def on_order_execution(self, exchange: Exchange, client_id: str, side: Side, price: Price, fill_qty: Qty,
leaves_qty: Qty, exec_ts: int, recv_ts: int):
if leaves_qty and leaves_qty.is_null():
leaves_qty = None
# use order manager to check whether this order is sent by the strat
order = self._order_mgr_.get_order_with_client_id(client_id)
if order:
# use order manager to keep track of OrderExecution for this order, so order can be tracked through out
# it's life cycle using order mgr. Order manager would return a boolean value here to
# inform whether the order is fully executed or not. If yes, the strat can start to calculate realized PNL
is_fully_executed = self._order_mgr_.apply_order_execution(client_id=client_id, fill_qty=fill_qty,
nullable_leaves_qty=leaves_qty)
if is_fully_executed is not None: # then calculate PNL
qty = fill_qty if side == Side.Buy else -fill_qty
self._pos_ += qty
self.log_.info(
"on_order_execution: {ep} {clid} {side} {qty} @ {price} transac_ts={ex_ts} recv_ts={rx_ts}"
.format(ep=exchange.name, side=side.name, clid=client_id, qty=fill_qty, price=price,
ex_ts=exec_ts, rx_ts=recv_ts))
if price > Price.zero():
self._net_notional_ += qty.to_float() / price.to_float()
if self._pos_ == Qty.zero():
self._pnl_ = round(self._net_notional_, 4)
[docs] def on_order_cancelled(self, exchange: Exchange, client_id: str, unsolicited: bool, engine_ts: int,
recv_ts: int, cancel_code: CancelCode, cancel_reason: str):
# use order manager to keep track of OrderCancelled for this order, so order can be tracked through out
# it's life cycle using order mgr
self._order_mgr_.apply_order_cancelled(client_id=client_id)
self.log_.info("on_order_cancelled: {ep} {clid} {clx_code} '{clx_reason}' transac_ts={ex_ts} recv_ts={rx_ts}"
.format(ep=exchange.name, clid=client_id, clx_code=cancel_code.name, clx_reason=cancel_reason,
ex_ts=engine_ts, rx_ts=recv_ts))
# -----------------------------------------------------------------------------------
# ------------------------------ Account Info Callbacks -----------------------------
# -----------------------------------------------------------------------------------
[docs] def on_order_status(self, exchange: Exchange, instrument_id: int, client_ord_id: str, exch_ord_id: str, side: Side,
price: Price, leaves_qty: Qty, status: OrderStatus, order_type: OrderType, tif: TIF,
exec_instr: ExecInstructions) -> None:
# This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please
# refer to on_order_status in IStrategy documentation reference for more information
self.log_.info("on_order_status: {ep} {clid} {side} {qty} @ {price} status={status}"
.format(ep=exchange.name, side=side.name, clid=client_ord_id, qty=leaves_qty, price=price,
status=status.name))
[docs] def on_orders_info(self, exchange: Exchange, orders: typing.List[Order]):
# This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please
# refer to on_orders_info in IStrategy documentation reference for more information
self.log_.info("on_orders_info: {ep} {num_orders} orders"
.format(ep=exchange.name, num_orders=len(orders)))
[docs] def on_wallet_info(self, exchange: Exchange, coin: str, total_balance: Qty, available_balance: Qty):
# This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please
# refer to on_wallet_info in IStrategy documentation reference for more information
self.log_.info("on_wallet_info: {ep} {coin} total_balance={tot_bal} available_balance={avail_bal}"
.format(ep=exchange.name, coin=coin, tot_bal=total_balance, avail_bal=available_balance))
[docs] def on_position_info(self, exchange: Exchange, instrument_id: int, position: Qty):
# This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please
# refer to on_position_info in IStrategy documentation reference for more information
self.log_.info("on_position_info: {ep} {instr_id} position={pos}"
.format(ep=exchange.name, instr_id=instrument_id, pos=position))
[docs] def on_funding_info(self, exchange: Exchange, instrument_id: int, funding_rate: float, next_funding_ts: int):
# Here we just log the funding info. May need funding rate, funding ts, for the strategy signal, pnl calc,
# etc. Please refer to on_funding_info in IStrategy documentation reference for more information
self.log_.info("on_funding_info: {ep} {instr_id} funding_rate={fund_rate} next_funding_ts={fund_ts}"
.format(ep=exchange.name, instr_id=instrument_id, fund_rate=funding_rate,
fund_ts=next_funding_ts))
[docs] def on_account_info(self, exchange: Exchange, user_id: str):
# This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please
# refer to on_account_info in IStrategy documentation reference for more information
self.log_.info("on_account_info: {ep} user_id='{user_id}'".format(ep=exchange, user_id=user_id))
[docs] def on_request_reject(self, exchange: Exchange, reject_code: GtwRejectCode, detail: str, rejected_rqst_type: str):
# This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please
# refer to on_request_reject in IStrategy documentation reference for more information
self.log_.info("on_request_reject: {ep} {rej_code} '{rej_reason}' rejected_msg_type={rejed_msg_type}"
.format(ep=exchange.name, rej_code=reject_code.name, rej_reason=detail,
rejed_msg_type=rejected_rqst_type))
# -----------------------------------------------------------------------------------
# ------------------------------- Market Data Callbacks -----------------------------
# -----------------------------------------------------------------------------------
[docs] def on_public_trade(self, exchange: Exchange, instrument_id: int, side: Side, price: Price, qty: Qty, exec_ts: int,
recv_ts: int):
# Latency for public trade is calculated and logged here. May need to utilize the trade info in the strat.
# Please refer to on_public_trade in IStrategy documentation reference for more information
now_ts = utils.get_now_ts()
total_latency = now_ts - exec_ts
net_latency = recv_ts - exec_ts
code_latency = now_ts - recv_ts
self.log_.info("on_public_trade: {ep} {instr_id} {side} {qty} @ {px}"
.format(ep=exchange.name, instr_id=instrument_id, side=side.name, qty=qty, px=price))
self.log_.info("total_latency({tot} ms) = net_latency({net} ms) + code_latency({code} ms)"
.format(tot=total_latency, net=net_latency, code=code_latency))
[docs] def on_add_price_level(self, exchange: Exchange, instrument_id: int, side: Side, price: Price, qty: Qty,
recv_ts: int):
# This strategy just logs upon this callback. May need to utilize this add price level info in the strat.
# Please refer to on_add_price_level in IStrategy documentation reference for more information
self.log_.info("on_add_price_level: {ep} {instr_id} {side} {qty} @ {px}"
.format(ep=exchange.name, instr_id=instrument_id, side=side.name, qty=qty, px=price))
[docs] def on_modify_price_level(self, exchange: Exchange, instrument_id: int, side: Side, price: Price, new_qty: Qty,
recv_ts: int):
# This strategy just logs upon this callback. May need to utilize this modify price level info in the strat.
# Please refer to on_modify_price_level in IStrategy documentation reference for more information
self.log_.info("on_modify_price_level: {ep} {instr_id} {side} {qty} @ {px}"
.format(ep=exchange.name, instr_id=instrument_id, side=side.name, qty=new_qty, px=price))
[docs] def on_delete_price_level(self, exchange: Exchange, instrument_id: int, side: Side, price: Price, recv_ts: int):
# This strategy just logs upon this callback. May need to utilize this delete price level info in the strat.
# Please refer to on_delete_price_level in IStrategy documentation reference for more information
self.log_.info("on_delete_price_level: {ep} {instr_id} {side} @ {px}"
.format(ep=exchange.name, instr_id=instrument_id, side=side.name, px=price))
[docs] def on_add_level(self, exchange: Exchange, instrument_id: int, side: Side, level_id: int, price: Price, qty: Qty,
recv_ts: int):
# This strategy just logs upon this callback. May need to utilize this add level info in the strat.
# Please refer to on_add_level in IStrategy documentation reference for more information
self.log_.info("on_add_level: {ep} {instr_id} {lvl_id} {side} {qty} @ {px}"
.format(ep=exchange.name, instr_id=instrument_id, lvl_id=level_id, side=side.name, qty=qty,
px=price))
[docs] def on_modify_level(self, exchange: Exchange, instrument_id: int, side: Side, level_id: int, new_qty: Qty,
recv_ts: int):
# This strategy just logs upon this callback. May need to utilize this modify level info in the strat.
# Please refer to on_modify_level in IStrategy documentation reference for more information
self.log_.info("on_modify_level: {ep} {instr_id} {lvl_id} {side} {qty}"
.format(ep=exchange.name, instr_id=instrument_id, lvl_id=level_id, side=side.name, qty=new_qty))
[docs] def on_delete_level(self, exchange: Exchange, instrument_id: int, side: Side, level_id: int, recv_ts: int):
# This strategy just logs upon this callback. May need to utilize this delete level info in the strat.
# Please refer to on_delete_level in IStrategy documentation reference for more information
self.log_.info("on_delete_level: {ep} {instr_id} {lvl_id} {side}"
.format(ep=exchange.name, instr_id=instrument_id, lvl_id=level_id, side=side.name))
[docs] def on_best_bid_level_update(self, exchange: Exchange, instrument_id: int, price: Price, qty: Qty, recv_ts: int):
# This strategy utilize best bid info for the trading signal, just log the info if this info is unnecessary
lat = utils.get_now_ts() - recv_ts
self._best_bid_ = price
self._best_bid_qty_ = qty
self.log_.info("on_best_bid_update: {ep} {instr_id} {qty} @ {px} [{lat} ms]"
.format(ep=exchange.name, instr_id=instrument_id, qty=qty, px=price, lat=lat))
self.update_orders_()
[docs] def on_best_ask_level_update(self, exchange: Exchange, instrument_id: int, price: Price, qty: Qty, recv_ts: int):
# tTis strategy utilize best ask info for the trading signal, just log the info if this info is unnecessary
lat = utils.get_now_ts() - recv_ts
self._best_ask_ = price
self._best_ask_qty_ = qty
self.log_.info("on_best_ask_update: {ep} {instr_id} {qty} @ {px} [{lat} ms]"
.format(ep=exchange.name, instr_id=instrument_id, qty=qty, px=price, lat=lat))
self.update_orders_()
# -----------------------------------------------------------------------------------
# --------------------------- Gtw and Feed Status Callbacks -------------------------
# -----------------------------------------------------------------------------------
[docs] def on_gateway_status(self, exchange: Exchange, status: GtwStatus, detail: str):
# This strategy just logs upon this callback. May need to clear your gtw-related internal states here.
# Please refer to on_gateway_status in IStrategy documentation reference for more information
self.log_.info("on_gateway_status: {ep} status={stat} detail='{detail}'"
.format(ep=exchange.name, stat=status.name, detail=detail))
[docs] def on_feed_handler_status(self, exchange: Exchange, status: FeedHandlerStatus, detail: str):
# This strategy clears the book if new feed snap shot is started to rebuild the book when new feed is received.
# Please refer to on_feed_handler_status in IStrategy documentation reference for more information
self.log_.info("on_feed_handler_status: {ep} status={stat} detail='{detail}'"
.format(ep=exchange.name, stat=status.name, detail=detail))
if status == FeedHandlerStatus.SnapshotStart:
self.lvl_book_.clear()
[docs] def on_rate_limit_info(self, exchange: Exchange, rate_limit: int, rate_remaining: int, reset_ts: int):
# This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please
# refer to on_rate_limit_info in IStrategy documentation reference for more information
self.log_.info("on_rate_limit_info: {ep} limit={lim} remaining={rem} reset_ts={ts}"
.format(ep=exchange.name, lim=rate_limit, rem=rate_remaining, ts=reset_ts))
[docs] def on_unsupported_op(self, exchange: Exchange, unsupported_msg_type: str):
# This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please
# refer to on_unsupported_op in IStrategy documentation reference for more information
self.log_.info("on_unsupported_op: {ep} unsupported_msg_type={msg_type}"
.format(ep=exchange.name, msg_type=unsupported_msg_type))