import json
from decimal import Decimal
from os import PathLike
import mlthon.basics.utils as utils
from mlthon.basics import logging
from mlthon.basics.defs import Side
from mlthon.basics.mldecimal import MLDecimal
from mlthon.basics.price import Price
from mlthon.basics.qty import Qty
from mlthon.basics.rate_tracker import RateTracker, SlicingRateTracker
from mlthon.risk.risk_metric_tracker import RiskMetricTracker
from mlthon.risk.risk_position import RiskPosition
[docs]class RiskManager:
def __init__(self, max_order_size: Qty, max_long_position: Qty, max_short_position: Qty,
max_order_submit_rate: int, submit_rate_window: int, max_open_orders_per_side: int,
max_notional_per_side: MLDecimal, max_directional_exposure: MLDecimal):
"""
:param max_order_size: Maximum allowed order size
:param max_long_position: Maximum allowed long position
:param max_short_position: Maximum allowed short position
:param max_order_submit_rate: Maximum number of orders
:param submit_rate_window: Window for maximum orders
:param max_open_orders_per_side: Maximum open orders on either side
:param max_notional_per_side: Maximum notional allowed on either side
:param max_directional_exposure: Maximum allowed net direction exposure
"""
self._max_order_size_ = max_order_size
self._max_long_position_ = max_long_position
self._max_short_position_ = max_short_position
self._max_order_submit_rate_ = max_order_submit_rate
self._submit_rate_window_ = submit_rate_window
self._max_open_orders_per_side_ = max_open_orders_per_side
self._max_notional_per_side_ = max_notional_per_side
self._max_directional_exposure_ = max_directional_exposure
self._metrics_by_exchange_symbol_ = {}
self._logger_ = logging.get_logger(self)
[docs] @staticmethod
def create_risk_manager_from_json(config: dict):
"""
:param config: Json config dict like
:return: RiskManager object
"""
if not isinstance(config, dict):
raise ValueError("Invalid json received. Json should be dict of key value pairs")
for key in ["max_order_size", "max_long_position", "max_short_position", "max_order_submit_rate",
"submit_rate_window", "max_open_orders_per_side", "max_notional_per_side",
"max_directional_exposure"]:
if key not in config:
raise ValueError("Missing {key} from json config".format(key=key))
valid_params = {
"max_order_size": Qty.from_json(config["max_order_size"]),
"max_long_position": Qty.from_json(config["max_long_position"]),
"max_short_position": Qty.from_json(config["max_short_position"]),
"max_order_submit_rate": int(config["max_order_submit_rate"]),
"submit_rate_window": int(config["submit_rate_window"]),
"max_open_orders_per_side": int(config["max_open_orders_per_side"]),
"max_notional_per_side": MLDecimal.from_json(config["max_notional_per_side"]),
"max_directional_exposure": MLDecimal.from_json(config["max_directional_exposure"]),
}
return RiskManager(**valid_params)
[docs] @staticmethod
def create_risk_manager_from_json_file(json_path: PathLike):
"""
:param json_path: Json path with dict like config parameters
:return: RiskManager object
"""
with open(json_path) as f:
config = json.load(f, parse_float=Decimal)
return RiskManager.create_risk_manager_from_json(config)
@property
def max_order_size(self) -> Qty:
return self._max_order_size_
@property
def max_long_position(self) -> Qty:
return self._max_long_position_
@property
def max_short_position(self) -> Qty:
return self._max_short_position_
@property
def max_order_submit_rate(self) -> int:
return self._max_order_submit_rate_
@property
def submit_rate_window(self) -> int:
return self._submit_rate_window_
@property
def max_open_orders_per_side(self) -> int:
return self._max_open_orders_per_side_
@property
def max_notional_per_side(self) -> MLDecimal:
return self._max_notional_per_side_
@property
def max_directional_exposure(self) -> MLDecimal:
return self._max_directional_exposure_
[docs] def is_valid_new_order_to_send(self, exchange_id: str, symbol: str, qty: Qty, side: Side, price: Price) -> bool:
"""
Validates if new order added to portfolio exceeds risk metrics
:param exchange_id: exchange id
:param symbol: symbol of new order
:param qty: quantity of new order
:param side: side of new order
:param price: price of new order
:return: True or False
"""
risk_position = RiskPosition(exchange_id=exchange_id, symbol=symbol, qty=qty, side=side, price=price)
# Validate order submit rate
rate_tracker = self.__get_rate_tracker(exchange_id=exchange_id)
is_valid_risk = rate_tracker.is_valid_rate(ts=utils.get_now_ts())
if not is_valid_risk:
self._logger_.warning("Order submit rate is exceeded")
return False
# Validate other risk metrics
risk_tracker = self.__get_risk_metric_tracker(exchange_id=exchange_id, symbol=symbol)
risk_tracker.prepare_new_position(risk_position)
# Validate order size
is_valid_risk = self.__is_valid_order_size(qty)
if not is_valid_risk:
risk_tracker.cancel_new_position(risk_position)
self._logger_.warning("Order size threshold exceeded")
return False
# Validate net position
is_valid_risk = is_valid_risk and risk_tracker.is_valid_position(self.max_long_position, self.max_short_position)
if not is_valid_risk:
risk_tracker.cancel_new_position(risk_position)
self._logger_.warning("Total open position threshold exceeded")
return False
# Validate open orders
is_valid_risk = is_valid_risk and risk_tracker.is_valid_open_orders_per_side(self.max_open_orders_per_side)
if not is_valid_risk:
risk_tracker.cancel_new_position(risk_position)
self._logger_.warning("Total open orders threshold exceeded")
return False
# Validate notional per side
is_valid_risk = is_valid_risk and risk_tracker.is_valid_notional_per_side(self.max_notional_per_side)
if not is_valid_risk:
risk_tracker.cancel_new_position(risk_position)
self._logger_.warning("Total open notional threshold exceeded")
return False
# Validate net directional exposure
is_valid_risk = is_valid_risk and risk_tracker.is_valid_directional_exposure(self.max_directional_exposure)
if not is_valid_risk:
risk_tracker.cancel_new_position(risk_position)
self._logger_.warning("Total directional exposure exceeded")
return False
return True
[docs] def is_valid_modify_to_send(self, exchange_id: str, symbol: str, qty: Qty, side: Side, price: Price,
new_qty: Qty, new_price: Price):
"""
Validates if new order added to portfolio exceeds risk metrics
:param exchange_id: exchange id
:param symbol: symbol of order
:param qty: quantity of order
:param side: side of order
:param price: price of order
:param new_qty: new quantity of order
:param new_price: new price of order
:return: True or False
"""
risk_position_old = RiskPosition(exchange_id=exchange_id, symbol=symbol, qty=qty, side=side, price=price)
risk_position_new = RiskPosition(exchange_id=exchange_id, symbol=symbol, qty=new_qty, side=side, price=new_price)
# Validate order submit rate
rate_tracker = self.__get_rate_tracker(exchange_id=exchange_id)
is_valid_risk = rate_tracker.is_valid_rate(ts=utils.get_now_ts())
if not is_valid_risk:
self._logger_.warning("Order submit rate is exceeded")
return False
# Validate other risk metrics
risk_tracker = self.__get_risk_metric_tracker(exchange_id=exchange_id, symbol=symbol)
risk_tracker.prepare_modify_position(risk_position_old, risk_position_new)
# Validate order size
is_valid_risk = self.__is_valid_order_size(qty)
if not is_valid_risk:
risk_tracker.on_modify_reject(risk_position_old, risk_position_new)
self._logger_.warning("Order size threshold exceeded")
return False
# Validate net position
is_valid_risk = is_valid_risk and risk_tracker.is_valid_position(self.max_long_position,
self.max_short_position)
if not is_valid_risk:
risk_tracker.on_modify_reject(risk_position_old, risk_position_new)
self._logger_.warning("Total open position threshold exceeded")
return False
# Validate open orders
is_valid_risk = is_valid_risk and risk_tracker.is_valid_open_orders_per_side(self.max_open_orders_per_side)
if not is_valid_risk:
risk_tracker.on_modify_reject(risk_position_old, risk_position_new)
self._logger_.warning("Total open orders threshold exceeded")
return False
# Validate notional per side
is_valid_risk = is_valid_risk and risk_tracker.is_valid_notional_per_side(self.max_notional_per_side)
if not is_valid_risk:
risk_tracker.on_modify_reject(risk_position_old, risk_position_new)
self._logger_.warning("Total open notional threshold exceeded")
return False
# Validate net directional exposure
is_valid_risk = is_valid_risk and risk_tracker.is_valid_directional_exposure(self.max_directional_exposure)
if not is_valid_risk:
risk_tracker.on_modify_reject(risk_position_old, risk_position_new)
self._logger_.warning("Total directional exposure exceeded")
return False
return True
[docs] def on_new_order_reject(self, exchange_id: str, symbol: str, qty: Qty, side: Side, price: Price):
"""
Update risk profile on new order rejection i.e. remove position from risk profile
:param exchange_id: exchange id
:param symbol: symbol of new order
:param qty: quantity of new order
:param side: side of new order
:param price: price of new order
:return: True or False
"""
risk_tracker = self.__get_risk_metric_tracker(exchange_id, symbol)
risk_position = RiskPosition(exchange_id=exchange_id, symbol=symbol, qty=qty, side=side, price=price)
risk_tracker.on_new_order_reject(risk_position)
[docs] def on_order_cancelled(self, exchange_id: str, symbol: str, qty: Qty, side: Side, price: Price):
"""
Update risk profile on order cancellation i.e. remove position from risk profile
:param exchange_id: exchange id
:param symbol: symbol of new order
:param qty: quantity of new order
:param side: side of new order
:param price: price of new order
:return: True or False
"""
risk_tracker = self.__get_risk_metric_tracker(exchange_id, symbol)
risk_position = RiskPosition(exchange_id=exchange_id, symbol=symbol, qty=qty, side=side, price=price)
risk_tracker.on_order_cancelled(risk_position)
[docs] def on_order_executed(self, exchange_id: str, symbol: str, qty: Qty, side: Side, price: Price):
"""
Update risk profile on order execution
:param exchange_id: exchange id
:param symbol: symbol of new order
:param qty: quantity of new order
:param side: side of new order
:param price: price of new order
:return: True or False
"""
risk_tracker = self.__get_risk_metric_tracker(exchange_id, symbol)
risk_position = RiskPosition(exchange_id=exchange_id, symbol=symbol, qty=qty, side=side, price=price)
risk_tracker.on_order_executed(risk_position)
[docs] def on_modify_reject(self, exchange_id: str, symbol: str, qty: Qty, side: Side, price: Price,
new_qty: Qty, new_price: Price):
"""
Update risk profile on new order rejection i.e. remove position from risk profile
:param exchange_id: exchange id
:param symbol: symbol of new order
:param qty: quantity of new order
:param side: side of new order
:param price: price of new order
:param new_qty: new quantity of order
:param new_price: new price of order
:return: True or False
"""
risk_tracker = self.__get_risk_metric_tracker(exchange_id, symbol)
risk_position_old = RiskPosition(exchange_id=exchange_id, symbol=symbol, qty=qty, side=side, price=price)
risk_position_new = RiskPosition(exchange_id=exchange_id, symbol=symbol, qty=new_qty, side=side, price=new_price)
risk_tracker.on_modify_reject(risk_position_old, risk_position_new)
[docs] def on_modify_ack(self, exchange_id: str, symbol: str, qty: Qty, side: Side, price: Price,
new_qty: Qty, new_price: Price):
"""
Update risk profile on order execution
:param exchange_id: exchange id
:param symbol: symbol of new order
:param qty: quantity of new order
:param side: side of new order
:param price: price of new order
:param new_qty: new quantity of order
:param new_price: new price of order
:return: True or False
"""
risk_tracker = self.__get_risk_metric_tracker(exchange_id, symbol)
risk_position_old = RiskPosition(exchange_id=exchange_id, symbol=symbol, qty=qty, side=side, price=price)
risk_position_new = RiskPosition(exchange_id=exchange_id, symbol=symbol, qty=new_qty, side=side, price=new_price)
risk_tracker.on_modify_ack(risk_position_old, risk_position_new)
def __get_risk_metric_tracker(self, exchange_id: str, symbol: str) -> RiskMetricTracker:
if exchange_id not in self._metrics_by_exchange_symbol_:
self._metrics_by_exchange_symbol_[exchange_id] = {}
self._metrics_by_exchange_symbol_[exchange_id]["symbol_metric_tracker"] = {}
elif "symbol_metric_tracker" not in self._metrics_by_exchange_symbol_[exchange_id]:
self._metrics_by_exchange_symbol_[exchange_id]["symbol_metric_tracker"] = {}
if symbol not in self._metrics_by_exchange_symbol_[exchange_id]["symbol_metric_tracker"]:
risk_tracker = RiskMetricTracker(exchange_id=exchange_id, symbol=symbol)
self._metrics_by_exchange_symbol_[exchange_id]["symbol_metric_tracker"][symbol] = risk_tracker
return self._metrics_by_exchange_symbol_[exchange_id]["symbol_metric_tracker"][symbol]
def __get_rate_tracker(self, exchange_id: str) -> RateTracker:
if exchange_id not in self._metrics_by_exchange_symbol_:
self._metrics_by_exchange_symbol_[exchange_id] = {}
if "rate_tracker" not in self._metrics_by_exchange_symbol_[exchange_id]:
rate_tracker = SlicingRateTracker(max_rate=self.max_order_submit_rate, window_ms=self.submit_rate_window)
self._metrics_by_exchange_symbol_[exchange_id]["rate_tracker"] = rate_tracker
return self._metrics_by_exchange_symbol_[exchange_id]["rate_tracker"]
def __is_valid_order_size(self, qty: Qty) -> bool:
return qty <= self.max_order_size