diff --git a/common/MarketData.py b/common/MarketData.py index dde8951..d90b37a 100644 --- a/common/MarketData.py +++ b/common/MarketData.py @@ -1,362 +1,371 @@ -import datetime as dt -import logging -import os -from collections import OrderedDict -from typing import Dict - -import humanize -import pandas as pd -import pytz -import requests as r -import schedule - -from common.Symbol import Stock - -log = logging.getLogger(__name__) - - -class MarketData: - """ - Functions for finding stock market information about symbols from MarkData.app - """ - - SYMBOL_REGEX = "[$]([a-zA-Z]{1,4})" - - symbol_list: Dict[str, Dict] = {} - charts: Dict[Stock, pd.DataFrame] = {} - - openTime = dt.time(hour=9, minute=30, second=0) - marketTimeZone = pytz.timezone("US/Eastern") - - def __init__(self) -> None: - """Creates a Symbol Object - - Parameters - ---------- - MARKETDATA_TOKEN : str - MarketData.app API Token - """ - - try: - self.MARKETDATA_TOKEN = os.environ["MARKETDATA"] - - if self.MARKETDATA_TOKEN == "TOKEN": - self.MARKETDATA_TOKEN = "" - except KeyError: - self.MARKETDATA_TOKEN = "" - log.warning("Starting without an MarketData.app Token will not allow you to get market data!") - log.warning("Use this affiliate link so that the bot can stay free:") - log.warning("https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=repo") - - if self.MARKETDATA_TOKEN != "": - schedule.every().day.do(self.clear_charts) - - self.get_symbol_list() - schedule.every().day.do(self.get_symbol_list) - - def get(self, endpoint, params=None, timeout=10, headers=None) -> dict: - url = "https://api.marketdata.app/v1/" + endpoint - - if params is None: - params = {} - - # set token param if it wasn't passed. - params["token"] = self.MARKETDATA_TOKEN - - # Undocumented query variable that ensures bot usage can be - # monitored even if someone doesn't make it through an affiliate link. - params["application"] = "simplestockbot" - - if headers is None: - headers = {} - headers = {"User-Agent": "Simple Stock Bot anson@ansonbiggs.com"} | headers - - resp = r.get(url, params=params, timeout=timeout, headers=headers) - - logging.error(resp.headers.items()) - - # Make sure API returned a proper status code - try: - resp.raise_for_status() - except r.exceptions.HTTPError as e: - logging.error(e) - return {} - - # Make sure API returned valid JSON - try: - resp_json = resp.json() - - match resp_json["s"]: - case "ok": - return resp_json - case "no_data": - return resp_json - case "error": - logging.error("MarketData Error:\n" + resp_json["errmsg"]) - return {} - - except r.exceptions.JSONDecodeError as e: - logging.error(e) - - return {} - - def symbol_id(self, symbol: str) -> Dict[str, Dict]: - return self.symbol_list.get(symbol.upper(), None) - - def get_symbol_list(self): - # Doesn't use `self.get`` since needs are much different - sec_resp = r.get( - "https://www.sec.gov/files/company_tickers.json", - headers={ - "User-Agent": "Simple Stock Bot anson@ansonbiggs.com", - "Accept-Encoding": "gzip, deflate", - "Host": "www.sec.gov", - }, - ) - sec_resp.raise_for_status() - sec_data = sec_resp.json() - - for rank, ticker_info in sec_data.items(): - self.symbol_list[ticker_info["ticker"]] = { - "ticker": ticker_info["ticker"], - "title": ticker_info["title"], - "mkt_cap_rank": rank, - } - - def clear_charts(self) -> None: - """ - Clears cache of chart data. - Charts are cached so that only 1 API call per 24 hours is needed since the - chart data is expensive and a large download. - """ - self.charts = {} - - def status(self) -> str: - # TODO: At the moment this API is poorly documented, this function likely needs to be revisited later. - - try: - status = r.get( - "https://stats.uptimerobot.com/api/getMonitorList/6Kv3zIow0A", - timeout=5, - ) - status.raise_for_status() - except r.HTTPError: - return f"API returned an HTTP error code {status.status_code} in {status.elapsed.total_seconds()} seconds." - except r.Timeout: - return "API timed out before it was able to give status. This is likely due to a surge in usage or a complete outage." - - statusJSON = status.json() - - if statusJSON["status"] == "ok": - return ( - f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} seconds." - ) - else: - return f"MarketData.app is currently reporting the following status: {statusJSON['status']}" - - def price_reply(self, symbol: Stock) -> str: - """Returns price movement of Stock for the last market day, or after hours. - - Parameters - ---------- - symbol : Stock - - Returns - ------- - str - Formatted markdown - """ - - if quoteResp := self.get(f"stocks/quotes/{symbol.symbol}/"): - price = round(quoteResp["last"][0], 2) - - try: - changePercent = round(quoteResp["changepct"][0], 2) - except TypeError: - return f"The price of {symbol.name} is ${price}" - - message = f"The current price of {symbol.name} is ${price} and " - - if changePercent > 0.0: - message += f"is currently up {changePercent}% for the day." - elif changePercent < 0.0: - message += f"is currently down {changePercent}% for the day." - else: - message += "hasn't shown any movement for the day." - - return message - else: - return f"Getting a quote for {symbol} encountered an error." - - def spark_reply(self, symbol: Stock) -> str: - if quoteResp := self.get(f"stocks/quotes/{symbol}/"): - try: - changePercent = round(quoteResp["changepct"][0], 2) - return f"`{symbol.tag}`: {changePercent}%" - except TypeError: - pass - - return f"`{symbol.tag}`" - - def intra_reply(self, symbol: Stock) -> pd.DataFrame: - """Returns price data for a symbol of the past month up until the previous trading days close. - Also caches multiple requests made in the same day. - - Parameters - ---------- - symbol : str - Stock symbol. - - Returns - ------- - pd.DataFrame - Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. - """ - schedule.run_pending() - - try: - return self.charts[symbol.id.upper()] - except KeyError: - pass - - resolution = "15" # minutes - now = dt.datetime.now(self.marketTimeZone) - - if self.openTime < now.time(): - startTime = now.replace(hour=9, minute=30) - else: - startTime = now - dt.timedelta(days=1) - - if data := self.get( - f"stocks/candles/{resolution}/{symbol}", - params={"from": startTime.timestamp(), "to": now.timestamp(), "extended": True}, - ): - data.pop("s") - df = pd.DataFrame(data) - df["t"] = pd.to_datetime(df["t"], unit="s", utc=True) - df.set_index("t", inplace=True) - - df.rename( - columns={ - "o": "Open", - "h": "High", - "l": "Low", - "c": "Close", - "v": "Volume", - }, - inplace=True, - ) - - self.charts[symbol.id.upper()] = df - return df - - return pd.DataFrame() - - def chart_reply(self, symbol: Stock) -> pd.DataFrame: - """Returns price data for a symbol of the past month up until the previous trading days close. - Also caches multiple requests made in the same day. - - Parameters - ---------- - symbol : str - Stock symbol. - - Returns - ------- - pd.DataFrame - Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. - """ - schedule.run_pending() - - try: - return self.charts[symbol.id.upper()] - except KeyError: - pass - - to_date = dt.datetime.today().strftime("%Y-%m-%d") - from_date = (dt.datetime.today() - dt.timedelta(days=30)).strftime("%Y-%m-%d") - resultion = "daily" - - if data := self.get( - f"stocks/candles/{resultion}/{symbol}", - params={ - "from": from_date, - "to": to_date, - }, - ): - data.pop("s") - - df = pd.DataFrame(data) - df["t"] = pd.to_datetime(df["t"], unit="s") - df.set_index("t", inplace=True) - - df.rename( - columns={ - "o": "Open", - "h": "High", - "l": "Low", - "c": "Close", - "v": "Volume", - }, - inplace=True, - ) - - self.charts[symbol.id.upper()] = df - return df - - return pd.DataFrame() - - def options_reply(self, request: str) -> str: - """Undocumented API Usage!""" - - options_data = self.get(f"options/quotes/{request}") - - for key in options_data.keys(): - options_data[key] = options_data[key][0] - - options_data["underlying"] = "$" + options_data["underlying"] - - options_data["updated"] = humanize.naturaltime(dt.datetime.now() - dt.datetime.fromtimestamp(options_data["updated"])) - - options_data["expiration"] = humanize.naturaltime( - dt.datetime.now() - dt.datetime.fromtimestamp(options_data["expiration"]) - ) - - options_data["firstTraded"] = humanize.naturaltime( - dt.datetime.now() - dt.datetime.fromtimestamp(options_data["firstTraded"]) - ) - - rename = { - "optionSymbol": "Option Symbol", - "underlying": "Underlying", - "expiration": "Expiration", - "side": "side", - "strike": "strike", - "firstTraded": "First Traded", - "updated": "Last Updated", - "bid": "bid", - "bidSize": "bidSize", - "mid": "mid", - "ask": "ask", - "askSize": "askSize", - "last": "last", - "openInterest": "Open Interest", - "volume": "Volume", - "inTheMoney": "inTheMoney", - "intrinsicValue": "Intrinsic Value", - "extrinsicValue": "Extrinsic Value", - "underlyingPrice": "Underlying Price", - "iv": "Implied Volatility", - "delta": "delta", - "gamma": "gamma", - "theta": "theta", - "vega": "vega", - "rho": "rho", - } - - options_cleaned = OrderedDict() - for old, new in rename.items(): - if old in options_data: - options_cleaned[new] = options_data[old] - - return options_cleaned +import datetime as dt +import logging +import os +from collections import OrderedDict +from typing import Dict + +import humanize +import pandas as pd +import pytz +import requests as r +import schedule + + +from common.Symbol import Stock + +log = logging.getLogger(__name__) + + +class MarketData: + """ + Functions for finding stock market information about symbols from MarkData.app + """ + + SYMBOL_REGEX = "[$]([a-zA-Z]{1,4})" + + symbol_list: Dict[str, Dict] = {} + charts: Dict[Stock, pd.DataFrame] = {} + + openTime = dt.time(hour=9, minute=30, second=0) + marketTimeZone = pytz.timezone("US/Eastern") + + def __init__(self) -> None: + """Creates a Symbol Object + + Parameters + ---------- + MARKETDATA_TOKEN : str + MarketData.app API Token + """ + + try: + self.MARKETDATA_TOKEN = os.environ["MARKETDATA"] + + if self.MARKETDATA_TOKEN == "TOKEN": + self.MARKETDATA_TOKEN = "" + except KeyError: + self.MARKETDATA_TOKEN = "" + log.warning( + "Starting without an MarketData.app Token will not allow you to get market data!" + ) + log.warning("Use this affiliate link so that the bot can stay free:") + log.warning( + "https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=repo" + ) + + if self.MARKETDATA_TOKEN != "": + schedule.every().day.do(self.clear_charts) + + self.get_symbol_list() + schedule.every().day.do(self.get_symbol_list) + + def get(self, endpoint, params=None, timeout=10, headers=None) -> dict: + url = "https://api.marketdata.app/v1/" + endpoint + + if params is None: + params = {} + + # set token param if it wasn't passed. + params["token"] = self.MARKETDATA_TOKEN + + # Undocumented query variable that ensures bot usage can be + # monitored even if someone doesn't make it through an affiliate link. + params["application"] = "simplestockbot" + + if headers is None: + headers = {} + headers = {"User-Agent": "Simple Stock Bot anson@ansonbiggs.com"} | headers + + resp = r.get(url, params=params, timeout=timeout, headers=headers) + + logging.error(resp.headers.items()) + + # Make sure API returned a proper status code + try: + resp.raise_for_status() + except r.exceptions.HTTPError as e: + logging.error(e) + return {} + + # Make sure API returned valid JSON + try: + resp_json = resp.json() + + match resp_json["s"]: + case "ok": + return resp_json + case "no_data": + return resp_json + case "error": + logging.error("MarketData Error:\n" + resp_json["errmsg"]) + return {} + + except r.exceptions.JSONDecodeError as e: + logging.error(e) + + return {} + + def symbol_id(self, symbol: str) -> Dict[str, Dict]: + return self.symbol_list.get(symbol.upper(), None) + + def get_symbol_list(self): + # Doesn't use `self.get()` since needs are much different + sec_resp = r.get( + "https://www.sec.gov/files/company_tickers.json", + headers={ + "User-Agent": "Simple Stock Bot anson@ansonbiggs.com", + "Accept-Encoding": "gzip, deflate", + "Host": "www.sec.gov", + }, + ) + sec_resp.raise_for_status() + sec_data = sec_resp.json() + + for rank, ticker_info in sec_data.items(): + self.symbol_list[ticker_info["ticker"]] = { + "ticker": ticker_info["ticker"], + "title": ticker_info["title"], + "mkt_cap_rank": rank, + } + + def clear_charts(self) -> None: + """ + Clears cache of chart data. + Charts are cached so that only 1 API call per 24 hours is needed since the + chart data is expensive and a large download. + """ + self.charts = {} + + def status(self) -> str: + # TODO: At the moment this API is poorly documented, this function likely needs to be revisited later. + + try: + status = r.get( + "https://stats.uptimerobot.com/api/getMonitorList/6Kv3zIow0A", + timeout=5, + ) + status.raise_for_status() + except r.HTTPError: + return f"API returned an HTTP error code {status.status_code} in {status.elapsed.total_seconds()} seconds." + except r.Timeout: + return "API timed out before it was able to give status. This is likely due to a surge in usage or a complete outage." + + statusJSON = status.json() + + if statusJSON["status"] == "ok": + return f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} seconds." + else: + return f"MarketData.app is currently reporting the following status: {statusJSON['status']}" + + def price_reply(self, symbol: Stock) -> str: + """Returns price movement of Stock for the last market day, or after hours. + + Parameters + ---------- + symbol : Stock + + Returns + ------- + str + Formatted markdown + """ + + if quoteResp := self.get(f"stocks/quotes/{symbol.symbol}/"): + price = round(quoteResp["last"][0], 2) + + try: + changePercent = round(quoteResp["changepct"][0], 2) + except TypeError: + return f"The price of {symbol.name} is ${price}" + + message = f"The current price of {symbol.name} is ${price} and " + + if changePercent > 0.0: + message += f"is currently up {changePercent}% for the day." + elif changePercent < 0.0: + message += f"is currently down {changePercent}% for the day." + else: + message += "hasn't shown any movement for the day." + + return message + else: + return f"Getting a quote for {symbol} encountered an error." + + def spark_reply(self, symbol: Stock) -> str: + if quoteResp := self.get(f"stocks/quotes/{symbol}/"): + try: + changePercent = round(quoteResp["changepct"][0], 2) + return f"`{symbol.tag}`: {changePercent}%" + except TypeError: + pass + + return f"`{symbol.tag}`" + + def intra_reply(self, symbol: Stock) -> pd.DataFrame: + """Returns price data for a symbol of the past month up until the previous trading days close. + Also caches multiple requests made in the same day. + + Parameters + ---------- + symbol : str + Stock symbol. + + Returns + ------- + pd.DataFrame + Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. + """ + schedule.run_pending() + + try: + return self.charts[symbol.id.upper()] + except KeyError: + pass + + resolution = "15" # minutes + now = dt.datetime.now(self.marketTimeZone) + + if self.openTime < now.time(): + startTime = now.replace(hour=9, minute=30) + else: + startTime = now - dt.timedelta(days=1) + + if data := self.get( + f"stocks/candles/{resolution}/{symbol}", + params={ + "from": startTime.timestamp(), + "to": now.timestamp(), + "extended": True, + }, + ): + data.pop("s") + df = pd.DataFrame(data) + df["t"] = pd.to_datetime(df["t"], unit="s", utc=True) + df.set_index("t", inplace=True) + + df.rename( + columns={ + "o": "Open", + "h": "High", + "l": "Low", + "c": "Close", + "v": "Volume", + }, + inplace=True, + ) + + self.charts[symbol.id.upper()] = df + return df + + return pd.DataFrame() + + def chart_reply(self, symbol: Stock) -> pd.DataFrame: + """Returns price data for a symbol of the past month up until the previous trading days close. + Also caches multiple requests made in the same day. + + Parameters + ---------- + symbol : str + Stock symbol. + + Returns + ------- + pd.DataFrame + Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. + """ + schedule.run_pending() + + try: + return self.charts[symbol.id.upper()] + except KeyError: + pass + + to_date = dt.datetime.today().strftime("%Y-%m-%d") + from_date = (dt.datetime.today() - dt.timedelta(days=30)).strftime("%Y-%m-%d") + resultion = "daily" + + if data := self.get( + f"stocks/candles/{resultion}/{symbol}", + params={ + "from": from_date, + "to": to_date, + }, + ): + data.pop("s") + + df = pd.DataFrame(data) + df["t"] = pd.to_datetime(df["t"], unit="s") + df.set_index("t", inplace=True) + + df.rename( + columns={ + "o": "Open", + "h": "High", + "l": "Low", + "c": "Close", + "v": "Volume", + }, + inplace=True, + ) + + self.charts[symbol.id.upper()] = df + return df + + return pd.DataFrame() + + def options_reply(self, request: str) -> str: + """Undocumented API Usage!""" + + options_data = self.get(f"options/quotes/{request}") + + for key in options_data.keys(): + options_data[key] = options_data[key][0] + + options_data["underlying"] = "$" + options_data["underlying"] + + options_data["updated"] = humanize.naturaltime( + dt.datetime.now() - dt.datetime.fromtimestamp(options_data["updated"]) + ) + + options_data["expiration"] = humanize.naturaltime( + dt.datetime.now() - dt.datetime.fromtimestamp(options_data["expiration"]) + ) + + options_data["firstTraded"] = humanize.naturaltime( + dt.datetime.now() - dt.datetime.fromtimestamp(options_data["firstTraded"]) + ) + + rename = { + "optionSymbol": "Option Symbol", + "underlying": "Underlying", + "expiration": "Expiration", + "side": "side", + "strike": "strike", + "firstTraded": "First Traded", + "updated": "Last Updated", + "bid": "bid", + "bidSize": "bidSize", + "mid": "mid", + "ask": "ask", + "askSize": "askSize", + "last": "last", + "openInterest": "Open Interest", + "volume": "Volume", + "inTheMoney": "inTheMoney", + "intrinsicValue": "Intrinsic Value", + "extrinsicValue": "Extrinsic Value", + "underlyingPrice": "Underlying Price", + "iv": "Implied Volatility", + "delta": "delta", + "gamma": "gamma", + "theta": "theta", + "vega": "vega", + "rho": "rho", + } + + options_cleaned = OrderedDict() + for old, new in rename.items(): + if old in options_data: + options_cleaned[new] = options_data[old] + + return options_cleaned diff --git a/common/Symbol.py b/common/Symbol.py index cdd8602..d7d502e 100644 --- a/common/Symbol.py +++ b/common/Symbol.py @@ -1,52 +1,55 @@ -import logging - -import pandas as pd - - -class Symbol: - """ - symbol: What the user calls it. ie tsla or btc - id: What the api expects. ie tsla or bitcoin - name: Human readable. ie Tesla or Bitcoin - tag: Uppercase tag to call the symbol. ie $TSLA or $$BTC - """ - - currency = "usd" - pass - - def __init__(self, symbol) -> None: - self.symbol = symbol - self.id = symbol - self.name = symbol - self.tag = "$" + symbol - - def __repr__(self) -> str: - return f"<{self.__class__.__name__} instance of {self.id} at {id(self)}>" - - def __str__(self) -> str: - return self.id - - -class Stock(Symbol): - """Stock Market Object. Gets data from MarketData""" - - def __init__(self, symbol_info: dict) -> None: - self.symbol = symbol_info["ticker"] - self.id = symbol_info["ticker"] - self.name = symbol_info["title"] - self.tag = "$" + symbol_info["ticker"] - self.market_cap_rank = symbol_info["mkt_cap_rank"] - - -class Coin(Symbol): - """Cryptocurrency Object. Gets data from CoinGecko.""" - - def __init__(self, symbol: pd.DataFrame) -> None: - if len(symbol) > 1: - logging.info(f"Crypto with shared id:\n\t{symbol.id}") - symbol = symbol.head(1) - - self.symbol = symbol.symbol.values[0] - self.id = symbol.id.values[0] - self.name = symbol.name.values[0] - self.tag = symbol.type_id.values[0].upper() +import logging + +import pandas as pd + + +class Symbol: + """ + symbol: What the user calls it. ie tsla or btc + id: What the api expects. ie tsla or bitcoin + name: Human readable. ie Tesla or Bitcoin + tag: Uppercase tag to call the symbol. ie $TSLA or $$BTC + """ + + currency = "usd" + pass + + def __init__(self, symbol) -> None: + self.symbol = symbol + self.id = symbol + self.name = symbol + self.tag = "$" + symbol + + def __repr__(self) -> str: + return f"<{self.__class__.__name__} instance of {self.id} at {id(self)}>" + + def __str__(self) -> str: + return self.id + + def __hash__(self): + return hash(self.id) + + +class Stock(Symbol): + """Stock Market Object. Gets data from MarketData""" + + def __init__(self, symbol_info: dict) -> None: + self.symbol = symbol_info["ticker"] + self.id = symbol_info["ticker"] + self.name = symbol_info["title"] + self.tag = "$" + symbol_info["ticker"] + self.market_cap_rank = symbol_info["mkt_cap_rank"] + + +class Coin(Symbol): + """Cryptocurrency Object. Gets data from CoinGecko.""" + + def __init__(self, symbol: pd.DataFrame) -> None: + if len(symbol) > 1: + logging.info(f"Crypto with shared id:\n\t{symbol.id}") + symbol = symbol.head(1) + + self.symbol = symbol.symbol.values[0] + self.id = symbol.id.values[0] + self.name = symbol.name.values[0] + self.tag = symbol.type_id.values[0].upper() diff --git a/common/cg_Crypto.py b/common/cg_Crypto.py index 4ba41a0..136edc7 100644 --- a/common/cg_Crypto.py +++ b/common/cg_Crypto.py @@ -1,381 +1,388 @@ -import logging -from typing import List - -import pandas as pd -import requests as r -import schedule -from markdownify import markdownify - -from common.Symbol import Coin -from common.utilities import rate_limited - -import time - -log = logging.getLogger(__name__) - - -class cg_Crypto: - """ - Functions for finding crypto info - """ - - vs_currency = "usd" # simple/supported_vs_currencies for list of options - - trending_cache: List[str] = [] - - def __init__(self) -> None: - self.get_symbol_list() - schedule.every().day.do(self.get_symbol_list) - - # Coingecko's rate limit is 30 requests per minute. - # Since there are two bots sharing the same IP, we allocate half of that limit to each bot. - # This results in a rate limit of 15 requests per minute for each bot. - # Given this, the rate limit effectively becomes 1 request every 4 seconds for each bot. - @rate_limited(0.25) - def get(self, endpoint, params: dict = {}, timeout=10) -> dict: - url = "https://api.coingecko.com/api/v3" + endpoint - resp = r.get(url, params=params, timeout=timeout) - # Make sure API returned a proper status code - - if resp.status_code == 429: - log.warning(f"CoinGecko returned 429 - Too Many Requests for endpoint: {endpoint}. Sleeping and trying again.") - time.sleep(10) - return self.get(endpoint=endpoint, params=params, timeout=timeout) - - try: - resp.raise_for_status() - except r.exceptions.HTTPError as e: - log.error(e) - return {} - - # Make sure API returned valid JSON - try: - resp_json = resp.json() - return resp_json - except r.exceptions.JSONDecodeError as e: - log.error(e) - return {} - - def symbol_id(self, symbol) -> str: - try: - return self.symbol_list[self.symbol_list["symbol"] == symbol]["id"].values[0] - except KeyError: - return "" - - def get_symbol_list(self): - raw_symbols = self.get("/coins/list") - symbols = pd.DataFrame(data=raw_symbols) - - # Removes all binance-peg symbols - symbols = symbols[~symbols["id"].str.contains("binance-peg")] - - symbols["description"] = "$$" + symbols["symbol"].str.upper() + ": " + symbols["name"] - symbols = symbols[["id", "symbol", "name", "description"]] - symbols["type_id"] = "$$" + symbols["symbol"] - - self.symbol_list = symbols - - def status(self) -> str: - """Checks CoinGecko /ping endpoint for API issues. - - Returns - ------- - str - Human readable text on status of CoinGecko API - """ - status = r.get( - "https://api.coingecko.com/api/v3/ping", - timeout=5, - ) - - try: - status.raise_for_status() - return ( - f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} seconds." - ) - except r.HTTPError: - return f"CoinGecko API returned an error code {status.status_code} in {status.elapsed.total_seconds()} seconds." - - def price_reply(self, coin: Coin) -> str: - """Returns current market price or after hours if its available for a given coin symbol. - - Parameters - ---------- - symbols : list - List of coin symbols. - - Returns - ------- - Dict[str, str] - Each symbol passed in is a key with its value being a human readable - markdown formatted string of the symbols price and movement. - """ - - if resp := self.get( - "/simple/price", - params={ - "ids": coin.id, - "vs_currencies": self.vs_currency, - "include_24hr_change": "true", - }, - ): - try: - data = resp[coin.id] - - price = data[self.vs_currency] - change = data[self.vs_currency + "_24h_change"] - if change is None: - change = 0 - except KeyError: - return f"{coin.id} returned an error." - - message = f"The current price of {coin.name} is $**{price:,}**" - - # Determine wording of change text - if change > 0: - message += f", the coin is currently **up {change:.3f}%** for today" - elif change < 0: - message += f", the coin is currently **down {change:.3f}%** for today" - else: - message += ", the coin hasn't shown any movement today." - - else: - message = f"The price for {coin.name} is not available. If you suspect this is an error run `/status`" - - return message - - def intra_reply(self, symbol: Coin) -> pd.DataFrame: - """Returns price data for a symbol since the last market open. - - Parameters - ---------- - symbol : str - Stock symbol. - - Returns - ------- - pd.DataFrame - Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. - """ - - if resp := self.get( - f"/coins/{symbol.id}/ohlc", - params={"vs_currency": self.vs_currency, "days": 1}, - ): - df = pd.DataFrame(resp, columns=["Date", "Open", "High", "Low", "Close"]).dropna() - df["Date"] = pd.to_datetime(df["Date"], unit="ms") - df = df.set_index("Date") - return df - - return pd.DataFrame() - - def chart_reply(self, symbol: Coin) -> pd.DataFrame: - """Returns price data for a symbol of the past month up until the previous trading days close. - Also caches multiple requests made in the same day. - - Parameters - ---------- - symbol : str - Stock symbol. - - Returns - ------- - pd.DataFrame - Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. - """ - - if resp := self.get( - f"/coins/{symbol.id}/ohlc", - params={"vs_currency": self.vs_currency, "days": 30}, - ): - df = pd.DataFrame(resp, columns=["Date", "Open", "High", "Low", "Close"]).dropna() - df["Date"] = pd.to_datetime(df["Date"], unit="ms") - df = df.set_index("Date") - return df - - return pd.DataFrame() - - def stat_reply(self, symbol: Coin) -> str: - """Gathers key statistics on coin. Mostly just CoinGecko scores. - - Parameters - ---------- - symbol : Coin - - Returns - ------- - str - Preformatted markdown. - """ - - if data := self.get( - f"/coins/{symbol.id}", - params={ - "localization": "false", - }, - ): - return f""" - [{data['name']}]({data['links']['homepage'][0]}) Statistics: - Market Cap: ${data['market_data']['market_cap'][self.vs_currency]:,} - Market Cap Ranking: {data.get('market_cap_rank',"Not Available")} - CoinGecko Scores: - Overall: {data.get('coingecko_score','Not Available')} - Development: {data.get('developer_score','Not Available')} - Community: {data.get('community_score','Not Available')} - Public Interest: {data.get('public_interest_score','Not Available')} - """ - else: - return f"{symbol.symbol} returned an error." - - def cap_reply(self, coin: Coin) -> str: - """Gets market cap for Coin - - Parameters - ---------- - coin : Coin - - Returns - ------- - str - Preformatted markdown. - """ - - if resp := self.get( - "/simple/price", - params={ - "ids": coin.id, - "vs_currencies": self.vs_currency, - "include_market_cap": "true", - }, - ): - log.debug(resp) - try: - data = resp[coin.id] - - price = data[self.vs_currency] - cap = data[self.vs_currency + "_market_cap"] - except KeyError: - return f"{coin.id} returned an error." - - if cap == 0: - return f"The market cap for {coin.name} is not available for unknown reasons." - - message = ( - f"The current price of {coin.name} is $**{price:,}** and" - + " its market cap is $**{cap:,.2f}** {self.vs_currency.upper()}" - ) - - else: - message = f"The Coin: {coin.name} was not found or returned and error." - - return message - - def info_reply(self, symbol: Coin) -> str: - """Gets coin description - - Parameters - ---------- - symbol : Coin - - Returns - ------- - str - Preformatted markdown. - """ - - if data := self.get( - f"/coins/{symbol.id}", - params={"localization": "false"}, - ): - try: - return markdownify(data["description"]["en"]) - except KeyError: - return f"{symbol} does not have a description available." - - return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist." - - def spark_reply(self, symbol: Coin) -> str: - change = self.get( - "/simple/price", - params={ - "ids": symbol.id, - "vs_currencies": self.vs_currency, - "include_24hr_change": "true", - }, - )[symbol.id]["usd_24h_change"] - - return f"`{symbol.tag}`: {symbol.name}, {change:.2f}%" - - def trending(self) -> list[str]: - """Gets current coins trending on coingecko - - Returns - ------- - list[str] - list of $$ID: NAME, CHANGE% - """ - - coins = self.get("/search/trending") - try: - trending = [] - for coin in coins["coins"]: - c = coin["item"] - - sym = c["symbol"].upper() - name = c["name"] - change = self.get( - "/simple/price", - params={ - "ids": c["id"], - "vs_currencies": self.vs_currency, - "include_24hr_change": "true", - }, - )[c["id"]]["usd_24h_change"] - - msg = f"`$${sym}`: {name}, {change:.2f}%" - - trending.append(msg) - - except Exception as e: - log.warning(e) - return self.trending_cache - - self.trending_cache = trending - return trending - - def batch_price(self, coins: list[Coin]) -> list[str]: - """Gets price of a list of coins all in one API call - - Parameters - ---------- - coins : list[Coin] - - Returns - ------- - list[str] - returns preformatted list of strings detailing price movement of each coin passed in. - """ - query = ",".join([c.id for c in coins]) - - prices = self.get( - "/simple/price", - params={ - "ids": query, - "vs_currencies": self.vs_currency, - "include_24hr_change": "true", - }, - ) - - replies = [] - for coin in coins: - if coin.id in prices: - p = prices[coin.id] - - if p.get("usd_24h_change") is None: - p["usd_24h_change"] = 0 - - replies.append( - f"{coin.name}: ${p.get('usd',0):,} and has moved {p.get('usd_24h_change',0.0):.2f}% in the past 24 hours." - ) - - return replies +import logging +from typing import List + +import pandas as pd +import requests as r +import schedule +from markdownify import markdownify +from common.Symbol import Coin +from common.utilities import rate_limited + +import time + +log = logging.getLogger(__name__) + + +class cg_Crypto: + """ + Functions for finding crypto info + """ + + vs_currency = "usd" # simple/supported_vs_currencies for list of options + + trending_cache: List[str] = [] + + def __init__(self) -> None: + self.get_symbol_list() + schedule.every().day.do(self.get_symbol_list) + + # Coingecko's rate limit is 30 requests per minute. + # Since there are two bots sharing the same IP, we allocate half of that limit to each bot. + # This results in a rate limit of 15 requests per minute for each bot. + # Given this, the rate limit effectively becomes 1 request every 4 seconds for each bot. + @rate_limited(0.25) + def get(self, endpoint, params: dict = {}, timeout=10) -> dict: + url = "https://api.coingecko.com/api/v3" + endpoint + resp = r.get(url, params=params, timeout=timeout) + # Make sure API returned a proper status code + + if resp.status_code == 429: + log.warning( + f"CoinGecko returned 429 - Too Many Requests for endpoint: {endpoint}. Sleeping and trying again." + ) + time.sleep(10) + return self.get(endpoint=endpoint, params=params, timeout=timeout) + + try: + resp.raise_for_status() + except r.exceptions.HTTPError as e: + log.error(e) + return {} + + # Make sure API returned valid JSON + try: + resp_json = resp.json() + return resp_json + except r.exceptions.JSONDecodeError as e: + log.error(e) + return {} + + def symbol_id(self, symbol) -> str: + try: + return self.symbol_list[self.symbol_list["symbol"] == symbol]["id"].values[ + 0 + ] + except KeyError: + return "" + + def get_symbol_list(self): + raw_symbols = self.get("/coins/list") + symbols = pd.DataFrame(data=raw_symbols) + + # Removes all binance-peg symbols + symbols = symbols[~symbols["id"].str.contains("binance-peg")] + + symbols["description"] = ( + "$$" + symbols["symbol"].str.upper() + ": " + symbols["name"] + ) + symbols = symbols[["id", "symbol", "name", "description"]] + symbols["type_id"] = "$$" + symbols["symbol"] + + self.symbol_list = symbols + + def status(self) -> str: + """Checks CoinGecko /ping endpoint for API issues. + + Returns + ------- + str + Human readable text on status of CoinGecko API + """ + status = r.get( + "https://api.coingecko.com/api/v3/ping", + timeout=5, + ) + + try: + status.raise_for_status() + return f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} seconds." + except r.HTTPError: + return f"CoinGecko API returned an error code {status.status_code} in {status.elapsed.total_seconds()} seconds." + + def price_reply(self, coin: Coin) -> str: + """Returns current market price or after hours if its available for a given coin symbol. + + Parameters + ---------- + symbols : list + List of coin symbols. + + Returns + ------- + Dict[str, str] + Each symbol passed in is a key with its value being a human readable + markdown formatted string of the symbols price and movement. + """ + + if resp := self.get( + "/simple/price", + params={ + "ids": coin.id, + "vs_currencies": self.vs_currency, + "include_24hr_change": "true", + }, + ): + try: + data = resp[coin.id] + + price = data[self.vs_currency] + change = data[self.vs_currency + "_24h_change"] + if change is None: + change = 0 + except KeyError: + return f"{coin.id} returned an error." + + message = f"The current price of {coin.name} is $**{price:,}**" + + # Determine wording of change text + if change > 0: + message += f", the coin is currently **up {change:.3f}%** for today" + elif change < 0: + message += f", the coin is currently **down {change:.3f}%** for today" + else: + message += ", the coin hasn't shown any movement today." + + else: + message = f"The price for {coin.name} is not available. If you suspect this is an error run `/status`" + + return message + + def intra_reply(self, symbol: Coin) -> pd.DataFrame: + """Returns price data for a symbol since the last market open. + + Parameters + ---------- + symbol : str + Stock symbol. + + Returns + ------- + pd.DataFrame + Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. + """ + + if resp := self.get( + f"/coins/{symbol.id}/ohlc", + params={"vs_currency": self.vs_currency, "days": 1}, + ): + df = pd.DataFrame( + resp, columns=["Date", "Open", "High", "Low", "Close"] + ).dropna() + df["Date"] = pd.to_datetime(df["Date"], unit="ms") + df = df.set_index("Date") + return df + + return pd.DataFrame() + + def chart_reply(self, symbol: Coin) -> pd.DataFrame: + """Returns price data for a symbol of the past month up until the previous trading days close. + Also caches multiple requests made in the same day. + + Parameters + ---------- + symbol : str + Stock symbol. + + Returns + ------- + pd.DataFrame + Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. + """ + + if resp := self.get( + f"/coins/{symbol.id}/ohlc", + params={"vs_currency": self.vs_currency, "days": 30}, + ): + df = pd.DataFrame( + resp, columns=["Date", "Open", "High", "Low", "Close"] + ).dropna() + df["Date"] = pd.to_datetime(df["Date"], unit="ms") + df = df.set_index("Date") + return df + + return pd.DataFrame() + + def stat_reply(self, symbol: Coin) -> str: + """Gathers key statistics on coin. Mostly just CoinGecko scores. + + Parameters + ---------- + symbol : Coin + + Returns + ------- + str + Preformatted markdown. + """ + + if data := self.get( + f"/coins/{symbol.id}", + params={ + "localization": "false", + }, + ): + return f""" + [{data['name']}]({data['links']['homepage'][0]}) Statistics: + Market Cap: ${data['market_data']['market_cap'][self.vs_currency]:,} + Market Cap Ranking: {data.get('market_cap_rank',"Not Available")} + CoinGecko Scores: + Overall: {data.get('coingecko_score','Not Available')} + Development: {data.get('developer_score','Not Available')} + Community: {data.get('community_score','Not Available')} + Public Interest: {data.get('public_interest_score','Not Available')} + """ + else: + return f"{symbol.symbol} returned an error." + + def cap_reply(self, coin: Coin) -> str: + """Gets market cap for Coin + + Parameters + ---------- + coin : Coin + + Returns + ------- + str + Preformatted markdown. + """ + + if resp := self.get( + "/simple/price", + params={ + "ids": coin.id, + "vs_currencies": self.vs_currency, + "include_market_cap": "true", + }, + ): + log.debug(resp) + try: + data = resp[coin.id] + + price = data[self.vs_currency] + cap = data[self.vs_currency + "_market_cap"] + except KeyError: + return f"{coin.id} returned an error." + + if cap == 0: + return f"The market cap for {coin.name} is not available for unknown reasons." + + message = ( + f"The current price of {coin.name} is $**{price:,}** and" + + " its market cap is $**{cap:,.2f}** {self.vs_currency.upper()}" + ) + + else: + message = f"The Coin: {coin.name} was not found or returned and error." + + return message + + def info_reply(self, symbol: Coin) -> str: + """Gets coin description + + Parameters + ---------- + symbol : Coin + + Returns + ------- + str + Preformatted markdown. + """ + + if data := self.get( + f"/coins/{symbol.id}", + params={"localization": "false"}, + ): + try: + return markdownify(data["description"]["en"]) + except KeyError: + return f"{symbol} does not have a description available." + + return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist." + + def spark_reply(self, symbol: Coin) -> str: + change = self.get( + "/simple/price", + params={ + "ids": symbol.id, + "vs_currencies": self.vs_currency, + "include_24hr_change": "true", + }, + )[symbol.id]["usd_24h_change"] + + return f"`{symbol.tag}`: {symbol.name}, {change:.2f}%" + + def trending(self) -> list[str]: + """Gets current coins trending on coingecko + + Returns + ------- + list[str] + list of $$ID: NAME, CHANGE% + """ + + coins = self.get("/search/trending") + try: + trending = [] + for coin in coins["coins"]: + c = coin["item"] + + sym = c["symbol"].upper() + name = c["name"] + change = self.get( + "/simple/price", + params={ + "ids": c["id"], + "vs_currencies": self.vs_currency, + "include_24hr_change": "true", + }, + )[c["id"]]["usd_24h_change"] + + msg = f"`$${sym}`: {name}, {change:.2f}%" + + trending.append(msg) + + except Exception as e: + log.warning(e) + return self.trending_cache + + self.trending_cache = trending + return trending + + def batch_price(self, coins: list[Coin]) -> list[str]: + """Gets price of a list of coins all in one API call + + Parameters + ---------- + coins : list[Coin] + + Returns + ------- + list[str] + returns preformatted list of strings detailing price movement of each coin passed in. + """ + query = ",".join([c.id for c in coins]) + + prices = self.get( + "/simple/price", + params={ + "ids": query, + "vs_currencies": self.vs_currency, + "include_24hr_change": "true", + }, + ) + + replies = [] + for coin in coins: + if coin.id in prices: + p = prices[coin.id] + + if p.get("usd_24h_change") is None: + p["usd_24h_change"] = 0 + + replies.append( + f"{coin.name}: ${p.get('usd',0):,} and has moved {p.get('usd_24h_change',0.0):.2f}% in the past 24 hours." + ) + + return replies diff --git a/common/symbol_router.py b/common/symbol_router.py index e09a453..044f64c 100644 --- a/common/symbol_router.py +++ b/common/symbol_router.py @@ -1,407 +1,407 @@ -"""Function that routes symbols to the correct API provider. -""" - -import datetime -import logging -import random -import re -from typing import Dict - -import pandas as pd -import schedule -from cachetools import TTLCache, cached - -from common.cg_Crypto import cg_Crypto -from common.MarketData import MarketData -from common.Symbol import Coin, Stock, Symbol - -log = logging.getLogger(__name__) - - -class Router: - STOCK_REGEX = "(?:^|[^\\$])\\$([a-zA-Z.]{1,6})" - CRYPTO_REGEX = "[$]{2}([a-zA-Z]{1,20})" - trending_count: Dict[str, float] = {} - - def __init__(self): - self.stock = MarketData() - self.crypto = cg_Crypto() - - schedule.every().hour.do(self.trending_decay) - - def trending_decay(self, decay=0.5): - """Decays the value of each trending stock by a multiplier""" - t_copy = {} - dead_keys = [] - if self.trending_count: - t_copy = self.trending_count.copy() - for key in t_copy.keys(): - if t_copy[key] < 0.01: - # Prune Keys - dead_keys.append(key) - else: - t_copy[key] = t_copy[key] * decay - for dead in dead_keys: - t_copy.pop(dead) - - self.trending_count = t_copy.copy() - log.info("Decayed trending symbols.") - - def find_symbols(self, text: str, *, trending_weight: int = 1) -> list[Stock | Coin]: - """Finds stock tickers starting with a dollar sign, and cryptocurrencies with two dollar signs - in a blob of text and returns them in a list. - - Parameters - ---------- - text : str - Blob of text. - - Returns - ------- - list[Symbol] - List of stock symbols as Symbol objects - """ - schedule.run_pending() - - symbols: list[Symbol] = [] - stock_matches = set(re.findall(self.STOCK_REGEX, text)) - coin_matches = set(re.findall(self.CRYPTO_REGEX, text)) - - for stock_match in stock_matches: - # Market data lacks tools to check if a symbol is valid. - if stock_info := self.stock.symbol_id(stock_match): - symbols.append(Stock(stock_info)) - else: - log.info(f"{stock_match} is not in list of stocks") - - for coin_match in coin_matches: - sym = self.crypto.symbol_list[self.crypto.symbol_list["symbol"].str.fullmatch(coin_match.lower(), case=False)] - if sym.empty: - log.info(f"{coin_match} is not in list of coins") - else: - symbols.append(Coin(sym)) - if symbols: - for symbol in symbols: - self.trending_count[symbol.tag] = self.trending_count.get(symbol.tag, 0) + trending_weight - log.debug(self.trending_count) - - return symbols - - def status(self, bot_resp) -> str: - """Checks for any issues with APIs. - - Returns - ------- - str - Human readable text on status of the bot and relevant APIs - """ - - stats = f""" - Bot Status: - {bot_resp} - - Stock Market Data: - {self.stock.status()} - - Cryptocurrency Data: - {self.crypto.status()} - """ - - log.warning(stats) - - return stats - - def inline_search(self, search: str, matches: int = 5) -> pd.DataFrame: - """Searches based on the shortest symbol that contains the same string as the search. - Should be very fast compared to a fuzzy search. - - Parameters - ---------- - search : str - String used to match against symbols. - - Returns - ------- - list[tuple[str, str]] - Each tuple contains: (Symbol, Issue Name). - """ - - # df = pd.concat([self.stock.symbol_list, self.crypto.symbol_list]) - df = self.crypto.symbol_list - - df = df[df["description"].str.contains(search, regex=False, case=False)].sort_values( - by="type_id", key=lambda x: x.str.len() - ) - - symbols = df.head(matches) - symbols["price_reply"] = symbols["type_id"].apply( - lambda sym: self.price_reply(self.find_symbols(sym, trending_weight=0))[0] - ) - - return symbols - - def price_reply(self, symbols: list[Symbol]) -> list[str]: - """Returns current market price or after hours if its available for a given stock symbol. - - Parameters - ---------- - symbols : list - List of stock symbols. - - Returns - ------- - Dict[str, str] - Each symbol passed in is a key with its value being a human readable - markdown formatted string of the symbols price and movement. - """ - replies = [] - - for symbol in symbols: - log.info(symbol) - if isinstance(symbol, Stock): - replies.append(self.stock.price_reply(symbol)) - elif isinstance(symbol, Coin): - replies.append(self.crypto.price_reply(symbol)) - else: - log.info(f"{symbol} is not a Stock or Coin") - - return replies - - def info_reply(self, symbols: list) -> list[str]: - """Gets information on stock symbols. - - Parameters - ---------- - symbols : list[str] - List of stock symbols. - - Returns - ------- - Dict[str, str] - Each symbol passed in is a key with its value being a human readable formatted - string of the symbols information. - """ - replies = [] - - for symbol in symbols: - if isinstance(symbol, Stock): - replies.append(self.stock.info_reply(symbol)) - elif isinstance(symbol, Coin): - replies.append(self.crypto.info_reply(symbol)) - else: - log.debug(f"{symbol} is not a Stock or Coin") - - return replies - - def intra_reply(self, symbol: Symbol) -> pd.DataFrame: - """Returns price data for a symbol since the last market open. - - Parameters - ---------- - symbol : str - Stock symbol. - - Returns - ------- - pd.DataFrame - Returns a timeseries dataframe with high, low, and volume data if its available. - Otherwise returns empty pd.DataFrame. - """ - - if isinstance(symbol, Stock): - return self.stock.intra_reply(symbol) - elif isinstance(symbol, Coin): - return self.crypto.intra_reply(symbol) - else: - log.debug(f"{symbol} is not a Stock or Coin") - return pd.DataFrame() - - def chart_reply(self, symbol: Symbol) -> pd.DataFrame: - """Returns price data for a symbol of the past month up until the previous trading days close. - Also caches multiple requests made in the same day. - - Parameters - ---------- - symbol : str - Stock symbol. - - Returns - ------- - pd.DataFrame - Returns a timeseries dataframe with high, low, and volume data if its available. - Otherwise returns empty pd.DataFrame. - """ - if isinstance(symbol, Stock): - return self.stock.chart_reply(symbol) - elif isinstance(symbol, Coin): - return self.crypto.chart_reply(symbol) - else: - log.debug(f"{symbol} is not a Stock or Coin") - return pd.DataFrame() - - def stat_reply(self, symbols: list[Symbol]) -> list[str]: - """Gets key statistics for each symbol in the list - - Parameters - ---------- - symbols : list[str] - List of stock symbols - - Returns - ------- - Dict[str, str] - Each symbol passed in is a key with its value being a human readable - formatted string of the symbols statistics. - """ - replies = [] - - for symbol in symbols: - if isinstance(symbol, Stock): - replies.append(self.stock.stat_reply(symbol)) - elif isinstance(symbol, Coin): - replies.append(self.crypto.stat_reply(symbol)) - else: - log.debug(f"{symbol} is not a Stock or Coin") - - return replies - - def cap_reply(self, symbols: list[Symbol]) -> list[str]: - """Gets market cap for each symbol in the list - - Parameters - ---------- - symbols : list[str] - List of stock symbols - - Returns - ------- - Dict[str, str] - Each symbol passed in is a key with its value being a human readable - formatted string of the symbols market cap. - """ - replies = [] - - for symbol in symbols: - if isinstance(symbol, Stock): - replies.append(self.stock.cap_reply(symbol)) - elif isinstance(symbol, Coin): - replies.append(self.crypto.cap_reply(symbol)) - else: - log.debug(f"{symbol} is not a Stock or Coin") - - return replies - - def spark_reply(self, symbols: list[Symbol]) -> list[str]: - """Gets change for each symbol and returns it in a compact format - - Parameters - ---------- - symbols : list[str] - List of stock symbols - - Returns - ------- - list[str] - List of human readable strings. - """ - replies = [] - - for symbol in symbols: - if isinstance(symbol, Stock): - replies.append(self.stock.spark_reply(symbol)) - elif isinstance(symbol, Coin): - replies.append(self.crypto.spark_reply(symbol)) - else: - log.debug(f"{symbol} is not a Stock or Coin") - - return replies - - @cached(cache=TTLCache(maxsize=1024, ttl=600)) - def trending(self) -> str: - """Checks APIs for trending symbols. - - Returns - ------- - list[str] - List of preformatted strings to be sent to user. - """ - - # stocks = self.stock.trending() - coins = self.crypto.trending() - - reply = "" - - log.warning(self.trending_count) - if self.trending_count: - reply += "πŸ”₯Trending on the Stock Bot:\n`" - reply += "━" * len("Trending on the Stock Bot:") + "`\n" - - sorted_trending = [s[0] for s in sorted(self.trending_count.items(), key=lambda item: item[1])][::-1][0:5] - log.warning(sorted_trending) - for t in sorted_trending: - reply += self.spark_reply(self.find_symbols(t))[0] + "\n" - - if coins: - reply += "\n\n🦎Trending on CoinGecko:\n`" - reply += "━" * len("Trending on CoinGecko:") + "`\n" - for coin in coins: - reply += coin + "\n" - - if "`$GME" in reply: - reply = reply.replace("πŸ”₯", "🦍") - - if reply: - return reply - else: - log.warning("Failed to collect trending data.") - return "Trending data is not currently available." - - def random_pick(self) -> str: - # choice = random.choice(list(self.stock.symbol_list["description"]) + list(self.crypto.symbol_list["description"])) - choice = random.choice(list(self.crypto.symbol_list["description"])) - hold = (datetime.date.today() + datetime.timedelta(random.randint(1, 365))).strftime("%b %d, %Y") - - return f"{choice}\nBuy and hold until: {hold}" - - def batch_price_reply(self, symbols: list[Symbol]) -> list[str]: - """Returns current market price or after hours if its available for a given stock symbol. - - Parameters - ---------- - symbols : list - List of stock symbols. - - Returns - ------- - Dict[str, str] - Each symbol passed in is a key with its value being a human readable - markdown formatted string of the symbols price and movement. - """ - replies = [] - stocks = [] - coins = [] - - for symbol in symbols: - if isinstance(symbol, Stock): - stocks.append(symbol) - elif isinstance(symbol, Coin): - coins.append(symbol) - else: - log.debug(f"{symbol} is not a Stock or Coin") - - if stocks: - for stock in stocks: - replies.append(self.stock.price_reply(stock)) - if coins: - replies = replies + self.crypto.batch_price(coins) - - return replies - - def options(self, request: str, symbols: list[Symbol]) -> Dict: - request = request.lower() - if len(symbols) == 1: - symbol = symbols[0] - request = request.replace(symbol.tag.lower(), symbol.symbol.lower()) - return self.stock.options_reply(request) - else: - return self.stock.options_reply(request) +"""Function that routes symbols to the correct API provider. +""" + +import datetime +import logging +import random +import re +from typing import Dict + +import pandas as pd +import schedule +from cachetools import TTLCache, cached + +from common.cg_Crypto import cg_Crypto +from common.MarketData import MarketData +from common.Symbol import Coin, Stock, Symbol + +log = logging.getLogger(__name__) + + +class Router: + STOCK_REGEX = "(?:^|[^\\$])\\$([a-zA-Z.]{1,6})" + CRYPTO_REGEX = "[$]{2}([a-zA-Z]{1,20})" + trending_count: Dict[str, float] = {} + + def __init__(self): + self.stock = MarketData() + self.crypto = cg_Crypto() + + schedule.every().hour.do(self.trending_decay) + + def trending_decay(self, decay=0.5): + """Decays the value of each trending stock by a multiplier""" + t_copy = {} + dead_keys = [] + if self.trending_count: + t_copy = self.trending_count.copy() + for key in t_copy.keys(): + if t_copy[key] < 0.01: + # Prune Keys + dead_keys.append(key) + else: + t_copy[key] = t_copy[key] * decay + for dead in dead_keys: + t_copy.pop(dead) + + self.trending_count = t_copy.copy() + log.info("Decayed trending symbols.") + + def find_symbols(self, text: str, *, trending_weight: int = 1) -> list[Stock | Coin]: + """Finds stock tickers starting with a dollar sign, and cryptocurrencies with two dollar signs + in a blob of text and returns them in a list. + + Parameters + ---------- + text : str + Blob of text. + + Returns + ------- + list[Symbol] + List of stock symbols as Symbol objects + """ + schedule.run_pending() + + symbols: list[Symbol] = [] + stock_matches = set(re.findall(self.STOCK_REGEX, text)) + coin_matches = set(re.findall(self.CRYPTO_REGEX, text)) + + for stock_match in stock_matches: + # Market data lacks tools to check if a symbol is valid. + if stock_info := self.stock.symbol_id(stock_match): + symbols.append(Stock(stock_info)) + else: + log.info(f"{stock_match} is not in list of stocks") + + for coin_match in coin_matches: + sym = self.crypto.symbol_list[self.crypto.symbol_list["symbol"].str.fullmatch(coin_match.lower(), case=False)] + if sym.empty: + log.info(f"{coin_match} is not in list of coins") + else: + symbols.append(Coin(sym)) + if symbols: + for symbol in symbols: + self.trending_count[symbol.tag] = self.trending_count.get(symbol.tag, 0) + trending_weight + log.debug(self.trending_count) + + return symbols + + def status(self, bot_resp) -> str: + """Checks for any issues with APIs. + + Returns + ------- + str + Human readable text on status of the bot and relevant APIs + """ + + stats = f""" + Bot Status: + {bot_resp} + + Stock Market Data: + {self.stock.status()} + + Cryptocurrency Data: + {self.crypto.status()} + """ + + log.warning(stats) + + return stats + + def inline_search(self, search: str, matches: int = 5) -> pd.DataFrame: + """Searches based on the shortest symbol that contains the same string as the search. + Should be very fast compared to a fuzzy search. + + Parameters + ---------- + search : str + String used to match against symbols. + + Returns + ------- + list[tuple[str, str]] + Each tuple contains: (Symbol, Issue Name). + """ + + # df = pd.concat([self.stock.symbol_list, self.crypto.symbol_list]) + df = self.crypto.symbol_list + + df = df[df["description"].str.contains(search, regex=False, case=False)].sort_values( + by="type_id", key=lambda x: x.str.len() + ) + + symbols = df.head(matches) + symbols["price_reply"] = symbols["type_id"].apply( + lambda sym: self.price_reply(self.find_symbols(sym, trending_weight=0))[0] + ) + + return symbols + + def price_reply(self, symbols: list[Symbol]) -> list[str]: + """Returns current market price or after hours if its available for a given stock symbol. + + Parameters + ---------- + symbols : list + List of stock symbols. + + Returns + ------- + Dict[str, str] + Each symbol passed in is a key with its value being a human readable + markdown formatted string of the symbols price and movement. + """ + replies = [] + + for symbol in symbols: + log.info(symbol) + if isinstance(symbol, Stock): + replies.append(self.stock.price_reply(symbol)) + elif isinstance(symbol, Coin): + replies.append(self.crypto.price_reply(symbol)) + else: + log.info(f"{symbol} is not a Stock or Coin") + + return replies + + def info_reply(self, symbols: list) -> list[str]: + """Gets information on stock symbols. + + Parameters + ---------- + symbols : list[str] + List of stock symbols. + + Returns + ------- + Dict[str, str] + Each symbol passed in is a key with its value being a human readable formatted + string of the symbols information. + """ + replies = [] + + for symbol in symbols: + if isinstance(symbol, Stock): + replies.append(self.stock.info_reply(symbol)) + elif isinstance(symbol, Coin): + replies.append(self.crypto.info_reply(symbol)) + else: + log.debug(f"{symbol} is not a Stock or Coin") + + return replies + + def intra_reply(self, symbol: Symbol) -> pd.DataFrame: + """Returns price data for a symbol since the last market open. + + Parameters + ---------- + symbol : str + Stock symbol. + + Returns + ------- + pd.DataFrame + Returns a timeseries dataframe with high, low, and volume data if its available. + Otherwise returns empty pd.DataFrame. + """ + + if isinstance(symbol, Stock): + return self.stock.intra_reply(symbol) + elif isinstance(symbol, Coin): + return self.crypto.intra_reply(symbol) + else: + log.debug(f"{symbol} is not a Stock or Coin") + return pd.DataFrame() + + def chart_reply(self, symbol: Symbol) -> pd.DataFrame: + """Returns price data for a symbol of the past month up until the previous trading days close. + Also caches multiple requests made in the same day. + + Parameters + ---------- + symbol : str + Stock symbol. + + Returns + ------- + pd.DataFrame + Returns a timeseries dataframe with high, low, and volume data if its available. + Otherwise returns empty pd.DataFrame. + """ + if isinstance(symbol, Stock): + return self.stock.chart_reply(symbol) + elif isinstance(symbol, Coin): + return self.crypto.chart_reply(symbol) + else: + log.debug(f"{symbol} is not a Stock or Coin") + return pd.DataFrame() + + def stat_reply(self, symbols: list[Symbol]) -> list[str]: + """Gets key statistics for each symbol in the list + + Parameters + ---------- + symbols : list[str] + List of stock symbols + + Returns + ------- + Dict[str, str] + Each symbol passed in is a key with its value being a human readable + formatted string of the symbols statistics. + """ + replies = [] + + for symbol in symbols: + if isinstance(symbol, Stock): + replies.append(self.stock.stat_reply(symbol)) + elif isinstance(symbol, Coin): + replies.append(self.crypto.stat_reply(symbol)) + else: + log.debug(f"{symbol} is not a Stock or Coin") + + return replies + + def cap_reply(self, symbols: list[Symbol]) -> list[str]: + """Gets market cap for each symbol in the list + + Parameters + ---------- + symbols : list[str] + List of stock symbols + + Returns + ------- + Dict[str, str] + Each symbol passed in is a key with its value being a human readable + formatted string of the symbols market cap. + """ + replies = [] + + for symbol in symbols: + if isinstance(symbol, Stock): + replies.append(self.stock.cap_reply(symbol)) + elif isinstance(symbol, Coin): + replies.append(self.crypto.cap_reply(symbol)) + else: + log.debug(f"{symbol} is not a Stock or Coin") + + return replies + + def spark_reply(self, symbols: list[Symbol]) -> list[str]: + """Gets change for each symbol and returns it in a compact format + + Parameters + ---------- + symbols : list[str] + List of stock symbols + + Returns + ------- + list[str] + List of human readable strings. + """ + replies = [] + + for symbol in symbols: + if isinstance(symbol, Stock): + replies.append(self.stock.spark_reply(symbol)) + elif isinstance(symbol, Coin): + replies.append(self.crypto.spark_reply(symbol)) + else: + log.debug(f"{symbol} is not a Stock or Coin") + + return replies + + @cached(cache=TTLCache(maxsize=1024, ttl=600)) + def trending(self) -> str: + """Checks APIs for trending symbols. + + Returns + ------- + list[str] + List of preformatted strings to be sent to user. + """ + + # stocks = self.stock.trending() + coins = self.crypto.trending() + + reply = "" + + log.warning(self.trending_count) + if self.trending_count: + reply += "πŸ”₯Trending on the Stock Bot:\n`" + reply += "━" * len("Trending on the Stock Bot:") + "`\n" + + sorted_trending = [s[0] for s in sorted(self.trending_count.items(), key=lambda item: item[1])][::-1][0:5] + log.warning(sorted_trending) + for t in sorted_trending: + reply += self.spark_reply(self.find_symbols(t))[0] + "\n" + + if coins: + reply += "\n\n🦎Trending on CoinGecko:\n`" + reply += "━" * len("Trending on CoinGecko:") + "`\n" + for coin in coins: + reply += coin + "\n" + + if "`$GME" in reply: + reply = reply.replace("πŸ”₯", "🦍") + + if reply: + return reply + else: + log.warning("Failed to collect trending data.") + return "Trending data is not currently available." + + def random_pick(self) -> str: + # choice = random.choice(list(self.stock.symbol_list["description"]) + list(self.crypto.symbol_list["description"])) + choice = random.choice(list(self.crypto.symbol_list["description"])) + hold = (datetime.date.today() + datetime.timedelta(random.randint(1, 365))).strftime("%b %d, %Y") + + return f"{choice}\nBuy and hold until: {hold}" + + def batch_price_reply(self, symbols: list[Symbol]) -> list[str]: + """Returns current market price or after hours if its available for a given stock symbol. + + Parameters + ---------- + symbols : list + List of stock symbols. + + Returns + ------- + Dict[str, str] + Each symbol passed in is a key with its value being a human readable + markdown formatted string of the symbols price and movement. + """ + replies = [] + stocks = [] + coins = [] + + for symbol in symbols: + if isinstance(symbol, Stock): + stocks.append(symbol) + elif isinstance(symbol, Coin): + coins.append(symbol) + else: + log.debug(f"{symbol} is not a Stock or Coin") + + if stocks: + for stock in stocks: + replies.append(self.stock.price_reply(stock)) + if coins: + replies = replies + self.crypto.batch_price(coins) + + return replies + + def options(self, request: str, symbols: list[Symbol]) -> Dict: + request = request.lower() + if len(symbols) == 1: + symbol = symbols[0] + request = request.replace(symbol.tag.lower(), symbol.symbol.lower()) + return self.stock.options_reply(request) + else: + return self.stock.options_reply(request) diff --git a/common/utilities.py b/common/utilities.py index 6e0e5ca..6a19019 100644 --- a/common/utilities.py +++ b/common/utilities.py @@ -1,31 +1,31 @@ -import time -import logging - -log = logging.getLogger(__name__) - - -def rate_limited(max_per_second): - """ - Decorator that ensures the wrapped function is called at most `max_per_second` times per second. - """ - min_interval = 1.0 / max_per_second - - def decorate(func): - last_called = [0.0] - - def rate_limited_function(*args, **kwargs): - elapsed = time.time() - last_called[0] - left_to_wait = min_interval - elapsed - - if left_to_wait > 0: - log.info(f"Rate limit exceeded. Waiting for {left_to_wait:.2f} seconds.") - time.sleep(left_to_wait) - - ret = func(*args, **kwargs) - last_called[0] = time.time() - - return ret - - return rate_limited_function - - return decorate +import time +import logging + +log = logging.getLogger(__name__) + + +def rate_limited(max_per_second): + """ + Decorator that ensures the wrapped function is called at most `max_per_second` times per second. + """ + min_interval = 1.0 / max_per_second + + def decorate(func): + last_called = [0.0] + + def rate_limited_function(*args, **kwargs): + elapsed = time.time() - last_called[0] + left_to_wait = min_interval - elapsed + + if left_to_wait > 0: + log.info(f"Rate limit exceeded. Waiting for {left_to_wait:.2f} seconds.") + time.sleep(left_to_wait) + + ret = func(*args, **kwargs) + last_called[0] = time.time() + + return ret + + return rate_limited_function + + return decorate diff --git a/discord/D_info.py b/discord/D_info.py index 34dadb6..1508136 100644 --- a/discord/D_info.py +++ b/discord/D_info.py @@ -1,59 +1,59 @@ -"""Functions and Info specific to the discord Bot -""" - -import re - -import requests as r - - -class D_info: - license = re.sub( - r"\b\n", - " ", - r.get("https://gitlab.com/simple-stock-bots/simple-stock-bot/-/raw/master/LICENSE").text, - ) - - help_text = """ -Thanks for using this bot. If you like it, [support me with a beer](https://www.buymeacoffee.com/Anson). 🍻 - -For stock data or hosting your own bot, use my link. This helps keep the bot free: -[marketdata.app](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=discord). - -**Updates**: Join the bot's discord: https://t.me/simplestockbotnews. - -**Documentation**: All details about the bot are at [docs](https://simplestockbot.com). - -The bot reads _"Symbols"_. Use `$` for stock tickers and `$$` for cryptocurrencies. For example: -- `/chart $$eth` gives Ethereum's monthly chart. -- `/dividend $psec` shows Prospect Capital's dividend. - -Type any symbol, and the bot shows its price. Like: `Is $$btc rising since $tsla accepts it?` will give Bitcoin and Tesla prices. - -**Commands** -- `/donate [USD amount]`: Support the bot. πŸŽ—οΈ -- `/intra $[symbol]`: See stock's latest movement. πŸ“ˆ -- `/chart $[symbol]`: View a month's stock activity. πŸ“Š -- `/trending`: Check trending stocks and cryptos. πŸ’¬ -- `/help`: Need help? Ask here. πŸ†˜ - -**Inline Features** -Type @SimpleStockBot `[search]` anywhere to find and get stock/crypto prices. Note: Prices might be delayed up to an hour. - -Data from: [marketdata.app](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=discord). - -Issues with the bot? Use `/status` or [contact us](https://simplestockbot.com/contact). - """ - - donate_text = """ -Simple Stock Bot runs purely on [donations.](https://www.buymeacoffee.com/Anson) -Every donation supports server costs and -[marketdata.app](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=discord) provides our data. - -**How to Donate?** -1. Use `/donate [amount in USD]` command. - - E.g., `/donate 2` donates 2 USD. -2. Or, donate at [buymeacoffee](https://www.buymeacoffee.com/Anson). - - It's quick, doesn't need an account, and accepts Paypal or Credit card. - -Questions? Visit our [website](https://simplestockbot.com). -""" +"""Functions and Info specific to the discord Bot +""" + +import re + +import requests as r + + +class D_info: + license = re.sub( + r"\b\n", + " ", + r.get("https://gitlab.com/simple-stock-bots/simple-stock-bot/-/raw/master/LICENSE").text, + ) + + help_text = """ +Thanks for using this bot. If you like it, [support me with a beer](https://www.buymeacoffee.com/Anson). 🍻 + +For stock data or hosting your own bot, use my link. This helps keep the bot free: +[marketdata.app](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=discord). + +**Updates**: Join the bot's discord: https://t.me/simplestockbotnews. + +**Documentation**: All details about the bot are at [docs](https://simplestockbot.com). + +The bot reads _"Symbols"_. Use `$` for stock tickers and `$$` for cryptocurrencies. For example: +- `/chart $$eth` gives Ethereum's monthly chart. +- `/dividend $psec` shows Prospect Capital's dividend. + +Type any symbol, and the bot shows its price. Like: `Is $$btc rising since $tsla accepts it?` will give Bitcoin and Tesla prices. + +**Commands** +- `/donate [USD amount]`: Support the bot. πŸŽ—οΈ +- `/intra $[symbol]`: See stock's latest movement. πŸ“ˆ +- `/chart $[symbol]`: View a month's stock activity. πŸ“Š +- `/trending`: Check trending stocks and cryptos. πŸ’¬ +- `/help`: Need help? Ask here. πŸ†˜ + +**Inline Features** +Type @SimpleStockBot `[search]` anywhere to find and get stock/crypto prices. Note: Prices might be delayed up to an hour. + +Data from: [marketdata.app](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=discord). + +Issues with the bot? Use `/status` or [contact us](https://simplestockbot.com/contact). + """ + + donate_text = """ +Simple Stock Bot runs purely on [donations.](https://www.buymeacoffee.com/Anson) +Every donation supports server costs and +[marketdata.app](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=discord) provides our data. + +**How to Donate?** +1. Use `/donate [amount in USD]` command. + - E.g., `/donate 2` donates 2 USD. +2. Or, donate at [buymeacoffee](https://www.buymeacoffee.com/Anson). + - It's quick, doesn't need an account, and accepts Paypal or Credit card. + +Questions? Visit our [website](https://simplestockbot.com). +""" diff --git a/discord/bot.py b/discord/bot.py index eeb9cab..3054400 100644 --- a/discord/bot.py +++ b/discord/bot.py @@ -1,256 +1,256 @@ -import datetime -import io -import logging -import os - -import mplfinance as mpf -import nextcord -from D_info import D_info -from nextcord.ext import commands - -from common.symbol_router import Router - -DISCORD_TOKEN = os.environ["DISCORD"] - -s = Router() -d = D_info() - - -intents = nextcord.Intents.default() - - -client = nextcord.Client(intents=intents) -bot = commands.Bot(command_prefix="/", description=d.help_text, intents=intents) - -logger = logging.getLogger("nextcord") -logger.setLevel(logging.INFO) -handler = logging.FileHandler(filename="nextcord.log", encoding="utf-8", mode="w") -handler.setFormatter(logging.Formatter("%(asctime)s:%(levelname)s:%(name)s: %(message)s")) -logger.addHandler(handler) - - -@bot.event -async def on_ready(): - logging.info("Starting Simple Stock Bot") - logging.info(f"Logged in as {bot.user.name} {bot.user.id}") - - -@bot.command() -async def status(ctx: commands): - """Debug command for diagnosing if the bot is experiencing any issues.""" - logging.info(f"Status command ran by {ctx.message.author}") - message = "" - try: - message = "Contact MisterBiggs#0465 if you need help.\n" - message += s.status(f"Bot recieved your message in: {bot.latency*10:.4f} seconds") + "\n" - - except Exception as ex: - logging.critical(ex) - message += ( - f"*\n\nERROR ENCOUNTERED:*\n{ex}\n\n" - + "*The bot encountered an error while attempting to find errors. Please contact the bot admin.*" - ) - await ctx.send(message) - - -@bot.command() -async def license(ctx: commands): - """Returns the bots license agreement.""" - await ctx.send(d.license) - - -@bot.command() -async def donate(ctx: commands): - """Details on how to support the development and hosting of the bot.""" - await ctx.send(d.donate_text) - - -@bot.command() -async def search(ctx: commands, *, query: str): - """Search for a stock symbol using either symbol of company name.""" - results = s.search_symbols(query) - if results: - reply = "*Search Results:*\n`$ticker: Company Name`\n" - for query in results: - reply += "`" + query[1] + "`\n" - await ctx.send(reply) - - -@bot.command() -async def crypto(ctx: commands, _: str): - """Get the price of a cryptocurrency using in USD.""" - await ctx.send("Crypto now has native support. Any crypto can be called using two dollar signs: `$$eth` `$$btc` `$$doge`") - - -@bot.command() -async def intra(ctx: commands, sym: str): - """Get a chart for the stocks movement since market open.""" - symbols = s.find_symbols(sym) - - if len(symbols): - symbol = symbols[0] - else: - await ctx.send("No symbols or coins found.") - return - - df = s.intra_reply(symbol) - if df.empty: - await ctx.send("Invalid symbol please see `/help` for usage details.") - return - with ctx.channel.typing(): - buf = io.BytesIO() - mpf.plot( - df, - type="renko", - title=f"\n{symbol.name}", - volume="volume" in df.keys(), - style="yahoo", - savefig=dict(fname=buf, dpi=400, bbox_inches="tight"), - ) - - buf.seek(0) - - # Get price so theres no request lag after the image is sent - price_reply = s.price_reply([symbol])[0] - await ctx.send( - file=nextcord.File( - buf, - filename=f"{symbol.name}:intra{datetime.date.today().strftime('%S%M%d%b%Y')}.png", - ), - content=f"\nIntraday chart for {symbol.name} from {df.first_valid_index().strftime('%d %b at %H:%M')} to" - + f" {df.last_valid_index().strftime('%d %b at %H:%M')}", - ) - await ctx.send(price_reply) - - -@bot.command() -async def chart(ctx: commands, sym: str): - """returns a chart of the past month of data for a symbol""" - - symbols = s.find_symbols(sym) - - if len(symbols): - symbol = symbols[0] - else: - await ctx.send("No symbols or coins found.") - return - - df = s.chart_reply(symbol) - if df.empty: - await ctx.send("Invalid symbol please see `/help` for usage details.") - return - with ctx.channel.typing(): - buf = io.BytesIO() - mpf.plot( - df, - type="candle", - title=f"\n{symbol.name}", - volume="volume" in df.keys(), - style="yahoo", - savefig=dict(fname=buf, dpi=400, bbox_inches="tight"), - ) - buf.seek(0) - - # Get price so theres no request lag after the image is sent - price_reply = s.price_reply([symbol])[0] - await ctx.send( - file=nextcord.File( - buf, - filename=f"{symbol.name}:1M{datetime.date.today().strftime('%d%b%Y')}.png", - ), - content=f"\n1 Month chart for {symbol.name} from {df.first_valid_index().strftime('%d, %b %Y')}" - + f" to {df.last_valid_index().strftime('%d, %b %Y')}", - ) - await ctx.send(price_reply) - - -@bot.command() -async def cap(ctx: commands, sym: str): - """Get the market cap of a symbol""" - symbols = s.find_symbols(sym) - if symbols: - with ctx.channel.typing(): - for reply in s.cap_reply(symbols): - await ctx.send(reply) - - -@bot.command() -async def trending(ctx: commands): - """Get a list of Trending Stocks and Coins""" - with ctx.channel.typing(): - await ctx.send(s.trending()) - - -@bot.event -async def on_message(message): - # Ignore messages from the bot itself - if message.author.id == bot.user.id: - return - - content_lower = message.content.lower() - - # Process commands starting with "/" - if message.content.startswith("/"): - await bot.process_commands(message) - return - - symbols = None - if "$" in message.content: - symbols = s.find_symbols(message.content) - - if "call" in content_lower or "put" in content_lower: - await handle_options(message, symbols) - return - - if symbols: - for reply in s.price_reply(symbols): - await message.channel.send(reply) - return - - -async def handle_options(message, symbols): - logging.info("Options detected") - try: - options_data = s.options(message.content.lower(), symbols) - - # Create the embed directly within the function - embed = nextcord.Embed(title=options_data["Option Symbol"], description=options_data["Underlying"], color=0x3498DB) - - # Key details - details = ( - f"Expiration: {options_data['Expiration']}\n" f"Side: {options_data['side']}\n" f"Strike: {options_data['strike']}" - ) - embed.add_field(name="Details", value=details, inline=False) - - # Pricing info - pricing_info = ( - f"Bid: {options_data['bid']} (Size: {options_data['bidSize']})\n" - f"Mid: {options_data['mid']}\n" - f"Ask: {options_data['ask']} (Size: {options_data['askSize']})\n" - f"Last: {options_data['last']}" - ) - embed.add_field(name="Pricing", value=pricing_info, inline=False) - - # Volume and open interest - volume_info = f"Open Interest: {options_data['Open Interest']}\n" f"Volume: {options_data['Volume']}" - embed.add_field(name="Activity", value=volume_info, inline=False) - - # Greeks - greeks_info = ( - f"IV: {options_data['Implied Volatility']}\n" - f"Delta: {options_data['delta']}\n" - f"Gamma: {options_data['gamma']}\n" - f"Theta: {options_data['theta']}\n" - f"Vega: {options_data['vega']}\n" - f"Rho: {options_data['rho']}" - ) - embed.add_field(name="Greeks", value=greeks_info, inline=False) - - # Send the created embed - await message.channel.send(embed=embed) - - except KeyError as ex: - logging.warning(f"KeyError processing options for message {message.content}: {ex}") - - -bot.run(DISCORD_TOKEN) +import datetime +import io +import logging +import os + +import mplfinance as mpf +import nextcord +from D_info import D_info +from nextcord.ext import commands + +from common.symbol_router import Router + +DISCORD_TOKEN = os.environ["DISCORD"] + +s = Router() +d = D_info() + + +intents = nextcord.Intents.default() + + +client = nextcord.Client(intents=intents) +bot = commands.Bot(command_prefix="/", description=d.help_text, intents=intents) + +logger = logging.getLogger("nextcord") +logger.setLevel(logging.INFO) +handler = logging.FileHandler(filename="nextcord.log", encoding="utf-8", mode="w") +handler.setFormatter(logging.Formatter("%(asctime)s:%(levelname)s:%(name)s: %(message)s")) +logger.addHandler(handler) + + +@bot.event +async def on_ready(): + logging.info("Starting Simple Stock Bot") + logging.info(f"Logged in as {bot.user.name} {bot.user.id}") + + +@bot.command() +async def status(ctx: commands): + """Debug command for diagnosing if the bot is experiencing any issues.""" + logging.info(f"Status command ran by {ctx.message.author}") + message = "" + try: + message = "Contact MisterBiggs#0465 if you need help.\n" + message += s.status(f"Bot recieved your message in: {bot.latency*10:.4f} seconds") + "\n" + + except Exception as ex: + logging.critical(ex) + message += ( + f"*\n\nERROR ENCOUNTERED:*\n{ex}\n\n" + + "*The bot encountered an error while attempting to find errors. Please contact the bot admin.*" + ) + await ctx.send(message) + + +@bot.command() +async def license(ctx: commands): + """Returns the bots license agreement.""" + await ctx.send(d.license) + + +@bot.command() +async def donate(ctx: commands): + """Details on how to support the development and hosting of the bot.""" + await ctx.send(d.donate_text) + + +@bot.command() +async def search(ctx: commands, *, query: str): + """Search for a stock symbol using either symbol of company name.""" + results = s.search_symbols(query) + if results: + reply = "*Search Results:*\n`$ticker: Company Name`\n" + for query in results: + reply += "`" + query[1] + "`\n" + await ctx.send(reply) + + +@bot.command() +async def crypto(ctx: commands, _: str): + """Get the price of a cryptocurrency using in USD.""" + await ctx.send("Crypto now has native support. Any crypto can be called using two dollar signs: `$$eth` `$$btc` `$$doge`") + + +@bot.command() +async def intra(ctx: commands, sym: str): + """Get a chart for the stocks movement since market open.""" + symbols = s.find_symbols(sym) + + if len(symbols): + symbol = symbols[0] + else: + await ctx.send("No symbols or coins found.") + return + + df = s.intra_reply(symbol) + if df.empty: + await ctx.send("Invalid symbol please see `/help` for usage details.") + return + with ctx.channel.typing(): + buf = io.BytesIO() + mpf.plot( + df, + type="renko", + title=f"\n{symbol.name}", + volume="volume" in df.keys(), + style="yahoo", + savefig=dict(fname=buf, dpi=400, bbox_inches="tight"), + ) + + buf.seek(0) + + # Get price so theres no request lag after the image is sent + price_reply = s.price_reply([symbol])[0] + await ctx.send( + file=nextcord.File( + buf, + filename=f"{symbol.name}:intra{datetime.date.today().strftime('%S%M%d%b%Y')}.png", + ), + content=f"\nIntraday chart for {symbol.name} from {df.first_valid_index().strftime('%d %b at %H:%M')} to" + + f" {df.last_valid_index().strftime('%d %b at %H:%M')}", + ) + await ctx.send(price_reply) + + +@bot.command() +async def chart(ctx: commands, sym: str): + """returns a chart of the past month of data for a symbol""" + + symbols = s.find_symbols(sym) + + if len(symbols): + symbol = symbols[0] + else: + await ctx.send("No symbols or coins found.") + return + + df = s.chart_reply(symbol) + if df.empty: + await ctx.send("Invalid symbol please see `/help` for usage details.") + return + with ctx.channel.typing(): + buf = io.BytesIO() + mpf.plot( + df, + type="candle", + title=f"\n{symbol.name}", + volume="volume" in df.keys(), + style="yahoo", + savefig=dict(fname=buf, dpi=400, bbox_inches="tight"), + ) + buf.seek(0) + + # Get price so theres no request lag after the image is sent + price_reply = s.price_reply([symbol])[0] + await ctx.send( + file=nextcord.File( + buf, + filename=f"{symbol.name}:1M{datetime.date.today().strftime('%d%b%Y')}.png", + ), + content=f"\n1 Month chart for {symbol.name} from {df.first_valid_index().strftime('%d, %b %Y')}" + + f" to {df.last_valid_index().strftime('%d, %b %Y')}", + ) + await ctx.send(price_reply) + + +@bot.command() +async def cap(ctx: commands, sym: str): + """Get the market cap of a symbol""" + symbols = s.find_symbols(sym) + if symbols: + with ctx.channel.typing(): + for reply in s.cap_reply(symbols): + await ctx.send(reply) + + +@bot.command() +async def trending(ctx: commands): + """Get a list of Trending Stocks and Coins""" + with ctx.channel.typing(): + await ctx.send(s.trending()) + + +@bot.event +async def on_message(message): + # Ignore messages from the bot itself + if message.author.id == bot.user.id: + return + + content_lower = message.content.lower() + + # Process commands starting with "/" + if message.content.startswith("/"): + await bot.process_commands(message) + return + + symbols = None + if "$" in message.content: + symbols = s.find_symbols(message.content) + + if "call" in content_lower or "put" in content_lower: + await handle_options(message, symbols) + return + + if symbols: + for reply in s.price_reply(symbols): + await message.channel.send(reply) + return + + +async def handle_options(message, symbols): + logging.info("Options detected") + try: + options_data = s.options(message.content.lower(), symbols) + + # Create the embed directly within the function + embed = nextcord.Embed(title=options_data["Option Symbol"], description=options_data["Underlying"], color=0x3498DB) + + # Key details + details = ( + f"Expiration: {options_data['Expiration']}\n" f"Side: {options_data['side']}\n" f"Strike: {options_data['strike']}" + ) + embed.add_field(name="Details", value=details, inline=False) + + # Pricing info + pricing_info = ( + f"Bid: {options_data['bid']} (Size: {options_data['bidSize']})\n" + f"Mid: {options_data['mid']}\n" + f"Ask: {options_data['ask']} (Size: {options_data['askSize']})\n" + f"Last: {options_data['last']}" + ) + embed.add_field(name="Pricing", value=pricing_info, inline=False) + + # Volume and open interest + volume_info = f"Open Interest: {options_data['Open Interest']}\n" f"Volume: {options_data['Volume']}" + embed.add_field(name="Activity", value=volume_info, inline=False) + + # Greeks + greeks_info = ( + f"IV: {options_data['Implied Volatility']}\n" + f"Delta: {options_data['delta']}\n" + f"Gamma: {options_data['gamma']}\n" + f"Theta: {options_data['theta']}\n" + f"Vega: {options_data['vega']}\n" + f"Rho: {options_data['rho']}" + ) + embed.add_field(name="Greeks", value=greeks_info, inline=False) + + # Send the created embed + await message.channel.send(embed=embed) + + except KeyError as ex: + logging.warning(f"KeyError processing options for message {message.content}: {ex}") + + +bot.run(DISCORD_TOKEN) diff --git a/pyproject.toml b/pyproject.toml index 8c189a5..c4799a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,2 @@ -[tool.black] -line-length = 130 - -[tool.flake8] -max-line-length = 130 - -[tool.pycodestyle] -max_line_length = 130 - [tool.ruff] line-length = 130 \ No newline at end of file diff --git a/telegram/T_info.py b/telegram/T_info.py index b1cd35c..b157aaa 100644 --- a/telegram/T_info.py +++ b/telegram/T_info.py @@ -1,70 +1,70 @@ -"""Functions and Info specific to the Telegram Bot -""" - -import re - -import requests as r - - -class T_info: - license = re.sub( - r"\b\n", - " ", - r.get("https://gitlab.com/simple-stock-bots/simple-stock-bot/-/raw/master/LICENSE").text, - ) - - help_text = """ -Appreciate this bot? Show support by [buying me a beer](https://www.buymeacoffee.com/Anson) 🍻. - -Want stock data or to host your own bot? Help keep this bot free by using my -[affiliate link](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=telegram). - -πŸ“’ Stay updated on the bot's Telegram: https://t.me/simplestockbotnews. - -**Guide**: All about using and setting up the bot is in the [docs](https://simplestockbot.com). - -The bot recognizes _"Symbols"_. `$` for stocks and `$$` for cryptos. Example: -- `/chart $$eth` gets a month's Ethereum chart. -- `/dividend $psec` shows Prospect Capital's dividend info. - -Mention a symbol, and the bot reveals its price. -E.g., `What's $$btc's price since $tsla accepts it?` gives Bitcoin and Tesla prices. - -**Commands** -- `/donate [USD]`: Support the bot. πŸŽ—οΈ -- `/intra $[symbol]`: Today's stock activity. πŸ“ˆ -- `/chart $[symbol]`: Past month's stock chart. πŸ“Š -- `/trending`: What's hot in stocks and cryptos. πŸ’¬ -- `/help`: Bot assistance. πŸ†˜ - -**Inline Features** -Search with @SimpleStockBot `[query]` anywhere. -Pick a ticker, and the bot shares the current price in chat. Note: Prices can lag by an hour. - -Data thanks to [marketdata.app](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=telegram). - -Bot issues? Use `/status` or [contact us](https://simplestockbot.com/contact). - - """ - - donate_text = """ -Support Simple Stock Bot through [donations](https://www.buymeacoffee.com/Anson). -All funds help maintain servers, with data from -[marketdata.app](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=telegram). - -**How to Donate?** -1. Use `/donate [amount in USD]`. E.g., `/donate 2` donates 2 USD. -2. Or, quickly donate at [buymeacoffee](https://www.buymeacoffee.com/Anson). No account needed, accepts Paypal & Credit card. - -For questions, visit our [website](https://simplestockbot.com). -""" - - -# Not used by the bot but for updating commands with BotFather -commands = """ -donate - Donate to the bot πŸŽ—οΈ -help - Get some help using the bot. πŸ†˜ -trending - Trending Stocks and Cryptos. πŸ’¬ -intra - $[symbol] Plot since the last market open. πŸ“ˆ -chart - $[chart] Plot of the past month. πŸ“Š -""" +"""Functions and Info specific to the Telegram Bot +""" + +import re + +import requests as r + + +class T_info: + license = re.sub( + r"\b\n", + " ", + r.get("https://gitlab.com/simple-stock-bots/simple-stock-bot/-/raw/master/LICENSE").text, + ) + + help_text = """ +Appreciate this bot? Show support by [buying me a beer](https://www.buymeacoffee.com/Anson) 🍻. + +Want stock data or to host your own bot? Help keep this bot free by using my +[affiliate link](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=telegram). + +πŸ“’ Stay updated on the bot's Telegram: https://t.me/simplestockbotnews. + +**Guide**: All about using and setting up the bot is in the [docs](https://simplestockbot.com). + +The bot recognizes _"Symbols"_. `$` for stocks and `$$` for cryptos. Example: +- `/chart $$eth` gets a month's Ethereum chart. +- `/dividend $psec` shows Prospect Capital's dividend info. + +Mention a symbol, and the bot reveals its price. +E.g., `What's $$btc's price since $tsla accepts it?` gives Bitcoin and Tesla prices. + +**Commands** +- `/donate [USD]`: Support the bot. πŸŽ—οΈ +- `/intra $[symbol]`: Today's stock activity. πŸ“ˆ +- `/chart $[symbol]`: Past month's stock chart. πŸ“Š +- `/trending`: What's hot in stocks and cryptos. πŸ’¬ +- `/help`: Bot assistance. πŸ†˜ + +**Inline Features** +Search with @SimpleStockBot `[query]` anywhere. +Pick a ticker, and the bot shares the current price in chat. Note: Prices can lag by an hour. + +Data thanks to [marketdata.app](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=telegram). + +Bot issues? Use `/status` or [contact us](https://simplestockbot.com/contact). + + """ + + donate_text = """ +Support Simple Stock Bot through [donations](https://www.buymeacoffee.com/Anson). +All funds help maintain servers, with data from +[marketdata.app](https://dashboard.marketdata.app/marketdata/aff/go/misterbiggs?keyword=telegram). + +**How to Donate?** +1. Use `/donate [amount in USD]`. E.g., `/donate 2` donates 2 USD. +2. Or, quickly donate at [buymeacoffee](https://www.buymeacoffee.com/Anson). No account needed, accepts Paypal & Credit card. + +For questions, visit our [website](https://simplestockbot.com). +""" + + +# Not used by the bot but for updating commands with BotFather +commands = """ +donate - Donate to the bot πŸŽ—οΈ +help - Get some help using the bot. πŸ†˜ +trending - Trending Stocks and Cryptos. πŸ’¬ +intra - $[symbol] Plot since the last market open. πŸ“ˆ +chart - $[chart] Plot of the past month. πŸ“Š +""" diff --git a/telegram/bot.py b/telegram/bot.py index 5c08fc0..9f23e49 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -1,508 +1,508 @@ -# Works with Python 3.8 -import datetime -import html -import io -import json -import logging -import os -import random -import string -import traceback -from uuid import uuid4 - -import mplfinance as mpf -from T_info import T_info - -import telegram -from common.symbol_router import Router -from telegram import InlineQueryResultArticle, InputTextMessageContent, LabeledPrice, Update -from telegram.ext import ( - Application, - CommandHandler, - ContextTypes, - InlineQueryHandler, - MessageHandler, - PreCheckoutQueryHandler, - filters, -) - -# Enable logging -logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO) - -# set higher logging level for httpx to avoid all GET and POST requests being logged -logging.getLogger("httpx").setLevel(logging.WARNING) - -log = logging.getLogger(__name__) - -TELEGRAM_TOKEN = os.environ["TELEGRAM"] - -try: - STRIPE_TOKEN = os.environ["STRIPE"] -except KeyError: - STRIPE_TOKEN = "" - log.warning("Starting without a STRIPE Token will not allow you to accept Donations!") - -s = Router() -t = T_info() - - -log.info("Bot script started.") - - -async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Send help text when the command /start is issued.""" - log.info(f"Start command ran by {update.message.chat.username}") - await update.message.reply_text( - text=t.help_text, - parse_mode=telegram.constants.ParseMode.MARKDOWN, - disable_notification=True, - ) - - -async def help(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Send help text when the command /help is issued.""" - log.info(f"Help command ran by {update.message.chat.username}") - await update.message.reply_text( - text=t.help_text, - parse_mode=telegram.constants.ParseMode.MARKDOWN, - disable_notification=True, - ) - - -async def license(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Send bots license when the /license command is issued.""" - log.info(f"License command ran by {update.message.chat.username}") - await update.message.reply_text( - text=t.license, - parse_mode=telegram.constants.ParseMode.MARKDOWN, - disable_notification=True, - ) - - -async def status(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Gather status of bot and dependant services and return important status updates.""" - log.warning(f"Status command ran by {update.message.chat.username}") - bot_resp_time = datetime.datetime.now(update.message.date.tzinfo) - update.message.date - - bot_status = s.status(f"It took {bot_resp_time.total_seconds()} seconds for the bot to get your message.") - - await update.message.reply_text( - text=bot_status, - parse_mode=telegram.constants.ParseMode.MARKDOWN, - ) - - -async def donate(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Sets up donation.""" - log.info(f"Donate command ran by {update.message.chat.username}") - chat_id = update.message.chat_id - - if update.message.text.strip() == "/donate" or "/donate@" in update.message.text: - await update.message.reply_text( - text=t.donate_text, - parse_mode=telegram.constants.ParseMode.MARKDOWN, - disable_notification=True, - ) - amount = 1.0 - else: - amount = float(update.message.text.replace("/donate", "").replace("$", "").strip()) - - try: - price = int(amount * 100) - except ValueError: - await update.message.reply_text(f"{amount} is not a valid donation amount or number.") - return - log.info(f"Donation amount: {price} by {update.message.chat.username}") - - await context.bot.send_invoice( - chat_id=chat_id, - title="Simple Stock Bot Donation", - description=f"Simple Stock Bot Donation of ${amount} by {update.message.chat.username}", - payload=f"simple-stock-bot-{chat_id}", - provider_token=STRIPE_TOKEN, - currency="USD", - prices=[LabeledPrice("Donation:", price)], - start_parameter="", - # suggested_tip_amounts=[100, 500, 1000, 2000], - photo_url="https://simple-stock-bots.gitlab.io/docs/img/Telegram.png", - photo_width=500, - photo_height=500, - ) - - -async def precheckout_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Approves donation""" - log.info("precheckout_callback queried") - query = update.pre_checkout_query - - await query.answer(ok=True) - # I dont think I need to check since its only donations. - # if query.invoice_payload == "simple-stock-bot": - # # answer False pre_checkout_query - # await query.answer(ok=True) - # else: - # await query.answer(ok=False, error_message="Something went wrong...") - - -async def successful_payment_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Thanks user for donation""" - log.info("Successful payment!") - await update.message.reply_text("Thank you for your donation! It goes a long way to keeping the bot free!") - - -async def symbol_detect_image(update: Update, context: ContextTypes.DEFAULT_TYPE): - """ - Makes image captions into text then passes the `update` and `context` - to symbol detect so that it can reply cashtags in image captions. - """ - try: - if update.message.caption: - update.message.text = update.message.caption - await symbol_detect(update, context) - except AttributeError: - return - - -async def symbol_detect(update: Update, context: ContextTypes.DEFAULT_TYPE): - """ - Runs on any message that doesn't have a command and searches for cashtags, - then returns the prices of any symbols found. - """ - try: - message = update.message.text - chat_id = update.message.chat_id - if "$" in message: - log.info("Looking for Symbols") - symbols = s.find_symbols(message) - else: - return - except AttributeError as ex: - log.info(ex) - return - - # Detect Options - if ("call" in message.lower()) or ("put" in message.lower()): - log.info("Options detected") - await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.TYPING) - try: - options_data = s.options(message, symbols) - - await update.message.reply_text( - text=generate_options_reply(options_data), - parse_mode=telegram.constants.ParseMode.MARKDOWN, - ) - return - except KeyError as ex: - logging.warning(ex) - pass - - if symbols: - log.info(f"Symbols found: {symbols}") - await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.TYPING) - - for reply in s.price_reply(symbols): - await update.message.reply_text( - text=reply, - parse_mode=telegram.constants.ParseMode.MARKDOWN, - disable_notification=True, - ) - - -def generate_options_reply(options_data: dict): - # Header with Option Symbol and Underlying - message_text = f"*{options_data['Option Symbol']} ({options_data['Underlying']})*\n\n" - - # Key details - details = ( - f"*Expiration:* `{options_data['Expiration']}`\n" - f"*Side:* `{options_data['side']}`\n" - f"*Strike:* `{options_data['strike']}`\n" - f"*First Traded:* `{options_data['First Traded']}`\n" - f"*Last Updated:* `{options_data['Last Updated']}`\n\n" - ) - message_text += details - - # Pricing info - pricing_info = ( - f"*Bid:* `{options_data['bid']}` (Size: `{options_data['bidSize']}`)\n" - f"*Mid:* `{options_data['mid']}`\n" - f"*Ask:* `{options_data['ask']}` (Size: `{options_data['askSize']}`)\n" - f"*Last:* `{options_data['last']}`\n\n" - ) - message_text += pricing_info - - # Volume and open interest - volume_info = f"*Open Interest:* `{options_data['Open Interest']}`\n" f"*Volume:* `{options_data['Volume']}`\n\n" - message_text += volume_info - - # Greeks - greeks_info = ( - f"*IV:* `{options_data['Implied Volatility']}`\n" - f"*Delta:* `{options_data['delta']}`\n" - f"*Gamma:* `{options_data['gamma']}`\n" - f"*Theta:* `{options_data['theta']}`\n" - f"*Vega:* `{options_data['vega']}`\n" - f"*Rho:* `{options_data['rho']}`\n" - ) - message_text += greeks_info - - return message_text - - -async def intra(update: Update, context: ContextTypes.DEFAULT_TYPE): - """returns a chart of intraday data for a symbol""" - log.info(f"Intra command ran by {update.message.chat.username}") - - message = update.message.text - chat_id = update.message.chat_id - - if message.strip().split("@")[0] == "/intra": - await update.message.reply_text( - "This command returns a chart of the stocks movement since the most recent market open.\nExample: /intra $tsla" - ) - return - - symbols = s.find_symbols(message, trending_weight=5) - symbol = symbols[0] - - if len(symbols): - symbol = symbols[0] - else: - await update.message.reply_text("No symbols or coins found.") - return - - df = s.intra_reply(symbol) - if df.empty: - await update.message.reply_text( - text="Invalid symbol please see `/help` for usage details.", - parse_mode=telegram.constants.ParseMode.MARKDOWN, - disable_notification=True, - ) - return - - await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.UPLOAD_PHOTO) - - buf = io.BytesIO() - mpf.plot( - df, - type="renko", - title=f"\n{symbol.name}", - volume="Volume" in df.keys(), - style="yahoo", - savefig=dict(fname=buf, dpi=400, bbox_inches="tight"), - ) - buf.seek(0) - - await update.message.reply_photo( - photo=buf, - caption=f"\nIntraday chart for {symbol.name} from {df.first_valid_index().strftime('%d %b at %H:%M')} to" - + f" {df.last_valid_index().strftime('%d %b at %H:%M %Z')}" - + f"\n\n{s.price_reply([symbol])[0]}", - parse_mode=telegram.constants.ParseMode.MARKDOWN, - disable_notification=True, - ) - - -async def chart(update: Update, context: ContextTypes.DEFAULT_TYPE): - """returns a chart of the past month of data for a symbol""" - log.info(f"Chart command ran by {update.message.chat.username}") - - message = update.message.text - chat_id = update.message.chat_id - - if message.strip().split("@")[0] == "/chart": - await update.message.reply_text( - "This command returns a chart of the stocks movement for the past month.\nExample: /chart $tsla" - ) - return - - symbols = s.find_symbols(message, trending_weight=10) - - if len(symbols): - symbol = symbols[0] - else: - await update.message.reply_text("No symbols or coins found.") - return - - df = s.chart_reply(symbol) - if df.empty: - await update.message.reply_text( - text="Invalid symbol please see `/help` for usage details.", - parse_mode=telegram.constants.ParseMode.MARKDOWN, - disable_notification=True, - ) - return - await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.UPLOAD_PHOTO) - - buf = io.BytesIO() - mpf.plot( - df, - type="candle", - title=f"\n{symbol.name}", - volume="Volume" in df.keys(), - style="yahoo", - savefig=dict(fname=buf, dpi=400, bbox_inches="tight"), - ) - buf.seek(0) - - await update.message.reply_photo( - photo=buf, - caption=f"\n1 Month chart for {symbol.name} from {df.first_valid_index().strftime('%d, %b %Y')}" - + f" to {df.last_valid_index().strftime('%d, %b %Y')}\n\n{s.price_reply([symbol])[0]}", - parse_mode=telegram.constants.ParseMode.MARKDOWN, - disable_notification=True, - ) - - -async def trending(update: Update, context: ContextTypes.DEFAULT_TYPE): - """returns currently trending symbols and how much they've moved in the past trading day.""" - log.info(f"Trending command ran by {update.message.chat.username}") - - chat_id = update.message.chat_id - - await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.TYPING) - - trending_list = s.trending() - - await update.message.reply_text( - text=trending_list, - parse_mode=telegram.constants.ParseMode.MARKDOWN, - disable_notification=True, - ) - - -async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """ - Handles inline query. Searches by looking if query is contained - in the symbol and returns matches in alphabetical order. - """ - - if not update.inline_query.query: - return - - print(f"Query: {update.inline_query.query}") - - ignored_queries = {"$", "$$", " ", ""} - - if update.inline_query.query.strip() in ignored_queries: - default_message = """ - You can type:\n@SimpleStockBot `[search]` - in any chat or direct message to search for the stock bots full list of stock and crypto symbols and return the price. - """ - - await update.inline_query.answer( - [ - InlineQueryResultArticle( - str(uuid4()), - title="Please enter a query. It can be a ticker or a name of a company.", - input_message_content=InputTextMessageContent( - default_message, parse_mode=telegram.constants.ParseMode.MARKDOWN - ), - ) - ] - ) - - matches = s.inline_search(update.inline_query.query) - - results = [] - for _, row in matches.iterrows(): - results.append( - InlineQueryResultArticle( - str(uuid4()), - title=row["description"], - input_message_content=InputTextMessageContent( - row["price_reply"], parse_mode=telegram.constants.ParseMode.MARKDOWN - ), - ) - ) - - if len(results) == 5: - await update.inline_query.answer(results, cache_time=60 * 60) - log.info("Inline Command was successful") - return - await update.inline_query.answer(results) - - -async def rand_pick(update: Update, context: ContextTypes.DEFAULT_TYPE): - """For the gamblers. Returns a random symbol to buy and a sell date""" - log.info(f"Someone is gambling! Random_pick command ran by {update.message.chat.username}") - - await update.message.reply_text( - text=s.random_pick(), - parse_mode=telegram.constants.ParseMode.MARKDOWN, - disable_notification=True, - ) - - -async def error(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Log Errors caused by Updates.""" - log.warning('Update "%s" caused error "%s"', update, error) - - tb_list = traceback.format_exception(None, context.error, context.error.__traceback__) - tb_string = "".join(tb_list) - - err_code = "".join([random.choice(string.ascii_lowercase) for i in range(5)]) - log.warning(f"Logging error: {err_code}") - - if update: - log.warning( - f"An exception was raised while handling an update\n" - f"\tupdate = {html.escape(json.dumps(update.to_dict(), indent=2, ensure_ascii=False))}\n" - f"\tcontext.chat_data = {str(context.chat_data)}\n" - f"\tcontext.user_data = {str(context.user_data)}\n" - f"\t{html.escape(tb_string)}" - ) - - await update.message.reply_text( - text=f"An error has occured. Please inform @MisterBiggs if the error persists. Error Code: `{err_code}`", - parse_mode=telegram.constants.ParseMode.MARKDOWN, - ) - else: - log.warning("No message to send to user.") - log.warning(tb_string) - - -def main(): - """Start the context.bot.""" - # Create the EventHandler and pass it your bot's token. - application = Application.builder().token(TELEGRAM_TOKEN).build() - - # on different commands - answer in Telegram - application.add_handler(CommandHandler("start", start)) - application.add_handler(CommandHandler("help", help)) - application.add_handler(CommandHandler("license", license)) - application.add_handler(CommandHandler("trending", trending)) - application.add_handler(CommandHandler("random", rand_pick)) - application.add_handler(CommandHandler("donate", donate)) - application.add_handler(CommandHandler("status", status)) - application.add_handler(CommandHandler("inline", inline_query)) - - # Charting can be slow so they run async. - application.add_handler(CommandHandler("intra", intra, block=False)) - application.add_handler(CommandHandler("intraday", intra, block=False)) - application.add_handler(CommandHandler("day", intra, block=False)) - application.add_handler(CommandHandler("chart", chart, block=False)) - application.add_handler(CommandHandler("month", chart, block=False)) - - # on noncommand i.e message - echo the message on Telegram - application.add_handler(MessageHandler(filters.TEXT, symbol_detect)) - application.add_handler(MessageHandler(filters.PHOTO, symbol_detect_image)) - - # Inline Bot commands - application.add_handler(InlineQueryHandler(inline_query)) - - # Pre-checkout handler to final check - application.add_handler(PreCheckoutQueryHandler(precheckout_callback)) - - # Payment success - application.add_handler(MessageHandler(filters.SUCCESSFUL_PAYMENT, successful_payment_callback)) - - # log all errors - application.add_error_handler(error) - - # Start the Bot - application.run_polling(allowed_updates=Update.ALL_TYPES) - - -if __name__ == "__main__": - main() +# Works with Python 3.8 +import datetime +import html +import io +import json +import logging +import os +import random +import string +import traceback +from uuid import uuid4 + +import mplfinance as mpf +from T_info import T_info + +import telegram +from common.symbol_router import Router +from telegram import InlineQueryResultArticle, InputTextMessageContent, LabeledPrice, Update +from telegram.ext import ( + Application, + CommandHandler, + ContextTypes, + InlineQueryHandler, + MessageHandler, + PreCheckoutQueryHandler, + filters, +) + +# Enable logging +logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO) + +# set higher logging level for httpx to avoid all GET and POST requests being logged +logging.getLogger("httpx").setLevel(logging.WARNING) + +log = logging.getLogger(__name__) + +TELEGRAM_TOKEN = os.environ["TELEGRAM"] + +try: + STRIPE_TOKEN = os.environ["STRIPE"] +except KeyError: + STRIPE_TOKEN = "" + log.warning("Starting without a STRIPE Token will not allow you to accept Donations!") + +s = Router() +t = T_info() + + +log.info("Bot script started.") + + +async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Send help text when the command /start is issued.""" + log.info(f"Start command ran by {update.message.chat.username}") + await update.message.reply_text( + text=t.help_text, + parse_mode=telegram.constants.ParseMode.MARKDOWN, + disable_notification=True, + ) + + +async def help(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Send help text when the command /help is issued.""" + log.info(f"Help command ran by {update.message.chat.username}") + await update.message.reply_text( + text=t.help_text, + parse_mode=telegram.constants.ParseMode.MARKDOWN, + disable_notification=True, + ) + + +async def license(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Send bots license when the /license command is issued.""" + log.info(f"License command ran by {update.message.chat.username}") + await update.message.reply_text( + text=t.license, + parse_mode=telegram.constants.ParseMode.MARKDOWN, + disable_notification=True, + ) + + +async def status(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Gather status of bot and dependant services and return important status updates.""" + log.warning(f"Status command ran by {update.message.chat.username}") + bot_resp_time = datetime.datetime.now(update.message.date.tzinfo) - update.message.date + + bot_status = s.status(f"It took {bot_resp_time.total_seconds()} seconds for the bot to get your message.") + + await update.message.reply_text( + text=bot_status, + parse_mode=telegram.constants.ParseMode.MARKDOWN, + ) + + +async def donate(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Sets up donation.""" + log.info(f"Donate command ran by {update.message.chat.username}") + chat_id = update.message.chat_id + + if update.message.text.strip() == "/donate" or "/donate@" in update.message.text: + await update.message.reply_text( + text=t.donate_text, + parse_mode=telegram.constants.ParseMode.MARKDOWN, + disable_notification=True, + ) + amount = 1.0 + else: + amount = float(update.message.text.replace("/donate", "").replace("$", "").strip()) + + try: + price = int(amount * 100) + except ValueError: + await update.message.reply_text(f"{amount} is not a valid donation amount or number.") + return + log.info(f"Donation amount: {price} by {update.message.chat.username}") + + await context.bot.send_invoice( + chat_id=chat_id, + title="Simple Stock Bot Donation", + description=f"Simple Stock Bot Donation of ${amount} by {update.message.chat.username}", + payload=f"simple-stock-bot-{chat_id}", + provider_token=STRIPE_TOKEN, + currency="USD", + prices=[LabeledPrice("Donation:", price)], + start_parameter="", + # suggested_tip_amounts=[100, 500, 1000, 2000], + photo_url="https://simple-stock-bots.gitlab.io/docs/img/Telegram.png", + photo_width=500, + photo_height=500, + ) + + +async def precheckout_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Approves donation""" + log.info("precheckout_callback queried") + query = update.pre_checkout_query + + await query.answer(ok=True) + # I dont think I need to check since its only donations. + # if query.invoice_payload == "simple-stock-bot": + # # answer False pre_checkout_query + # await query.answer(ok=True) + # else: + # await query.answer(ok=False, error_message="Something went wrong...") + + +async def successful_payment_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Thanks user for donation""" + log.info("Successful payment!") + await update.message.reply_text("Thank you for your donation! It goes a long way to keeping the bot free!") + + +async def symbol_detect_image(update: Update, context: ContextTypes.DEFAULT_TYPE): + """ + Makes image captions into text then passes the `update` and `context` + to symbol detect so that it can reply cashtags in image captions. + """ + try: + if update.message.caption: + update.message.text = update.message.caption + await symbol_detect(update, context) + except AttributeError: + return + + +async def symbol_detect(update: Update, context: ContextTypes.DEFAULT_TYPE): + """ + Runs on any message that doesn't have a command and searches for cashtags, + then returns the prices of any symbols found. + """ + try: + message = update.message.text + chat_id = update.message.chat_id + if "$" in message: + log.info("Looking for Symbols") + symbols = s.find_symbols(message) + else: + return + except AttributeError as ex: + log.info(ex) + return + + # Detect Options + if ("call" in message.lower()) or ("put" in message.lower()): + log.info("Options detected") + await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.TYPING) + try: + options_data = s.options(message, symbols) + + await update.message.reply_text( + text=generate_options_reply(options_data), + parse_mode=telegram.constants.ParseMode.MARKDOWN, + ) + return + except KeyError as ex: + logging.warning(ex) + pass + + if symbols: + log.info(f"Symbols found: {symbols}") + await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.TYPING) + + for reply in s.price_reply(symbols): + await update.message.reply_text( + text=reply, + parse_mode=telegram.constants.ParseMode.MARKDOWN, + disable_notification=True, + ) + + +def generate_options_reply(options_data: dict): + # Header with Option Symbol and Underlying + message_text = f"*{options_data['Option Symbol']} ({options_data['Underlying']})*\n\n" + + # Key details + details = ( + f"*Expiration:* `{options_data['Expiration']}`\n" + f"*Side:* `{options_data['side']}`\n" + f"*Strike:* `{options_data['strike']}`\n" + f"*First Traded:* `{options_data['First Traded']}`\n" + f"*Last Updated:* `{options_data['Last Updated']}`\n\n" + ) + message_text += details + + # Pricing info + pricing_info = ( + f"*Bid:* `{options_data['bid']}` (Size: `{options_data['bidSize']}`)\n" + f"*Mid:* `{options_data['mid']}`\n" + f"*Ask:* `{options_data['ask']}` (Size: `{options_data['askSize']}`)\n" + f"*Last:* `{options_data['last']}`\n\n" + ) + message_text += pricing_info + + # Volume and open interest + volume_info = f"*Open Interest:* `{options_data['Open Interest']}`\n" f"*Volume:* `{options_data['Volume']}`\n\n" + message_text += volume_info + + # Greeks + greeks_info = ( + f"*IV:* `{options_data['Implied Volatility']}`\n" + f"*Delta:* `{options_data['delta']}`\n" + f"*Gamma:* `{options_data['gamma']}`\n" + f"*Theta:* `{options_data['theta']}`\n" + f"*Vega:* `{options_data['vega']}`\n" + f"*Rho:* `{options_data['rho']}`\n" + ) + message_text += greeks_info + + return message_text + + +async def intra(update: Update, context: ContextTypes.DEFAULT_TYPE): + """returns a chart of intraday data for a symbol""" + log.info(f"Intra command ran by {update.message.chat.username}") + + message = update.message.text + chat_id = update.message.chat_id + + if message.strip().split("@")[0] == "/intra": + await update.message.reply_text( + "This command returns a chart of the stocks movement since the most recent market open.\nExample: /intra $tsla" + ) + return + + symbols = s.find_symbols(message, trending_weight=5) + symbol = symbols[0] + + if len(symbols): + symbol = symbols[0] + else: + await update.message.reply_text("No symbols or coins found.") + return + + df = s.intra_reply(symbol) + if df.empty: + await update.message.reply_text( + text="Invalid symbol please see `/help` for usage details.", + parse_mode=telegram.constants.ParseMode.MARKDOWN, + disable_notification=True, + ) + return + + await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.UPLOAD_PHOTO) + + buf = io.BytesIO() + mpf.plot( + df, + type="renko", + title=f"\n{symbol.name}", + volume="Volume" in df.keys(), + style="yahoo", + savefig=dict(fname=buf, dpi=400, bbox_inches="tight"), + ) + buf.seek(0) + + await update.message.reply_photo( + photo=buf, + caption=f"\nIntraday chart for {symbol.name} from {df.first_valid_index().strftime('%d %b at %H:%M')} to" + + f" {df.last_valid_index().strftime('%d %b at %H:%M %Z')}" + + f"\n\n{s.price_reply([symbol])[0]}", + parse_mode=telegram.constants.ParseMode.MARKDOWN, + disable_notification=True, + ) + + +async def chart(update: Update, context: ContextTypes.DEFAULT_TYPE): + """returns a chart of the past month of data for a symbol""" + log.info(f"Chart command ran by {update.message.chat.username}") + + message = update.message.text + chat_id = update.message.chat_id + + if message.strip().split("@")[0] == "/chart": + await update.message.reply_text( + "This command returns a chart of the stocks movement for the past month.\nExample: /chart $tsla" + ) + return + + symbols = s.find_symbols(message, trending_weight=10) + + if len(symbols): + symbol = symbols[0] + else: + await update.message.reply_text("No symbols or coins found.") + return + + df = s.chart_reply(symbol) + if df.empty: + await update.message.reply_text( + text="Invalid symbol please see `/help` for usage details.", + parse_mode=telegram.constants.ParseMode.MARKDOWN, + disable_notification=True, + ) + return + await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.UPLOAD_PHOTO) + + buf = io.BytesIO() + mpf.plot( + df, + type="candle", + title=f"\n{symbol.name}", + volume="Volume" in df.keys(), + style="yahoo", + savefig=dict(fname=buf, dpi=400, bbox_inches="tight"), + ) + buf.seek(0) + + await update.message.reply_photo( + photo=buf, + caption=f"\n1 Month chart for {symbol.name} from {df.first_valid_index().strftime('%d, %b %Y')}" + + f" to {df.last_valid_index().strftime('%d, %b %Y')}\n\n{s.price_reply([symbol])[0]}", + parse_mode=telegram.constants.ParseMode.MARKDOWN, + disable_notification=True, + ) + + +async def trending(update: Update, context: ContextTypes.DEFAULT_TYPE): + """returns currently trending symbols and how much they've moved in the past trading day.""" + log.info(f"Trending command ran by {update.message.chat.username}") + + chat_id = update.message.chat_id + + await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.TYPING) + + trending_list = s.trending() + + await update.message.reply_text( + text=trending_list, + parse_mode=telegram.constants.ParseMode.MARKDOWN, + disable_notification=True, + ) + + +async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """ + Handles inline query. Searches by looking if query is contained + in the symbol and returns matches in alphabetical order. + """ + + if not update.inline_query.query: + return + + print(f"Query: {update.inline_query.query}") + + ignored_queries = {"$", "$$", " ", ""} + + if update.inline_query.query.strip() in ignored_queries: + default_message = """ + You can type:\n@SimpleStockBot `[search]` + in any chat or direct message to search for the stock bots full list of stock and crypto symbols and return the price. + """ + + await update.inline_query.answer( + [ + InlineQueryResultArticle( + str(uuid4()), + title="Please enter a query. It can be a ticker or a name of a company.", + input_message_content=InputTextMessageContent( + default_message, parse_mode=telegram.constants.ParseMode.MARKDOWN + ), + ) + ] + ) + + matches = s.inline_search(update.inline_query.query) + + results = [] + for _, row in matches.iterrows(): + results.append( + InlineQueryResultArticle( + str(uuid4()), + title=row["description"], + input_message_content=InputTextMessageContent( + row["price_reply"], parse_mode=telegram.constants.ParseMode.MARKDOWN + ), + ) + ) + + if len(results) == 5: + await update.inline_query.answer(results, cache_time=60 * 60) + log.info("Inline Command was successful") + return + await update.inline_query.answer(results) + + +async def rand_pick(update: Update, context: ContextTypes.DEFAULT_TYPE): + """For the gamblers. Returns a random symbol to buy and a sell date""" + log.info(f"Someone is gambling! Random_pick command ran by {update.message.chat.username}") + + await update.message.reply_text( + text=s.random_pick(), + parse_mode=telegram.constants.ParseMode.MARKDOWN, + disable_notification=True, + ) + + +async def error(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Log Errors caused by Updates.""" + log.warning('Update "%s" caused error "%s"', update, error) + + tb_list = traceback.format_exception(None, context.error, context.error.__traceback__) + tb_string = "".join(tb_list) + + err_code = "".join([random.choice(string.ascii_lowercase) for i in range(5)]) + log.warning(f"Logging error: {err_code}") + + if update: + log.warning( + f"An exception was raised while handling an update\n" + f"\tupdate = {html.escape(json.dumps(update.to_dict(), indent=2, ensure_ascii=False))}\n" + f"\tcontext.chat_data = {str(context.chat_data)}\n" + f"\tcontext.user_data = {str(context.user_data)}\n" + f"\t{html.escape(tb_string)}" + ) + + await update.message.reply_text( + text=f"An error has occured. Please inform @MisterBiggs if the error persists. Error Code: `{err_code}`", + parse_mode=telegram.constants.ParseMode.MARKDOWN, + ) + else: + log.warning("No message to send to user.") + log.warning(tb_string) + + +def main(): + """Start the context.bot.""" + # Create the EventHandler and pass it your bot's token. + application = Application.builder().token(TELEGRAM_TOKEN).build() + + # on different commands - answer in Telegram + application.add_handler(CommandHandler("start", start)) + application.add_handler(CommandHandler("help", help)) + application.add_handler(CommandHandler("license", license)) + application.add_handler(CommandHandler("trending", trending)) + application.add_handler(CommandHandler("random", rand_pick)) + application.add_handler(CommandHandler("donate", donate)) + application.add_handler(CommandHandler("status", status)) + application.add_handler(CommandHandler("inline", inline_query)) + + # Charting can be slow so they run async. + application.add_handler(CommandHandler("intra", intra, block=False)) + application.add_handler(CommandHandler("intraday", intra, block=False)) + application.add_handler(CommandHandler("day", intra, block=False)) + application.add_handler(CommandHandler("chart", chart, block=False)) + application.add_handler(CommandHandler("month", chart, block=False)) + + # on noncommand i.e message - echo the message on Telegram + application.add_handler(MessageHandler(filters.TEXT, symbol_detect)) + application.add_handler(MessageHandler(filters.PHOTO, symbol_detect_image)) + + # Inline Bot commands + application.add_handler(InlineQueryHandler(inline_query)) + + # Pre-checkout handler to final check + application.add_handler(PreCheckoutQueryHandler(precheckout_callback)) + + # Payment success + application.add_handler(MessageHandler(filters.SUCCESSFUL_PAYMENT, successful_payment_callback)) + + # log all errors + application.add_error_handler(error) + + # Start the Bot + application.run_polling(allowed_updates=Update.ALL_TYPES) + + +if __name__ == "__main__": + main() diff --git a/tests.py b/tests.py index 996bcff..0549f0d 100644 --- a/tests.py +++ b/tests.py @@ -1,23 +1,21 @@ -import time - -import keyboard - -tests = """$$xno -$tsla -/intra $tsla -/intra $$btc -/chart $tsla -/chart $$btc -/help -/trending""".split( - "\n" -) - -print("press enter to start") -keyboard.wait("enter") - -for test in tests: - print(test) - keyboard.write(test) - time.sleep(1) - keyboard.press_and_release("enter") +import time + +import keyboard + +tests = """$$xno +$tsla +/intra $tsla +/intra $$btc +/chart $tsla +/chart $$btc +/help +/trending""".split("\n") + +print("press enter to start") +keyboard.wait("enter") + +for test in tests: + print(test) + keyboard.write(test) + time.sleep(1) + keyboard.press_and_release("enter")