1
0
mirror of https://gitlab.com/simple-stock-bots/simple-stock-bot.git synced 2025-06-16 07:16:40 +00:00

Resolve "Switch to marketdata.app for stock market data"

This commit is contained in:
Anson Biggs 2023-04-07 20:39:24 +00:00
parent 06e1f62fff
commit c2d73815f6
15 changed files with 347 additions and 982 deletions

View File

@ -1,11 +1,11 @@
FROM python:3.9-buster AS builder FROM python:3.11-buster AS builder
COPY requirements.txt /requirements.txt COPY requirements.txt /requirements.txt
RUN pip install --user -r requirements.txt RUN pip install --user -r requirements.txt
FROM python:3.9-slim FROM python:3.11-slim
ENV MPLBACKEND=Agg ENV MPLBACKEND=Agg

View File

@ -5,100 +5,28 @@ black:
- black --check --verbose -- . - black --check --verbose -- .
#Following instructions (as of 2020-04-01): https://docs.gitlab.com/ee/ci/docker/using_kaniko.html build:master:
#Kaniko docs are here: https://github.com/GoogleContainerTools/kaniko
#While this example shows building to multiple registries for all branches, with a few modifications
# it can be used to build non-master branches to a "dev" container registry and only build master to
# a production container registry
image:
name: gcr.io/kaniko-project/executor:debug
entrypoint: [""]
variables:
#More Information on Kaniko Caching: https://cloud.google.com/build/docs/kaniko-cache
KANIKO_CACHE_ARGS: "--cache=true --cache-copy-layers=true --cache-ttl=24h"
VERSIONLABELMETHOD: "OnlyIfThisCommitHasVersion" # options: "OnlyIfThisCommitHasVersion","LastVersionTagInGit"
IMAGE_LABELS: >
--label org.opencontainers.image.vendor=$CI_SERVER_URL/$GITLAB_USER_LOGIN
--label org.opencontainers.image.authors=$CI_SERVER_URL/$GITLAB_USER_LOGIN
--label org.opencontainers.image.revision=$CI_COMMIT_SHA
--label org.opencontainers.image.source=$CI_PROJECT_URL
--label org.opencontainers.image.documentation=$CI_PROJECT_URL
--label org.opencontainers.image.licenses=$CI_PROJECT_URL
--label org.opencontainers.image.url=$CI_PROJECT_URL
--label vcs-url=$CI_PROJECT_URL
--label com.gitlab.ci.user=$CI_SERVER_URL/$GITLAB_USER_LOGIN
--label com.gitlab.ci.email=$GITLAB_USER_EMAIL
--label com.gitlab.ci.tagorbranch=$CI_COMMIT_REF_NAME
--label com.gitlab.ci.pipelineurl=$CI_PIPELINE_URL
--label com.gitlab.ci.commiturl=$CI_PROJECT_URL/commit/$CI_COMMIT_SHA
--label com.gitlab.ci.cijoburl=$CI_JOB_URL
--label com.gitlab.ci.mrurl=$CI_PROJECT_URL/-/merge_requests/$CI_MERGE_REQUEST_ID
get-latest-git-version:
stage: .pre
image:
name: alpine/git
entrypoint: [""]
rules:
- if: '$VERSIONLABELMETHOD == "LastVersionTagInGit"'
script:
- |
echo "the google kaniko container does not have git and does not have a packge manager to install it"
git clone https://github.com/GoogleContainerTools/kaniko.git
cd kaniko
echo "$(git describe --abbrev=0 --tags)" > ../VERSIONTAG.txt
echo "VERSIONTAG.txt contains $(cat ../VERSIONTAG.txt)"
artifacts:
paths:
- VERSIONTAG.txt
.build_with_kaniko:
#Hidden job to use as an "extends" template
stage: build stage: build
image:
name: gcr.io/kaniko-project/executor:v1.9.0-debug
entrypoint: [""]
script: script:
- | - /kaniko/executor
echo "Building and shipping image to $CI_REGISTRY_IMAGE" --context "${CI_PROJECT_DIR}"
#Build date for opencontainers --dockerfile "${CI_PROJECT_DIR}/Dockerfile"
BUILDDATE="'$(date '+%FT%T%z' | sed -E -n 's/(\+[0-9]{2})([0-9]{2})$/\1:\2/p')'" #rfc 3339 date --destination "${CI_REGISTRY_IMAGE}:latest"
IMAGE_LABELS="$IMAGE_LABELS --label org.opencontainers.image.created=$BUILDDATE --label build-date=$BUILDDATE" rules:
#Description for opencontainers - if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "master"'
BUILDTITLE=$(echo $CI_PROJECT_TITLE | tr " " "_")
IMAGE_LABELS="$IMAGE_LABELS --label org.opencontainers.image.title=$BUILDTITLE --label org.opencontainers.image.description=$BUILDTITLE"
#Add ref.name for opencontainers
IMAGE_LABELS="$IMAGE_LABELS --label org.opencontainers.image.ref.name=$CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME"
#Build Version Label and Tag from git tag, LastVersionTagInGit was placed by a previous job artifact
if [[ "$VERSIONLABELMETHOD" == "LastVersionTagInGit" ]]; then VERSIONLABEL=$(cat VERSIONTAG.txt); fi
if [[ "$VERSIONLABELMETHOD" == "OnlyIfThisCommitHasVersion" ]]; then VERSIONLABEL=$CI_COMMIT_TAG; fi
if [[ ! -z "$VERSIONLABEL" ]]; then
IMAGE_LABELS="$IMAGE_LABELS --label org.opencontainers.image.version=$VERSIONLABEL"
ADDITIONALTAGLIST="$ADDITIONALTAGLIST $VERSIONLABEL"
fi
ADDITIONALTAGLIST="$ADDITIONALTAGLIST $CI_COMMIT_REF_NAME $CI_COMMIT_SHORT_SHA"
if [[ "$CI_COMMIT_BRANCH" == "$CI_DEFAULT_BRANCH" ]]; then ADDITIONALTAGLIST="$ADDITIONALTAGLIST latest"; fi
if [[ -n "$ADDITIONALTAGLIST" ]]; then
for TAG in $ADDITIONALTAGLIST; do
FORMATTEDTAGLIST="${FORMATTEDTAGLIST} --tag $CI_REGISTRY_IMAGE:$TAG ";
done;
fi
#Reformat Docker tags to kaniko's --destination argument:
FORMATTEDTAGLIST=$(echo "${FORMATTEDTAGLIST}" | sed s/\-\-tag/\-\-destination/g)
echo "Kaniko arguments to run: --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/Dockerfile $KANIKO_CACHE_ARGS $FORMATTEDTAGLIST $IMAGE_LABELS"
mkdir -p /kaniko/.docker
echo "{\"auths\":{\"$CI_REGISTRY\":{\"auth\":\"$(echo -n $CI_REGISTRY_USER:$CI_REGISTRY_PASSWORD | base64)\"}}}" > /kaniko/.docker/config.json
/kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/Dockerfile $KANIKO_CACHE_ARGS $FORMATTEDTAGLIST $IMAGE_LABELS
build-for-gitlab-project-registry: build:master:
extends: .build_with_kaniko stage: build
environment: image:
#This is only here for completeness, since there are no CI CD Variables with this scope, the project defaults are used name: gcr.io/kaniko-project/executor:v1.9.0-debug
# to push to this projects docker registry entrypoint: [""]
name: push-to-gitlab-project-registry script:
- /kaniko/executor
--context "${CI_PROJECT_DIR}"
--dockerfile "${CI_PROJECT_DIR}/Dockerfile"
--destination "${CI_REGISTRY_IMAGE}:${CI_COMMIT_SHORT_SHA}"
--destination "${CI_REGISTRY_IMAGE}:${CI_COMMIT_BRANCH}"

7
.vscode/launch.json vendored
View File

@ -1,15 +1,12 @@
{ {
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [ "configurations": [
{ {
"name": "Telegram Bot", "name": "Telegram Bot",
"type": "python", "type": "python",
"request": "launch", "request": "launch",
"program": "bot.py", "program": "bot.py",
"console": "integratedTerminal" "console": "integratedTerminal",
"python": "python3.11"
} }
] ]
} }

12
.vscode/settings.json vendored
View File

@ -1,13 +1,5 @@
{ {
"workbench.iconTheme": "vscode-icons",
"editor.suggestSelection": "first",
"vsintellicode.modify.editor.suggestSelection": "automaticallyOverrodeDefaultValue",
"python.languageServer": "Pylance",
"git.autofetch": true,
"editor.formatOnSave": true,
"files.associations": {
"DockerDev": "dockerfile",
},
"python.formatting.provider": "black", "python.formatting.provider": "black",
"python.showStartPage": false, "python.linting.mypyEnabled": true,
"python.linting.flake8Enabled": true,
} }

View File

@ -1,11 +1,11 @@
FROM python:3.9-buster AS builder FROM python:3.11-buster AS builder
COPY requirements.txt /requirements.txt COPY requirements.txt /requirements.txt
RUN pip install --user -r requirements.txt RUN pip install --user -r requirements.txt
FROM python:3.9-slim FROM python:3.11-slim
ENV MPLBACKEND=Agg ENV MPLBACKEND=Agg

View File

@ -1,497 +0,0 @@
"""Class with functions for running the bot with IEX Cloud.
"""
import logging
import os
from datetime import datetime
from logging import warning
from typing import List, Optional, Tuple
import pandas as pd
import requests as r
import schedule
from Symbol import Stock
class IEX_Symbol:
"""
Functions for finding stock market information about symbols.
"""
SYMBOL_REGEX = "[$]([a-zA-Z]{1,4})"
searched_symbols = {}
otc_list = []
charts = {}
trending_cache = None
def __init__(self) -> None:
"""Creates a Symbol Object
Parameters
----------
IEX_TOKEN : str
IEX API Token
"""
try:
self.IEX_TOKEN = os.environ["IEX"]
if self.IEX_TOKEN == "TOKEN":
self.IEX_TOKEN = ""
except KeyError:
self.IEX_TOKEN = ""
warning(
"Starting without an IEX Token will not allow you to get market data!"
)
if self.IEX_TOKEN != "":
self.get_symbol_list()
schedule.every().day.do(self.get_symbol_list)
schedule.every().day.do(self.clear_charts)
def get(self, endpoint, params: dict = {}, timeout=5) -> dict:
url = "https://cloud.iexapis.com/stable" + endpoint
# set token param if it wasn't passed.
params["token"] = params.get("token", self.IEX_TOKEN)
resp = r.get(url, params=params, timeout=timeout)
# Make sure API returned a proper status code
try:
resp.raise_for_status()
except r.exceptions.HTTPError as e:
logging.error(e)
return {}
# Make sure API returned valid JSON
try:
resp_json = resp.json()
# IEX uses backtick ` as apostrophe which breaks telegram markdown parsing
if type(resp_json) is dict:
resp_json["companyName"] = resp_json.get("companyName", "").replace(
"`", "'"
)
return resp_json
except r.exceptions.JSONDecodeError as e:
logging.error(e)
return {}
def clear_charts(self) -> None:
"""
Clears cache of chart data.
Charts are cached so that only 1 API call per 24 hours is needed since the
chart data is expensive and a large download.
"""
self.charts = {}
def get_symbol_list(
self, return_df=False
) -> Optional[Tuple[pd.DataFrame, datetime]]:
"""Gets list of all symbols supported by IEX
Parameters
----------
return_df : bool, optional
return the dataframe of all stock symbols, by default False
Returns
-------
Optional[Tuple[pd.DataFrame, datetime]]
If `return_df` is set to `True` returns a dataframe, otherwise returns `None`.
"""
reg_symbols = self.get("/ref-data/symbols")
otc_symbols = self.get("/ref-data/otc/symbols")
reg = pd.DataFrame(data=reg_symbols)
otc = pd.DataFrame(data=otc_symbols)
self.otc_list = set(otc["symbol"].to_list())
symbols = pd.concat([reg, otc])
# IEX uses backtick ` as apostrophe which breaks telegram markdown parsing
symbols["name"] = symbols["name"].str.replace("`", "'")
symbols["description"] = "$" + symbols["symbol"] + ": " + symbols["name"]
symbols["id"] = symbols["symbol"]
symbols["type_id"] = "$" + symbols["symbol"].str.lower()
symbols = symbols[["id", "symbol", "name", "description", "type_id"]]
self.symbol_list = symbols
if return_df:
return symbols, datetime.now()
def status(self) -> str:
"""Checks IEX Status dashboard for any current API issues.
Returns
-------
str
Human readable text on status of IEX API
"""
if self.IEX_TOKEN == "":
return "The `IEX_TOKEN` is not set so Stock Market data is not available."
resp = r.get(
"https://pjmps0c34hp7.statuspage.io/api/v2/status.json",
timeout=15,
)
if resp.status_code == 200:
status = resp.json()["status"]
else:
return "IEX Cloud did not respond. Please check their status page for more information. https://status.iexapis.com"
if status["indicator"] == "none":
return "IEX Cloud is currently not reporting any issues with its API."
else:
return (
f"{status['indicator']}: {status['description']}."
+ " Please check the status page for more information. https://status.iexapis.com"
)
def price_reply(self, symbol: Stock) -> str:
"""Returns price movement of Stock for the last market day, or after hours.
Parameters
----------
symbol : Stock
Returns
-------
str
Formatted markdown
"""
if IEXData := self.get(f"/stock/{symbol.id}/quote"):
if symbol.symbol.upper() in self.otc_list:
return f"OTC - {symbol.symbol.upper()}, {IEXData['companyName']} most recent price is: $**{IEXData['latestPrice']}**"
keys = (
"extendedChangePercent",
"extendedPrice",
"companyName",
"latestPrice",
"changePercent",
)
if set(keys).issubset(IEXData):
if change := IEXData.get("changePercent", 0):
change = round(change * 100, 2)
else:
change = 0
if (
IEXData.get("isUSMarketOpen", True)
or (IEXData["extendedChangePercent"] is None)
or (IEXData["extendedPrice"] is None)
): # Check if market is open.
message = f"The current stock price of {IEXData['companyName']} is $**{IEXData['latestPrice']}**"
else:
message = (
f"{IEXData['companyName']} closed at $**{IEXData['latestPrice']}** with a change of {change}%,"
+ f" after hours _(15 minutes delayed)_ the stock price is $**{IEXData['extendedPrice']}**"
)
if change := IEXData.get("extendedChangePercent", 0):
change = round(change * 100, 2)
else:
change = 0
# Determine wording of change text
if change > 0:
message += f", the stock is currently **up {change}%**"
elif change < 0:
message += f", the stock is currently **down {change}%**"
else:
message += ", the stock hasn't shown any movement today."
else:
message = (
f"The symbol: {symbol} encountered and error. This could be due to "
)
else:
message = f"The symbol: {symbol} was not found."
return message
def dividend_reply(self, symbol: Stock) -> str:
"""Returns the most recent, or next dividend date for a stock symbol.
Parameters
----------
symbol : Stock
Returns
-------
str
Formatted markdown
"""
if symbol.symbol.upper() in self.otc_list:
return "OTC stocks do not currently support any commands."
if resp := self.get(f"/stock/{symbol.id}/dividends/next"):
try:
IEXData = resp[0]
except IndexError as e:
logging.info(e)
return f"Getting dividend information for ${symbol.id.upper()} encountered an error. The provider for upcoming dividend information has been having issues recently which has likely caused this error. It is also possible that the stock has no dividend or does not exist."
keys = (
"amount",
"currency",
"declaredDate",
"exDate",
"frequency",
"paymentDate",
"flag",
)
if set(keys).issubset(IEXData):
if IEXData["currency"] == "USD":
price = f"${IEXData['amount']}"
else:
price = f"{IEXData['amount']} {IEXData['currency']}"
# Pattern IEX uses for dividend date.
pattern = "%Y-%m-%d"
declared = datetime.strptime(IEXData["declaredDate"], pattern).strftime(
"%A, %B %w"
)
ex = datetime.strptime(IEXData["exDate"], pattern).strftime("%A, %B %w")
payment = datetime.strptime(IEXData["paymentDate"], pattern).strftime(
"%A, %B %w"
)
daysDelta = (
datetime.strptime(IEXData["paymentDate"], pattern) - datetime.now()
).days
return (
"The next dividend for "
+ f"{self.symbol_list[self.symbol_list['symbol']==symbol.id.upper()]['description'].item()}" # Get full name without api call
+ f" is on {payment} which is in {daysDelta} days."
+ f" The dividend is for {price} per share."
+ f"\n\nThe dividend was declared on {declared} and the ex-dividend date is {ex}"
)
return f"Getting dividend information for ${symbol.id.upper()} encountered an error. The provider for upcoming dividend information has been having issues recently which has likely caused this error. It is also possible that the stock has no dividend or does not exist."
def news_reply(self, symbol: Stock) -> str:
"""Gets most recent, english, non-paywalled news
Parameters
----------
symbol : Stock
Returns
-------
str
Formatted markdown
"""
if symbol.symbol.upper() in self.otc_list:
return "OTC stocks do not currently support any commands."
if data := self.get(f"/stock/{symbol.id}/news/last/15"):
line = []
for news in data:
if news["lang"] == "en" and not news["hasPaywall"]:
line.append(
f"*{news['source']}*: [{news['headline']}]({news['url']})"
)
return f"News for **{symbol.id.upper()}**:\n" + "\n".join(line[:5])
else:
return f"No news found for: {symbol.id}\nEither today is boring or the symbol does not exist."
def info_reply(self, symbol: Stock) -> str:
"""Gets description for Stock
Parameters
----------
symbol : Stock
Returns
-------
str
Formatted text
"""
if symbol.symbol.upper() in self.otc_list:
return "OTC stocks do not currently support any commands."
if data := self.get(f"/stock/{symbol.id}/company"):
[data.pop(k) for k in list(data) if data[k] == ""]
if "description" in data:
return data["description"]
return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist."
def stat_reply(self, symbol: Stock) -> str:
"""Key statistics on a Stock
Parameters
----------
symbol : Stock
Returns
-------
str
Formatted markdown
"""
if symbol.symbol.upper() in self.otc_list:
return "OTC stocks do not currently support any commands."
if data := self.get(f"/stock/{symbol.id}/stats"):
[data.pop(k) for k in list(data) if data[k] == ""]
m = ""
if "companyName" in data:
m += f"Company Name: {data['companyName']}\n"
if "marketcap" in data:
m += f"Market Cap: ${data['marketcap']:,}\n"
if "week52high" in data:
m += f"52 Week (high-low): {data['week52high']:,} "
if "week52low" in data:
m += f"- {data['week52low']:,}\n"
if "employees" in data:
m += f"Number of Employees: {data['employees']:,}\n"
if "nextEarningsDate" in data:
m += f"Next Earnings Date: {data['nextEarningsDate']}\n"
if "peRatio" in data:
m += f"Price to Earnings: {data['peRatio']:.3f}\n"
if "beta" in data:
m += f"Beta: {data['beta']:.3f}\n"
return m
else:
return f"No information found for: {symbol}\nEither today is boring or the symbol does not exist."
def cap_reply(self, symbol: Stock) -> str:
"""Get the Market Cap of a stock"""
if data := self.get(f"/stock/{symbol.id}/stats"):
try:
cap = data["marketcap"]
except KeyError:
return f"{symbol.id} returned an error."
message = f"The current market cap of {symbol.name} is $**{cap:,.2f}**"
else:
message = f"The Stock: {symbol.name} was not found or returned and error."
return message
def intra_reply(self, symbol: Stock) -> pd.DataFrame:
"""Returns price data for a symbol since the last market open.
Parameters
----------
symbol : str
Stock symbol.
Returns
-------
pd.DataFrame
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
"""
if symbol.symbol.upper() in self.otc_list:
return pd.DataFrame()
if symbol.id.upper() not in list(self.symbol_list["symbol"]):
return pd.DataFrame()
if data := self.get(f"/stock/{symbol.id}/intraday-prices"):
df = pd.DataFrame(data)
df.dropna(inplace=True, subset=["date", "minute", "high", "low", "volume"])
df["DT"] = pd.to_datetime(df["date"] + "T" + df["minute"])
df = df.set_index("DT")
return df
return pd.DataFrame()
def chart_reply(self, symbol: Stock) -> pd.DataFrame:
"""Returns price data for a symbol of the past month up until the previous trading days close.
Also caches multiple requests made in the same day.
Parameters
----------
symbol : str
Stock symbol.
Returns
-------
pd.DataFrame
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
"""
schedule.run_pending()
if symbol.symbol.upper() in self.otc_list:
return pd.DataFrame()
if symbol.id.upper() not in list(self.symbol_list["symbol"]):
return pd.DataFrame()
try: # https://stackoverflow.com/a/3845776/8774114
return self.charts[symbol.id.upper()]
except KeyError:
pass
if data := self.get(
f"/stock/{symbol.id}/chart/1mm",
params={"chartInterval": 3, "includeToday": "false"},
):
df = pd.DataFrame(data)
df.dropna(inplace=True, subset=["date", "minute", "high", "low", "volume"])
df["DT"] = pd.to_datetime(df["date"] + "T" + df["minute"])
df = df.set_index("DT")
self.charts[symbol.id.upper()] = df
return df
return pd.DataFrame()
def spark_reply(self, symbol: Stock) -> str:
quote = self.get(f"/stock/{symbol.id}/quote")
open_change = quote.get("changePercent", 0)
after_change = quote.get("extendedChangePercent", 0)
change = 0
if open_change:
change = change + open_change
if after_change:
change = change + after_change
change = change * 100
return f"`{symbol.tag}`: {quote['companyName']}, {change:.2f}%"
def trending(self) -> list[str]:
"""Gets current coins trending on IEX. Only returns when market is open.
Returns
-------
list[str]
list of $ID: NAME, CHANGE%
"""
if data := self.get(f"/stock/market/list/mostactive"):
self.trending_cache = [
f"`${s['symbol']}`: {s['companyName']}, {100*s['changePercent']:.2f}%"
for s in data
]
return self.trending_cache

212
MarketData.py Normal file
View File

@ -0,0 +1,212 @@
"""Class with functions for running the bot with IEX Cloud.
"""
import logging
import os
import datetime as dt
from logging import warning
from typing import Dict
import pandas as pd
import requests as r
import schedule
from Symbol import Stock
class MarketData:
"""
Functions for finding stock market information about symbols from MarkData.app
"""
SYMBOL_REGEX = "[$]([a-zA-Z]{1,4})"
charts: Dict[Stock, pd.DataFrame] = {}
def __init__(self) -> None:
"""Creates a Symbol Object
Parameters
----------
MARKETDATA_TOKEN : str
MarketData.app API Token
"""
try:
self.MARKETDATA_TOKEN = os.environ["MARKETDATA"]
if self.MARKETDATA_TOKEN == "TOKEN":
self.MARKETDATA_TOKEN = ""
except KeyError:
self.MARKETDATA_TOKEN = ""
warning("Starting without an MarketData.app Token will not allow you to get market data!")
if self.MARKETDATA_TOKEN != "":
schedule.every().day.do(self.clear_charts)
def get(self, endpoint, params: dict = {}, timeout=10) -> dict:
url = "https://api.marketdata.app/v1/" + endpoint
# set token param if it wasn't passed.
params["token"] = params.get("token", self.MARKETDATA_TOKEN)
resp = r.get(url, params=params, timeout=timeout)
# Make sure API returned a proper status code
try:
resp.raise_for_status()
except r.exceptions.HTTPError as e:
logging.error(e)
return {}
# Make sure API returned valid JSON
try:
resp_json = resp.json()
match resp_json["s"]:
case "ok":
return resp_json
case "no_data":
return resp_json
case "error":
logging.error("MarketData Error:\n" + resp_json["errmsg"])
return {}
except r.exceptions.JSONDecodeError as e:
logging.error(e)
return {}
def clear_charts(self) -> None:
"""
Clears cache of chart data.
Charts are cached so that only 1 API call per 24 hours is needed since the
chart data is expensive and a large download.
"""
self.charts = {}
def status(self) -> str:
return "status isnt implemented by marketdata.app"
def price_reply(self, symbol: Stock) -> str:
"""Returns price movement of Stock for the last market day, or after hours.
Parameters
----------
symbol : Stock
Returns
-------
str
Formatted markdown
"""
if quoteResp := self.get(f"stocks/quotes/{symbol}/"):
return f"The current price of {quoteResp['symbol']} is ${quoteResp['last']}"
else:
return f"Getting a quote for {symbol} encountered an error."
def intra_reply(self, symbol: Stock) -> pd.DataFrame:
"""Returns price data for a symbol of the past month up until the previous trading days close.
Also caches multiple requests made in the same day.
Parameters
----------
symbol : str
Stock symbol.
Returns
-------
pd.DataFrame
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
"""
schedule.run_pending()
try:
return self.charts[symbol.id.upper()]
except KeyError:
pass
resolution = "5" # minutes
if data := self.get(
f"stocks/candles/{resolution}/{symbol}",
params={
"from": dt.datetime.now().strftime("%Y-%m-%d"),
"to": dt.datetime.now().isoformat(),
},
):
data.pop("s")
df = pd.DataFrame(data)
df["t"] = pd.to_datetime(df["t"], unit="s")
df.set_index("t", inplace=True)
df.rename(
columns={
"o": "Open",
"h": "High",
"l": "Low",
"c": "Close",
"v": "Volume",
},
inplace=True,
)
self.charts[symbol.id.upper()] = df
return df
return pd.DataFrame()
def chart_reply(self, symbol: Stock) -> pd.DataFrame:
"""Returns price data for a symbol of the past month up until the previous trading days close.
Also caches multiple requests made in the same day.
Parameters
----------
symbol : str
Stock symbol.
Returns
-------
pd.DataFrame
Returns a timeseries dataframe with high, low, and volume data if its available. Otherwise returns empty pd.DataFrame.
"""
schedule.run_pending()
try:
return self.charts[symbol.id.upper()]
except KeyError:
pass
to_date = dt.datetime.today().strftime("%Y-%m-%d")
from_date = (dt.datetime.today() - dt.timedelta(days=30)).strftime("%Y-%m-%d")
resultion = "daily"
if data := self.get(
f"stocks/candles/{resultion}/{symbol}",
params={
"from": from_date,
"to": to_date,
},
):
data.pop("s")
df = pd.DataFrame(data)
df["t"] = pd.to_datetime(df["t"], unit="s")
df.set_index("t", inplace=True)
df.rename(
columns={
"o": "Open",
"h": "High",
"l": "Low",
"c": "Close",
"v": "Volume",
},
inplace=True,
)
self.charts[symbol.id.upper()] = df
return df
return pd.DataFrame()

View File

@ -27,17 +27,13 @@ class Symbol:
class Stock(Symbol): class Stock(Symbol):
"""Stock Market Object. Gets data from IEX Cloud""" """Stock Market Object. Gets data from MarketData"""
def __init__(self, symbol: pd.DataFrame) -> None: def __init__(self, symbol: str) -> None:
if len(symbol) > 1: self.symbol = symbol
logging.info(f"Crypto with shared id:\n\t{symbol.id}") self.id = symbol
symbol = symbol.head(1) self.name = "$" + symbol
self.tag = "$" + symbol.upper()
self.symbol = symbol.symbol.values[0]
self.id = symbol.id.values[0]
self.name = symbol.name.values[0]
self.tag = symbol.type_id.values[0].upper()
class Coin(Symbol): class Coin(Symbol):

View File

@ -10,9 +10,7 @@ class T_info:
license = re.sub( license = re.sub(
r"\b\n", r"\b\n",
" ", " ",
r.get( r.get("https://gitlab.com/simple-stock-bots/simple-telegram-stock-bot/-/raw/master/LICENSE").text,
"https://gitlab.com/simple-stock-bots/simple-telegram-stock-bot/-/raw/master/LICENSE"
).text,
) )
help_text = """ help_text = """

280
bot.py
View File

@ -8,7 +8,7 @@ import os
import random import random
import string import string
import traceback import traceback
from logging import critical, debug, error, info, warning import logging as log
from uuid import uuid4 from uuid import uuid4
import mplfinance as mpf import mplfinance as mpf
@ -38,23 +38,21 @@ try:
STRIPE_TOKEN = os.environ["STRIPE"] STRIPE_TOKEN = os.environ["STRIPE"]
except KeyError: except KeyError:
STRIPE_TOKEN = "" STRIPE_TOKEN = ""
warning("Starting without a STRIPE Token will not allow you to accept Donations!") log.warning("Starting without a STRIPE Token will not allow you to accept Donations!")
s = Router() s = Router()
t = T_info() t = T_info()
# Enable logging # Enable logging
logging.basicConfig( logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO)
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
info("Bot script started.") log.info("Bot script started.")
def start(update: Update, context: CallbackContext): def start(update: Update, context: CallbackContext):
"""Send help text when the command /start is issued.""" """Send help text when the command /start is issued."""
info(f"Start command ran by {update.message.chat.username}") log.info(f"Start command ran by {update.message.chat.username}")
update.message.reply_text( update.message.reply_text(
text=t.help_text, text=t.help_text,
parse_mode=telegram.ParseMode.MARKDOWN, parse_mode=telegram.ParseMode.MARKDOWN,
@ -64,7 +62,7 @@ def start(update: Update, context: CallbackContext):
def help(update: Update, context: CallbackContext): def help(update: Update, context: CallbackContext):
"""Send help text when the command /help is issued.""" """Send help text when the command /help is issued."""
info(f"Help command ran by {update.message.chat.username}") log.info(f"Help command ran by {update.message.chat.username}")
update.message.reply_text( update.message.reply_text(
text=t.help_text, text=t.help_text,
parse_mode=telegram.ParseMode.MARKDOWN, parse_mode=telegram.ParseMode.MARKDOWN,
@ -74,7 +72,7 @@ def help(update: Update, context: CallbackContext):
def license(update: Update, context: CallbackContext): def license(update: Update, context: CallbackContext):
"""Send bots license when the /license command is issued.""" """Send bots license when the /license command is issued."""
info(f"License command ran by {update.message.chat.username}") log.info(f"License command ran by {update.message.chat.username}")
update.message.reply_text( update.message.reply_text(
text=t.license, text=t.license,
parse_mode=telegram.ParseMode.MARKDOWN, parse_mode=telegram.ParseMode.MARKDOWN,
@ -84,14 +82,10 @@ def license(update: Update, context: CallbackContext):
def status(update: Update, context: CallbackContext): def status(update: Update, context: CallbackContext):
"""Gather status of bot and dependant services and return important status updates.""" """Gather status of bot and dependant services and return important status updates."""
warning(f"Status command ran by {update.message.chat.username}") log.warning(f"Status command ran by {update.message.chat.username}")
bot_resp_time = ( bot_resp_time = datetime.datetime.now(update.message.date.tzinfo) - update.message.date
datetime.datetime.now(update.message.date.tzinfo) - update.message.date
)
bot_status = s.status( bot_status = s.status(f"It took {bot_resp_time.total_seconds()} seconds for the bot to get your message.")
f"It took {bot_resp_time.total_seconds()} seconds for the bot to get your message."
)
update.message.reply_text( update.message.reply_text(
text=bot_status, text=bot_status,
@ -101,7 +95,7 @@ def status(update: Update, context: CallbackContext):
def donate(update: Update, context: CallbackContext): def donate(update: Update, context: CallbackContext):
"""Sets up donation.""" """Sets up donation."""
info(f"Donate command ran by {update.message.chat.username}") log.info(f"Donate command ran by {update.message.chat.username}")
chat_id = update.message.chat_id chat_id = update.message.chat_id
if update.message.text.strip() == "/donate" or "/donate@" in update.message.text: if update.message.text.strip() == "/donate" or "/donate@" in update.message.text:
@ -110,16 +104,16 @@ def donate(update: Update, context: CallbackContext):
parse_mode=telegram.ParseMode.MARKDOWN, parse_mode=telegram.ParseMode.MARKDOWN,
disable_notification=True, disable_notification=True,
) )
amount = 1 amount = 1.0
else: else:
amount = update.message.text.replace("/donate", "").replace("$", "").strip() amount = float(update.message.text.replace("/donate", "").replace("$", "").strip())
try: try:
price = int(float(amount) * 100) price = int(amount * 100)
except ValueError: except ValueError:
update.message.reply_text(f"{amount} is not a valid donation amount or number.") update.message.reply_text(f"{amount} is not a valid donation amount or number.")
return return
info(f"Donation amount: {price} by {update.message.chat.username}") log.info(f"Donation amount: {price} by {update.message.chat.username}")
context.bot.send_invoice( context.bot.send_invoice(
chat_id=chat_id, chat_id=chat_id,
@ -139,7 +133,7 @@ def donate(update: Update, context: CallbackContext):
def precheckout_callback(update: Update, context: CallbackContext): def precheckout_callback(update: Update, context: CallbackContext):
"""Approves donation""" """Approves donation"""
info(f"precheckout_callback queried") log.info("precheckout_callback queried")
query = update.pre_checkout_query query = update.pre_checkout_query
query.answer(ok=True) query.answer(ok=True)
@ -153,10 +147,8 @@ def precheckout_callback(update: Update, context: CallbackContext):
def successful_payment_callback(update: Update, context: CallbackContext): def successful_payment_callback(update: Update, context: CallbackContext):
"""Thanks user for donation""" """Thanks user for donation"""
info(f"Successful payment!") log.info("Successful payment!")
update.message.reply_text( update.message.reply_text("Thank you for your donation! It goes a long way to keeping the bot free!")
"Thank you for your donation! It goes a long way to keeping the bot free!"
)
def symbol_detect_image(update: Update, context: CallbackContext): def symbol_detect_image(update: Update, context: CallbackContext):
@ -179,16 +171,16 @@ def symbol_detect(update: Update, context: CallbackContext):
chat_id = update.message.chat_id chat_id = update.message.chat_id
if "$" in message: if "$" in message:
symbols = s.find_symbols(message) symbols = s.find_symbols(message)
info("Looking for Symbols") log.info("Looking for Symbols")
else: else:
return return
except AttributeError as ex: except AttributeError as ex:
info(ex) log.info(ex)
return return
if symbols: if symbols:
# Let user know bot is working # Let user know bot is working
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING) context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING)
info(f"Symbols found: {symbols}") log.info(f"Symbols found: {symbols}")
for reply in s.price_reply(symbols): for reply in s.price_reply(symbols):
update.message.reply_text( update.message.reply_text(
@ -198,113 +190,9 @@ def symbol_detect(update: Update, context: CallbackContext):
) )
def dividend(update: Update, context: CallbackContext):
"""/dividend or /div command and then finds dividend info on that symbol."""
info(f"Dividend command ran by {update.message.chat.username}")
message = update.message.text
chat_id = update.message.chat_id
if message.strip().split("@")[0] == "/dividend":
update.message.reply_text(
"This command gives info on the next dividend date for a symbol.\nExample: /dividend $tsla"
)
return
symbols = s.find_symbols(message)
if symbols:
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING)
for reply in s.dividend_reply(symbols):
update.message.reply_text(
text=reply,
parse_mode=telegram.ParseMode.MARKDOWN,
disable_notification=True,
)
def news(update: Update, context: CallbackContext):
"""/news command then finds news info on that symbol."""
info(f"News command ran by {update.message.chat.username}")
message = update.message.text
chat_id = update.message.chat_id
if message.strip().split("@")[0] == "/news":
update.message.reply_text(
"This command gives the most recent english news for a symbol.\nExample: /news $tsla"
)
return
symbols = s.find_symbols(message, trending_weight=10)
if symbols:
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING)
for reply in s.news_reply(symbols):
update.message.reply_text(
text=reply,
parse_mode=telegram.ParseMode.MARKDOWN,
disable_notification=True,
)
def information(update: Update, context: CallbackContext):
"""/info command then finds info on that symbol."""
info(f"Information command ran by {update.message.chat.username}")
message = update.message.text
chat_id = update.message.chat_id
if message.strip().split("@")[0] == "/info":
update.message.reply_text(
"This command gives information on a symbol.\nExample: /info $tsla"
)
return
symbols = s.find_symbols(message)
if symbols:
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING)
for reply in s.info_reply(symbols):
update.message.reply_text(
text=reply,
parse_mode=telegram.ParseMode.MARKDOWN,
disable_notification=True,
)
def search(update: Update, context: CallbackContext):
"""
Searches on full list of stocks and crypto descriptions
then returns the top matches in order of smallest symbol name length.
"""
info(f"Search command ran by {update.message.chat.username}")
message = update.message.text.replace("/search ", "")
chat_id = update.message.chat_id
if message.strip().split("@")[0] == "/search":
update.message.reply_text(
"This command searches for symbols supported by the bot.\nExample: /search Tesla Motors or /search $tsla"
)
return
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING)
queries = s.inline_search(message, matches=10)
if not queries.empty:
reply = "*Search Results:*\n`$ticker` : Company Name\n`" + ("-" * 21) + "`\n"
for _, query in queries.iterrows():
desc = query["description"]
reply += "`" + desc.replace(": ", "` : ") + "\n"
update.message.reply_text(
text=reply,
parse_mode=telegram.ParseMode.MARKDOWN,
disable_notification=True,
)
def intra(update: Update, context: CallbackContext): def intra(update: Update, context: CallbackContext):
"""returns a chart of intraday data for a symbol""" """returns a chart of intraday data for a symbol"""
info(f"Intra command ran by {update.message.chat.username}") log.info(f"Intra command ran by {update.message.chat.username}")
message = update.message.text message = update.message.text
chat_id = update.message.chat_id chat_id = update.message.chat_id
@ -333,16 +221,14 @@ def intra(update: Update, context: CallbackContext):
) )
return return
context.bot.send_chat_action( context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.UPLOAD_PHOTO)
chat_id=chat_id, action=telegram.ChatAction.UPLOAD_PHOTO
)
buf = io.BytesIO() buf = io.BytesIO()
mpf.plot( mpf.plot(
df, df,
type="renko", type="renko",
title=f"\n{symbol.name}", title=f"\n{symbol.name}",
volume="volume" in df.keys(), volume="Volume" in df.keys(),
style="yahoo", style="yahoo",
savefig=dict(fname=buf, dpi=400, bbox_inches="tight"), savefig=dict(fname=buf, dpi=400, bbox_inches="tight"),
) )
@ -360,7 +246,7 @@ def intra(update: Update, context: CallbackContext):
def chart(update: Update, context: CallbackContext): def chart(update: Update, context: CallbackContext):
"""returns a chart of the past month of data for a symbol""" """returns a chart of the past month of data for a symbol"""
info(f"Chart command ran by {update.message.chat.username}") log.info(f"Chart command ran by {update.message.chat.username}")
message = update.message.text message = update.message.text
chat_id = update.message.chat_id chat_id = update.message.chat_id
@ -387,16 +273,14 @@ def chart(update: Update, context: CallbackContext):
disable_notification=True, disable_notification=True,
) )
return return
context.bot.send_chat_action( context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.UPLOAD_PHOTO)
chat_id=chat_id, action=telegram.ChatAction.UPLOAD_PHOTO
)
buf = io.BytesIO() buf = io.BytesIO()
mpf.plot( mpf.plot(
df, df,
type="candle", type="candle",
title=f"\n{symbol.name}", title=f"\n{symbol.name}",
volume="volume" in df.keys(), volume="Volume" in df.keys(),
style="yahoo", style="yahoo",
savefig=dict(fname=buf, dpi=400, bbox_inches="tight"), savefig=dict(fname=buf, dpi=400, bbox_inches="tight"),
) )
@ -411,66 +295,16 @@ def chart(update: Update, context: CallbackContext):
) )
def stat(update: Update, context: CallbackContext):
"""returns key statistics on symbol"""
info(f"Stat command ran by {update.message.chat.username}")
message = update.message.text
chat_id = update.message.chat_id
if message.strip().split("@")[0] == "/stat":
update.message.reply_text(
"This command returns key statistics for a symbol.\nExample: /stat $tsla"
)
return
symbols = s.find_symbols(message)
if symbols:
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING)
for reply in s.stat_reply(symbols):
update.message.reply_text(
text=reply,
parse_mode=telegram.ParseMode.MARKDOWN,
disable_notification=True,
)
def cap(update: Update, context: CallbackContext):
"""returns market cap for symbol"""
info(f"Cap command ran by {update.message.chat.username}")
message = update.message.text
chat_id = update.message.chat_id
if message.strip().split("@")[0] == "/cap":
update.message.reply_text(
"This command returns the market cap for a symbol.\nExample: /cap $tsla"
)
return
symbols = s.find_symbols(message)
if symbols:
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING)
for reply in s.cap_reply(symbols):
update.message.reply_text(
text=reply,
parse_mode=telegram.ParseMode.MARKDOWN,
disable_notification=True,
)
def trending(update: Update, context: CallbackContext): def trending(update: Update, context: CallbackContext):
"""returns currently trending symbols and how much they've moved in the past trading day.""" """returns currently trending symbols and how much they've moved in the past trading day."""
info(f"Trending command ran by {update.message.chat.username}") log.info(f"Trending command ran by {update.message.chat.username}")
chat_id = update.message.chat_id chat_id = update.message.chat_id
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING) context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING)
trending_list = s.trending() trending_list = s.trending()
info(trending_list) log.info(trending_list)
update.message.reply_text( update.message.reply_text(
text=trending_list, text=trending_list,
@ -485,13 +319,14 @@ def inline_query(update: Update, context: CallbackContext):
in the symbol and returns matches in alphabetical order. in the symbol and returns matches in alphabetical order.
""" """
# info(f"Inline command ran by {update.message.chat.username}") # info(f"Inline command ran by {update.message.chat.username}")
info(f"Query: {update.inline_query.query}") log.info(f"Query: {update.inline_query.query}")
ignored_queries = {"$", "$$", " ", ""} ignored_queries = {"$", "$$", " ", ""}
if update.inline_query.query.strip() in ignored_queries: if update.inline_query.query.strip() in ignored_queries:
default_message = """ default_message = """
You can type:\n@SimpleStockBot `[search]`\nin any chat or direct message to search for the stock bots full list of stock and crypto symbols and return the price. You can type:\n@SimpleStockBot `[search]`
in any chat or direct message to search for the stock bots full list of stock and crypto symbols and return the price.
""" """
update.inline_query.answer( update.inline_query.answer(
@ -499,9 +334,7 @@ def inline_query(update: Update, context: CallbackContext):
InlineQueryResultArticle( InlineQueryResultArticle(
str(uuid4()), str(uuid4()),
title="Please enter a query. It can be a ticker or a name of a company.", title="Please enter a query. It can be a ticker or a name of a company.",
input_message_content=InputTextMessageContent( input_message_content=InputTextMessageContent(default_message, parse_mode=telegram.ParseMode.MARKDOWN),
default_message, parse_mode=telegram.ParseMode.MARKDOWN
),
) )
] ]
) )
@ -510,29 +343,24 @@ def inline_query(update: Update, context: CallbackContext):
results = [] results = []
for _, row in matches.iterrows(): for _, row in matches.iterrows():
results.append( results.append(
InlineQueryResultArticle( InlineQueryResultArticle(
str(uuid4()), str(uuid4()),
title=row["description"], title=row["description"],
input_message_content=InputTextMessageContent( input_message_content=InputTextMessageContent(row["price_reply"], parse_mode=telegram.ParseMode.MARKDOWN),
row["price_reply"], parse_mode=telegram.ParseMode.MARKDOWN
),
) )
) )
if len(results) == 5: if len(results) == 5:
update.inline_query.answer(results, cache_time=60 * 60) update.inline_query.answer(results, cache_time=60 * 60)
info("Inline Command was successful") log.info("Inline Command was successful")
return return
update.inline_query.answer(results) update.inline_query.answer(results)
def rand_pick(update: Update, context: CallbackContext): def rand_pick(update: Update, context: CallbackContext):
"""For the gamblers. Returns a random symbol to buy and a sell date""" """For the gamblers. Returns a random symbol to buy and a sell date"""
info( log.info(f"Someone is gambling! Random_pick command ran by {update.message.chat.username}")
f"Someone is gambling! Random_pick command ran by {update.message.chat.username}"
)
update.message.reply_text( update.message.reply_text(
text=s.random_pick(), text=s.random_pick(),
@ -543,15 +371,13 @@ def rand_pick(update: Update, context: CallbackContext):
def error(update: Update, context: CallbackContext): def error(update: Update, context: CallbackContext):
"""Log Errors caused by Updates.""" """Log Errors caused by Updates."""
warning('Update "%s" caused error "%s"', update, error) log.warning('Update "%s" caused error "%s"', update, error)
tb_list = traceback.format_exception( tb_list = traceback.format_exception(None, context.error, context.error.__traceback__)
None, context.error, context.error.__traceback__
)
tb_string = "".join(tb_list) tb_string = "".join(tb_list)
err_code = "".join([random.choice(string.ascii_lowercase) for i in range(5)]) err_code = "".join([random.choice(string.ascii_lowercase) for i in range(5)])
warning(f"Logging error: {err_code}") log.warning(f"Logging error: {err_code}")
if update: if update:
message = ( message = (
@ -562,18 +388,14 @@ def error(update: Update, context: CallbackContext):
f"<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n" f"<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n"
f"<pre>{html.escape(tb_string)}</pre>" f"<pre>{html.escape(tb_string)}</pre>"
) )
warning(message) log.warning(message)
else: else:
warning(tb_string) log.warning(tb_string)
# update.message.reply_text( update.message.reply_text(
# text=f"An error has occured. Please inform @MisterBiggs if the error persists. Error Code: `{err_code}`", text=f"An error has occured. Please inform @MisterBiggs if the error persists. Error Code: `{err_code}`",
# parse_mode=telegram.ParseMode.MARKDOWN, parse_mode=telegram.ParseMode.MARKDOWN,
# ) )
# Finally, send the message
# update.message.reply_text(text=message, parse_mode=telegram.ParseMode.HTML)
# update.message.reply_text(text="Please inform the bot admin of this issue.")
def main(): def main():
@ -588,15 +410,7 @@ def main():
dp.add_handler(CommandHandler("start", start)) dp.add_handler(CommandHandler("start", start))
dp.add_handler(CommandHandler("help", help)) dp.add_handler(CommandHandler("help", help))
dp.add_handler(CommandHandler("license", license)) dp.add_handler(CommandHandler("license", license))
dp.add_handler(CommandHandler("dividend", dividend))
dp.add_handler(CommandHandler("div", dividend))
dp.add_handler(CommandHandler("news", news))
dp.add_handler(CommandHandler("info", information))
dp.add_handler(CommandHandler("stat", stat))
dp.add_handler(CommandHandler("stats", stat))
dp.add_handler(CommandHandler("cap", cap))
dp.add_handler(CommandHandler("trending", trending)) dp.add_handler(CommandHandler("trending", trending))
dp.add_handler(CommandHandler("search", search))
dp.add_handler(CommandHandler("random", rand_pick)) dp.add_handler(CommandHandler("random", rand_pick))
dp.add_handler(CommandHandler("donate", donate)) dp.add_handler(CommandHandler("donate", donate))
dp.add_handler(CommandHandler("status", status)) dp.add_handler(CommandHandler("status", status))
@ -620,9 +434,7 @@ def main():
dp.add_handler(PreCheckoutQueryHandler(precheckout_callback)) dp.add_handler(PreCheckoutQueryHandler(precheckout_callback))
# Payment success # Payment success
dp.add_handler( dp.add_handler(MessageHandler(Filters.successful_payment, successful_payment_callback))
MessageHandler(Filters.successful_payment, successful_payment_callback)
)
# log all errors # log all errors
dp.add_error_handler(error) dp.add_error_handler(error)

View File

@ -1,10 +1,8 @@
"""Class with functions for running the bot with IEX Cloud. """Class with functions for running the bot with IEX Cloud.
""" """
import logging import logging as log
from datetime import datetime from typing import List
from logging import critical, debug, error, info, warning
from typing import List, Optional, Tuple
import pandas as pd import pandas as pd
import requests as r import requests as r
@ -21,8 +19,7 @@ class cg_Crypto:
vs_currency = "usd" # simple/supported_vs_currencies for list of options vs_currency = "usd" # simple/supported_vs_currencies for list of options
searched_symbols = {} trending_cache: List[str] = []
trending_cache = None
def __init__(self) -> None: def __init__(self) -> None:
"""Creates a Symbol Object """Creates a Symbol Object
@ -36,14 +33,13 @@ class cg_Crypto:
schedule.every().day.do(self.get_symbol_list) schedule.every().day.do(self.get_symbol_list)
def get(self, endpoint, params: dict = {}, timeout=10) -> dict: def get(self, endpoint, params: dict = {}, timeout=10) -> dict:
url = "https://api.coingecko.com/api/v3" + endpoint url = "https://api.coingecko.com/api/v3" + endpoint
resp = r.get(url, params=params, timeout=timeout) resp = r.get(url, params=params, timeout=timeout)
# Make sure API returned a proper status code # Make sure API returned a proper status code
try: try:
resp.raise_for_status() resp.raise_for_status()
except r.exceptions.HTTPError as e: except r.exceptions.HTTPError as e:
logging.error(e) log.error(e)
return {} return {}
# Make sure API returned valid JSON # Make sure API returned valid JSON
@ -51,36 +47,27 @@ class cg_Crypto:
resp_json = resp.json() resp_json = resp.json()
return resp_json return resp_json
except r.exceptions.JSONDecodeError as e: except r.exceptions.JSONDecodeError as e:
logging.error(e) log.error(e)
return {} return {}
def symbol_id(self, symbol) -> str: def symbol_id(self, symbol) -> str:
try: try:
return self.symbol_list[self.symbol_list["symbol"] == symbol]["id"].values[ return self.symbol_list[self.symbol_list["symbol"] == symbol]["id"].values[0]
0
]
except KeyError: except KeyError:
return "" return ""
def get_symbol_list( def get_symbol_list(self):
self, return_df=False
) -> Optional[Tuple[pd.DataFrame, datetime]]:
raw_symbols = self.get("/coins/list") raw_symbols = self.get("/coins/list")
symbols = pd.DataFrame(data=raw_symbols) symbols = pd.DataFrame(data=raw_symbols)
# Removes all binance-peg symbols # Removes all binance-peg symbols
symbols = symbols[~symbols["id"].str.contains("binance-peg")] symbols = symbols[~symbols["id"].str.contains("binance-peg")]
symbols["description"] = ( symbols["description"] = "$$" + symbols["symbol"].str.upper() + ": " + symbols["name"]
"$$" + symbols["symbol"].str.upper() + ": " + symbols["name"]
)
symbols = symbols[["id", "symbol", "name", "description"]] symbols = symbols[["id", "symbol", "name", "description"]]
symbols["type_id"] = "$$" + symbols["symbol"] symbols["type_id"] = "$$" + symbols["symbol"]
self.symbol_list = symbols self.symbol_list = symbols
if return_df:
return symbols, datetime.now()
def status(self) -> str: def status(self) -> str:
"""Checks CoinGecko /ping endpoint for API issues. """Checks CoinGecko /ping endpoint for API issues.
@ -97,8 +84,10 @@ class cg_Crypto:
try: try:
status.raise_for_status() status.raise_for_status()
return f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} Seconds." return (
except: f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} Seconds."
)
except r.HTTPError:
return f"CoinGecko API returned an error code {status.status_code} in {status.elapsed.total_seconds()} Seconds." return f"CoinGecko API returned an error code {status.status_code} in {status.elapsed.total_seconds()} Seconds."
def price_reply(self, coin: Coin) -> str: def price_reply(self, coin: Coin) -> str:
@ -167,9 +156,7 @@ class cg_Crypto:
f"/coins/{symbol.id}/ohlc", f"/coins/{symbol.id}/ohlc",
params={"vs_currency": self.vs_currency, "days": 1}, params={"vs_currency": self.vs_currency, "days": 1},
): ):
df = pd.DataFrame( df = pd.DataFrame(resp, columns=["Date", "Open", "High", "Low", "Close"]).dropna()
resp, columns=["Date", "Open", "High", "Low", "Close"]
).dropna()
df["Date"] = pd.to_datetime(df["Date"], unit="ms") df["Date"] = pd.to_datetime(df["Date"], unit="ms")
df = df.set_index("Date") df = df.set_index("Date")
return df return df
@ -195,9 +182,7 @@ class cg_Crypto:
f"/coins/{symbol.id}/ohlc", f"/coins/{symbol.id}/ohlc",
params={"vs_currency": self.vs_currency, "days": 30}, params={"vs_currency": self.vs_currency, "days": 30},
): ):
df = pd.DataFrame( df = pd.DataFrame(resp, columns=["Date", "Open", "High", "Low", "Close"]).dropna()
resp, columns=["Date", "Open", "High", "Low", "Close"]
).dropna()
df["Date"] = pd.to_datetime(df["Date"], unit="ms") df["Date"] = pd.to_datetime(df["Date"], unit="ms")
df = df.set_index("Date") df = df.set_index("Date")
return df return df
@ -223,7 +208,6 @@ class cg_Crypto:
"localization": "false", "localization": "false",
}, },
): ):
return f""" return f"""
[{data['name']}]({data['links']['homepage'][0]}) Statistics: [{data['name']}]({data['links']['homepage'][0]}) Statistics:
Market Cap: ${data['market_data']['market_cap'][self.vs_currency]:,} Market Cap: ${data['market_data']['market_cap'][self.vs_currency]:,}
@ -251,14 +235,14 @@ class cg_Crypto:
""" """
if resp := self.get( if resp := self.get(
f"/simple/price", "/simple/price",
params={ params={
"ids": coin.id, "ids": coin.id,
"vs_currencies": self.vs_currency, "vs_currencies": self.vs_currency,
"include_market_cap": "true", "include_market_cap": "true",
}, },
): ):
debug(resp) log.debug(resp)
try: try:
data = resp[coin.id] data = resp[coin.id]
@ -270,7 +254,10 @@ class cg_Crypto:
if cap == 0: if cap == 0:
return f"The market cap for {coin.name} is not available for unknown reasons." return f"The market cap for {coin.name} is not available for unknown reasons."
message = f"The current price of {coin.name} is $**{price:,}** and its market cap is $**{cap:,.2f}** {self.vs_currency.upper()}" message = (
f"The current price of {coin.name} is $**{price:,}** and"
+ " its market cap is $**{cap:,.2f}** {self.vs_currency.upper()}"
)
else: else:
message = f"The Coin: {coin.name} was not found or returned and error." message = f"The Coin: {coin.name} was not found or returned and error."
@ -303,7 +290,7 @@ class cg_Crypto:
def spark_reply(self, symbol: Coin) -> str: def spark_reply(self, symbol: Coin) -> str:
change = self.get( change = self.get(
f"/simple/price", "/simple/price",
params={ params={
"ids": symbol.id, "ids": symbol.id,
"vs_currencies": self.vs_currency, "vs_currencies": self.vs_currency,
@ -331,7 +318,7 @@ class cg_Crypto:
sym = c["symbol"].upper() sym = c["symbol"].upper()
name = c["name"] name = c["name"]
change = self.get( change = self.get(
f"/simple/price", "/simple/price",
params={ params={
"ids": c["id"], "ids": c["id"],
"vs_currencies": self.vs_currency, "vs_currencies": self.vs_currency,
@ -344,7 +331,7 @@ class cg_Crypto:
trending.append(msg) trending.append(msg)
except Exception as e: except Exception as e:
logging.warning(e) log.warning(e)
return self.trending_cache return self.trending_cache
self.trending_cache = trending self.trending_cache = trending
@ -365,7 +352,7 @@ class cg_Crypto:
query = ",".join([c.id for c in coins]) query = ",".join([c.id for c in coins])
prices = self.get( prices = self.get(
f"/simple/price", "/simple/price",
params={ params={
"ids": query, "ids": query,
"vs_currencies": self.vs_currency, "vs_currencies": self.vs_currency,

6
dev-reqs.txt Normal file
View File

@ -0,0 +1,6 @@
-r requirements.txt
black==23.3.0
flake8==5.0.4
Flake8-pyproject==1.2.3
pylama==8.4.1
mypy==1.2.0

8
pyproject.toml Normal file
View File

@ -0,0 +1,8 @@
[tool.black]
line-length = 130
[tool.flake8]
max-line-length = 130
[tool.pycodestyle]
max_line_length = 130

View File

@ -1,6 +1,6 @@
python-telegram-bot==13.5 python-telegram-bot==13.5
requests==2.25.1 requests==2.25.1
pandas==1.2.1 pandas==2.0.0
schedule==1.0.0 schedule==1.0.0
mplfinance==0.12.7a5 mplfinance==0.12.7a5
markdownify==0.6.5 markdownify==0.6.5

View File

@ -12,7 +12,7 @@ import schedule
from cachetools import TTLCache, cached from cachetools import TTLCache, cached
from cg_Crypto import cg_Crypto from cg_Crypto import cg_Crypto
from IEX_Symbol import IEX_Symbol from MarketData import MarketData
from Symbol import Coin, Stock, Symbol from Symbol import Coin, Stock, Symbol
@ -22,7 +22,7 @@ class Router:
trending_count = {} trending_count = {}
def __init__(self): def __init__(self):
self.stock = IEX_Symbol() self.stock = MarketData()
self.crypto = cg_Crypto() self.crypto = cg_Crypto()
schedule.every().hour.do(self.trending_decay) schedule.every().hour.do(self.trending_decay)
@ -64,22 +64,12 @@ class Router:
symbols = [] symbols = []
stocks = set(re.findall(self.STOCK_REGEX, text)) stocks = set(re.findall(self.STOCK_REGEX, text))
for stock in stocks: for stock in stocks:
sym = self.stock.symbol_list[ # Market data lacks tools to check if a symbol is valid.
self.stock.symbol_list["symbol"].str.fullmatch(stock, case=False) symbols.append(Stock(stock))
]
if ~sym.empty:
print(sym)
symbols.append(Stock(sym))
else:
info(f"{stock} is not in list of stocks")
coins = set(re.findall(self.CRYPTO_REGEX, text)) coins = set(re.findall(self.CRYPTO_REGEX, text))
for coin in coins: for coin in coins:
sym = self.crypto.symbol_list[ sym = self.crypto.symbol_list[self.crypto.symbol_list["symbol"].str.fullmatch(coin.lower(), case=False)]
self.crypto.symbol_list["symbol"].str.fullmatch(
coin.lower(), case=False
)
]
if ~sym.empty: if ~sym.empty:
symbols.append(Coin(sym)) symbols.append(Coin(sym))
else: else:
@ -87,9 +77,7 @@ class Router:
if symbols: if symbols:
info(symbols) info(symbols)
for symbol in symbols: for symbol in symbols:
self.trending_count[symbol.tag] = ( self.trending_count[symbol.tag] = self.trending_count.get(symbol.tag, 0) + trending_weight
self.trending_count.get(symbol.tag, 0) + trending_weight
)
return symbols return symbols
@ -134,9 +122,9 @@ class Router:
df = pd.concat([self.stock.symbol_list, self.crypto.symbol_list]) df = pd.concat([self.stock.symbol_list, self.crypto.symbol_list])
df = df[ df = df[df["description"].str.contains(search, regex=False, case=False)].sort_values(
df["description"].str.contains(search, regex=False, case=False) by="type_id", key=lambda x: x.str.len()
].sort_values(by="type_id", key=lambda x: x.str.len()) )
symbols = df.head(matches) symbols = df.head(matches)
symbols["price_reply"] = symbols["type_id"].apply( symbols["price_reply"] = symbols["type_id"].apply(
@ -172,60 +160,6 @@ class Router:
return replies return replies
def dividend_reply(self, symbols: list) -> list[str]:
"""Returns the most recent, or next dividend date for a stock symbol.
Parameters
----------
symbols : list
List of stock symbols.
Returns
-------
Dict[str, str]
Each symbol passed in is a key with its value being a human readable
formatted string of the symbols div dates.
"""
replies = []
for symbol in symbols:
if isinstance(symbol, Stock):
replies.append(self.stock.dividend_reply(symbol))
elif isinstance(symbol, Coin):
replies.append("Cryptocurrencies do no have Dividends.")
else:
debug(f"{symbol} is not a Stock or Coin")
return replies
def news_reply(self, symbols: list) -> list[str]:
"""Gets recent english news on stock symbols.
Parameters
----------
symbols : list
List of stock symbols.
Returns
-------
Dict[str, str]
Each symbol passed in is a key with its value being a human
readable markdown formatted string of the symbols news.
"""
replies = []
for symbol in symbols:
if isinstance(symbol, Stock):
replies.append(self.stock.news_reply(symbol))
elif isinstance(symbol, Coin):
# replies.append(self.crypto.news_reply(symbol))
replies.append(
"News is not yet supported for cryptocurrencies. If you have any suggestions for news sources please contatct @MisterBiggs"
)
else:
debug(f"{symbol} is not a Stock or Coin")
return replies
def info_reply(self, symbols: list) -> list[str]: def info_reply(self, symbols: list) -> list[str]:
"""Gets information on stock symbols. """Gets information on stock symbols.
@ -367,7 +301,8 @@ class Router:
for symbol in symbols: for symbol in symbols:
if isinstance(symbol, Stock): if isinstance(symbol, Stock):
replies.append(self.stock.spark_reply(symbol)) replies.append("Command not supported for stocks.")
# replies.append(self.stock.spark_reply(symbol))
elif isinstance(symbol, Coin): elif isinstance(symbol, Coin):
replies.append(self.crypto.spark_reply(symbol)) replies.append(self.crypto.spark_reply(symbol))
else: else:
@ -394,10 +329,7 @@ class Router:
reply += "🔥Trending on the Stock Bot:\n`" reply += "🔥Trending on the Stock Bot:\n`"
reply += "" * len("Trending on the Stock Bot:") + "`\n" reply += "" * len("Trending on the Stock Bot:") + "`\n"
sorted_trending = [ sorted_trending = [s[0] for s in sorted(self.trending_count.items(), key=lambda item: item[1])][::-1][0:5]
s[0]
for s in sorted(self.trending_count.items(), key=lambda item: item[1])
][::-1][0:5]
for t in sorted_trending: for t in sorted_trending:
reply += self.spark_reply(self.find_symbols(t))[0] + "\n" reply += self.spark_reply(self.find_symbols(t))[0] + "\n"
@ -424,14 +356,8 @@ class Router:
return "Trending data is not currently available." return "Trending data is not currently available."
def random_pick(self) -> str: def random_pick(self) -> str:
choice = random.choice(list(self.stock.symbol_list["description"]) + list(self.crypto.symbol_list["description"]))
choice = random.choice( hold = (datetime.date.today() + datetime.timedelta(random.randint(1, 365))).strftime("%b %d, %Y")
list(self.stock.symbol_list["description"])
+ list(self.crypto.symbol_list["description"])
)
hold = (
datetime.date.today() + datetime.timedelta(random.randint(1, 365))
).strftime("%b %d, %Y")
return f"{choice}\nBuy and hold until: {hold}" return f"{choice}\nBuy and hold until: {hold}"