Source code for mlthon.strats.demo_strat

""" ML Tech, Inc. Demo strat docstring.

This strategy serve as an example to help researchers understand the order entry,
order management/tracking, market data collections, etc. Most of the functionalities provided by our trading 
environment are used in this strat. 

This is a simple one instrument (BTC-USDT linear perpetual in Bybit) market-making strategy. Trading logic is: It
rests at the top bid/offer behind a fixed threshold(_entry_qty_threshold_) size. If the size drops below the
threshold, the order is canceled. If a resting buy(sell) order is filled, a closing sell(buy) order is placed if the
size on the top bid(offer) is at least as much as a fixed threshold(_exit_qty_threshold_) size. If the size of the
bid(offer) drops below the threshold, an aggressive order closing order is sent to close the position.

See more details on the comments below.

"""

import typing
import math

from mlthon.api.istrat_env import IStratEnv
from mlthon.api.istrategy import IStrategy
from mlthon.basics import utils, logging
from mlthon.basics.defs import GtwRejectCode, CancelCode, FeedHandlerStatus, GtwStatus, Exchange, Side, \
    ExecInstructions, OrderType, TIF, OrderStatus
from mlthon.basics.price import Price
from mlthon.basics.qty import Qty
from mlthon.instruments.instrument import Instrument
from mlthon.order.books.level_book import LevelBook
from mlthon.order.order import Order
from mlthon.order.order_mgr import OrderMgr
from mlthon.basics.tick_format import TickFormat, TickUnit


[docs]class DemoStrat(IStrategy): def __init__(self, cfg): self.log_ = logging.get_logger(self) self.log_.info("DemoStrat constructor called!") # use order manager to track the orders within the lifecycle of a strat. See the comments below for details self._order_mgr_ = OrderMgr("byt") self.env_ = None # type: typing.Optional[IStratEnv] self.lvl_book_ = LevelBook() # used a dictionary to store instrument information. Its key, instrument name, is like byt:perp:BTC-USDT # (BTC-USDT linear perpetual in Bybit) self._instruments_by_name_ = {} self._exch_ = Exchange.Bybit # target exchange in this strat self._instrument_: Instrument = None self._instrument_id_ = -1 # self._entry_qty_threshold_ = Qty.from_int(10000) # self._exit_qty_threshold_ = Qty.from_int(2000) # # resting qty of opening order # self._resting_size_ = 10 self._entry_qty_threshold_ = Qty.from_int(40) self._exit_qty_threshold_ = Qty.from_int(20) # resting qty of opening order self._resting_size_ = Qty.from_str("0.001") # _place_orders_is used to enable/disable trading via telegram # the strat will send orders to exchange only if this flag is set to True self._place_orders_ = False self._net_notional_ = 0.0 self._pnl_ = 0.0 # used to keep track of PNL # best bid/ask related variables, which is used for trading signal self._best_bid_ = Price.zero() self._best_bid_qty_ = Qty.zero() self._best_ask_ = Price.zero() self._best_ask_qty_ = Qty.zero() self._pos_ = Qty.zero() self._last_order_cancelled_ts_ = math.nan self._quote_delay_ = 30000 # 30 seconds
[docs] def on_framework_connected(self, env: IStratEnv): self.log_.info("Now connected to framework") self.env_ = env # login is REQUIRED, to let trading environment register this strategy so that we could start to send msg # back and forward self.env_.login("demo_strat") # every 2 seconds (2000 milliseconds), we would update the order (send order, modify order, cancel order, etc.) self.env_.setup_timer(2000, self.on_recurring_two_secs) # every 5 seconds, get position information self.env_.setup_timer(5000, self.on_one_shot_five_secs) # # Hook command is just manual command which is hooked to a callback func. Once the strategy is running and add # a hook command, go to the corresponding telegram bot and use MENU, a button with the command name would show # in there. Click the command, our trading environment will invoke the corresponding callback. # TODO: may not use telegram in the future # For example, here we use a "start_trading" hook command. Go to the telegram bot and there'll be a # "start_trading" button in the MENU. Once clicking on it, basically the strategy will start trading (update # orders, etc.) self.env_.add_cmd_hook("start_trading", self.start_trading_command) self.env_.add_cmd_hook("stop_trading", self.stop_trading_command) self.env_.add_cmd_hook("get_position", self.get_position_command) self.env_.add_cmd_hook("get_orders", self.get_orders_command) self.env_.add_cmd_hook("get_pnl", self.get_pnl_command) self._place_orders_ = False
[docs] def on_framework_disconnected(self): # This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please # refer to on_framework_disconnected in IStrategy documentation reference for more information self.log_.info("Framework is disconnected!!")
[docs] def on_start(self, params: str): # This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please # refer to on_start in IStrategy documentation reference for more information self.log_.info("on_start() called with params '" + params + "'") self.env_.add_cmd_hook("test_command", self.hooked_command_test)
[docs] def on_stop(self, params: str): # This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please # refer to on_stop in IStrategy documentation reference for more information self.log_.info("on_stop() called with params '" + params + "'") self.env_.send_stop_ack() # send to trading system side that the strategy has received the stop msg
[docs] def on_instruments(self, instruments: typing.Iterable[Instrument]): """Define the behavior of this strat when receiving all the information of instruments. A common practice is to keep the instrument inside this strat class. This strat will log the instrument info here at the same time, to help researchers check all the information.""" self._instruments_by_name_ = {instrument.get_instrument_name(): instrument for instrument in instruments} # sometimes keeping instruments_by_id is necessary as well, for example: # self._instruments_by_id_ = {instrument.get_instrument_id(): instrument for instrument in instruments} # instrument ID is an unique identifier for an instrument. e.g. 30000000003 = byt:perp:BTC-USDT (i.e. BTC-USDT # linear perpetual in Bybit) for instrument in instruments: self.log_.info("fetched instrument: " + str(instrument)) self.log_.info(''' "instrument_type":"{instrument_type}", "mlt_symbol":"{mlt_symbol}", "price_tick_rules": {price_tick_rules}, "quantity_tick_rules": {quantity_tick_rules}, "multiplier":"{multiplier}", "valid_start_ts":{valid_start_ts}, "valid_end_ts":{valid_end_ts}, "instrument_group_id":"{instrument_group_id}", "base_currency":"{base_currency}", "quote_currency":"{quote_currency}", "min_quantity":"{min_quantity}", "exch_instr_id":"{exch_instr_id}", "quantification":"{quantification}", "instrument_id":{instrument_id}, "price_precision":{price_precision}, "quantity_precision":{quantity_precision} '''.format(instrument_type=instrument.get_instrument_type(), mlt_symbol=instrument.get_mlt_symbol(), price_tick_rules=instrument.get_price_tick_rules(), quantity_tick_rules=instrument.get_quantity_tick_rules(), multiplier=instrument.get_multiplier(), valid_start_ts=instrument.get_valid_start_ts(), valid_end_ts=instrument.get_valid_end_ts(), instrument_group_id=instrument.get_instrument_group_id(), base_currency=instrument.get_base_currency(), quote_currency=instrument.get_quote_currency(), min_quantity=instrument.get_min_quantity(), exch_instr_id=instrument.get_exch_instr_id(), quantification=instrument.get_quantification(), instrument_id=instrument.get_instrument_id(), price_precision=instrument.get_price_precision(), quantity_precision=instrument.get_quantity_precision())) # The next several lines are specific to this strategy. Don't need to follow them in other strats. # In this example, we use BTC-USDT in Bybit as the trading target. So in this case, we assure that the first # instrument sent from our trading system would be BTC-USDT in Bybit. That's why we just use the first # instrument here. In other cases, BTC-USDT in Bybit may not be the first instrument. self._instrument_ = list(instruments)[0] self._instrument_id_ = self._instrument_.get_instrument_id() # use the first instrument (byt:perp:BTC-USDT) self._exch_ = self._instrument_.get_exchange()
[docs] def on_strat_params_update(self, strat_params: dict): """This strat doesn't need any strat params, so just put a log and skip it""" self.log_.info("on_strat_params_update() is invoked")
# ----------------------------------------------------------------------------------- # ----------------------------- Timer and Hook Callback ----------------------------- # -----------------------------------------------------------------------------------
[docs] def on_recurring_two_secs(self): self.log_.warning("2 second recurring timer callback!") self.update_orders_() return True
[docs] def on_one_shot_five_secs(self): self.log_.info("5 second one-shot timer callback!") if self._instrument_: # If instrument information is received, fetch position info request self.env_.fetch_position_info(Exchange.Bybit, instrument_id=self._instrument_id_) return False
[docs] def hooked_command_test(self, params: str): self.log_.info("Hooked command test callback with params: '%s'" % params) """Instrument ID 30000000003: BTC-USDT USDT perpetual in Bybit""" self.env_.fetch_position_info(Exchange.Bybit, instrument_id=self._instrument_id_)
[docs] def get_pnl_command(self, params: str): # it would publish the PNL into telegram channel TODO: telegram may be replaced in the future self.env_.publish_telegram("PnL: " + str(self._pnl_))
[docs] def start_trading_command(self, params: str): self._place_orders_ = True self.log_.info("Start trading command sent!!") self.env_.publish_telegram("Starting Trading!!")
[docs] def stop_trading_command(self, params: str): self._place_orders_ = False self.cancel_all_open_orders_("Stop Trading command sent") self.log_.info("Stop trading command sent!!") self.env_.publish_telegram("Stop Trading, cancelling all open orders!!")
[docs] def get_position_command(self, params: str): self.env_.publish_telegram("Position: " + self._pos_.to_str()) self.log_.info("Get position command sent!!")
[docs] def get_orders_command(self, params: str): self.env_.fetch_orders_info(exchange=self._exch_, instrument_id=self._instrument_id_) self.log_.info("Get orders command sent!!")
[docs] def format_price_and_qty_(self, price: typing.Union[float, Price], qty: Qty, side: Side): """This function would round the price and qty to the nearest tick up/down (depending on the side) so that new/modify/cancel order could be sent with valid price and qty. """ # qty is ticked down with size of one tick equal to min quantity here. Note that min quantity is used here # for simplicity. instrument.get_quantity_tick_rules should be used for detailed quantity tick rules. # Please refer to get_quantity_tick_rules in Instrument documentation reference for more information. qty_tick_size = self._instrument_.get_min_quantity().to_float() qty = TickFormat.qty_down_from_float(qty.to_float(), tick_size=qty_tick_size, tick_unit=TickUnit.Lot) # Rounding price to the nearest tick up/down using default tick size from instrument information. Tick sizes in # instrument information are normalized to cents scale. For simplicity, default tick size is used here. # Please refer to get_price_tick_rules in Instrument documentation reference for more information. price_tick_size = float(self._instrument_.get_price_tick_rules()["default"]) * 100 if side == Side.Buy: price = Price.from_str(str(price)) if isinstance(price, float) else price price = TickFormat.price_up_from_float(price.to_float(), tick_size=price_tick_size, tick_unit=TickUnit.Cent) else: price = Price.from_str(str(price)) if isinstance(price, float) else price price = TickFormat.price_down_from_float(price.to_float(), tick_size=price_tick_size, tick_unit=TickUnit.Cent) return price, qty
[docs] def send_order_(self, side: Side, price: typing.Union[float, Price], qty: Qty, reason: str, exch: Exchange, instrument_id: int, exec_instr=ExecInstructions.Unset, ord_type=OrderType.Limit, tif=TIF.GTC, close_position: bool = False): """Function used by strat to send a new order to our trading system. Order manager is used to determine if order can be sent and to keep track of order through it's lifecycle""" # format price and qty price, qty = self.format_price_and_qty_(price=price, qty=qty, side=side) # use order manager to prepare this new order. order manager would track whether this order is valid and ready # to be sent. If yes, it would automatically generate an unique ID for the order, so the researcher won't need # to create or record the ID manually. order = self._order_mgr_.prepare_new_order(instrument_id=instrument_id, side=side, price=price, qty=qty, exec_instr=exec_instr, order_type=OrderType.Limit, tif=TIF.GTC, nullable_attachment=reason) if order: # if order is not None, then it's really to be sent new_order_clid = order.get_client_id() self.env_.send_new_order(client_id=new_order_clid, exchange=exch, instrument_id=instrument_id, side=side, price=price, qty=qty, exec_instr=exec_instr, ord_type=ord_type, time_in_force=tif, close_position=close_position) self.env_.publish_telegram("New order " + str(new_order_clid) + ": " + side.name + " " + qty.to_str() + "@" + price.to_str() + " " + str(new_order_clid) + " reason: " + reason) else: self.env_.publish_telegram("Not able to send a new_order!!")
[docs] def modify_order_(self, client_id: str, side: Side, price: typing.Union[float, Price], qty: Qty, reason: str, exch: Exchange): """Function used by strat to send a modify order to our trading system. Order manager is used to determine if order can be modified and to keep track of order through it's lifecycle""" # format price and qty price, qty = self.format_price_and_qty_(price=price, qty=qty, side=side) # use order manager to prepare this modify order. order manager would track whether this order is valid and # ready to be sent. If yes, it would return True, False otherwise. With the order mgr, researchers won't cause # trading environment or need to wait for the rejection from exchange if ModifyOrder request is invalid or there # is any pending status, etc (eg: Modify is invalid on canceled order). success = self._order_mgr_.prepare_modify_order(client_id=client_id, new_price=price, new_qty=qty) if success: self.env_.send_modify_order(exchange=exch, client_ord_id=client_id, new_price=price, new_qty=qty) self.env_.publish_telegram("Sent a modify order " + str(client_id) + ": " + side.name + " " + qty.to_str() + "@" + price.to_str() + " " + " reason: " + reason)
[docs] def cancel_order_(self, order: Order, instrument_id: int, reason: str = 'cancel_order'): """Function used by strat to send a cancel order to our trading system. Order manager is used to determine if order can be cancelled and to keep track of order through it's lifecycle""" order_id = order.get_client_id() # order cannot be cancelled if it has PendingNew, PendingModify or PendingCancel status, so check is needed if order.is_new_pending() or order.is_modify_pending() or order.is_cancel_pending(): order_state = "PendingNew" if order.is_new_pending() else \ "PendingModify" if order.is_modify_pending() else "PendingCancel" self.log_.info("Cannot cancel Order: " + str(order_id) + " in " + order_state + " state") return # Use order manager to prepare this cancel order. Order manager would track whether the cancel is valid and # ready to be sent. If yes, it would return True, False otherwise. With the order mgr, researchers won't cause # trading environment or need to wait for the rejection from exchange if CancelOrder request is invalid or there # is any pending status, etc. ready_to_send_cancel = self._order_mgr_.prepare_cancel_order(client_id=order_id) if ready_to_send_cancel: self.env_.send_cancel_order(exchange=self._exch_, instrument_id=instrument_id, client_ord_id=order_id) self.env_.publish_telegram("Sent a cancel: " + str(order_id) + " reason:" + reason)
[docs] def cancel_all_open_orders_(self, reason='cancel_all'): # If applied correctly, order manager can return all open orders. This function is used by strat to cancel # all open orders orders = self._order_mgr_.get_open_orders() if orders: self.log_.info("Cancelling all open orders!!") for order in orders: self.cancel_order_(order, instrument_id=self._instrument_id_, reason=reason)
[docs] def cancel_all_open_orders_on_side_(self, side: Side, reason='cancel_all'): # If applied correctly, order manager can return all open orders. This function is used by strat to cancel # all open orders on a particular side orders = self._order_mgr_.get_open_orders_on_side(side=side) if orders: self.log_.info("Cancelling all open orders on side " + side.name) for order in orders: self.cancel_order_(order, instrument_id=self._instrument_id_, reason=reason)
[docs] def modify_resting_orders_(self, side: Side, price: Price, qty: Qty): # Function used by strat to modify all resting orders with new price on a side modify_orders = True orders = self._order_mgr_.get_open_orders_on_side(side=side) if orders: for order in orders: # send modify order all the resting orders (on the target side but with a different price from target) if order.get_side() == side and order.get_price() != price: self.modify_order_(order.get_client_id(), side, price, qty, reason="Modify resting order", exch=self._exch_) else: modify_orders = False return modify_orders
[docs] def is_market_valid_(self): return self._best_bid_ < self._best_ask_
[docs] def update_orders_(self): """This function implements the trading logic and updates orders as necessary""" if (not self.is_market_valid_()) or (not self._place_orders_): self.log_.warning("Invalid Market or trading is disabled !!") self.cancel_all_open_orders_() else: # If order was cancelled because bid/ask qty fell below _entry_qty_threshold_, then this strat waits # for 30seconds before placing new order. This is just to minimize frequency of order placement in this # strat if previous orders were cancelled. now_ts = utils.get_now_ts() if math.isnan(self._last_order_cancelled_ts_): self._last_order_cancelled_ts_ = now_ts - (self._quote_delay_ + 1) if (now_ts < self._last_order_cancelled_ts_ + self._quote_delay_) and self._pos_ == Qty.zero(): return if self._pos_ == Qty.zero(): # If current position is zero, rest at the top bid/offer behind a fixed threshold(_entry_qty_threshold_) # size. If there are already existing orders modify them, else place new order qty = self._resting_size_ if self._best_bid_qty_ > self._entry_qty_threshold_: price = self._best_bid_ side = Side.Buy modify_orders = self.modify_resting_orders_(side=side, price=price, qty=qty) if not modify_orders: self.send_order_(side=Side.Buy, price=price, qty=qty, reason="Open Position", exch=self._exch_, instrument_id=self._instrument_id_, exec_instr=ExecInstructions.PostOnly, close_position=False) else: self.cancel_all_open_orders_on_side_(side=Side.Buy) if self._best_ask_qty_ > self._entry_qty_threshold_: price = self._best_ask_ side = Side.Sell modify_orders = self.modify_resting_orders_(side=side, price=price, qty=qty) if not modify_orders: self.send_order_(side=Side.Sell, price=price, qty=qty, reason="Open Position", exch=self._exch_, instrument_id=self._instrument_id_, exec_instr=ExecInstructions.PostOnly, close_position=False) else: self.cancel_all_open_orders_on_side_(side=Side.Sell) else: # If a resting buy(sell) order is filled, a closing sell(buy) order is placed if the size on the # top bid(offer) is at least as much as a fixed threshold(_exit_qty_threshold_) size. If the size of the # bid(offer) drops below the threshold, an aggressive order closing order is sent to close the position. qty = abs(self._pos_) exit_exec_instr = ExecInstructions.Unset if self._pos_ > Qty.zero(): side = Side.Sell if self._best_bid_qty_ > self._exit_qty_threshold_: # Closing sell rest order will be placed price = self._best_ask_ else: # Aggressive closing sell order will be placed to close the position price = self._best_bid_ else: side = Side.Buy if self._best_ask_qty_ > self._exit_qty_threshold_: # Closing buy rest order will be placed price = self._best_bid_ else: # Aggressive closing buy order will be placed to close the position price = self._best_ask_ modify_orders = self.modify_resting_orders_(side=side, price=price, qty=qty) if not modify_orders: self.send_order_(side=side, price=price, qty=qty, reason="Close Position", exch=self._exch_, instrument_id=self._instrument_id_, exec_instr=exit_exec_instr, close_position=True)
# ----------------------------------------------------------------------------------- # ------------------------------ Order Entry Callbacks ------------------------------ # -----------------------------------------------------------------------------------
[docs] def on_new_order_ack(self, exchange: Exchange, client_id: str, exchange_id: str, instrument_id: int, side: Side, price: Price, qty: Qty, leaves_qty: Qty, order_type: OrderType, tif: TIF, exec_instr: ExecInstructions): # use order manager to check whether this order is sent by the strat order = self._order_mgr_.get_order_with_client_id(client_id) if order: # use order manager to keep track of NewOrderAck for this order, so order can be tracked through out # it's life cycle using order mgr self._order_mgr_.apply_new_order_ack(client_id=client_id, exchange_id=exchange_id, price=price, leaves_qty=qty) self.log_.info("on_new_order_ack: {ep} {clid} {side} {qty} @ {price}" .format(ep=exchange.name, side=side.name, clid=client_id, qty=qty, price=price))
[docs] def on_modify_order_ack(self, exchange: Exchange, client_id: str, new_price: Price, new_qty: Qty, leaves_qty: Qty): # use order manager to check whether this order is sent by the strat order = self._order_mgr_.get_order_with_client_id(client_id) if order: # use order manager to keep track of ModifyOrderAck for this order, so order can be tracked through out # it's life cycle using order mgr self._order_mgr_.apply_modify_order_ack(client_id=client_id, leaves_qty=leaves_qty) self.log_.info("on_modify_order_ack: {ep} {clid} {qty} @ {price}" .format(ep=exchange.name, clid=client_id, qty=new_qty, price=new_price))
[docs] def on_cancel_all_ack(self, exchange: Exchange, instrument_id: int): self.log_.info("on_cancel_all_ack: {ep} {instr_id}".format(ep=exchange.name, instr_id=instrument_id))
[docs] def on_cancel_order_ack(self, exchange: Exchange, client_id: str): # use order manager to keep track of CancelOrderAck for this order, so order can be tracked through out # it's life cycle using order mgr. Keep track of when order is cancelled self._order_mgr_.apply_cancel_order_ack(client_id=client_id) self.log_.info("on_cancel_order_ack: {ep} {clid}".format(ep=exchange.name, clid=client_id)) self._last_order_cancelled_ts_ = utils.get_now_ts()
[docs] def on_new_order_reject(self, exchange: Exchange, client_id: str, reject_code: GtwRejectCode, reject_reason: str): # use order manager to keep track of NewOrderReject for this order, so order can be tracked through out # it's life cycle using order mgr self._order_mgr_.apply_new_order_reject(client_id=client_id) self.log_.info("on_new_order_reject: {ep} {clid} {rej_code} '{rej_detail}'" .format(ep=exchange.name, clid=client_id, rej_code=reject_code.name, rej_detail=reject_reason))
[docs] def on_modify_order_reject(self, exchange: Exchange, client_id: str, reject_code: GtwRejectCode, reject_reason: str): # use order manager to keep track of ModifyOrderReject for this order, so order can be tracked through out # it's life cycle using order mgr self._order_mgr_.apply_modify_order_reject(client_id=client_id) self.log_.info("on_modify_order_reject: {ep} {clid} {rej_code} '{rej_detail}'" .format(ep=exchange.name, clid=client_id, rej_code=reject_code.name, rej_detail=reject_reason))
[docs] def on_cancel_all_reject(self, exchange: Exchange, instrument_id: int, reject_code: GtwRejectCode, reject_reason: str): self.log_.info("on_cancel_all_reject: {ep} {instr_id} {rej_code} '{rej_detail}'" .format(ep=exchange.name, instr_id=instrument_id, rej_code=reject_code.name, rej_detail=reject_reason))
[docs] def on_cancel_order_reject(self, exchange: Exchange, client_id: str, reject_code: GtwRejectCode, reject_reason: str): # use order manager to keep track of CancelOrderReject for this order, so order can be tracked through out # it's life cycle using order mgr self._order_mgr_.apply_cancel_order_reject(client_id) self.log_.info("on_cancel_order_reject: {ep} {clid} {rej_code} '{rej_detail}'" .format(ep=exchange.name, clid=client_id, rej_code=reject_code.name, rej_detail=reject_reason))
[docs] def on_order_execution(self, exchange: Exchange, client_id: str, side: Side, price: Price, fill_qty: Qty, leaves_qty: Qty, exec_ts: int, recv_ts: int): if leaves_qty and leaves_qty.is_null(): leaves_qty = None # use order manager to check whether this order is sent by the strat order = self._order_mgr_.get_order_with_client_id(client_id) if order: # use order manager to keep track of OrderExecution for this order, so order can be tracked through out # it's life cycle using order mgr. Order manager would return a boolean value here to # inform whether the order is fully executed or not. If yes, the strat can start to calculate realized PNL is_fully_executed = self._order_mgr_.apply_order_execution(client_id=client_id, fill_qty=fill_qty, nullable_leaves_qty=leaves_qty) if is_fully_executed is not None: # then calculate PNL qty = fill_qty if side == Side.Buy else -fill_qty self._pos_ += qty self.log_.info( "on_order_execution: {ep} {clid} {side} {qty} @ {price} transac_ts={ex_ts} recv_ts={rx_ts}" .format(ep=exchange.name, side=side.name, clid=client_id, qty=fill_qty, price=price, ex_ts=exec_ts, rx_ts=recv_ts)) if price > Price.zero(): self._net_notional_ += qty.to_float() / price.to_float() if self._pos_ == Qty.zero(): self._pnl_ = round(self._net_notional_, 4)
[docs] def on_order_cancelled(self, exchange: Exchange, client_id: str, unsolicited: bool, engine_ts: int, recv_ts: int, cancel_code: CancelCode, cancel_reason: str): # use order manager to keep track of OrderCancelled for this order, so order can be tracked through out # it's life cycle using order mgr self._order_mgr_.apply_order_cancelled(client_id=client_id) self.log_.info("on_order_cancelled: {ep} {clid} {clx_code} '{clx_reason}' transac_ts={ex_ts} recv_ts={rx_ts}" .format(ep=exchange.name, clid=client_id, clx_code=cancel_code.name, clx_reason=cancel_reason, ex_ts=engine_ts, rx_ts=recv_ts))
# ----------------------------------------------------------------------------------- # ------------------------------ Account Info Callbacks ----------------------------- # -----------------------------------------------------------------------------------
[docs] def on_order_status(self, exchange: Exchange, instrument_id: int, client_ord_id: str, exch_ord_id: str, side: Side, price: Price, leaves_qty: Qty, status: OrderStatus, order_type: OrderType, tif: TIF, exec_instr: ExecInstructions) -> None: # This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please # refer to on_order_status in IStrategy documentation reference for more information self.log_.info("on_order_status: {ep} {clid} {side} {qty} @ {price} status={status}" .format(ep=exchange.name, side=side.name, clid=client_ord_id, qty=leaves_qty, price=price, status=status.name))
[docs] def on_orders_info(self, exchange: Exchange, orders: typing.List[Order]): # This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please # refer to on_orders_info in IStrategy documentation reference for more information self.log_.info("on_orders_info: {ep} {num_orders} orders" .format(ep=exchange.name, num_orders=len(orders)))
[docs] def on_wallet_info(self, exchange: Exchange, coin: str, total_balance: Qty, available_balance: Qty): # This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please # refer to on_wallet_info in IStrategy documentation reference for more information self.log_.info("on_wallet_info: {ep} {coin} total_balance={tot_bal} available_balance={avail_bal}" .format(ep=exchange.name, coin=coin, tot_bal=total_balance, avail_bal=available_balance))
[docs] def on_position_info(self, exchange: Exchange, instrument_id: int, position: Qty): # This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please # refer to on_position_info in IStrategy documentation reference for more information self.log_.info("on_position_info: {ep} {instr_id} position={pos}" .format(ep=exchange.name, instr_id=instrument_id, pos=position))
[docs] def on_funding_info(self, exchange: Exchange, instrument_id: int, funding_rate: float, next_funding_ts: int): # Here we just log the funding info. May need funding rate, funding ts, for the strategy signal, pnl calc, # etc. Please refer to on_funding_info in IStrategy documentation reference for more information self.log_.info("on_funding_info: {ep} {instr_id} funding_rate={fund_rate} next_funding_ts={fund_ts}" .format(ep=exchange.name, instr_id=instrument_id, fund_rate=funding_rate, fund_ts=next_funding_ts))
[docs] def on_account_info(self, exchange: Exchange, user_id: str): # This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please # refer to on_account_info in IStrategy documentation reference for more information self.log_.info("on_account_info: {ep} user_id='{user_id}'".format(ep=exchange, user_id=user_id))
[docs] def on_request_reject(self, exchange: Exchange, reject_code: GtwRejectCode, detail: str, rejected_rqst_type: str): # This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please # refer to on_request_reject in IStrategy documentation reference for more information self.log_.info("on_request_reject: {ep} {rej_code} '{rej_reason}' rejected_msg_type={rejed_msg_type}" .format(ep=exchange.name, rej_code=reject_code.name, rej_reason=detail, rejed_msg_type=rejected_rqst_type))
# ----------------------------------------------------------------------------------- # ------------------------------- Market Data Callbacks ----------------------------- # -----------------------------------------------------------------------------------
[docs] def on_public_trade(self, exchange: Exchange, instrument_id: int, side: Side, price: Price, qty: Qty, exec_ts: int, recv_ts: int): # Latency for public trade is calculated and logged here. May need to utilize the trade info in the strat. # Please refer to on_public_trade in IStrategy documentation reference for more information now_ts = utils.get_now_ts() total_latency = now_ts - exec_ts net_latency = recv_ts - exec_ts code_latency = now_ts - recv_ts self.log_.info("on_public_trade: {ep} {instr_id} {side} {qty} @ {px}" .format(ep=exchange.name, instr_id=instrument_id, side=side.name, qty=qty, px=price)) self.log_.info("total_latency({tot} ms) = net_latency({net} ms) + code_latency({code} ms)" .format(tot=total_latency, net=net_latency, code=code_latency))
[docs] def on_add_price_level(self, exchange: Exchange, instrument_id: int, side: Side, price: Price, qty: Qty, recv_ts: int): # This strategy just logs upon this callback. May need to utilize this add price level info in the strat. # Please refer to on_add_price_level in IStrategy documentation reference for more information self.log_.info("on_add_price_level: {ep} {instr_id} {side} {qty} @ {px}" .format(ep=exchange.name, instr_id=instrument_id, side=side.name, qty=qty, px=price))
[docs] def on_modify_price_level(self, exchange: Exchange, instrument_id: int, side: Side, price: Price, new_qty: Qty, recv_ts: int): # This strategy just logs upon this callback. May need to utilize this modify price level info in the strat. # Please refer to on_modify_price_level in IStrategy documentation reference for more information self.log_.info("on_modify_price_level: {ep} {instr_id} {side} {qty} @ {px}" .format(ep=exchange.name, instr_id=instrument_id, side=side.name, qty=new_qty, px=price))
[docs] def on_delete_price_level(self, exchange: Exchange, instrument_id: int, side: Side, price: Price, recv_ts: int): # This strategy just logs upon this callback. May need to utilize this delete price level info in the strat. # Please refer to on_delete_price_level in IStrategy documentation reference for more information self.log_.info("on_delete_price_level: {ep} {instr_id} {side} @ {px}" .format(ep=exchange.name, instr_id=instrument_id, side=side.name, px=price))
[docs] def on_add_level(self, exchange: Exchange, instrument_id: int, side: Side, level_id: int, price: Price, qty: Qty, recv_ts: int): # This strategy just logs upon this callback. May need to utilize this add level info in the strat. # Please refer to on_add_level in IStrategy documentation reference for more information self.log_.info("on_add_level: {ep} {instr_id} {lvl_id} {side} {qty} @ {px}" .format(ep=exchange.name, instr_id=instrument_id, lvl_id=level_id, side=side.name, qty=qty, px=price))
[docs] def on_modify_level(self, exchange: Exchange, instrument_id: int, side: Side, level_id: int, new_qty: Qty, recv_ts: int): # This strategy just logs upon this callback. May need to utilize this modify level info in the strat. # Please refer to on_modify_level in IStrategy documentation reference for more information self.log_.info("on_modify_level: {ep} {instr_id} {lvl_id} {side} {qty}" .format(ep=exchange.name, instr_id=instrument_id, lvl_id=level_id, side=side.name, qty=new_qty))
[docs] def on_delete_level(self, exchange: Exchange, instrument_id: int, side: Side, level_id: int, recv_ts: int): # This strategy just logs upon this callback. May need to utilize this delete level info in the strat. # Please refer to on_delete_level in IStrategy documentation reference for more information self.log_.info("on_delete_level: {ep} {instr_id} {lvl_id} {side}" .format(ep=exchange.name, instr_id=instrument_id, lvl_id=level_id, side=side.name))
[docs] def on_best_bid_level_update(self, exchange: Exchange, instrument_id: int, price: Price, qty: Qty, recv_ts: int): # This strategy utilize best bid info for the trading signal, just log the info if this info is unnecessary lat = utils.get_now_ts() - recv_ts self._best_bid_ = price self._best_bid_qty_ = qty self.log_.info("on_best_bid_update: {ep} {instr_id} {qty} @ {px} [{lat} ms]" .format(ep=exchange.name, instr_id=instrument_id, qty=qty, px=price, lat=lat)) self.update_orders_()
[docs] def on_best_ask_level_update(self, exchange: Exchange, instrument_id: int, price: Price, qty: Qty, recv_ts: int): # tTis strategy utilize best ask info for the trading signal, just log the info if this info is unnecessary lat = utils.get_now_ts() - recv_ts self._best_ask_ = price self._best_ask_qty_ = qty self.log_.info("on_best_ask_update: {ep} {instr_id} {qty} @ {px} [{lat} ms]" .format(ep=exchange.name, instr_id=instrument_id, qty=qty, px=price, lat=lat)) self.update_orders_()
# ----------------------------------------------------------------------------------- # --------------------------- Gtw and Feed Status Callbacks ------------------------- # -----------------------------------------------------------------------------------
[docs] def on_gateway_status(self, exchange: Exchange, status: GtwStatus, detail: str): # This strategy just logs upon this callback. May need to clear your gtw-related internal states here. # Please refer to on_gateway_status in IStrategy documentation reference for more information self.log_.info("on_gateway_status: {ep} status={stat} detail='{detail}'" .format(ep=exchange.name, stat=status.name, detail=detail))
[docs] def on_feed_handler_status(self, exchange: Exchange, status: FeedHandlerStatus, detail: str): # This strategy clears the book if new feed snap shot is started to rebuild the book when new feed is received. # Please refer to on_feed_handler_status in IStrategy documentation reference for more information self.log_.info("on_feed_handler_status: {ep} status={stat} detail='{detail}'" .format(ep=exchange.name, stat=status.name, detail=detail)) if status == FeedHandlerStatus.SnapshotStart: self.lvl_book_.clear()
[docs] def on_rate_limit_info(self, exchange: Exchange, rate_limit: int, rate_remaining: int, reset_ts: int): # This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please # refer to on_rate_limit_info in IStrategy documentation reference for more information self.log_.info("on_rate_limit_info: {ep} limit={lim} remaining={rem} reset_ts={ts}" .format(ep=exchange.name, lim=rate_limit, rem=rate_remaining, ts=reset_ts))
[docs] def on_unsupported_op(self, exchange: Exchange, unsupported_msg_type: str): # This strategy just logs upon this callback. Strategy specific code should be coded here if necessary. Please # refer to on_unsupported_op in IStrategy documentation reference for more information self.log_.info("on_unsupported_op: {ep} unsupported_msg_type={msg_type}" .format(ep=exchange.name, msg_type=unsupported_msg_type))