mirror of
https://gitlab.com/simple-stock-bots/simple-stock-bot.git
synced 2025-08-02 11:31:35 +00:00
formatting
This commit is contained in:
@@ -1,362 +1,371 @@
|
||||
import datetime as dt
|
||||
import logging
|
||||
import os
|
||||
from collections import OrderedDict
|
||||
from typing import Dict
|
||||
|
||||
import humanize
|
||||
import pandas as pd
|
||||
import pytz
|
||||
import requests as r
|
||||
import schedule
|
||||
|
||||
from common.Symbol import Stock
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MarketData:
|
||||
"""
|
||||
Functions for finding stock market information about symbols from MarkData.app
|
||||
"""
|
||||
|
||||
SYMBOL_REGEX = "[$]([a-zA-Z]{1,4})"
|
||||
|
||||
symbol_list: Dict[str, Dict] = {}
|
||||
charts: Dict[Stock, pd.DataFrame] = {}
|
||||
|
||||
openTime = dt.time(hour=9, minute=30, second=0)
|
||||
marketTimeZone = pytz.timezone("US/Eastern")
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Creates a Symbol Object
|
||||
|
||||
Parameters
|
||||
----------
|
||||
MARKETDATA_TOKEN : str
|
||||
MarketData.app API Token
|
||||
"""
|
||||
|
||||
try:
|
||||
self.MARKETDATA_TOKEN = os.environ["MARKETDATA"]
|
||||
|
||||
if self.MARKETDATA_TOKEN == "TOKEN":
|
||||
self.MARKETDATA_TOKEN = ""
|
||||
except KeyError:
|
||||
self.MARKETDATA_TOKEN = ""
|
||||
log.warning("Starting without an MarketData.app Token will not allow you to get market data!")
|
||||
log.warning("Use this affiliate link so that the bot can stay free:")
|
||||
log.warning("https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=repo")
|
||||
|
||||
if self.MARKETDATA_TOKEN != "":
|
||||
schedule.every().day.do(self.clear_charts)
|
||||
|
||||
self.get_symbol_list()
|
||||
schedule.every().day.do(self.get_symbol_list)
|
||||
|
||||
def get(self, endpoint, params=None, timeout=10, headers=None) -> dict:
|
||||
url = "https://api.marketdata.app/v1/" + endpoint
|
||||
|
||||
if params is None:
|
||||
params = {}
|
||||
|
||||
# set token param if it wasn't passed.
|
||||
params["token"] = self.MARKETDATA_TOKEN
|
||||
|
||||
# Undocumented query variable that ensures bot usage can be
|
||||
# monitored even if someone doesn't make it through an affiliate link.
|
||||
params["application"] = "simplestockbot"
|
||||
|
||||
if headers is None:
|
||||
headers = {}
|
||||
headers = {"User-Agent": "Simple Stock Bot anson@ansonbiggs.com"} | headers
|
||||
|
||||
resp = r.get(url, params=params, timeout=timeout, headers=headers)
|
||||
|
||||
logging.error(resp.headers.items())
|
||||
|
||||
# Make sure API returned a proper status code
|
||||
try:
|
||||
resp.raise_for_status()
|
||||
except r.exceptions.HTTPError as e:
|
||||
logging.error(e)
|
||||
return {}
|
||||
|
||||
# Make sure API returned valid JSON
|
||||
try:
|
||||
resp_json = resp.json()
|
||||
|
||||
match resp_json["s"]:
|
||||
case "ok":
|
||||
return resp_json
|
||||
case "no_data":
|
||||
return resp_json
|
||||
case "error":
|
||||
logging.error("MarketData Error:\n" + resp_json["errmsg"])
|
||||
return {}
|
||||
|
||||
except r.exceptions.JSONDecodeError as e:
|
||||
logging.error(e)
|
||||
|
||||
return {}
|
||||
|
||||
def symbol_id(self, symbol: str) -> Dict[str, Dict]:
|
||||
return self.symbol_list.get(symbol.upper(), None)
|
||||
|
||||
def get_symbol_list(self):
|
||||
# Doesn't use `self.get`` since needs are much different
|
||||
sec_resp = r.get(
|
||||
"https://www.sec.gov/files/company_tickers.json",
|
||||
headers={
|
||||
"User-Agent": "Simple Stock Bot anson@ansonbiggs.com",
|
||||
"Accept-Encoding": "gzip, deflate",
|
||||
"Host": "www.sec.gov",
|
||||
},
|
||||
)
|
||||
sec_resp.raise_for_status()
|
||||
sec_data = sec_resp.json()
|
||||
|
||||
for rank, ticker_info in sec_data.items():
|
||||
self.symbol_list[ticker_info["ticker"]] = {
|
||||
"ticker": ticker_info["ticker"],
|
||||
"title": ticker_info["title"],
|
||||
"mkt_cap_rank": rank,
|
||||
}
|
||||
|
||||
def clear_charts(self) -> None:
|
||||
"""
|
||||
Clears cache of chart data.
|
||||
Charts are cached so that only 1 API call per 24 hours is needed since the
|
||||
chart data is expensive and a large download.
|
||||
"""
|
||||
self.charts = {}
|
||||
|
||||
def status(self) -> str:
|
||||
# TODO: At the moment this API is poorly documented, this function likely needs to be revisited later.
|
||||
|
||||
try:
|
||||
status = r.get(
|
||||
"https://stats.uptimerobot.com/api/getMonitorList/6Kv3zIow0A",
|
||||
timeout=5,
|
||||
)
|
||||
status.raise_for_status()
|
||||
except r.HTTPError:
|
||||
return f"API returned an HTTP error code {status.status_code} in {status.elapsed.total_seconds()} seconds."
|
||||
except r.Timeout:
|
||||
return "API timed out before it was able to give status. This is likely due to a surge in usage or a complete outage."
|
||||
|
||||
statusJSON = status.json()
|
||||
|
||||
if statusJSON["status"] == "ok":
|
||||
return (
|
||||
f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} seconds."
|
||||
)
|
||||
else:
|
||||
return f"MarketData.app is currently reporting the following status: {statusJSON['status']}"
|
||||
|
||||
def price_reply(self, symbol: Stock) -> str:
|
||||
"""Returns price movement of Stock for the last market day, or after hours.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : Stock
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Formatted markdown
|
||||
"""
|
||||
|
||||
if quoteResp := self.get(f"stocks/quotes/{symbol.symbol}/"):
|
||||
price = round(quoteResp["last"][0], 2)
|
||||
|
||||
try:
|
||||
changePercent = round(quoteResp["changepct"][0], 2)
|
||||
except TypeError:
|
||||
return f"The price of {symbol.name} is ${price}"
|
||||
|
||||
message = f"The current price of {symbol.name} is ${price} and "
|
||||
|
||||
if changePercent > 0.0:
|
||||
message += f"is currently up {changePercent}% for the day."
|
||||
elif changePercent < 0.0:
|
||||
message += f"is currently down {changePercent}% for the day."
|
||||
else:
|
||||
message += "hasn't shown any movement for the day."
|
||||
|
||||
return message
|
||||
else:
|
||||
return f"Getting a quote for {symbol} encountered an error."
|
||||
|
||||
def spark_reply(self, symbol: Stock) -> str:
|
||||
if quoteResp := self.get(f"stocks/quotes/{symbol}/"):
|
||||
try:
|
||||
changePercent = round(quoteResp["changepct"][0], 2)
|
||||
return f"`{symbol.tag}`: {changePercent}%"
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
return f"`{symbol.tag}`"
|
||||
|
||||
def intra_reply(self, symbol: Stock) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol of the past month up until the previous trading days close.
|
||||
Also caches multiple requests made in the same day.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
schedule.run_pending()
|
||||
|
||||
try:
|
||||
return self.charts[symbol.id.upper()]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
resolution = "15" # minutes
|
||||
now = dt.datetime.now(self.marketTimeZone)
|
||||
|
||||
if self.openTime < now.time():
|
||||
startTime = now.replace(hour=9, minute=30)
|
||||
else:
|
||||
startTime = now - dt.timedelta(days=1)
|
||||
|
||||
if data := self.get(
|
||||
f"stocks/candles/{resolution}/{symbol}",
|
||||
params={"from": startTime.timestamp(), "to": now.timestamp(), "extended": True},
|
||||
):
|
||||
data.pop("s")
|
||||
df = pd.DataFrame(data)
|
||||
df["t"] = pd.to_datetime(df["t"], unit="s", utc=True)
|
||||
df.set_index("t", inplace=True)
|
||||
|
||||
df.rename(
|
||||
columns={
|
||||
"o": "Open",
|
||||
"h": "High",
|
||||
"l": "Low",
|
||||
"c": "Close",
|
||||
"v": "Volume",
|
||||
},
|
||||
inplace=True,
|
||||
)
|
||||
|
||||
self.charts[symbol.id.upper()] = df
|
||||
return df
|
||||
|
||||
return pd.DataFrame()
|
||||
|
||||
def chart_reply(self, symbol: Stock) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol of the past month up until the previous trading days close.
|
||||
Also caches multiple requests made in the same day.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
schedule.run_pending()
|
||||
|
||||
try:
|
||||
return self.charts[symbol.id.upper()]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
to_date = dt.datetime.today().strftime("%Y-%m-%d")
|
||||
from_date = (dt.datetime.today() - dt.timedelta(days=30)).strftime("%Y-%m-%d")
|
||||
resultion = "daily"
|
||||
|
||||
if data := self.get(
|
||||
f"stocks/candles/{resultion}/{symbol}",
|
||||
params={
|
||||
"from": from_date,
|
||||
"to": to_date,
|
||||
},
|
||||
):
|
||||
data.pop("s")
|
||||
|
||||
df = pd.DataFrame(data)
|
||||
df["t"] = pd.to_datetime(df["t"], unit="s")
|
||||
df.set_index("t", inplace=True)
|
||||
|
||||
df.rename(
|
||||
columns={
|
||||
"o": "Open",
|
||||
"h": "High",
|
||||
"l": "Low",
|
||||
"c": "Close",
|
||||
"v": "Volume",
|
||||
},
|
||||
inplace=True,
|
||||
)
|
||||
|
||||
self.charts[symbol.id.upper()] = df
|
||||
return df
|
||||
|
||||
return pd.DataFrame()
|
||||
|
||||
def options_reply(self, request: str) -> str:
|
||||
"""Undocumented API Usage!"""
|
||||
|
||||
options_data = self.get(f"options/quotes/{request}")
|
||||
|
||||
for key in options_data.keys():
|
||||
options_data[key] = options_data[key][0]
|
||||
|
||||
options_data["underlying"] = "$" + options_data["underlying"]
|
||||
|
||||
options_data["updated"] = humanize.naturaltime(dt.datetime.now() - dt.datetime.fromtimestamp(options_data["updated"]))
|
||||
|
||||
options_data["expiration"] = humanize.naturaltime(
|
||||
dt.datetime.now() - dt.datetime.fromtimestamp(options_data["expiration"])
|
||||
)
|
||||
|
||||
options_data["firstTraded"] = humanize.naturaltime(
|
||||
dt.datetime.now() - dt.datetime.fromtimestamp(options_data["firstTraded"])
|
||||
)
|
||||
|
||||
rename = {
|
||||
"optionSymbol": "Option Symbol",
|
||||
"underlying": "Underlying",
|
||||
"expiration": "Expiration",
|
||||
"side": "side",
|
||||
"strike": "strike",
|
||||
"firstTraded": "First Traded",
|
||||
"updated": "Last Updated",
|
||||
"bid": "bid",
|
||||
"bidSize": "bidSize",
|
||||
"mid": "mid",
|
||||
"ask": "ask",
|
||||
"askSize": "askSize",
|
||||
"last": "last",
|
||||
"openInterest": "Open Interest",
|
||||
"volume": "Volume",
|
||||
"inTheMoney": "inTheMoney",
|
||||
"intrinsicValue": "Intrinsic Value",
|
||||
"extrinsicValue": "Extrinsic Value",
|
||||
"underlyingPrice": "Underlying Price",
|
||||
"iv": "Implied Volatility",
|
||||
"delta": "delta",
|
||||
"gamma": "gamma",
|
||||
"theta": "theta",
|
||||
"vega": "vega",
|
||||
"rho": "rho",
|
||||
}
|
||||
|
||||
options_cleaned = OrderedDict()
|
||||
for old, new in rename.items():
|
||||
if old in options_data:
|
||||
options_cleaned[new] = options_data[old]
|
||||
|
||||
return options_cleaned
|
||||
import datetime as dt
|
||||
import logging
|
||||
import os
|
||||
from collections import OrderedDict
|
||||
from typing import Dict
|
||||
|
||||
import humanize
|
||||
import pandas as pd
|
||||
import pytz
|
||||
import requests as r
|
||||
import schedule
|
||||
|
||||
|
||||
from common.Symbol import Stock
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MarketData:
|
||||
"""
|
||||
Functions for finding stock market information about symbols from MarkData.app
|
||||
"""
|
||||
|
||||
SYMBOL_REGEX = "[$]([a-zA-Z]{1,4})"
|
||||
|
||||
symbol_list: Dict[str, Dict] = {}
|
||||
charts: Dict[Stock, pd.DataFrame] = {}
|
||||
|
||||
openTime = dt.time(hour=9, minute=30, second=0)
|
||||
marketTimeZone = pytz.timezone("US/Eastern")
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Creates a Symbol Object
|
||||
|
||||
Parameters
|
||||
----------
|
||||
MARKETDATA_TOKEN : str
|
||||
MarketData.app API Token
|
||||
"""
|
||||
|
||||
try:
|
||||
self.MARKETDATA_TOKEN = os.environ["MARKETDATA"]
|
||||
|
||||
if self.MARKETDATA_TOKEN == "TOKEN":
|
||||
self.MARKETDATA_TOKEN = ""
|
||||
except KeyError:
|
||||
self.MARKETDATA_TOKEN = ""
|
||||
log.warning(
|
||||
"Starting without an MarketData.app Token will not allow you to get market data!"
|
||||
)
|
||||
log.warning("Use this affiliate link so that the bot can stay free:")
|
||||
log.warning(
|
||||
"https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=repo"
|
||||
)
|
||||
|
||||
if self.MARKETDATA_TOKEN != "":
|
||||
schedule.every().day.do(self.clear_charts)
|
||||
|
||||
self.get_symbol_list()
|
||||
schedule.every().day.do(self.get_symbol_list)
|
||||
|
||||
def get(self, endpoint, params=None, timeout=10, headers=None) -> dict:
|
||||
url = "https://api.marketdata.app/v1/" + endpoint
|
||||
|
||||
if params is None:
|
||||
params = {}
|
||||
|
||||
# set token param if it wasn't passed.
|
||||
params["token"] = self.MARKETDATA_TOKEN
|
||||
|
||||
# Undocumented query variable that ensures bot usage can be
|
||||
# monitored even if someone doesn't make it through an affiliate link.
|
||||
params["application"] = "simplestockbot"
|
||||
|
||||
if headers is None:
|
||||
headers = {}
|
||||
headers = {"User-Agent": "Simple Stock Bot anson@ansonbiggs.com"} | headers
|
||||
|
||||
resp = r.get(url, params=params, timeout=timeout, headers=headers)
|
||||
|
||||
logging.error(resp.headers.items())
|
||||
|
||||
# Make sure API returned a proper status code
|
||||
try:
|
||||
resp.raise_for_status()
|
||||
except r.exceptions.HTTPError as e:
|
||||
logging.error(e)
|
||||
return {}
|
||||
|
||||
# Make sure API returned valid JSON
|
||||
try:
|
||||
resp_json = resp.json()
|
||||
|
||||
match resp_json["s"]:
|
||||
case "ok":
|
||||
return resp_json
|
||||
case "no_data":
|
||||
return resp_json
|
||||
case "error":
|
||||
logging.error("MarketData Error:\n" + resp_json["errmsg"])
|
||||
return {}
|
||||
|
||||
except r.exceptions.JSONDecodeError as e:
|
||||
logging.error(e)
|
||||
|
||||
return {}
|
||||
|
||||
def symbol_id(self, symbol: str) -> Dict[str, Dict]:
|
||||
return self.symbol_list.get(symbol.upper(), None)
|
||||
|
||||
def get_symbol_list(self):
|
||||
# Doesn't use `self.get()` since needs are much different
|
||||
sec_resp = r.get(
|
||||
"https://www.sec.gov/files/company_tickers.json",
|
||||
headers={
|
||||
"User-Agent": "Simple Stock Bot anson@ansonbiggs.com",
|
||||
"Accept-Encoding": "gzip, deflate",
|
||||
"Host": "www.sec.gov",
|
||||
},
|
||||
)
|
||||
sec_resp.raise_for_status()
|
||||
sec_data = sec_resp.json()
|
||||
|
||||
for rank, ticker_info in sec_data.items():
|
||||
self.symbol_list[ticker_info["ticker"]] = {
|
||||
"ticker": ticker_info["ticker"],
|
||||
"title": ticker_info["title"],
|
||||
"mkt_cap_rank": rank,
|
||||
}
|
||||
|
||||
def clear_charts(self) -> None:
|
||||
"""
|
||||
Clears cache of chart data.
|
||||
Charts are cached so that only 1 API call per 24 hours is needed since the
|
||||
chart data is expensive and a large download.
|
||||
"""
|
||||
self.charts = {}
|
||||
|
||||
def status(self) -> str:
|
||||
# TODO: At the moment this API is poorly documented, this function likely needs to be revisited later.
|
||||
|
||||
try:
|
||||
status = r.get(
|
||||
"https://stats.uptimerobot.com/api/getMonitorList/6Kv3zIow0A",
|
||||
timeout=5,
|
||||
)
|
||||
status.raise_for_status()
|
||||
except r.HTTPError:
|
||||
return f"API returned an HTTP error code {status.status_code} in {status.elapsed.total_seconds()} seconds."
|
||||
except r.Timeout:
|
||||
return "API timed out before it was able to give status. This is likely due to a surge in usage or a complete outage."
|
||||
|
||||
statusJSON = status.json()
|
||||
|
||||
if statusJSON["status"] == "ok":
|
||||
return f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} seconds."
|
||||
else:
|
||||
return f"MarketData.app is currently reporting the following status: {statusJSON['status']}"
|
||||
|
||||
def price_reply(self, symbol: Stock) -> str:
|
||||
"""Returns price movement of Stock for the last market day, or after hours.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : Stock
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Formatted markdown
|
||||
"""
|
||||
|
||||
if quoteResp := self.get(f"stocks/quotes/{symbol.symbol}/"):
|
||||
price = round(quoteResp["last"][0], 2)
|
||||
|
||||
try:
|
||||
changePercent = round(quoteResp["changepct"][0], 2)
|
||||
except TypeError:
|
||||
return f"The price of {symbol.name} is ${price}"
|
||||
|
||||
message = f"The current price of {symbol.name} is ${price} and "
|
||||
|
||||
if changePercent > 0.0:
|
||||
message += f"is currently up {changePercent}% for the day."
|
||||
elif changePercent < 0.0:
|
||||
message += f"is currently down {changePercent}% for the day."
|
||||
else:
|
||||
message += "hasn't shown any movement for the day."
|
||||
|
||||
return message
|
||||
else:
|
||||
return f"Getting a quote for {symbol} encountered an error."
|
||||
|
||||
def spark_reply(self, symbol: Stock) -> str:
|
||||
if quoteResp := self.get(f"stocks/quotes/{symbol}/"):
|
||||
try:
|
||||
changePercent = round(quoteResp["changepct"][0], 2)
|
||||
return f"`{symbol.tag}`: {changePercent}%"
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
return f"`{symbol.tag}`"
|
||||
|
||||
def intra_reply(self, symbol: Stock) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol of the past month up until the previous trading days close.
|
||||
Also caches multiple requests made in the same day.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
schedule.run_pending()
|
||||
|
||||
try:
|
||||
return self.charts[symbol.id.upper()]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
resolution = "15" # minutes
|
||||
now = dt.datetime.now(self.marketTimeZone)
|
||||
|
||||
if self.openTime < now.time():
|
||||
startTime = now.replace(hour=9, minute=30)
|
||||
else:
|
||||
startTime = now - dt.timedelta(days=1)
|
||||
|
||||
if data := self.get(
|
||||
f"stocks/candles/{resolution}/{symbol}",
|
||||
params={
|
||||
"from": startTime.timestamp(),
|
||||
"to": now.timestamp(),
|
||||
"extended": True,
|
||||
},
|
||||
):
|
||||
data.pop("s")
|
||||
df = pd.DataFrame(data)
|
||||
df["t"] = pd.to_datetime(df["t"], unit="s", utc=True)
|
||||
df.set_index("t", inplace=True)
|
||||
|
||||
df.rename(
|
||||
columns={
|
||||
"o": "Open",
|
||||
"h": "High",
|
||||
"l": "Low",
|
||||
"c": "Close",
|
||||
"v": "Volume",
|
||||
},
|
||||
inplace=True,
|
||||
)
|
||||
|
||||
self.charts[symbol.id.upper()] = df
|
||||
return df
|
||||
|
||||
return pd.DataFrame()
|
||||
|
||||
def chart_reply(self, symbol: Stock) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol of the past month up until the previous trading days close.
|
||||
Also caches multiple requests made in the same day.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
schedule.run_pending()
|
||||
|
||||
try:
|
||||
return self.charts[symbol.id.upper()]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
to_date = dt.datetime.today().strftime("%Y-%m-%d")
|
||||
from_date = (dt.datetime.today() - dt.timedelta(days=30)).strftime("%Y-%m-%d")
|
||||
resultion = "daily"
|
||||
|
||||
if data := self.get(
|
||||
f"stocks/candles/{resultion}/{symbol}",
|
||||
params={
|
||||
"from": from_date,
|
||||
"to": to_date,
|
||||
},
|
||||
):
|
||||
data.pop("s")
|
||||
|
||||
df = pd.DataFrame(data)
|
||||
df["t"] = pd.to_datetime(df["t"], unit="s")
|
||||
df.set_index("t", inplace=True)
|
||||
|
||||
df.rename(
|
||||
columns={
|
||||
"o": "Open",
|
||||
"h": "High",
|
||||
"l": "Low",
|
||||
"c": "Close",
|
||||
"v": "Volume",
|
||||
},
|
||||
inplace=True,
|
||||
)
|
||||
|
||||
self.charts[symbol.id.upper()] = df
|
||||
return df
|
||||
|
||||
return pd.DataFrame()
|
||||
|
||||
def options_reply(self, request: str) -> str:
|
||||
"""Undocumented API Usage!"""
|
||||
|
||||
options_data = self.get(f"options/quotes/{request}")
|
||||
|
||||
for key in options_data.keys():
|
||||
options_data[key] = options_data[key][0]
|
||||
|
||||
options_data["underlying"] = "$" + options_data["underlying"]
|
||||
|
||||
options_data["updated"] = humanize.naturaltime(
|
||||
dt.datetime.now() - dt.datetime.fromtimestamp(options_data["updated"])
|
||||
)
|
||||
|
||||
options_data["expiration"] = humanize.naturaltime(
|
||||
dt.datetime.now() - dt.datetime.fromtimestamp(options_data["expiration"])
|
||||
)
|
||||
|
||||
options_data["firstTraded"] = humanize.naturaltime(
|
||||
dt.datetime.now() - dt.datetime.fromtimestamp(options_data["firstTraded"])
|
||||
)
|
||||
|
||||
rename = {
|
||||
"optionSymbol": "Option Symbol",
|
||||
"underlying": "Underlying",
|
||||
"expiration": "Expiration",
|
||||
"side": "side",
|
||||
"strike": "strike",
|
||||
"firstTraded": "First Traded",
|
||||
"updated": "Last Updated",
|
||||
"bid": "bid",
|
||||
"bidSize": "bidSize",
|
||||
"mid": "mid",
|
||||
"ask": "ask",
|
||||
"askSize": "askSize",
|
||||
"last": "last",
|
||||
"openInterest": "Open Interest",
|
||||
"volume": "Volume",
|
||||
"inTheMoney": "inTheMoney",
|
||||
"intrinsicValue": "Intrinsic Value",
|
||||
"extrinsicValue": "Extrinsic Value",
|
||||
"underlyingPrice": "Underlying Price",
|
||||
"iv": "Implied Volatility",
|
||||
"delta": "delta",
|
||||
"gamma": "gamma",
|
||||
"theta": "theta",
|
||||
"vega": "vega",
|
||||
"rho": "rho",
|
||||
}
|
||||
|
||||
options_cleaned = OrderedDict()
|
||||
for old, new in rename.items():
|
||||
if old in options_data:
|
||||
options_cleaned[new] = options_data[old]
|
||||
|
||||
return options_cleaned
|
||||
|
107
common/Symbol.py
107
common/Symbol.py
@@ -1,52 +1,55 @@
|
||||
import logging
|
||||
|
||||
import pandas as pd
|
||||
|
||||
|
||||
class Symbol:
|
||||
"""
|
||||
symbol: What the user calls it. ie tsla or btc
|
||||
id: What the api expects. ie tsla or bitcoin
|
||||
name: Human readable. ie Tesla or Bitcoin
|
||||
tag: Uppercase tag to call the symbol. ie $TSLA or $$BTC
|
||||
"""
|
||||
|
||||
currency = "usd"
|
||||
pass
|
||||
|
||||
def __init__(self, symbol) -> None:
|
||||
self.symbol = symbol
|
||||
self.id = symbol
|
||||
self.name = symbol
|
||||
self.tag = "$" + symbol
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<{self.__class__.__name__} instance of {self.id} at {id(self)}>"
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.id
|
||||
|
||||
|
||||
class Stock(Symbol):
|
||||
"""Stock Market Object. Gets data from MarketData"""
|
||||
|
||||
def __init__(self, symbol_info: dict) -> None:
|
||||
self.symbol = symbol_info["ticker"]
|
||||
self.id = symbol_info["ticker"]
|
||||
self.name = symbol_info["title"]
|
||||
self.tag = "$" + symbol_info["ticker"]
|
||||
self.market_cap_rank = symbol_info["mkt_cap_rank"]
|
||||
|
||||
|
||||
class Coin(Symbol):
|
||||
"""Cryptocurrency Object. Gets data from CoinGecko."""
|
||||
|
||||
def __init__(self, symbol: pd.DataFrame) -> None:
|
||||
if len(symbol) > 1:
|
||||
logging.info(f"Crypto with shared id:\n\t{symbol.id}")
|
||||
symbol = symbol.head(1)
|
||||
|
||||
self.symbol = symbol.symbol.values[0]
|
||||
self.id = symbol.id.values[0]
|
||||
self.name = symbol.name.values[0]
|
||||
self.tag = symbol.type_id.values[0].upper()
|
||||
import logging
|
||||
|
||||
import pandas as pd
|
||||
|
||||
|
||||
class Symbol:
|
||||
"""
|
||||
symbol: What the user calls it. ie tsla or btc
|
||||
id: What the api expects. ie tsla or bitcoin
|
||||
name: Human readable. ie Tesla or Bitcoin
|
||||
tag: Uppercase tag to call the symbol. ie $TSLA or $$BTC
|
||||
"""
|
||||
|
||||
currency = "usd"
|
||||
pass
|
||||
|
||||
def __init__(self, symbol) -> None:
|
||||
self.symbol = symbol
|
||||
self.id = symbol
|
||||
self.name = symbol
|
||||
self.tag = "$" + symbol
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<{self.__class__.__name__} instance of {self.id} at {id(self)}>"
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.id
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.id)
|
||||
|
||||
|
||||
class Stock(Symbol):
|
||||
"""Stock Market Object. Gets data from MarketData"""
|
||||
|
||||
def __init__(self, symbol_info: dict) -> None:
|
||||
self.symbol = symbol_info["ticker"]
|
||||
self.id = symbol_info["ticker"]
|
||||
self.name = symbol_info["title"]
|
||||
self.tag = "$" + symbol_info["ticker"]
|
||||
self.market_cap_rank = symbol_info["mkt_cap_rank"]
|
||||
|
||||
|
||||
class Coin(Symbol):
|
||||
"""Cryptocurrency Object. Gets data from CoinGecko."""
|
||||
|
||||
def __init__(self, symbol: pd.DataFrame) -> None:
|
||||
if len(symbol) > 1:
|
||||
logging.info(f"Crypto with shared id:\n\t{symbol.id}")
|
||||
symbol = symbol.head(1)
|
||||
|
||||
self.symbol = symbol.symbol.values[0]
|
||||
self.id = symbol.id.values[0]
|
||||
self.name = symbol.name.values[0]
|
||||
self.tag = symbol.type_id.values[0].upper()
|
||||
|
@@ -1,381 +1,388 @@
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
import pandas as pd
|
||||
import requests as r
|
||||
import schedule
|
||||
from markdownify import markdownify
|
||||
|
||||
from common.Symbol import Coin
|
||||
from common.utilities import rate_limited
|
||||
|
||||
import time
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class cg_Crypto:
|
||||
"""
|
||||
Functions for finding crypto info
|
||||
"""
|
||||
|
||||
vs_currency = "usd" # simple/supported_vs_currencies for list of options
|
||||
|
||||
trending_cache: List[str] = []
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.get_symbol_list()
|
||||
schedule.every().day.do(self.get_symbol_list)
|
||||
|
||||
# Coingecko's rate limit is 30 requests per minute.
|
||||
# Since there are two bots sharing the same IP, we allocate half of that limit to each bot.
|
||||
# This results in a rate limit of 15 requests per minute for each bot.
|
||||
# Given this, the rate limit effectively becomes 1 request every 4 seconds for each bot.
|
||||
@rate_limited(0.25)
|
||||
def get(self, endpoint, params: dict = {}, timeout=10) -> dict:
|
||||
url = "https://api.coingecko.com/api/v3" + endpoint
|
||||
resp = r.get(url, params=params, timeout=timeout)
|
||||
# Make sure API returned a proper status code
|
||||
|
||||
if resp.status_code == 429:
|
||||
log.warning(f"CoinGecko returned 429 - Too Many Requests for endpoint: {endpoint}. Sleeping and trying again.")
|
||||
time.sleep(10)
|
||||
return self.get(endpoint=endpoint, params=params, timeout=timeout)
|
||||
|
||||
try:
|
||||
resp.raise_for_status()
|
||||
except r.exceptions.HTTPError as e:
|
||||
log.error(e)
|
||||
return {}
|
||||
|
||||
# Make sure API returned valid JSON
|
||||
try:
|
||||
resp_json = resp.json()
|
||||
return resp_json
|
||||
except r.exceptions.JSONDecodeError as e:
|
||||
log.error(e)
|
||||
return {}
|
||||
|
||||
def symbol_id(self, symbol) -> str:
|
||||
try:
|
||||
return self.symbol_list[self.symbol_list["symbol"] == symbol]["id"].values[0]
|
||||
except KeyError:
|
||||
return ""
|
||||
|
||||
def get_symbol_list(self):
|
||||
raw_symbols = self.get("/coins/list")
|
||||
symbols = pd.DataFrame(data=raw_symbols)
|
||||
|
||||
# Removes all binance-peg symbols
|
||||
symbols = symbols[~symbols["id"].str.contains("binance-peg")]
|
||||
|
||||
symbols["description"] = "$$" + symbols["symbol"].str.upper() + ": " + symbols["name"]
|
||||
symbols = symbols[["id", "symbol", "name", "description"]]
|
||||
symbols["type_id"] = "$$" + symbols["symbol"]
|
||||
|
||||
self.symbol_list = symbols
|
||||
|
||||
def status(self) -> str:
|
||||
"""Checks CoinGecko /ping endpoint for API issues.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Human readable text on status of CoinGecko API
|
||||
"""
|
||||
status = r.get(
|
||||
"https://api.coingecko.com/api/v3/ping",
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
try:
|
||||
status.raise_for_status()
|
||||
return (
|
||||
f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} seconds."
|
||||
)
|
||||
except r.HTTPError:
|
||||
return f"CoinGecko API returned an error code {status.status_code} in {status.elapsed.total_seconds()} seconds."
|
||||
|
||||
def price_reply(self, coin: Coin) -> str:
|
||||
"""Returns current market price or after hours if its available for a given coin symbol.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list
|
||||
List of coin symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable
|
||||
markdown formatted string of the symbols price and movement.
|
||||
"""
|
||||
|
||||
if resp := self.get(
|
||||
"/simple/price",
|
||||
params={
|
||||
"ids": coin.id,
|
||||
"vs_currencies": self.vs_currency,
|
||||
"include_24hr_change": "true",
|
||||
},
|
||||
):
|
||||
try:
|
||||
data = resp[coin.id]
|
||||
|
||||
price = data[self.vs_currency]
|
||||
change = data[self.vs_currency + "_24h_change"]
|
||||
if change is None:
|
||||
change = 0
|
||||
except KeyError:
|
||||
return f"{coin.id} returned an error."
|
||||
|
||||
message = f"The current price of {coin.name} is $**{price:,}**"
|
||||
|
||||
# Determine wording of change text
|
||||
if change > 0:
|
||||
message += f", the coin is currently **up {change:.3f}%** for today"
|
||||
elif change < 0:
|
||||
message += f", the coin is currently **down {change:.3f}%** for today"
|
||||
else:
|
||||
message += ", the coin hasn't shown any movement today."
|
||||
|
||||
else:
|
||||
message = f"The price for {coin.name} is not available. If you suspect this is an error run `/status`"
|
||||
|
||||
return message
|
||||
|
||||
def intra_reply(self, symbol: Coin) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol since the last market open.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
|
||||
if resp := self.get(
|
||||
f"/coins/{symbol.id}/ohlc",
|
||||
params={"vs_currency": self.vs_currency, "days": 1},
|
||||
):
|
||||
df = pd.DataFrame(resp, columns=["Date", "Open", "High", "Low", "Close"]).dropna()
|
||||
df["Date"] = pd.to_datetime(df["Date"], unit="ms")
|
||||
df = df.set_index("Date")
|
||||
return df
|
||||
|
||||
return pd.DataFrame()
|
||||
|
||||
def chart_reply(self, symbol: Coin) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol of the past month up until the previous trading days close.
|
||||
Also caches multiple requests made in the same day.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
|
||||
if resp := self.get(
|
||||
f"/coins/{symbol.id}/ohlc",
|
||||
params={"vs_currency": self.vs_currency, "days": 30},
|
||||
):
|
||||
df = pd.DataFrame(resp, columns=["Date", "Open", "High", "Low", "Close"]).dropna()
|
||||
df["Date"] = pd.to_datetime(df["Date"], unit="ms")
|
||||
df = df.set_index("Date")
|
||||
return df
|
||||
|
||||
return pd.DataFrame()
|
||||
|
||||
def stat_reply(self, symbol: Coin) -> str:
|
||||
"""Gathers key statistics on coin. Mostly just CoinGecko scores.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : Coin
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Preformatted markdown.
|
||||
"""
|
||||
|
||||
if data := self.get(
|
||||
f"/coins/{symbol.id}",
|
||||
params={
|
||||
"localization": "false",
|
||||
},
|
||||
):
|
||||
return f"""
|
||||
[{data['name']}]({data['links']['homepage'][0]}) Statistics:
|
||||
Market Cap: ${data['market_data']['market_cap'][self.vs_currency]:,}
|
||||
Market Cap Ranking: {data.get('market_cap_rank',"Not Available")}
|
||||
CoinGecko Scores:
|
||||
Overall: {data.get('coingecko_score','Not Available')}
|
||||
Development: {data.get('developer_score','Not Available')}
|
||||
Community: {data.get('community_score','Not Available')}
|
||||
Public Interest: {data.get('public_interest_score','Not Available')}
|
||||
"""
|
||||
else:
|
||||
return f"{symbol.symbol} returned an error."
|
||||
|
||||
def cap_reply(self, coin: Coin) -> str:
|
||||
"""Gets market cap for Coin
|
||||
|
||||
Parameters
|
||||
----------
|
||||
coin : Coin
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Preformatted markdown.
|
||||
"""
|
||||
|
||||
if resp := self.get(
|
||||
"/simple/price",
|
||||
params={
|
||||
"ids": coin.id,
|
||||
"vs_currencies": self.vs_currency,
|
||||
"include_market_cap": "true",
|
||||
},
|
||||
):
|
||||
log.debug(resp)
|
||||
try:
|
||||
data = resp[coin.id]
|
||||
|
||||
price = data[self.vs_currency]
|
||||
cap = data[self.vs_currency + "_market_cap"]
|
||||
except KeyError:
|
||||
return f"{coin.id} returned an error."
|
||||
|
||||
if cap == 0:
|
||||
return f"The market cap for {coin.name} is not available for unknown reasons."
|
||||
|
||||
message = (
|
||||
f"The current price of {coin.name} is $**{price:,}** and"
|
||||
+ " its market cap is $**{cap:,.2f}** {self.vs_currency.upper()}"
|
||||
)
|
||||
|
||||
else:
|
||||
message = f"The Coin: {coin.name} was not found or returned and error."
|
||||
|
||||
return message
|
||||
|
||||
def info_reply(self, symbol: Coin) -> str:
|
||||
"""Gets coin description
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : Coin
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Preformatted markdown.
|
||||
"""
|
||||
|
||||
if data := self.get(
|
||||
f"/coins/{symbol.id}",
|
||||
params={"localization": "false"},
|
||||
):
|
||||
try:
|
||||
return markdownify(data["description"]["en"])
|
||||
except KeyError:
|
||||
return f"{symbol} does not have a description available."
|
||||
|
||||
return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist."
|
||||
|
||||
def spark_reply(self, symbol: Coin) -> str:
|
||||
change = self.get(
|
||||
"/simple/price",
|
||||
params={
|
||||
"ids": symbol.id,
|
||||
"vs_currencies": self.vs_currency,
|
||||
"include_24hr_change": "true",
|
||||
},
|
||||
)[symbol.id]["usd_24h_change"]
|
||||
|
||||
return f"`{symbol.tag}`: {symbol.name}, {change:.2f}%"
|
||||
|
||||
def trending(self) -> list[str]:
|
||||
"""Gets current coins trending on coingecko
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[str]
|
||||
list of $$ID: NAME, CHANGE%
|
||||
"""
|
||||
|
||||
coins = self.get("/search/trending")
|
||||
try:
|
||||
trending = []
|
||||
for coin in coins["coins"]:
|
||||
c = coin["item"]
|
||||
|
||||
sym = c["symbol"].upper()
|
||||
name = c["name"]
|
||||
change = self.get(
|
||||
"/simple/price",
|
||||
params={
|
||||
"ids": c["id"],
|
||||
"vs_currencies": self.vs_currency,
|
||||
"include_24hr_change": "true",
|
||||
},
|
||||
)[c["id"]]["usd_24h_change"]
|
||||
|
||||
msg = f"`$${sym}`: {name}, {change:.2f}%"
|
||||
|
||||
trending.append(msg)
|
||||
|
||||
except Exception as e:
|
||||
log.warning(e)
|
||||
return self.trending_cache
|
||||
|
||||
self.trending_cache = trending
|
||||
return trending
|
||||
|
||||
def batch_price(self, coins: list[Coin]) -> list[str]:
|
||||
"""Gets price of a list of coins all in one API call
|
||||
|
||||
Parameters
|
||||
----------
|
||||
coins : list[Coin]
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[str]
|
||||
returns preformatted list of strings detailing price movement of each coin passed in.
|
||||
"""
|
||||
query = ",".join([c.id for c in coins])
|
||||
|
||||
prices = self.get(
|
||||
"/simple/price",
|
||||
params={
|
||||
"ids": query,
|
||||
"vs_currencies": self.vs_currency,
|
||||
"include_24hr_change": "true",
|
||||
},
|
||||
)
|
||||
|
||||
replies = []
|
||||
for coin in coins:
|
||||
if coin.id in prices:
|
||||
p = prices[coin.id]
|
||||
|
||||
if p.get("usd_24h_change") is None:
|
||||
p["usd_24h_change"] = 0
|
||||
|
||||
replies.append(
|
||||
f"{coin.name}: ${p.get('usd',0):,} and has moved {p.get('usd_24h_change',0.0):.2f}% in the past 24 hours."
|
||||
)
|
||||
|
||||
return replies
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
import pandas as pd
|
||||
import requests as r
|
||||
import schedule
|
||||
from markdownify import markdownify
|
||||
from common.Symbol import Coin
|
||||
from common.utilities import rate_limited
|
||||
|
||||
import time
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class cg_Crypto:
|
||||
"""
|
||||
Functions for finding crypto info
|
||||
"""
|
||||
|
||||
vs_currency = "usd" # simple/supported_vs_currencies for list of options
|
||||
|
||||
trending_cache: List[str] = []
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.get_symbol_list()
|
||||
schedule.every().day.do(self.get_symbol_list)
|
||||
|
||||
# Coingecko's rate limit is 30 requests per minute.
|
||||
# Since there are two bots sharing the same IP, we allocate half of that limit to each bot.
|
||||
# This results in a rate limit of 15 requests per minute for each bot.
|
||||
# Given this, the rate limit effectively becomes 1 request every 4 seconds for each bot.
|
||||
@rate_limited(0.25)
|
||||
def get(self, endpoint, params: dict = {}, timeout=10) -> dict:
|
||||
url = "https://api.coingecko.com/api/v3" + endpoint
|
||||
resp = r.get(url, params=params, timeout=timeout)
|
||||
# Make sure API returned a proper status code
|
||||
|
||||
if resp.status_code == 429:
|
||||
log.warning(
|
||||
f"CoinGecko returned 429 - Too Many Requests for endpoint: {endpoint}. Sleeping and trying again."
|
||||
)
|
||||
time.sleep(10)
|
||||
return self.get(endpoint=endpoint, params=params, timeout=timeout)
|
||||
|
||||
try:
|
||||
resp.raise_for_status()
|
||||
except r.exceptions.HTTPError as e:
|
||||
log.error(e)
|
||||
return {}
|
||||
|
||||
# Make sure API returned valid JSON
|
||||
try:
|
||||
resp_json = resp.json()
|
||||
return resp_json
|
||||
except r.exceptions.JSONDecodeError as e:
|
||||
log.error(e)
|
||||
return {}
|
||||
|
||||
def symbol_id(self, symbol) -> str:
|
||||
try:
|
||||
return self.symbol_list[self.symbol_list["symbol"] == symbol]["id"].values[
|
||||
0
|
||||
]
|
||||
except KeyError:
|
||||
return ""
|
||||
|
||||
def get_symbol_list(self):
|
||||
raw_symbols = self.get("/coins/list")
|
||||
symbols = pd.DataFrame(data=raw_symbols)
|
||||
|
||||
# Removes all binance-peg symbols
|
||||
symbols = symbols[~symbols["id"].str.contains("binance-peg")]
|
||||
|
||||
symbols["description"] = (
|
||||
"$$" + symbols["symbol"].str.upper() + ": " + symbols["name"]
|
||||
)
|
||||
symbols = symbols[["id", "symbol", "name", "description"]]
|
||||
symbols["type_id"] = "$$" + symbols["symbol"]
|
||||
|
||||
self.symbol_list = symbols
|
||||
|
||||
def status(self) -> str:
|
||||
"""Checks CoinGecko /ping endpoint for API issues.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Human readable text on status of CoinGecko API
|
||||
"""
|
||||
status = r.get(
|
||||
"https://api.coingecko.com/api/v3/ping",
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
try:
|
||||
status.raise_for_status()
|
||||
return f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} seconds."
|
||||
except r.HTTPError:
|
||||
return f"CoinGecko API returned an error code {status.status_code} in {status.elapsed.total_seconds()} seconds."
|
||||
|
||||
def price_reply(self, coin: Coin) -> str:
|
||||
"""Returns current market price or after hours if its available for a given coin symbol.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list
|
||||
List of coin symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable
|
||||
markdown formatted string of the symbols price and movement.
|
||||
"""
|
||||
|
||||
if resp := self.get(
|
||||
"/simple/price",
|
||||
params={
|
||||
"ids": coin.id,
|
||||
"vs_currencies": self.vs_currency,
|
||||
"include_24hr_change": "true",
|
||||
},
|
||||
):
|
||||
try:
|
||||
data = resp[coin.id]
|
||||
|
||||
price = data[self.vs_currency]
|
||||
change = data[self.vs_currency + "_24h_change"]
|
||||
if change is None:
|
||||
change = 0
|
||||
except KeyError:
|
||||
return f"{coin.id} returned an error."
|
||||
|
||||
message = f"The current price of {coin.name} is $**{price:,}**"
|
||||
|
||||
# Determine wording of change text
|
||||
if change > 0:
|
||||
message += f", the coin is currently **up {change:.3f}%** for today"
|
||||
elif change < 0:
|
||||
message += f", the coin is currently **down {change:.3f}%** for today"
|
||||
else:
|
||||
message += ", the coin hasn't shown any movement today."
|
||||
|
||||
else:
|
||||
message = f"The price for {coin.name} is not available. If you suspect this is an error run `/status`"
|
||||
|
||||
return message
|
||||
|
||||
def intra_reply(self, symbol: Coin) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol since the last market open.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
|
||||
if resp := self.get(
|
||||
f"/coins/{symbol.id}/ohlc",
|
||||
params={"vs_currency": self.vs_currency, "days": 1},
|
||||
):
|
||||
df = pd.DataFrame(
|
||||
resp, columns=["Date", "Open", "High", "Low", "Close"]
|
||||
).dropna()
|
||||
df["Date"] = pd.to_datetime(df["Date"], unit="ms")
|
||||
df = df.set_index("Date")
|
||||
return df
|
||||
|
||||
return pd.DataFrame()
|
||||
|
||||
def chart_reply(self, symbol: Coin) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol of the past month up until the previous trading days close.
|
||||
Also caches multiple requests made in the same day.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
|
||||
if resp := self.get(
|
||||
f"/coins/{symbol.id}/ohlc",
|
||||
params={"vs_currency": self.vs_currency, "days": 30},
|
||||
):
|
||||
df = pd.DataFrame(
|
||||
resp, columns=["Date", "Open", "High", "Low", "Close"]
|
||||
).dropna()
|
||||
df["Date"] = pd.to_datetime(df["Date"], unit="ms")
|
||||
df = df.set_index("Date")
|
||||
return df
|
||||
|
||||
return pd.DataFrame()
|
||||
|
||||
def stat_reply(self, symbol: Coin) -> str:
|
||||
"""Gathers key statistics on coin. Mostly just CoinGecko scores.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : Coin
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Preformatted markdown.
|
||||
"""
|
||||
|
||||
if data := self.get(
|
||||
f"/coins/{symbol.id}",
|
||||
params={
|
||||
"localization": "false",
|
||||
},
|
||||
):
|
||||
return f"""
|
||||
[{data['name']}]({data['links']['homepage'][0]}) Statistics:
|
||||
Market Cap: ${data['market_data']['market_cap'][self.vs_currency]:,}
|
||||
Market Cap Ranking: {data.get('market_cap_rank',"Not Available")}
|
||||
CoinGecko Scores:
|
||||
Overall: {data.get('coingecko_score','Not Available')}
|
||||
Development: {data.get('developer_score','Not Available')}
|
||||
Community: {data.get('community_score','Not Available')}
|
||||
Public Interest: {data.get('public_interest_score','Not Available')}
|
||||
"""
|
||||
else:
|
||||
return f"{symbol.symbol} returned an error."
|
||||
|
||||
def cap_reply(self, coin: Coin) -> str:
|
||||
"""Gets market cap for Coin
|
||||
|
||||
Parameters
|
||||
----------
|
||||
coin : Coin
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Preformatted markdown.
|
||||
"""
|
||||
|
||||
if resp := self.get(
|
||||
"/simple/price",
|
||||
params={
|
||||
"ids": coin.id,
|
||||
"vs_currencies": self.vs_currency,
|
||||
"include_market_cap": "true",
|
||||
},
|
||||
):
|
||||
log.debug(resp)
|
||||
try:
|
||||
data = resp[coin.id]
|
||||
|
||||
price = data[self.vs_currency]
|
||||
cap = data[self.vs_currency + "_market_cap"]
|
||||
except KeyError:
|
||||
return f"{coin.id} returned an error."
|
||||
|
||||
if cap == 0:
|
||||
return f"The market cap for {coin.name} is not available for unknown reasons."
|
||||
|
||||
message = (
|
||||
f"The current price of {coin.name} is $**{price:,}** and"
|
||||
+ " its market cap is $**{cap:,.2f}** {self.vs_currency.upper()}"
|
||||
)
|
||||
|
||||
else:
|
||||
message = f"The Coin: {coin.name} was not found or returned and error."
|
||||
|
||||
return message
|
||||
|
||||
def info_reply(self, symbol: Coin) -> str:
|
||||
"""Gets coin description
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : Coin
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Preformatted markdown.
|
||||
"""
|
||||
|
||||
if data := self.get(
|
||||
f"/coins/{symbol.id}",
|
||||
params={"localization": "false"},
|
||||
):
|
||||
try:
|
||||
return markdownify(data["description"]["en"])
|
||||
except KeyError:
|
||||
return f"{symbol} does not have a description available."
|
||||
|
||||
return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist."
|
||||
|
||||
def spark_reply(self, symbol: Coin) -> str:
|
||||
change = self.get(
|
||||
"/simple/price",
|
||||
params={
|
||||
"ids": symbol.id,
|
||||
"vs_currencies": self.vs_currency,
|
||||
"include_24hr_change": "true",
|
||||
},
|
||||
)[symbol.id]["usd_24h_change"]
|
||||
|
||||
return f"`{symbol.tag}`: {symbol.name}, {change:.2f}%"
|
||||
|
||||
def trending(self) -> list[str]:
|
||||
"""Gets current coins trending on coingecko
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[str]
|
||||
list of $$ID: NAME, CHANGE%
|
||||
"""
|
||||
|
||||
coins = self.get("/search/trending")
|
||||
try:
|
||||
trending = []
|
||||
for coin in coins["coins"]:
|
||||
c = coin["item"]
|
||||
|
||||
sym = c["symbol"].upper()
|
||||
name = c["name"]
|
||||
change = self.get(
|
||||
"/simple/price",
|
||||
params={
|
||||
"ids": c["id"],
|
||||
"vs_currencies": self.vs_currency,
|
||||
"include_24hr_change": "true",
|
||||
},
|
||||
)[c["id"]]["usd_24h_change"]
|
||||
|
||||
msg = f"`$${sym}`: {name}, {change:.2f}%"
|
||||
|
||||
trending.append(msg)
|
||||
|
||||
except Exception as e:
|
||||
log.warning(e)
|
||||
return self.trending_cache
|
||||
|
||||
self.trending_cache = trending
|
||||
return trending
|
||||
|
||||
def batch_price(self, coins: list[Coin]) -> list[str]:
|
||||
"""Gets price of a list of coins all in one API call
|
||||
|
||||
Parameters
|
||||
----------
|
||||
coins : list[Coin]
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[str]
|
||||
returns preformatted list of strings detailing price movement of each coin passed in.
|
||||
"""
|
||||
query = ",".join([c.id for c in coins])
|
||||
|
||||
prices = self.get(
|
||||
"/simple/price",
|
||||
params={
|
||||
"ids": query,
|
||||
"vs_currencies": self.vs_currency,
|
||||
"include_24hr_change": "true",
|
||||
},
|
||||
)
|
||||
|
||||
replies = []
|
||||
for coin in coins:
|
||||
if coin.id in prices:
|
||||
p = prices[coin.id]
|
||||
|
||||
if p.get("usd_24h_change") is None:
|
||||
p["usd_24h_change"] = 0
|
||||
|
||||
replies.append(
|
||||
f"{coin.name}: ${p.get('usd',0):,} and has moved {p.get('usd_24h_change',0.0):.2f}% in the past 24 hours."
|
||||
)
|
||||
|
||||
return replies
|
||||
|
@@ -1,407 +1,407 @@
|
||||
"""Function that routes symbols to the correct API provider.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import random
|
||||
import re
|
||||
from typing import Dict
|
||||
|
||||
import pandas as pd
|
||||
import schedule
|
||||
from cachetools import TTLCache, cached
|
||||
|
||||
from common.cg_Crypto import cg_Crypto
|
||||
from common.MarketData import MarketData
|
||||
from common.Symbol import Coin, Stock, Symbol
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Router:
|
||||
STOCK_REGEX = "(?:^|[^\\$])\\$([a-zA-Z.]{1,6})"
|
||||
CRYPTO_REGEX = "[$]{2}([a-zA-Z]{1,20})"
|
||||
trending_count: Dict[str, float] = {}
|
||||
|
||||
def __init__(self):
|
||||
self.stock = MarketData()
|
||||
self.crypto = cg_Crypto()
|
||||
|
||||
schedule.every().hour.do(self.trending_decay)
|
||||
|
||||
def trending_decay(self, decay=0.5):
|
||||
"""Decays the value of each trending stock by a multiplier"""
|
||||
t_copy = {}
|
||||
dead_keys = []
|
||||
if self.trending_count:
|
||||
t_copy = self.trending_count.copy()
|
||||
for key in t_copy.keys():
|
||||
if t_copy[key] < 0.01:
|
||||
# Prune Keys
|
||||
dead_keys.append(key)
|
||||
else:
|
||||
t_copy[key] = t_copy[key] * decay
|
||||
for dead in dead_keys:
|
||||
t_copy.pop(dead)
|
||||
|
||||
self.trending_count = t_copy.copy()
|
||||
log.info("Decayed trending symbols.")
|
||||
|
||||
def find_symbols(self, text: str, *, trending_weight: int = 1) -> list[Stock | Coin]:
|
||||
"""Finds stock tickers starting with a dollar sign, and cryptocurrencies with two dollar signs
|
||||
in a blob of text and returns them in a list.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
text : str
|
||||
Blob of text.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[Symbol]
|
||||
List of stock symbols as Symbol objects
|
||||
"""
|
||||
schedule.run_pending()
|
||||
|
||||
symbols: list[Symbol] = []
|
||||
stock_matches = set(re.findall(self.STOCK_REGEX, text))
|
||||
coin_matches = set(re.findall(self.CRYPTO_REGEX, text))
|
||||
|
||||
for stock_match in stock_matches:
|
||||
# Market data lacks tools to check if a symbol is valid.
|
||||
if stock_info := self.stock.symbol_id(stock_match):
|
||||
symbols.append(Stock(stock_info))
|
||||
else:
|
||||
log.info(f"{stock_match} is not in list of stocks")
|
||||
|
||||
for coin_match in coin_matches:
|
||||
sym = self.crypto.symbol_list[self.crypto.symbol_list["symbol"].str.fullmatch(coin_match.lower(), case=False)]
|
||||
if sym.empty:
|
||||
log.info(f"{coin_match} is not in list of coins")
|
||||
else:
|
||||
symbols.append(Coin(sym))
|
||||
if symbols:
|
||||
for symbol in symbols:
|
||||
self.trending_count[symbol.tag] = self.trending_count.get(symbol.tag, 0) + trending_weight
|
||||
log.debug(self.trending_count)
|
||||
|
||||
return symbols
|
||||
|
||||
def status(self, bot_resp) -> str:
|
||||
"""Checks for any issues with APIs.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Human readable text on status of the bot and relevant APIs
|
||||
"""
|
||||
|
||||
stats = f"""
|
||||
Bot Status:
|
||||
{bot_resp}
|
||||
|
||||
Stock Market Data:
|
||||
{self.stock.status()}
|
||||
|
||||
Cryptocurrency Data:
|
||||
{self.crypto.status()}
|
||||
"""
|
||||
|
||||
log.warning(stats)
|
||||
|
||||
return stats
|
||||
|
||||
def inline_search(self, search: str, matches: int = 5) -> pd.DataFrame:
|
||||
"""Searches based on the shortest symbol that contains the same string as the search.
|
||||
Should be very fast compared to a fuzzy search.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
search : str
|
||||
String used to match against symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[tuple[str, str]]
|
||||
Each tuple contains: (Symbol, Issue Name).
|
||||
"""
|
||||
|
||||
# df = pd.concat([self.stock.symbol_list, self.crypto.symbol_list])
|
||||
df = self.crypto.symbol_list
|
||||
|
||||
df = df[df["description"].str.contains(search, regex=False, case=False)].sort_values(
|
||||
by="type_id", key=lambda x: x.str.len()
|
||||
)
|
||||
|
||||
symbols = df.head(matches)
|
||||
symbols["price_reply"] = symbols["type_id"].apply(
|
||||
lambda sym: self.price_reply(self.find_symbols(sym, trending_weight=0))[0]
|
||||
)
|
||||
|
||||
return symbols
|
||||
|
||||
def price_reply(self, symbols: list[Symbol]) -> list[str]:
|
||||
"""Returns current market price or after hours if its available for a given stock symbol.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list
|
||||
List of stock symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable
|
||||
markdown formatted string of the symbols price and movement.
|
||||
"""
|
||||
replies = []
|
||||
|
||||
for symbol in symbols:
|
||||
log.info(symbol)
|
||||
if isinstance(symbol, Stock):
|
||||
replies.append(self.stock.price_reply(symbol))
|
||||
elif isinstance(symbol, Coin):
|
||||
replies.append(self.crypto.price_reply(symbol))
|
||||
else:
|
||||
log.info(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
return replies
|
||||
|
||||
def info_reply(self, symbols: list) -> list[str]:
|
||||
"""Gets information on stock symbols.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list[str]
|
||||
List of stock symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable formatted
|
||||
string of the symbols information.
|
||||
"""
|
||||
replies = []
|
||||
|
||||
for symbol in symbols:
|
||||
if isinstance(symbol, Stock):
|
||||
replies.append(self.stock.info_reply(symbol))
|
||||
elif isinstance(symbol, Coin):
|
||||
replies.append(self.crypto.info_reply(symbol))
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
return replies
|
||||
|
||||
def intra_reply(self, symbol: Symbol) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol since the last market open.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available.
|
||||
Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
|
||||
if isinstance(symbol, Stock):
|
||||
return self.stock.intra_reply(symbol)
|
||||
elif isinstance(symbol, Coin):
|
||||
return self.crypto.intra_reply(symbol)
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
return pd.DataFrame()
|
||||
|
||||
def chart_reply(self, symbol: Symbol) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol of the past month up until the previous trading days close.
|
||||
Also caches multiple requests made in the same day.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available.
|
||||
Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
if isinstance(symbol, Stock):
|
||||
return self.stock.chart_reply(symbol)
|
||||
elif isinstance(symbol, Coin):
|
||||
return self.crypto.chart_reply(symbol)
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
return pd.DataFrame()
|
||||
|
||||
def stat_reply(self, symbols: list[Symbol]) -> list[str]:
|
||||
"""Gets key statistics for each symbol in the list
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list[str]
|
||||
List of stock symbols
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable
|
||||
formatted string of the symbols statistics.
|
||||
"""
|
||||
replies = []
|
||||
|
||||
for symbol in symbols:
|
||||
if isinstance(symbol, Stock):
|
||||
replies.append(self.stock.stat_reply(symbol))
|
||||
elif isinstance(symbol, Coin):
|
||||
replies.append(self.crypto.stat_reply(symbol))
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
return replies
|
||||
|
||||
def cap_reply(self, symbols: list[Symbol]) -> list[str]:
|
||||
"""Gets market cap for each symbol in the list
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list[str]
|
||||
List of stock symbols
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable
|
||||
formatted string of the symbols market cap.
|
||||
"""
|
||||
replies = []
|
||||
|
||||
for symbol in symbols:
|
||||
if isinstance(symbol, Stock):
|
||||
replies.append(self.stock.cap_reply(symbol))
|
||||
elif isinstance(symbol, Coin):
|
||||
replies.append(self.crypto.cap_reply(symbol))
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
return replies
|
||||
|
||||
def spark_reply(self, symbols: list[Symbol]) -> list[str]:
|
||||
"""Gets change for each symbol and returns it in a compact format
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list[str]
|
||||
List of stock symbols
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[str]
|
||||
List of human readable strings.
|
||||
"""
|
||||
replies = []
|
||||
|
||||
for symbol in symbols:
|
||||
if isinstance(symbol, Stock):
|
||||
replies.append(self.stock.spark_reply(symbol))
|
||||
elif isinstance(symbol, Coin):
|
||||
replies.append(self.crypto.spark_reply(symbol))
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
return replies
|
||||
|
||||
@cached(cache=TTLCache(maxsize=1024, ttl=600))
|
||||
def trending(self) -> str:
|
||||
"""Checks APIs for trending symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[str]
|
||||
List of preformatted strings to be sent to user.
|
||||
"""
|
||||
|
||||
# stocks = self.stock.trending()
|
||||
coins = self.crypto.trending()
|
||||
|
||||
reply = ""
|
||||
|
||||
log.warning(self.trending_count)
|
||||
if self.trending_count:
|
||||
reply += "🔥Trending on the Stock Bot:\n`"
|
||||
reply += "━" * len("Trending on the Stock Bot:") + "`\n"
|
||||
|
||||
sorted_trending = [s[0] for s in sorted(self.trending_count.items(), key=lambda item: item[1])][::-1][0:5]
|
||||
log.warning(sorted_trending)
|
||||
for t in sorted_trending:
|
||||
reply += self.spark_reply(self.find_symbols(t))[0] + "\n"
|
||||
|
||||
if coins:
|
||||
reply += "\n\n🦎Trending on CoinGecko:\n`"
|
||||
reply += "━" * len("Trending on CoinGecko:") + "`\n"
|
||||
for coin in coins:
|
||||
reply += coin + "\n"
|
||||
|
||||
if "`$GME" in reply:
|
||||
reply = reply.replace("🔥", "🦍")
|
||||
|
||||
if reply:
|
||||
return reply
|
||||
else:
|
||||
log.warning("Failed to collect trending data.")
|
||||
return "Trending data is not currently available."
|
||||
|
||||
def random_pick(self) -> str:
|
||||
# choice = random.choice(list(self.stock.symbol_list["description"]) + list(self.crypto.symbol_list["description"]))
|
||||
choice = random.choice(list(self.crypto.symbol_list["description"]))
|
||||
hold = (datetime.date.today() + datetime.timedelta(random.randint(1, 365))).strftime("%b %d, %Y")
|
||||
|
||||
return f"{choice}\nBuy and hold until: {hold}"
|
||||
|
||||
def batch_price_reply(self, symbols: list[Symbol]) -> list[str]:
|
||||
"""Returns current market price or after hours if its available for a given stock symbol.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list
|
||||
List of stock symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable
|
||||
markdown formatted string of the symbols price and movement.
|
||||
"""
|
||||
replies = []
|
||||
stocks = []
|
||||
coins = []
|
||||
|
||||
for symbol in symbols:
|
||||
if isinstance(symbol, Stock):
|
||||
stocks.append(symbol)
|
||||
elif isinstance(symbol, Coin):
|
||||
coins.append(symbol)
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
if stocks:
|
||||
for stock in stocks:
|
||||
replies.append(self.stock.price_reply(stock))
|
||||
if coins:
|
||||
replies = replies + self.crypto.batch_price(coins)
|
||||
|
||||
return replies
|
||||
|
||||
def options(self, request: str, symbols: list[Symbol]) -> Dict:
|
||||
request = request.lower()
|
||||
if len(symbols) == 1:
|
||||
symbol = symbols[0]
|
||||
request = request.replace(symbol.tag.lower(), symbol.symbol.lower())
|
||||
return self.stock.options_reply(request)
|
||||
else:
|
||||
return self.stock.options_reply(request)
|
||||
"""Function that routes symbols to the correct API provider.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import random
|
||||
import re
|
||||
from typing import Dict
|
||||
|
||||
import pandas as pd
|
||||
import schedule
|
||||
from cachetools import TTLCache, cached
|
||||
|
||||
from common.cg_Crypto import cg_Crypto
|
||||
from common.MarketData import MarketData
|
||||
from common.Symbol import Coin, Stock, Symbol
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Router:
|
||||
STOCK_REGEX = "(?:^|[^\\$])\\$([a-zA-Z.]{1,6})"
|
||||
CRYPTO_REGEX = "[$]{2}([a-zA-Z]{1,20})"
|
||||
trending_count: Dict[str, float] = {}
|
||||
|
||||
def __init__(self):
|
||||
self.stock = MarketData()
|
||||
self.crypto = cg_Crypto()
|
||||
|
||||
schedule.every().hour.do(self.trending_decay)
|
||||
|
||||
def trending_decay(self, decay=0.5):
|
||||
"""Decays the value of each trending stock by a multiplier"""
|
||||
t_copy = {}
|
||||
dead_keys = []
|
||||
if self.trending_count:
|
||||
t_copy = self.trending_count.copy()
|
||||
for key in t_copy.keys():
|
||||
if t_copy[key] < 0.01:
|
||||
# Prune Keys
|
||||
dead_keys.append(key)
|
||||
else:
|
||||
t_copy[key] = t_copy[key] * decay
|
||||
for dead in dead_keys:
|
||||
t_copy.pop(dead)
|
||||
|
||||
self.trending_count = t_copy.copy()
|
||||
log.info("Decayed trending symbols.")
|
||||
|
||||
def find_symbols(self, text: str, *, trending_weight: int = 1) -> list[Stock | Coin]:
|
||||
"""Finds stock tickers starting with a dollar sign, and cryptocurrencies with two dollar signs
|
||||
in a blob of text and returns them in a list.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
text : str
|
||||
Blob of text.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[Symbol]
|
||||
List of stock symbols as Symbol objects
|
||||
"""
|
||||
schedule.run_pending()
|
||||
|
||||
symbols: list[Symbol] = []
|
||||
stock_matches = set(re.findall(self.STOCK_REGEX, text))
|
||||
coin_matches = set(re.findall(self.CRYPTO_REGEX, text))
|
||||
|
||||
for stock_match in stock_matches:
|
||||
# Market data lacks tools to check if a symbol is valid.
|
||||
if stock_info := self.stock.symbol_id(stock_match):
|
||||
symbols.append(Stock(stock_info))
|
||||
else:
|
||||
log.info(f"{stock_match} is not in list of stocks")
|
||||
|
||||
for coin_match in coin_matches:
|
||||
sym = self.crypto.symbol_list[self.crypto.symbol_list["symbol"].str.fullmatch(coin_match.lower(), case=False)]
|
||||
if sym.empty:
|
||||
log.info(f"{coin_match} is not in list of coins")
|
||||
else:
|
||||
symbols.append(Coin(sym))
|
||||
if symbols:
|
||||
for symbol in symbols:
|
||||
self.trending_count[symbol.tag] = self.trending_count.get(symbol.tag, 0) + trending_weight
|
||||
log.debug(self.trending_count)
|
||||
|
||||
return symbols
|
||||
|
||||
def status(self, bot_resp) -> str:
|
||||
"""Checks for any issues with APIs.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Human readable text on status of the bot and relevant APIs
|
||||
"""
|
||||
|
||||
stats = f"""
|
||||
Bot Status:
|
||||
{bot_resp}
|
||||
|
||||
Stock Market Data:
|
||||
{self.stock.status()}
|
||||
|
||||
Cryptocurrency Data:
|
||||
{self.crypto.status()}
|
||||
"""
|
||||
|
||||
log.warning(stats)
|
||||
|
||||
return stats
|
||||
|
||||
def inline_search(self, search: str, matches: int = 5) -> pd.DataFrame:
|
||||
"""Searches based on the shortest symbol that contains the same string as the search.
|
||||
Should be very fast compared to a fuzzy search.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
search : str
|
||||
String used to match against symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[tuple[str, str]]
|
||||
Each tuple contains: (Symbol, Issue Name).
|
||||
"""
|
||||
|
||||
# df = pd.concat([self.stock.symbol_list, self.crypto.symbol_list])
|
||||
df = self.crypto.symbol_list
|
||||
|
||||
df = df[df["description"].str.contains(search, regex=False, case=False)].sort_values(
|
||||
by="type_id", key=lambda x: x.str.len()
|
||||
)
|
||||
|
||||
symbols = df.head(matches)
|
||||
symbols["price_reply"] = symbols["type_id"].apply(
|
||||
lambda sym: self.price_reply(self.find_symbols(sym, trending_weight=0))[0]
|
||||
)
|
||||
|
||||
return symbols
|
||||
|
||||
def price_reply(self, symbols: list[Symbol]) -> list[str]:
|
||||
"""Returns current market price or after hours if its available for a given stock symbol.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list
|
||||
List of stock symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable
|
||||
markdown formatted string of the symbols price and movement.
|
||||
"""
|
||||
replies = []
|
||||
|
||||
for symbol in symbols:
|
||||
log.info(symbol)
|
||||
if isinstance(symbol, Stock):
|
||||
replies.append(self.stock.price_reply(symbol))
|
||||
elif isinstance(symbol, Coin):
|
||||
replies.append(self.crypto.price_reply(symbol))
|
||||
else:
|
||||
log.info(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
return replies
|
||||
|
||||
def info_reply(self, symbols: list) -> list[str]:
|
||||
"""Gets information on stock symbols.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list[str]
|
||||
List of stock symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable formatted
|
||||
string of the symbols information.
|
||||
"""
|
||||
replies = []
|
||||
|
||||
for symbol in symbols:
|
||||
if isinstance(symbol, Stock):
|
||||
replies.append(self.stock.info_reply(symbol))
|
||||
elif isinstance(symbol, Coin):
|
||||
replies.append(self.crypto.info_reply(symbol))
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
return replies
|
||||
|
||||
def intra_reply(self, symbol: Symbol) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol since the last market open.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available.
|
||||
Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
|
||||
if isinstance(symbol, Stock):
|
||||
return self.stock.intra_reply(symbol)
|
||||
elif isinstance(symbol, Coin):
|
||||
return self.crypto.intra_reply(symbol)
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
return pd.DataFrame()
|
||||
|
||||
def chart_reply(self, symbol: Symbol) -> pd.DataFrame:
|
||||
"""Returns price data for a symbol of the past month up until the previous trading days close.
|
||||
Also caches multiple requests made in the same day.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbol : str
|
||||
Stock symbol.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
Returns a timeseries dataframe with high, low, and volume data if its available.
|
||||
Otherwise returns empty pd.DataFrame.
|
||||
"""
|
||||
if isinstance(symbol, Stock):
|
||||
return self.stock.chart_reply(symbol)
|
||||
elif isinstance(symbol, Coin):
|
||||
return self.crypto.chart_reply(symbol)
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
return pd.DataFrame()
|
||||
|
||||
def stat_reply(self, symbols: list[Symbol]) -> list[str]:
|
||||
"""Gets key statistics for each symbol in the list
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list[str]
|
||||
List of stock symbols
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable
|
||||
formatted string of the symbols statistics.
|
||||
"""
|
||||
replies = []
|
||||
|
||||
for symbol in symbols:
|
||||
if isinstance(symbol, Stock):
|
||||
replies.append(self.stock.stat_reply(symbol))
|
||||
elif isinstance(symbol, Coin):
|
||||
replies.append(self.crypto.stat_reply(symbol))
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
return replies
|
||||
|
||||
def cap_reply(self, symbols: list[Symbol]) -> list[str]:
|
||||
"""Gets market cap for each symbol in the list
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list[str]
|
||||
List of stock symbols
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable
|
||||
formatted string of the symbols market cap.
|
||||
"""
|
||||
replies = []
|
||||
|
||||
for symbol in symbols:
|
||||
if isinstance(symbol, Stock):
|
||||
replies.append(self.stock.cap_reply(symbol))
|
||||
elif isinstance(symbol, Coin):
|
||||
replies.append(self.crypto.cap_reply(symbol))
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
return replies
|
||||
|
||||
def spark_reply(self, symbols: list[Symbol]) -> list[str]:
|
||||
"""Gets change for each symbol and returns it in a compact format
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list[str]
|
||||
List of stock symbols
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[str]
|
||||
List of human readable strings.
|
||||
"""
|
||||
replies = []
|
||||
|
||||
for symbol in symbols:
|
||||
if isinstance(symbol, Stock):
|
||||
replies.append(self.stock.spark_reply(symbol))
|
||||
elif isinstance(symbol, Coin):
|
||||
replies.append(self.crypto.spark_reply(symbol))
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
return replies
|
||||
|
||||
@cached(cache=TTLCache(maxsize=1024, ttl=600))
|
||||
def trending(self) -> str:
|
||||
"""Checks APIs for trending symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[str]
|
||||
List of preformatted strings to be sent to user.
|
||||
"""
|
||||
|
||||
# stocks = self.stock.trending()
|
||||
coins = self.crypto.trending()
|
||||
|
||||
reply = ""
|
||||
|
||||
log.warning(self.trending_count)
|
||||
if self.trending_count:
|
||||
reply += "🔥Trending on the Stock Bot:\n`"
|
||||
reply += "━" * len("Trending on the Stock Bot:") + "`\n"
|
||||
|
||||
sorted_trending = [s[0] for s in sorted(self.trending_count.items(), key=lambda item: item[1])][::-1][0:5]
|
||||
log.warning(sorted_trending)
|
||||
for t in sorted_trending:
|
||||
reply += self.spark_reply(self.find_symbols(t))[0] + "\n"
|
||||
|
||||
if coins:
|
||||
reply += "\n\n🦎Trending on CoinGecko:\n`"
|
||||
reply += "━" * len("Trending on CoinGecko:") + "`\n"
|
||||
for coin in coins:
|
||||
reply += coin + "\n"
|
||||
|
||||
if "`$GME" in reply:
|
||||
reply = reply.replace("🔥", "🦍")
|
||||
|
||||
if reply:
|
||||
return reply
|
||||
else:
|
||||
log.warning("Failed to collect trending data.")
|
||||
return "Trending data is not currently available."
|
||||
|
||||
def random_pick(self) -> str:
|
||||
# choice = random.choice(list(self.stock.symbol_list["description"]) + list(self.crypto.symbol_list["description"]))
|
||||
choice = random.choice(list(self.crypto.symbol_list["description"]))
|
||||
hold = (datetime.date.today() + datetime.timedelta(random.randint(1, 365))).strftime("%b %d, %Y")
|
||||
|
||||
return f"{choice}\nBuy and hold until: {hold}"
|
||||
|
||||
def batch_price_reply(self, symbols: list[Symbol]) -> list[str]:
|
||||
"""Returns current market price or after hours if its available for a given stock symbol.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
symbols : list
|
||||
List of stock symbols.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, str]
|
||||
Each symbol passed in is a key with its value being a human readable
|
||||
markdown formatted string of the symbols price and movement.
|
||||
"""
|
||||
replies = []
|
||||
stocks = []
|
||||
coins = []
|
||||
|
||||
for symbol in symbols:
|
||||
if isinstance(symbol, Stock):
|
||||
stocks.append(symbol)
|
||||
elif isinstance(symbol, Coin):
|
||||
coins.append(symbol)
|
||||
else:
|
||||
log.debug(f"{symbol} is not a Stock or Coin")
|
||||
|
||||
if stocks:
|
||||
for stock in stocks:
|
||||
replies.append(self.stock.price_reply(stock))
|
||||
if coins:
|
||||
replies = replies + self.crypto.batch_price(coins)
|
||||
|
||||
return replies
|
||||
|
||||
def options(self, request: str, symbols: list[Symbol]) -> Dict:
|
||||
request = request.lower()
|
||||
if len(symbols) == 1:
|
||||
symbol = symbols[0]
|
||||
request = request.replace(symbol.tag.lower(), symbol.symbol.lower())
|
||||
return self.stock.options_reply(request)
|
||||
else:
|
||||
return self.stock.options_reply(request)
|
||||
|
@@ -1,31 +1,31 @@
|
||||
import time
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def rate_limited(max_per_second):
|
||||
"""
|
||||
Decorator that ensures the wrapped function is called at most `max_per_second` times per second.
|
||||
"""
|
||||
min_interval = 1.0 / max_per_second
|
||||
|
||||
def decorate(func):
|
||||
last_called = [0.0]
|
||||
|
||||
def rate_limited_function(*args, **kwargs):
|
||||
elapsed = time.time() - last_called[0]
|
||||
left_to_wait = min_interval - elapsed
|
||||
|
||||
if left_to_wait > 0:
|
||||
log.info(f"Rate limit exceeded. Waiting for {left_to_wait:.2f} seconds.")
|
||||
time.sleep(left_to_wait)
|
||||
|
||||
ret = func(*args, **kwargs)
|
||||
last_called[0] = time.time()
|
||||
|
||||
return ret
|
||||
|
||||
return rate_limited_function
|
||||
|
||||
return decorate
|
||||
import time
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def rate_limited(max_per_second):
|
||||
"""
|
||||
Decorator that ensures the wrapped function is called at most `max_per_second` times per second.
|
||||
"""
|
||||
min_interval = 1.0 / max_per_second
|
||||
|
||||
def decorate(func):
|
||||
last_called = [0.0]
|
||||
|
||||
def rate_limited_function(*args, **kwargs):
|
||||
elapsed = time.time() - last_called[0]
|
||||
left_to_wait = min_interval - elapsed
|
||||
|
||||
if left_to_wait > 0:
|
||||
log.info(f"Rate limit exceeded. Waiting for {left_to_wait:.2f} seconds.")
|
||||
time.sleep(left_to_wait)
|
||||
|
||||
ret = func(*args, **kwargs)
|
||||
last_called[0] = time.time()
|
||||
|
||||
return ret
|
||||
|
||||
return rate_limited_function
|
||||
|
||||
return decorate
|
||||
|
Reference in New Issue
Block a user