diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index e907d8d..7d7528f 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -7,9 +7,13 @@ RUN pip install --user -r requirements.txt FROM python:3.9-slim -COPY --from=builder /root/.local /root/.local -RUN pip install --no-cache-dir black +ENV MPLBACKEND=Agg +COPY --from=builder /root/.local /root/.local + +RUN pip install --no-cache-dir black +ENV TELEGRAM=TOKEN +ENV IEX=TOKEN COPY . . diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index efc4208..de09b46 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -10,7 +10,8 @@ "settings": {}, // Add the IDs of extensions you want installed when the container is created. "extensions": [ - "ms-python.python" + "ms-python.python", + "ms-azuretools.vscode-docker" ] // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..a33a1b2 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Telegram Bot", + "type": "python", + "request": "launch", + "program": "bot.py", + "console": "integratedTerminal" + } + ] +} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 4e2ad64..7ae49d2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,6 +7,8 @@ RUN pip install --user -r requirements.txt FROM python:3.9-slim +ENV MPLBACKEND=Agg + COPY --from=builder /root/.local /root/.local diff --git a/IEX_Symbol.py b/IEX_Symbol.py index 1eaa159..40038a0 100644 --- a/IEX_Symbol.py +++ b/IEX_Symbol.py @@ -1,520 +1,487 @@ -"""Class with functions for running the bot with IEX Cloud. -""" - -import logging -import os -from datetime import datetime -from logging import warning -from typing import List, Optional, Tuple - -import pandas as pd -import requests as r -import schedule -from fuzzywuzzy import fuzz - -from Symbol import Stock - - -class IEX_Symbol: - """ - Functions for finding stock market information about symbols. - """ - - SYMBOL_REGEX = "[$]([a-zA-Z]{1,4})" - - searched_symbols = {} - otc_list = [] - charts = {} - trending_cache = None - - def __init__(self) -> None: - """Creates a Symbol Object - - Parameters - ---------- - IEX_TOKEN : str - IEX API Token - """ - try: - self.IEX_TOKEN = os.environ["IEX"] - except KeyError: - self.IEX_TOKEN = "" - warning( - "Starting without an IEX Token will not allow you to get market data!" - ) - - if self.IEX_TOKEN != "": - self.get_symbol_list() - - schedule.every().day.do(self.get_symbol_list) - schedule.every().day.do(self.clear_charts) - - def get(self, endpoint, params: dict = {}, timeout=5) -> dict: - - url = "https://cloud.iexapis.com/stable" + endpoint - - # set token param if it wasn't passed. - params["token"] = params.get("token", self.IEX_TOKEN) - - resp = r.get(url, params=params, timeout=timeout) - - # Make sure API returned a proper status code - try: - resp.raise_for_status() - except r.exceptions.HTTPError as e: - logging.error(e) - return {} - - # Make sure API returned valid JSON - try: - resp_json = resp.json() - return resp_json - except r.exceptions.JSONDecodeError as e: - logging.error(e) - return {} - - def clear_charts(self) -> None: - """ - Clears cache of chart data. - Charts are cached so that only 1 API call per 24 hours is needed since the - chart data is expensive and a large download. - """ - self.charts = {} - - def get_symbol_list( - self, return_df=False - ) -> Optional[Tuple[pd.DataFrame, datetime]]: - """Gets list of all symbols supported by IEX - - Parameters - ---------- - return_df : bool, optional - return the dataframe of all stock symbols, by default False - - Returns - ------- - Optional[Tuple[pd.DataFrame, datetime]] - If `return_df` is set to `True` returns a dataframe, otherwise returns `None`. - """ - - reg_symbols = self.get("/ref-data/symbols") - otc_symbols = self.get("/ref-data/otc/symbols") - - reg = pd.DataFrame(data=reg_symbols) - otc = pd.DataFrame(data=otc_symbols) - self.otc_list = set(otc["symbol"].to_list()) - - symbols = pd.concat([reg, otc]) - - symbols["description"] = "$" + symbols["symbol"] + ": " + symbols["name"] - symbols["id"] = symbols["symbol"] - symbols["type_id"] = "$" + symbols["symbol"].str.lower() - - symbols = symbols[["id", "symbol", "name", "description", "type_id"]] - self.symbol_list = symbols - if return_df: - return symbols, datetime.now() - - def status(self) -> str: - """Checks IEX Status dashboard for any current API issues. - - Returns - ------- - str - Human readable text on status of IEX API - """ - resp = r.get( - "https://pjmps0c34hp7.statuspage.io/api/v2/status.json", - timeout=15, - ) - - if resp.status_code == 200: - status = resp.json()["status"] - else: - return "IEX Cloud did not respond. Please check their status page for more information. https://status.iexapis.com" - - if status["indicator"] == "none": - return "IEX Cloud is currently not reporting any issues with its API." - else: - return ( - f"{status['indicator']}: {status['description']}." - + " Please check the status page for more information. https://status.iexapis.com" - ) - - def search_symbols(self, search: str) -> List[Tuple[str, str]]: - """Performs a fuzzy search to find stock symbols closest to a search term. - - Parameters - ---------- - search : str - String used to search, could be a company name or something close to the companies stock ticker. - - Returns - ------- - List[tuple[str, str]] - A list tuples of every stock sorted in order of how well they match. Each tuple contains: (Symbol, Issue Name). - """ - - schedule.run_pending() - search = search.lower() - try: # https://stackoverflow.com/a/3845776/8774114 - return self.searched_symbols[search] - except KeyError: - pass - - symbols = self.symbol_list - symbols["Match"] = symbols.apply( - lambda x: fuzz.ratio(search, f"{x['symbol']}".lower()), - axis=1, - ) - - symbols.sort_values(by="Match", ascending=False, inplace=True) - if symbols["Match"].head().sum() < 300: - symbols["Match"] = symbols.apply( - lambda x: fuzz.partial_ratio(search, x["name"].lower()), - axis=1, - ) - - symbols.sort_values(by="Match", ascending=False, inplace=True) - symbols = symbols.head(10) - symbol_list = list(zip(list(symbols["symbol"]), list(symbols["description"]))) - self.searched_symbols[search] = symbol_list - return symbol_list - - def price_reply(self, symbol: Stock) -> str: - """Returns price movement of Stock for the last market day, or after hours. - - Parameters - ---------- - symbol : Stock - - Returns - ------- - str - Formatted markdown - """ - - if IEXData := self.get(f"/stock/{symbol.id}/quote"): - - if symbol.symbol.upper() in self.otc_list: - return f"OTC - {symbol.symbol.upper()}, {IEXData['companyName']} most recent price is: $**{IEXData['latestPrice']}**" - - keys = ( - "extendedChangePercent", - "extendedPrice", - "companyName", - "latestPrice", - "changePercent", - ) - - if set(keys).issubset(IEXData): - - if change := IEXData.get("changePercent", 0): - change = round(change * 100, 2) - else: - change = 0 - - if ( - IEXData.get("isUSMarketOpen", True) - or (IEXData["extendedChangePercent"] is None) - or (IEXData["extendedPrice"] is None) - ): # Check if market is open. - message = f"The current stock price of {IEXData['companyName']} is $**{IEXData['latestPrice']}**" - else: - message = ( - f"{IEXData['companyName']} closed at $**{IEXData['latestPrice']}** with a change of {change}%," - + f" after hours _(15 minutes delayed)_ the stock price is $**{IEXData['extendedPrice']}**" - ) - if change := IEXData.get("extendedChangePercent", 0): - change = round(change * 100, 2) - else: - change = 0 - - # Determine wording of change text - if change > 0: - message += f", the stock is currently **up {change}%**" - elif change < 0: - message += f", the stock is currently **down {change}%**" - else: - message += ", the stock hasn't shown any movement today." - else: - message = ( - f"The symbol: {symbol} encountered and error. This could be due to " - ) - - else: - message = f"The symbol: {symbol} was not found." - - return message - - def dividend_reply(self, symbol: Stock) -> str: - """Returns the most recent, or next dividend date for a stock symbol. - - Parameters - ---------- - symbol : Stock - - Returns - ------- - str - Formatted markdown - """ - if symbol.symbol.upper() in self.otc_list: - return "OTC stocks do not currently support any commands." - - if resp := self.get(f"/stock/{symbol.id}/dividends/next"): - try: - IEXData = resp[0] - except IndexError as e: - return f"${symbol.id.upper()} either doesn't exist or pays no dividend." - keys = ( - "amount", - "currency", - "declaredDate", - "exDate", - "frequency", - "paymentDate", - "flag", - ) - - if set(keys).issubset(IEXData): - - if IEXData["currency"] == "USD": - price = f"${IEXData['amount']}" - else: - price = f"{IEXData['amount']} {IEXData['currency']}" - - # Pattern IEX uses for dividend date. - pattern = "%Y-%m-%d" - - declared = datetime.strptime(IEXData["declaredDate"], pattern).strftime( - "%A, %B %w" - ) - ex = datetime.strptime(IEXData["exDate"], pattern).strftime("%A, %B %w") - payment = datetime.strptime(IEXData["paymentDate"], pattern).strftime( - "%A, %B %w" - ) - - daysDelta = ( - datetime.strptime(IEXData["paymentDate"], pattern) - datetime.now() - ).days - - return ( - "The next dividend for " - + f"{self.symbol_list[self.symbol_list['symbol']==symbol.id.upper()]['description'].item()}" # Get full name without api call - + f" is on {payment} which is in {daysDelta} days." - + f" The dividend is for {price} per share." - + f"\n\nThe dividend was declared on {declared} and the ex-dividend date is {ex}" - ) - - return f"${symbol.id.upper()} either doesn't exist or pays no dividend." - - def news_reply(self, symbol: Stock) -> str: - """Gets most recent, english, non-paywalled news - - Parameters - ---------- - symbol : Stock - - Returns - ------- - str - Formatted markdown - """ - if symbol.symbol.upper() in self.otc_list: - return "OTC stocks do not currently support any commands." - - if data := self.get(f"/stock/{symbol.id}/news/last/15"): - line = [] - - for news in data: - if news["lang"] == "en" and not news["hasPaywall"]: - line.append( - f"*{news['source']}*: [{news['headline']}]({news['url']})" - ) - - return f"News for **{symbol.id.upper()}**:\n" + "\n".join(line[:5]) - - else: - return f"No news found for: {symbol.id}\nEither today is boring or the symbol does not exist." - - def info_reply(self, symbol: Stock) -> str: - """Gets description for Stock - - Parameters - ---------- - symbol : Stock - - Returns - ------- - str - Formatted text - """ - if symbol.symbol.upper() in self.otc_list: - return "OTC stocks do not currently support any commands." - - if data := self.get(f"/stock/{symbol.id}/company"): - [data.pop(k) for k in list(data) if data[k] == ""] - - if "description" in data: - return data["description"] - - return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist." - - def stat_reply(self, symbol: Stock) -> str: - """Key statistics on a Stock - - Parameters - ---------- - symbol : Stock - - Returns - ------- - str - Formatted markdown - """ - if symbol.symbol.upper() in self.otc_list: - return "OTC stocks do not currently support any commands." - - if data := self.get(f"/stock/{symbol.id}/stats"): - [data.pop(k) for k in list(data) if data[k] == ""] - - m = "" - if "companyName" in data: - m += f"Company Name: {data['companyName']}\n" - if "marketcap" in data: - m += f"Market Cap: ${data['marketcap']:,}\n" - if "week52high" in data: - m += f"52 Week (high-low): {data['week52high']:,} " - if "week52low" in data: - m += f"- {data['week52low']:,}\n" - if "employees" in data: - m += f"Number of Employees: {data['employees']:,}\n" - if "nextEarningsDate" in data: - m += f"Next Earnings Date: {data['nextEarningsDate']}\n" - if "peRatio" in data: - m += f"Price to Earnings: {data['peRatio']:.3f}\n" - if "beta" in data: - m += f"Beta: {data['beta']:.3f}\n" - return m - else: - return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist." - - def cap_reply(self, symbol: Stock) -> str: - """Get the Market Cap of a stock""" - - if data := self.get(f"/stable/stock/{symbol.id}/stats"): - - try: - cap = data["marketcap"] - except KeyError: - return f"{symbol.id} returned an error." - - message = f"The current market cap of {symbol.name} is $**{cap:,.2f}**" - - else: - message = f"The Stock: {symbol.name} was not found or returned and error." - - return message - - def intra_reply(self, symbol: Stock) -> pd.DataFrame: - """Returns price data for a symbol since the last market open. - - Parameters - ---------- - symbol : str - Stock symbol. - - Returns - ------- - pd.DataFrame - Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. - """ - if symbol.symbol.upper() in self.otc_list: - return pd.DataFrame() - - if symbol.id.upper() not in list(self.symbol_list["symbol"]): - return pd.DataFrame() - - if data := self.get(f"/stock/{symbol.id}/intraday-prices"): - df = pd.DataFrame(data) - df.dropna(inplace=True, subset=["date", "minute", "high", "low", "volume"]) - df["DT"] = pd.to_datetime(df["date"] + "T" + df["minute"]) - df = df.set_index("DT") - return df - - return pd.DataFrame() - - def chart_reply(self, symbol: Stock) -> pd.DataFrame: - """Returns price data for a symbol of the past month up until the previous trading days close. - Also caches multiple requests made in the same day. - - Parameters - ---------- - symbol : str - Stock symbol. - - Returns - ------- - pd.DataFrame - Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. - """ - schedule.run_pending() - - if symbol.symbol.upper() in self.otc_list: - return pd.DataFrame() - - if symbol.id.upper() not in list(self.symbol_list["symbol"]): - return pd.DataFrame() - - try: # https://stackoverflow.com/a/3845776/8774114 - return self.charts[symbol.id.upper()] - except KeyError: - pass - - if data := self.get( - f"/stock/{symbol.id}/chart/1mm", - params={"chartInterval": 3, "includeToday": "false"}, - ): - df = pd.DataFrame(data) - df.dropna(inplace=True, subset=["date", "minute", "high", "low", "volume"]) - df["DT"] = pd.to_datetime(df["date"] + "T" + df["minute"]) - df = df.set_index("DT") - self.charts[symbol.id.upper()] = df - return df - - return pd.DataFrame() - - def spark_reply(self, symbol: Stock) -> str: - quote = self.get(f"/stock/{symbol.id}/quote") - - open_change = quote.get("changePercent", 0) - after_change = quote.get("extendedChangePercent", 0) - - change = 0 - - if open_change: - change = change + open_change - if after_change: - change = change + after_change - - change = change * 100 - - return f"`{symbol.tag}`: {quote['companyName']}, {change:.2f}%" - - def trending(self) -> list[str]: - """Gets current coins trending on IEX. Only returns when market is open. - - Returns - ------- - list[str] - list of $ID: NAME, CHANGE% - """ - - if data := self.get(f"/stock/market/list/mostactive"): - self.trending_cache = [ - f"`${s['symbol']}`: {s['companyName']}, {100*s['changePercent']:.2f}%" - for s in data - ] - - return self.trending_cache +"""Class with functions for running the bot with IEX Cloud. +""" + +import logging +import os +from datetime import datetime +from logging import warning +from typing import List, Optional, Tuple + +import pandas as pd +import requests as r +import schedule + +from Symbol import Stock + + +class IEX_Symbol: + """ + Functions for finding stock market information about symbols. + """ + + SYMBOL_REGEX = "[$]([a-zA-Z]{1,4})" + + searched_symbols = {} + otc_list = [] + charts = {} + trending_cache = None + + def __init__(self) -> None: + """Creates a Symbol Object + + Parameters + ---------- + IEX_TOKEN : str + IEX API Token + """ + try: + self.IEX_TOKEN = os.environ["IEX"] + + if self.IEX_TOKEN == "TOKEN": + self.IEX_TOKEN = "" + except KeyError: + self.IEX_TOKEN = "" + warning( + "Starting without an IEX Token will not allow you to get market data!" + ) + + if self.IEX_TOKEN != "": + self.get_symbol_list() + + schedule.every().day.do(self.get_symbol_list) + schedule.every().day.do(self.clear_charts) + + def get(self, endpoint, params: dict = {}, timeout=5) -> dict: + + url = "https://cloud.iexapis.com/stable" + endpoint + + # set token param if it wasn't passed. + params["token"] = params.get("token", self.IEX_TOKEN) + + resp = r.get(url, params=params, timeout=timeout) + + # Make sure API returned a proper status code + try: + resp.raise_for_status() + except r.exceptions.HTTPError as e: + logging.error(e) + return {} + + # Make sure API returned valid JSON + try: + resp_json = resp.json() + return resp_json + except r.exceptions.JSONDecodeError as e: + logging.error(e) + return {} + + def clear_charts(self) -> None: + """ + Clears cache of chart data. + Charts are cached so that only 1 API call per 24 hours is needed since the + chart data is expensive and a large download. + """ + self.charts = {} + + def get_symbol_list( + self, return_df=False + ) -> Optional[Tuple[pd.DataFrame, datetime]]: + """Gets list of all symbols supported by IEX + + Parameters + ---------- + return_df : bool, optional + return the dataframe of all stock symbols, by default False + + Returns + ------- + Optional[Tuple[pd.DataFrame, datetime]] + If `return_df` is set to `True` returns a dataframe, otherwise returns `None`. + """ + + reg_symbols = self.get("/ref-data/symbols") + otc_symbols = self.get("/ref-data/otc/symbols") + + reg = pd.DataFrame(data=reg_symbols) + otc = pd.DataFrame(data=otc_symbols) + self.otc_list = set(otc["symbol"].to_list()) + + symbols = pd.concat([reg, otc]) + + symbols["description"] = "$" + symbols["symbol"] + ": " + symbols["name"] + symbols["id"] = symbols["symbol"] + symbols["type_id"] = "$" + symbols["symbol"].str.lower() + + symbols = symbols[["id", "symbol", "name", "description", "type_id"]] + self.symbol_list = symbols + if return_df: + return symbols, datetime.now() + + def status(self) -> str: + """Checks IEX Status dashboard for any current API issues. + + Returns + ------- + str + Human readable text on status of IEX API + """ + + if self.IEX_TOKEN == "": + return "The `IEX_TOKEN` is not set so Stock Market data is not available." + + resp = r.get( + "https://pjmps0c34hp7.statuspage.io/api/v2/status.json", + timeout=15, + ) + + if resp.status_code == 200: + status = resp.json()["status"] + else: + return "IEX Cloud did not respond. Please check their status page for more information. https://status.iexapis.com" + + if status["indicator"] == "none": + return "IEX Cloud is currently not reporting any issues with its API." + else: + return ( + f"{status['indicator']}: {status['description']}." + + " Please check the status page for more information. https://status.iexapis.com" + ) + + def price_reply(self, symbol: Stock) -> str: + """Returns price movement of Stock for the last market day, or after hours. + + Parameters + ---------- + symbol : Stock + + Returns + ------- + str + Formatted markdown + """ + + if IEXData := self.get(f"/stock/{symbol.id}/quote"): + + if symbol.symbol.upper() in self.otc_list: + return f"OTC - {symbol.symbol.upper()}, {IEXData['companyName']} most recent price is: $**{IEXData['latestPrice']}**" + + keys = ( + "extendedChangePercent", + "extendedPrice", + "companyName", + "latestPrice", + "changePercent", + ) + + if set(keys).issubset(IEXData): + + if change := IEXData.get("changePercent", 0): + change = round(change * 100, 2) + else: + change = 0 + + if ( + IEXData.get("isUSMarketOpen", True) + or (IEXData["extendedChangePercent"] is None) + or (IEXData["extendedPrice"] is None) + ): # Check if market is open. + message = f"The current stock price of {IEXData['companyName']} is $**{IEXData['latestPrice']}**" + else: + message = ( + f"{IEXData['companyName']} closed at $**{IEXData['latestPrice']}** with a change of {change}%," + + f" after hours _(15 minutes delayed)_ the stock price is $**{IEXData['extendedPrice']}**" + ) + if change := IEXData.get("extendedChangePercent", 0): + change = round(change * 100, 2) + else: + change = 0 + + # Determine wording of change text + if change > 0: + message += f", the stock is currently **up {change}%**" + elif change < 0: + message += f", the stock is currently **down {change}%**" + else: + message += ", the stock hasn't shown any movement today." + else: + message = ( + f"The symbol: {symbol} encountered and error. This could be due to " + ) + + else: + message = f"The symbol: {symbol} was not found." + + return message + + def dividend_reply(self, symbol: Stock) -> str: + """Returns the most recent, or next dividend date for a stock symbol. + + Parameters + ---------- + symbol : Stock + + Returns + ------- + str + Formatted markdown + """ + if symbol.symbol.upper() in self.otc_list: + return "OTC stocks do not currently support any commands." + + if resp := self.get(f"/stock/{symbol.id}/dividends/next"): + try: + IEXData = resp[0] + except IndexError as e: + logging.info(e) + return f"Getting dividend information for ${symbol.id.upper()} encountered an error. The provider for upcoming dividend information has been having issues recently which has likely caused this error. It is also possible that the stock has no dividend or does not exist." + keys = ( + "amount", + "currency", + "declaredDate", + "exDate", + "frequency", + "paymentDate", + "flag", + ) + + if set(keys).issubset(IEXData): + + if IEXData["currency"] == "USD": + price = f"${IEXData['amount']}" + else: + price = f"{IEXData['amount']} {IEXData['currency']}" + + # Pattern IEX uses for dividend date. + pattern = "%Y-%m-%d" + + declared = datetime.strptime(IEXData["declaredDate"], pattern).strftime( + "%A, %B %w" + ) + ex = datetime.strptime(IEXData["exDate"], pattern).strftime("%A, %B %w") + payment = datetime.strptime(IEXData["paymentDate"], pattern).strftime( + "%A, %B %w" + ) + + daysDelta = ( + datetime.strptime(IEXData["paymentDate"], pattern) - datetime.now() + ).days + + return ( + "The next dividend for " + + f"{self.symbol_list[self.symbol_list['symbol']==symbol.id.upper()]['description'].item()}" # Get full name without api call + + f" is on {payment} which is in {daysDelta} days." + + f" The dividend is for {price} per share." + + f"\n\nThe dividend was declared on {declared} and the ex-dividend date is {ex}" + ) + + return f"Getting dividend information for ${symbol.id.upper()} encountered an error. The provider for upcoming dividend information has been having issues recently which has likely caused this error. It is also possible that the stock has no dividend or does not exist." + + def news_reply(self, symbol: Stock) -> str: + """Gets most recent, english, non-paywalled news + + Parameters + ---------- + symbol : Stock + + Returns + ------- + str + Formatted markdown + """ + if symbol.symbol.upper() in self.otc_list: + return "OTC stocks do not currently support any commands." + + if data := self.get(f"/stock/{symbol.id}/news/last/15"): + line = [] + + for news in data: + if news["lang"] == "en" and not news["hasPaywall"]: + line.append( + f"*{news['source']}*: [{news['headline']}]({news['url']})" + ) + + return f"News for **{symbol.id.upper()}**:\n" + "\n".join(line[:5]) + + else: + return f"No news found for: {symbol.id}\nEither today is boring or the symbol does not exist." + + def info_reply(self, symbol: Stock) -> str: + """Gets description for Stock + + Parameters + ---------- + symbol : Stock + + Returns + ------- + str + Formatted text + """ + if symbol.symbol.upper() in self.otc_list: + return "OTC stocks do not currently support any commands." + + if data := self.get(f"/stock/{symbol.id}/company"): + [data.pop(k) for k in list(data) if data[k] == ""] + + if "description" in data: + return data["description"] + + return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist." + + def stat_reply(self, symbol: Stock) -> str: + """Key statistics on a Stock + + Parameters + ---------- + symbol : Stock + + Returns + ------- + str + Formatted markdown + """ + if symbol.symbol.upper() in self.otc_list: + return "OTC stocks do not currently support any commands." + + if data := self.get(f"/stock/{symbol.id}/stats"): + [data.pop(k) for k in list(data) if data[k] == ""] + + m = "" + if "companyName" in data: + m += f"Company Name: {data['companyName']}\n" + if "marketcap" in data: + m += f"Market Cap: ${data['marketcap']:,}\n" + if "week52high" in data: + m += f"52 Week (high-low): {data['week52high']:,} " + if "week52low" in data: + m += f"- {data['week52low']:,}\n" + if "employees" in data: + m += f"Number of Employees: {data['employees']:,}\n" + if "nextEarningsDate" in data: + m += f"Next Earnings Date: {data['nextEarningsDate']}\n" + if "peRatio" in data: + m += f"Price to Earnings: {data['peRatio']:.3f}\n" + if "beta" in data: + m += f"Beta: {data['beta']:.3f}\n" + return m + else: + return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist." + + def cap_reply(self, symbol: Stock) -> str: + """Get the Market Cap of a stock""" + + if data := self.get(f"/stock/{symbol.id}/stats"): + + try: + cap = data["marketcap"] + except KeyError: + return f"{symbol.id} returned an error." + + message = f"The current market cap of {symbol.name} is $**{cap:,.2f}**" + + else: + message = f"The Stock: {symbol.name} was not found or returned and error." + + return message + + def intra_reply(self, symbol: Stock) -> pd.DataFrame: + """Returns price data for a symbol since the last market open. + + Parameters + ---------- + symbol : str + Stock symbol. + + Returns + ------- + pd.DataFrame + Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. + """ + if symbol.symbol.upper() in self.otc_list: + return pd.DataFrame() + + if symbol.id.upper() not in list(self.symbol_list["symbol"]): + return pd.DataFrame() + + if data := self.get(f"/stock/{symbol.id}/intraday-prices"): + df = pd.DataFrame(data) + df.dropna(inplace=True, subset=["date", "minute", "high", "low", "volume"]) + df["DT"] = pd.to_datetime(df["date"] + "T" + df["minute"]) + df = df.set_index("DT") + return df + + return pd.DataFrame() + + def chart_reply(self, symbol: Stock) -> pd.DataFrame: + """Returns price data for a symbol of the past month up until the previous trading days close. + Also caches multiple requests made in the same day. + + Parameters + ---------- + symbol : str + Stock symbol. + + Returns + ------- + pd.DataFrame + Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. + """ + schedule.run_pending() + + if symbol.symbol.upper() in self.otc_list: + return pd.DataFrame() + + if symbol.id.upper() not in list(self.symbol_list["symbol"]): + return pd.DataFrame() + + try: # https://stackoverflow.com/a/3845776/8774114 + return self.charts[symbol.id.upper()] + except KeyError: + pass + + if data := self.get( + f"/stock/{symbol.id}/chart/1mm", + params={"chartInterval": 3, "includeToday": "false"}, + ): + df = pd.DataFrame(data) + df.dropna(inplace=True, subset=["date", "minute", "high", "low", "volume"]) + df["DT"] = pd.to_datetime(df["date"] + "T" + df["minute"]) + df = df.set_index("DT") + self.charts[symbol.id.upper()] = df + return df + + return pd.DataFrame() + + def spark_reply(self, symbol: Stock) -> str: + quote = self.get(f"/stock/{symbol.id}/quote") + + open_change = quote.get("changePercent", 0) + after_change = quote.get("extendedChangePercent", 0) + + change = 0 + + if open_change: + change = change + open_change + if after_change: + change = change + after_change + + change = change * 100 + + return f"`{symbol.tag}`: {quote['companyName']}, {change:.2f}%" + + def trending(self) -> list[str]: + """Gets current coins trending on IEX. Only returns when market is open. + + Returns + ------- + list[str] + list of $ID: NAME, CHANGE% + """ + + if data := self.get(f"/stock/market/list/mostactive"): + self.trending_cache = [ + f"`${s['symbol']}`: {s['companyName']}, {100*s['changePercent']:.2f}%" + for s in data + ] + + return self.trending_cache diff --git a/T_info.py b/T_info.py index 1ba184b..bba48ac 100644 --- a/T_info.py +++ b/T_info.py @@ -39,7 +39,7 @@ Simply calling a symbol in any message that the bot can see will also return the - `/help` Get some help using the bot. 🆘 **Inline Features** - You can type @SimpleStockBot `[search]` in any chat or direct message to search for the stock bots full list of stock symbols and return the price of the ticker. Then once you select the ticker want the bot will send a message as you in that chat with the latest stock price. + You can type @SimpleStockBot `[search]` in any chat or direct message to search for the stock bots full list of stock and crypto symbols and return the price. Then once you select the ticker want the bot will send a message as you in that chat with the latest stock price. Prices may be delayed by up to an hour. Market data is provided by [IEX Cloud](https://iexcloud.io) @@ -73,3 +73,24 @@ trending - Trending Stocks and Cryptos. 💬 intra - $[symbol] Plot since the last market open. 📈 chart - $[chart] Plot of the past month. 📊 """ # Not used by the bot but for updaing commands with BotFather + + +tests = """ +/info $tsla +/info $$btc +/news $tsla +/news $$btc +/stat $tsla +/stat $$btc +/cap $tsla +/cap $$btc +/dividend $tsla +/dividend $msft +/dividend $$btc +/intra $tsla +/intra $$btc +/chart $tsla +/chart $$btc +/help +/trending +""" diff --git a/bot.py b/bot.py index 88145ba..eb976ae 100644 --- a/bot.py +++ b/bot.py @@ -85,14 +85,17 @@ def license(update: Update, context: CallbackContext): def status(update: Update, context: CallbackContext): """Gather status of bot and dependant services and return important status updates.""" warning(f"Status command ran by {update.message.chat.username}") - bot_resp = datetime.datetime.now(update.message.date.tzinfo) - update.message.date + bot_resp_time = ( + datetime.datetime.now(update.message.date.tzinfo) - update.message.date + ) + + bot_status = s.status( + f"It took {bot_resp_time.total_seconds()} seconds for the bot to get your message." + ) update.message.reply_text( - text=s.status( - f"It took {bot_resp.total_seconds()} seconds for the bot to get your message." - ), + text=bot_status, parse_mode=telegram.ParseMode.MARKDOWN, - disable_notification=True, ) @@ -261,8 +264,8 @@ def information(update: Update, context: CallbackContext): def search(update: Update, context: CallbackContext): """ - Uses fuzzy search on full list of stocks and crypto names - and descriptions then returns the top matches in order. + Searches on full list of stocks and crypto descriptions + then returns the top matches in order of smallest symbol name length. """ info(f"Search command ran by {update.message.chat.username}") message = update.message.text.replace("/search ", "") @@ -275,11 +278,13 @@ def search(update: Update, context: CallbackContext): return context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING) - queries = s.search_symbols(message)[:10] - if queries: - reply = "*Search Results:*\n`$ticker: Company Name`\n`" + ("-" * 21) + "`\n" - for query in queries: - reply += "`" + query[1] + "`\n" + queries = s.inline_search(message, matches=10) + if not queries.empty: + reply = "*Search Results:*\n`$ticker` : Company Name\n`" + ("-" * 21) + "`\n" + for _, query in queries.iterrows(): + desc = query["description"] + reply += "`" + desc.replace(": ", "` : ") + "\n" + update.message.reply_text( text=reply, parse_mode=telegram.ParseMode.MARKDOWN, @@ -466,31 +471,45 @@ def inline_query(update: Update, context: CallbackContext): Handles inline query. Searches by looking if query is contained in the symbol and returns matches in alphabetical order. """ - info(f"Inline command ran by {update.message.chat.username}") + # info(f"Inline command ran by {update.message.chat.username}") info(f"Query: {update.inline_query.query}") - matches = s.inline_search(update.inline_query.query)[:5] - symbols = " ".join([match[1].split(":")[0] for match in matches]) - prices = s.batch_price_reply(s.find_symbols(symbols)) + ignored_queries = {"$", "$$", " ", ""} - results = [] - for match, price in zip(matches, prices): - try: - results.append( + if update.inline_query.query.strip() in ignored_queries: + default_message = """ + You can type:\n@SimpleStockBot `[search]`\nin any chat or direct message to search for the stock bots full list of stock and crypto symbols and return the price. + """ + + update.inline_query.answer( + [ InlineQueryResultArticle( str(uuid4()), - title=match[1], + title="Please enter a query. It can be a ticker or a name of a company.", input_message_content=InputTextMessageContent( - price, parse_mode=telegram.ParseMode.MARKDOWN + default_message, parse_mode=telegram.ParseMode.MARKDOWN ), ) + ] + ) + + matches = s.inline_search(update.inline_query.query) + + results = [] + for _, row in matches.iterrows(): + + results.append( + InlineQueryResultArticle( + str(uuid4()), + title=row["description"], + input_message_content=InputTextMessageContent( + row["price_reply"], parse_mode=telegram.ParseMode.MARKDOWN + ), ) - except TypeError: - warning(f"{match} caused error in inline query.") - pass + ) if len(results) == 5: - update.inline_query.answer(results) + update.inline_query.answer(results, cache_time=60 * 60) info("Inline Command was successful") return update.inline_query.answer(results) @@ -568,6 +587,7 @@ def main(): dp.add_handler(CommandHandler("random", rand_pick)) dp.add_handler(CommandHandler("donate", donate)) dp.add_handler(CommandHandler("status", status)) + dp.add_handler(CommandHandler("inline", inline_query)) # Charting can be slow so they run async. dp.add_handler(CommandHandler("intra", intra, run_async=True)) diff --git a/cg_Crypto.py b/cg_Crypto.py index 155475a..b60eddb 100644 --- a/cg_Crypto.py +++ b/cg_Crypto.py @@ -9,7 +9,6 @@ from typing import List, Optional, Tuple import pandas as pd import requests as r import schedule -from fuzzywuzzy import fuzz from markdownify import markdownify from Symbol import Coin @@ -74,7 +73,7 @@ class cg_Crypto: "$$" + symbols["symbol"].str.upper() + ": " + symbols["name"] ) symbols = symbols[["id", "symbol", "name", "description"]] - symbols["type_id"] = "$$" + symbols["id"] + symbols["type_id"] = "$$" + symbols["symbol"] self.symbol_list = symbols if return_df: @@ -99,45 +98,6 @@ class cg_Crypto: except: return f"CoinGecko API returned an error code {status.status_code} in {status.elapsed.total_seconds()} Seconds." - def search_symbols(self, search: str) -> List[Tuple[str, str]]: - """Performs a fuzzy search to find coin symbols closest to a search term. - - Parameters - ---------- - search : str - String used to search, could be a company name or something close to the companies coin ticker. - - Returns - ------- - List[tuple[str, str]] - A list tuples of every coin sorted in order of how well they match. Each tuple contains: (Symbol, Issue Name). - """ - schedule.run_pending() - search = search.lower() - try: # https://stackoverflow.com/a/3845776/8774114 - return self.searched_symbols[search] - except KeyError: - pass - - symbols = self.symbol_list - symbols["Match"] = symbols.apply( - lambda x: fuzz.ratio(search, f"{x['symbol']}".lower()), - axis=1, - ) - - symbols.sort_values(by="Match", ascending=False, inplace=True) - if symbols["Match"].head().sum() < 300: - symbols["Match"] = symbols.apply( - lambda x: fuzz.partial_ratio(search, x["name"].lower()), - axis=1, - ) - - symbols.sort_values(by="Match", ascending=False, inplace=True) - symbols = symbols.head(10) - symbol_list = list(zip(list(symbols["symbol"]), list(symbols["description"]))) - self.searched_symbols[search] = symbol_list - return symbol_list - def price_reply(self, coin: Coin) -> str: """Returns current market price or after hours if its available for a given coin symbol. diff --git a/requirements.txt b/requirements.txt index 9615514..d73eacb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,6 @@ python-telegram-bot==13.5 requests==2.25.1 pandas==1.2.1 -fuzzywuzzy==0.18.0 -python-Levenshtein==0.12.1 schedule==1.0.0 mplfinance==0.12.7a5 markdownify==0.6.5 diff --git a/symbol_router.py b/symbol_router.py index 55f4f4e..63858bf 100644 --- a/symbol_router.py +++ b/symbol_router.py @@ -9,7 +9,6 @@ from logging import critical, debug, error, info, warning import pandas as pd import schedule from cachetools import TTLCache, cached -from fuzzywuzzy import fuzz from cg_Crypto import cg_Crypto from IEX_Symbol import IEX_Symbol @@ -19,7 +18,6 @@ from Symbol import Coin, Stock, Symbol class Router: STOCK_REGEX = "(?:^|[^\\$])\\$([a-zA-Z.]{1,6})" CRYPTO_REGEX = "[$]{2}([a-zA-Z]{1,20})" - searched_symbols = {} trending_count = {} def __init__(self): @@ -110,45 +108,7 @@ class Router: return stats - def search_symbols(self, search: str) -> list[tuple[str, str]]: - """Performs a fuzzy search to find stock symbols closest to a search term. - - Parameters - ---------- - search : str - String used to search, could be a company name or something close to the companies stock ticker. - - Returns - ------- - list[tuple[str, str]] - A list tuples of every stock sorted in order of how well they match. - Each tuple contains: (Symbol, Issue Name). - """ - - df = pd.concat([self.stock.symbol_list, self.crypto.symbol_list]) - - search = search.lower() - - df["Match"] = df.apply( - lambda x: fuzz.ratio(search, f"{x['symbol']}".lower()), - axis=1, - ) - - df.sort_values(by="Match", ascending=False, inplace=True) - # if df["Match"].head().sum() < 300: - # df["Match"] = df.apply( - # lambda x: fuzz.partial_ratio(search, x["name"].lower()), - # axis=1, - # ) - - # df.sort_values(by="Match", ascending=False, inplace=True) - - symbols = df.head(20) - symbol_list = list(zip(list(symbols["symbol"]), list(symbols["description"]))) - self.searched_symbols[search] = symbol_list - return symbol_list - - def inline_search(self, search: str) -> list[tuple[str, str]]: + def inline_search(self, search: str, matches: int = 5) -> pd.DataFrame: """Searches based on the shortest symbol that contains the same string as the search. Should be very fast compared to a fuzzy search. @@ -165,16 +125,16 @@ class Router: df = pd.concat([self.stock.symbol_list, self.crypto.symbol_list]) - search = search.lower() + df = df[ + df["description"].str.contains(search, regex=False, case=False) + ].sort_values(by="type_id", key=lambda x: x.str.len()) - df = df[df["type_id"].str.contains(search, regex=False)].sort_values( - by="type_id", key=lambda x: x.str.len() + symbols = df.head(matches) + symbols["price_reply"] = symbols["type_id"].apply( + lambda sym: self.price_reply(self.find_symbols(sym))[0] ) - symbols = df.head(20) - symbol_list = list(zip(list(symbols["symbol"]), list(symbols["description"]))) - self.searched_symbols[search] = symbol_list - return symbol_list + return symbols def price_reply(self, symbols: list[Symbol]) -> list[str]: """Returns current market price or after hours if its available for a given stock symbol.