1
0
mirror of https://gitlab.com/simple-stock-bots/simple-stock-bot.git synced 2025-06-16 07:16:40 +00:00

fixed stock market cap bug

This commit is contained in:
Anson Biggs 2021-11-05 23:31:24 -07:00
parent d533c2c4a2
commit 9d510fc104

View File

@ -1,479 +1,479 @@
"""Class with functions for running the bot with IEX Cloud. """Class with functions for running the bot with IEX Cloud.
""" """
import logging import logging
import os import os
from datetime import datetime from datetime import datetime
from logging import warning from logging import warning
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
import pandas as pd import pandas as pd
import requests as r import requests as r
import schedule import schedule
from Symbol import Stock from Symbol import Stock
class IEX_Symbol: class IEX_Symbol:
""" """
Functions for finding stock market information about symbols. Functions for finding stock market information about symbols.
""" """
SYMBOL_REGEX = "[$]([a-zA-Z]{1,4})" SYMBOL_REGEX = "[$]([a-zA-Z]{1,4})"
searched_symbols = {} searched_symbols = {}
otc_list = [] otc_list = []
charts = {} charts = {}
trending_cache = None trending_cache = None
def __init__(self) -> None: def __init__(self) -> None:
"""Creates a Symbol Object """Creates a Symbol Object
Parameters Parameters
---------- ----------
IEX_TOKEN : str IEX_TOKEN : str
IEX API Token IEX API Token
""" """
try: try:
self.IEX_TOKEN = os.environ["IEX"] self.IEX_TOKEN = os.environ["IEX"]
except KeyError: except KeyError:
self.IEX_TOKEN = "" self.IEX_TOKEN = ""
warning( warning(
"Starting without an IEX Token will not allow you to get market data!" "Starting without an IEX Token will not allow you to get market data!"
) )
if self.IEX_TOKEN != "": if self.IEX_TOKEN != "":
self.get_symbol_list() self.get_symbol_list()
schedule.every().day.do(self.get_symbol_list) schedule.every().day.do(self.get_symbol_list)
schedule.every().day.do(self.clear_charts) schedule.every().day.do(self.clear_charts)
def get(self, endpoint, params: dict = {}, timeout=5) -> dict: def get(self, endpoint, params: dict = {}, timeout=5) -> dict:
url = "https://cloud.iexapis.com/stable" + endpoint url = "https://cloud.iexapis.com/stable" + endpoint
# set token param if it wasn't passed. # set token param if it wasn't passed.
params["token"] = params.get("token", self.IEX_TOKEN) params["token"] = params.get("token", self.IEX_TOKEN)
resp = r.get(url, params=params, timeout=timeout) resp = r.get(url, params=params, timeout=timeout)
# Make sure API returned a proper status code # Make sure API returned a proper status code
try: try:
resp.raise_for_status() resp.raise_for_status()
except r.exceptions.HTTPError as e: except r.exceptions.HTTPError as e:
logging.error(e) logging.error(e)
return {} return {}
# Make sure API returned valid JSON # Make sure API returned valid JSON
try: try:
resp_json = resp.json() resp_json = resp.json()
return resp_json return resp_json
except r.exceptions.JSONDecodeError as e: except r.exceptions.JSONDecodeError as e:
logging.error(e) logging.error(e)
return {} return {}
def clear_charts(self) -> None: def clear_charts(self) -> None:
""" """
Clears cache of chart data. Clears cache of chart data.
Charts are cached so that only 1 API call per 24 hours is needed since the Charts are cached so that only 1 API call per 24 hours is needed since the
chart data is expensive and a large download. chart data is expensive and a large download.
""" """
self.charts = {} self.charts = {}
def get_symbol_list( def get_symbol_list(
self, return_df=False self, return_df=False
) -> Optional[Tuple[pd.DataFrame, datetime]]: ) -> Optional[Tuple[pd.DataFrame, datetime]]:
"""Gets list of all symbols supported by IEX """Gets list of all symbols supported by IEX
Parameters Parameters
---------- ----------
return_df : bool, optional return_df : bool, optional
return the dataframe of all stock symbols, by default False return the dataframe of all stock symbols, by default False
Returns Returns
------- -------
Optional[Tuple[pd.DataFrame, datetime]] Optional[Tuple[pd.DataFrame, datetime]]
If `return_df` is set to `True` returns a dataframe, otherwise returns `None`. If `return_df` is set to `True` returns a dataframe, otherwise returns `None`.
""" """
reg_symbols = self.get("/ref-data/symbols") reg_symbols = self.get("/ref-data/symbols")
otc_symbols = self.get("/ref-data/otc/symbols") otc_symbols = self.get("/ref-data/otc/symbols")
reg = pd.DataFrame(data=reg_symbols) reg = pd.DataFrame(data=reg_symbols)
otc = pd.DataFrame(data=otc_symbols) otc = pd.DataFrame(data=otc_symbols)
self.otc_list = set(otc["symbol"].to_list()) self.otc_list = set(otc["symbol"].to_list())
symbols = pd.concat([reg, otc]) symbols = pd.concat([reg, otc])
symbols["description"] = "$" + symbols["symbol"] + ": " + symbols["name"] symbols["description"] = "$" + symbols["symbol"] + ": " + symbols["name"]
symbols["id"] = symbols["symbol"] symbols["id"] = symbols["symbol"]
symbols["type_id"] = "$" + symbols["symbol"].str.lower() symbols["type_id"] = "$" + symbols["symbol"].str.lower()
symbols = symbols[["id", "symbol", "name", "description", "type_id"]] symbols = symbols[["id", "symbol", "name", "description", "type_id"]]
self.symbol_list = symbols self.symbol_list = symbols
if return_df: if return_df:
return symbols, datetime.now() return symbols, datetime.now()
def status(self) -> str: def status(self) -> str:
"""Checks IEX Status dashboard for any current API issues. """Checks IEX Status dashboard for any current API issues.
Returns Returns
------- -------
str str
Human readable text on status of IEX API Human readable text on status of IEX API
""" """
resp = r.get( resp = r.get(
"https://pjmps0c34hp7.statuspage.io/api/v2/status.json", "https://pjmps0c34hp7.statuspage.io/api/v2/status.json",
timeout=15, timeout=15,
) )
if resp.status_code == 200: if resp.status_code == 200:
status = resp.json()["status"] status = resp.json()["status"]
else: else:
return "IEX Cloud did not respond. Please check their status page for more information. https://status.iexapis.com" return "IEX Cloud did not respond. Please check their status page for more information. https://status.iexapis.com"
if status["indicator"] == "none": if status["indicator"] == "none":
return "IEX Cloud is currently not reporting any issues with its API." return "IEX Cloud is currently not reporting any issues with its API."
else: else:
return ( return (
f"{status['indicator']}: {status['description']}." f"{status['indicator']}: {status['description']}."
+ " Please check the status page for more information. https://status.iexapis.com" + " Please check the status page for more information. https://status.iexapis.com"
) )
def price_reply(self, symbol: Stock) -> str: def price_reply(self, symbol: Stock) -> str:
"""Returns price movement of Stock for the last market day, or after hours. """Returns price movement of Stock for the last market day, or after hours.
Parameters Parameters
---------- ----------
symbol : Stock symbol : Stock
Returns Returns
------- -------
str str
Formatted markdown Formatted markdown
""" """
if IEXData := self.get(f"/stock/{symbol.id}/quote"): if IEXData := self.get(f"/stock/{symbol.id}/quote"):
if symbol.symbol.upper() in self.otc_list: if symbol.symbol.upper() in self.otc_list:
return f"OTC - {symbol.symbol.upper()}, {IEXData['companyName']} most recent price is: $**{IEXData['latestPrice']}**" return f"OTC - {symbol.symbol.upper()}, {IEXData['companyName']} most recent price is: $**{IEXData['latestPrice']}**"
keys = ( keys = (
"extendedChangePercent", "extendedChangePercent",
"extendedPrice", "extendedPrice",
"companyName", "companyName",
"latestPrice", "latestPrice",
"changePercent", "changePercent",
) )
if set(keys).issubset(IEXData): if set(keys).issubset(IEXData):
if change := IEXData.get("changePercent", 0): if change := IEXData.get("changePercent", 0):
change = round(change * 100, 2) change = round(change * 100, 2)
else: else:
change = 0 change = 0
if ( if (
IEXData.get("isUSMarketOpen", True) IEXData.get("isUSMarketOpen", True)
or (IEXData["extendedChangePercent"] is None) or (IEXData["extendedChangePercent"] is None)
or (IEXData["extendedPrice"] is None) or (IEXData["extendedPrice"] is None)
): # Check if market is open. ): # Check if market is open.
message = f"The current stock price of {IEXData['companyName']} is $**{IEXData['latestPrice']}**" message = f"The current stock price of {IEXData['companyName']} is $**{IEXData['latestPrice']}**"
else: else:
message = ( message = (
f"{IEXData['companyName']} closed at $**{IEXData['latestPrice']}** with a change of {change}%," f"{IEXData['companyName']} closed at $**{IEXData['latestPrice']}** with a change of {change}%,"
+ f" after hours _(15 minutes delayed)_ the stock price is $**{IEXData['extendedPrice']}**" + f" after hours _(15 minutes delayed)_ the stock price is $**{IEXData['extendedPrice']}**"
) )
if change := IEXData.get("extendedChangePercent", 0): if change := IEXData.get("extendedChangePercent", 0):
change = round(change * 100, 2) change = round(change * 100, 2)
else: else:
change = 0 change = 0
# Determine wording of change text # Determine wording of change text
if change > 0: if change > 0:
message += f", the stock is currently **up {change}%**" message += f", the stock is currently **up {change}%**"
elif change < 0: elif change < 0:
message += f", the stock is currently **down {change}%**" message += f", the stock is currently **down {change}%**"
else: else:
message += ", the stock hasn't shown any movement today." message += ", the stock hasn't shown any movement today."
else: else:
message = ( message = (
f"The symbol: {symbol} encountered and error. This could be due to " f"The symbol: {symbol} encountered and error. This could be due to "
) )
else: else:
message = f"The symbol: {symbol} was not found." message = f"The symbol: {symbol} was not found."
return message return message
def dividend_reply(self, symbol: Stock) -> str: def dividend_reply(self, symbol: Stock) -> str:
"""Returns the most recent, or next dividend date for a stock symbol. """Returns the most recent, or next dividend date for a stock symbol.
Parameters Parameters
---------- ----------
symbol : Stock symbol : Stock
Returns Returns
------- -------
str str
Formatted markdown Formatted markdown
""" """
if symbol.symbol.upper() in self.otc_list: if symbol.symbol.upper() in self.otc_list:
return "OTC stocks do not currently support any commands." return "OTC stocks do not currently support any commands."
if resp := self.get(f"/stock/{symbol.id}/dividends/next"): if resp := self.get(f"/stock/{symbol.id}/dividends/next"):
try: try:
IEXData = resp[0] IEXData = resp[0]
except IndexError as e: except IndexError as e:
return f"${symbol.id.upper()} either doesn't exist or pays no dividend." return f"${symbol.id.upper()} either doesn't exist or pays no dividend."
keys = ( keys = (
"amount", "amount",
"currency", "currency",
"declaredDate", "declaredDate",
"exDate", "exDate",
"frequency", "frequency",
"paymentDate", "paymentDate",
"flag", "flag",
) )
if set(keys).issubset(IEXData): if set(keys).issubset(IEXData):
if IEXData["currency"] == "USD": if IEXData["currency"] == "USD":
price = f"${IEXData['amount']}" price = f"${IEXData['amount']}"
else: else:
price = f"{IEXData['amount']} {IEXData['currency']}" price = f"{IEXData['amount']} {IEXData['currency']}"
# Pattern IEX uses for dividend date. # Pattern IEX uses for dividend date.
pattern = "%Y-%m-%d" pattern = "%Y-%m-%d"
declared = datetime.strptime(IEXData["declaredDate"], pattern).strftime( declared = datetime.strptime(IEXData["declaredDate"], pattern).strftime(
"%A, %B %w" "%A, %B %w"
) )
ex = datetime.strptime(IEXData["exDate"], pattern).strftime("%A, %B %w") ex = datetime.strptime(IEXData["exDate"], pattern).strftime("%A, %B %w")
payment = datetime.strptime(IEXData["paymentDate"], pattern).strftime( payment = datetime.strptime(IEXData["paymentDate"], pattern).strftime(
"%A, %B %w" "%A, %B %w"
) )
daysDelta = ( daysDelta = (
datetime.strptime(IEXData["paymentDate"], pattern) - datetime.now() datetime.strptime(IEXData["paymentDate"], pattern) - datetime.now()
).days ).days
return ( return (
"The next dividend for " "The next dividend for "
+ f"{self.symbol_list[self.symbol_list['symbol']==symbol.id.upper()]['description'].item()}" # Get full name without api call + f"{self.symbol_list[self.symbol_list['symbol']==symbol.id.upper()]['description'].item()}" # Get full name without api call
+ f" is on {payment} which is in {daysDelta} days." + f" is on {payment} which is in {daysDelta} days."
+ f" The dividend is for {price} per share." + f" The dividend is for {price} per share."
+ f"\n\nThe dividend was declared on {declared} and the ex-dividend date is {ex}" + f"\n\nThe dividend was declared on {declared} and the ex-dividend date is {ex}"
) )
return f"${symbol.id.upper()} either doesn't exist or pays no dividend." return f"${symbol.id.upper()} either doesn't exist or pays no dividend."
def news_reply(self, symbol: Stock) -> str: def news_reply(self, symbol: Stock) -> str:
"""Gets most recent, english, non-paywalled news """Gets most recent, english, non-paywalled news
Parameters Parameters
---------- ----------
symbol : Stock symbol : Stock
Returns Returns
------- -------
str str
Formatted markdown Formatted markdown
""" """
if symbol.symbol.upper() in self.otc_list: if symbol.symbol.upper() in self.otc_list:
return "OTC stocks do not currently support any commands." return "OTC stocks do not currently support any commands."
if data := self.get(f"/stock/{symbol.id}/news/last/15"): if data := self.get(f"/stock/{symbol.id}/news/last/15"):
line = [] line = []
for news in data: for news in data:
if news["lang"] == "en" and not news["hasPaywall"]: if news["lang"] == "en" and not news["hasPaywall"]:
line.append( line.append(
f"*{news['source']}*: [{news['headline']}]({news['url']})" f"*{news['source']}*: [{news['headline']}]({news['url']})"
) )
return f"News for **{symbol.id.upper()}**:\n" + "\n".join(line[:5]) return f"News for **{symbol.id.upper()}**:\n" + "\n".join(line[:5])
else: else:
return f"No news found for: {symbol.id}\nEither today is boring or the symbol does not exist." return f"No news found for: {symbol.id}\nEither today is boring or the symbol does not exist."
def info_reply(self, symbol: Stock) -> str: def info_reply(self, symbol: Stock) -> str:
"""Gets description for Stock """Gets description for Stock
Parameters Parameters
---------- ----------
symbol : Stock symbol : Stock
Returns Returns
------- -------
str str
Formatted text Formatted text
""" """
if symbol.symbol.upper() in self.otc_list: if symbol.symbol.upper() in self.otc_list:
return "OTC stocks do not currently support any commands." return "OTC stocks do not currently support any commands."
if data := self.get(f"/stock/{symbol.id}/company"): if data := self.get(f"/stock/{symbol.id}/company"):
[data.pop(k) for k in list(data) if data[k] == ""] [data.pop(k) for k in list(data) if data[k] == ""]
if "description" in data: if "description" in data:
return data["description"] return data["description"]
return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist." return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist."
def stat_reply(self, symbol: Stock) -> str: def stat_reply(self, symbol: Stock) -> str:
"""Key statistics on a Stock """Key statistics on a Stock
Parameters Parameters
---------- ----------
symbol : Stock symbol : Stock
Returns Returns
------- -------
str str
Formatted markdown Formatted markdown
""" """
if symbol.symbol.upper() in self.otc_list: if symbol.symbol.upper() in self.otc_list:
return "OTC stocks do not currently support any commands." return "OTC stocks do not currently support any commands."
if data := self.get(f"/stock/{symbol.id}/stats"): if data := self.get(f"/stock/{symbol.id}/stats"):
[data.pop(k) for k in list(data) if data[k] == ""] [data.pop(k) for k in list(data) if data[k] == ""]
m = "" m = ""
if "companyName" in data: if "companyName" in data:
m += f"Company Name: {data['companyName']}\n" m += f"Company Name: {data['companyName']}\n"
if "marketcap" in data: if "marketcap" in data:
m += f"Market Cap: ${data['marketcap']:,}\n" m += f"Market Cap: ${data['marketcap']:,}\n"
if "week52high" in data: if "week52high" in data:
m += f"52 Week (high-low): {data['week52high']:,} " m += f"52 Week (high-low): {data['week52high']:,} "
if "week52low" in data: if "week52low" in data:
m += f"- {data['week52low']:,}\n" m += f"- {data['week52low']:,}\n"
if "employees" in data: if "employees" in data:
m += f"Number of Employees: {data['employees']:,}\n" m += f"Number of Employees: {data['employees']:,}\n"
if "nextEarningsDate" in data: if "nextEarningsDate" in data:
m += f"Next Earnings Date: {data['nextEarningsDate']}\n" m += f"Next Earnings Date: {data['nextEarningsDate']}\n"
if "peRatio" in data: if "peRatio" in data:
m += f"Price to Earnings: {data['peRatio']:.3f}\n" m += f"Price to Earnings: {data['peRatio']:.3f}\n"
if "beta" in data: if "beta" in data:
m += f"Beta: {data['beta']:.3f}\n" m += f"Beta: {data['beta']:.3f}\n"
return m return m
else: else:
return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist." return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist."
def cap_reply(self, symbol: Stock) -> str: def cap_reply(self, symbol: Stock) -> str:
"""Get the Market Cap of a stock""" """Get the Market Cap of a stock"""
if data := self.get(f"/stable/stock/{symbol.id}/stats"): if data := self.get(f"/stock/{symbol.id}/stats"):
try: try:
cap = data["marketcap"] cap = data["marketcap"]
except KeyError: except KeyError:
return f"{symbol.id} returned an error." return f"{symbol.id} returned an error."
message = f"The current market cap of {symbol.name} is $**{cap:,.2f}**" message = f"The current market cap of {symbol.name} is $**{cap:,.2f}**"
else: else:
message = f"The Stock: {symbol.name} was not found or returned and error." message = f"The Stock: {symbol.name} was not found or returned and error."
return message return message
def intra_reply(self, symbol: Stock) -> pd.DataFrame: def intra_reply(self, symbol: Stock) -> pd.DataFrame:
"""Returns price data for a symbol since the last market open. """Returns price data for a symbol since the last market open.
Parameters Parameters
---------- ----------
symbol : str symbol : str
Stock symbol. Stock symbol.
Returns Returns
------- -------
pd.DataFrame pd.DataFrame
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
""" """
if symbol.symbol.upper() in self.otc_list: if symbol.symbol.upper() in self.otc_list:
return pd.DataFrame() return pd.DataFrame()
if symbol.id.upper() not in list(self.symbol_list["symbol"]): if symbol.id.upper() not in list(self.symbol_list["symbol"]):
return pd.DataFrame() return pd.DataFrame()
if data := self.get(f"/stock/{symbol.id}/intraday-prices"): if data := self.get(f"/stock/{symbol.id}/intraday-prices"):
df = pd.DataFrame(data) df = pd.DataFrame(data)
df.dropna(inplace=True, subset=["date", "minute", "high", "low", "volume"]) df.dropna(inplace=True, subset=["date", "minute", "high", "low", "volume"])
df["DT"] = pd.to_datetime(df["date"] + "T" + df["minute"]) df["DT"] = pd.to_datetime(df["date"] + "T" + df["minute"])
df = df.set_index("DT") df = df.set_index("DT")
return df return df
return pd.DataFrame() return pd.DataFrame()
def chart_reply(self, symbol: Stock) -> pd.DataFrame: def chart_reply(self, symbol: Stock) -> pd.DataFrame:
"""Returns price data for a symbol of the past month up until the previous trading days close. """Returns price data for a symbol of the past month up until the previous trading days close.
Also caches multiple requests made in the same day. Also caches multiple requests made in the same day.
Parameters Parameters
---------- ----------
symbol : str symbol : str
Stock symbol. Stock symbol.
Returns Returns
------- -------
pd.DataFrame pd.DataFrame
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame. Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
""" """
schedule.run_pending() schedule.run_pending()
if symbol.symbol.upper() in self.otc_list: if symbol.symbol.upper() in self.otc_list:
return pd.DataFrame() return pd.DataFrame()
if symbol.id.upper() not in list(self.symbol_list["symbol"]): if symbol.id.upper() not in list(self.symbol_list["symbol"]):
return pd.DataFrame() return pd.DataFrame()
try: # https://stackoverflow.com/a/3845776/8774114 try: # https://stackoverflow.com/a/3845776/8774114
return self.charts[symbol.id.upper()] return self.charts[symbol.id.upper()]
except KeyError: except KeyError:
pass pass
if data := self.get( if data := self.get(
f"/stock/{symbol.id}/chart/1mm", f"/stock/{symbol.id}/chart/1mm",
params={"chartInterval": 3, "includeToday": "false"}, params={"chartInterval": 3, "includeToday": "false"},
): ):
df = pd.DataFrame(data) df = pd.DataFrame(data)
df.dropna(inplace=True, subset=["date", "minute", "high", "low", "volume"]) df.dropna(inplace=True, subset=["date", "minute", "high", "low", "volume"])
df["DT"] = pd.to_datetime(df["date"] + "T" + df["minute"]) df["DT"] = pd.to_datetime(df["date"] + "T" + df["minute"])
df = df.set_index("DT") df = df.set_index("DT")
self.charts[symbol.id.upper()] = df self.charts[symbol.id.upper()] = df
return df return df
return pd.DataFrame() return pd.DataFrame()
def spark_reply(self, symbol: Stock) -> str: def spark_reply(self, symbol: Stock) -> str:
quote = self.get(f"/stock/{symbol.id}/quote") quote = self.get(f"/stock/{symbol.id}/quote")
open_change = quote.get("changePercent", 0) open_change = quote.get("changePercent", 0)
after_change = quote.get("extendedChangePercent", 0) after_change = quote.get("extendedChangePercent", 0)
change = 0 change = 0
if open_change: if open_change:
change = change + open_change change = change + open_change
if after_change: if after_change:
change = change + after_change change = change + after_change
change = change * 100 change = change * 100
return f"`{symbol.tag}`: {quote['companyName']}, {change:.2f}%" return f"`{symbol.tag}`: {quote['companyName']}, {change:.2f}%"
def trending(self) -> list[str]: def trending(self) -> list[str]:
"""Gets current coins trending on IEX. Only returns when market is open. """Gets current coins trending on IEX. Only returns when market is open.
Returns Returns
------- -------
list[str] list[str]
list of $ID: NAME, CHANGE% list of $ID: NAME, CHANGE%
""" """
if data := self.get(f"/stock/market/list/mostactive"): if data := self.get(f"/stock/market/list/mostactive"):
self.trending_cache = [ self.trending_cache = [
f"`${s['symbol']}`: {s['companyName']}, {100*s['changePercent']:.2f}%" f"`${s['symbol']}`: {s['companyName']}, {100*s['changePercent']:.2f}%"
for s in data for s in data
] ]
return self.trending_cache return self.trending_cache