1
0
mirror of https://gitlab.com/simple-stock-bots/simple-stock-bot.git synced 2025-06-16 07:16:40 +00:00

small code clean up and add linting to repo

This commit is contained in:
Anson Biggs 2023-04-07 00:15:01 -06:00
parent b72f0518c2
commit cedacc5749
8 changed files with 98 additions and 148 deletions

12
.vscode/settings.json vendored
View File

@ -1,13 +1,5 @@
{
"workbench.iconTheme": "vscode-icons",
"editor.suggestSelection": "first",
"vsintellicode.modify.editor.suggestSelection": "automaticallyOverrodeDefaultValue",
"python.languageServer": "Pylance",
"git.autofetch": true,
"editor.formatOnSave": true,
"files.associations": {
"DockerDev": "dockerfile",
},
"python.formatting.provider": "black",
"python.showStartPage": false,
"python.linting.mypyEnabled": true,
"python.linting.flake8Enabled": true,
}

View File

@ -5,7 +5,7 @@ import logging
import os
import datetime as dt
from logging import warning
from typing import List, Optional, Tuple
from typing import Dict
import pandas as pd
import requests as r
@ -21,10 +21,7 @@ class MarketData:
SYMBOL_REGEX = "[$]([a-zA-Z]{1,4})"
searched_symbols = {}
otc_list = []
charts = {}
trending_cache = None
charts: Dict[Stock, pd.DataFrame] = {}
def __init__(self) -> None:
"""Creates a Symbol Object
@ -41,9 +38,7 @@ class MarketData:
self.MARKETDATA_TOKEN = ""
except KeyError:
self.MARKETDATA_TOKEN = ""
warning(
"Starting without an MarketData.app Token will not allow you to get market data!"
)
warning("Starting without an MarketData.app Token will not allow you to get market data!")
if self.MARKETDATA_TOKEN != "":
schedule.every().day.do(self.clear_charts)
@ -78,7 +73,8 @@ class MarketData:
except r.exceptions.JSONDecodeError as e:
logging.error(e)
return {}
return {}
def clear_charts(self) -> None:
"""
@ -131,7 +127,6 @@ class MarketData:
except KeyError:
pass
to_date = dt.datetime.today().strftime("%Y-%m-%d")
resolution = "5" # minutes
if data := self.get(

View File

@ -10,9 +10,7 @@ class T_info:
license = re.sub(
r"\b\n",
" ",
r.get(
"https://gitlab.com/simple-stock-bots/simple-telegram-stock-bot/-/raw/master/LICENSE"
).text,
r.get("https://gitlab.com/simple-stock-bots/simple-telegram-stock-bot/-/raw/master/LICENSE").text,
)
help_text = """

113
bot.py
View File

@ -8,7 +8,7 @@ import os
import random
import string
import traceback
from logging import critical, debug, error, info, warning
import logging as log
from uuid import uuid4
import mplfinance as mpf
@ -38,23 +38,21 @@ try:
STRIPE_TOKEN = os.environ["STRIPE"]
except KeyError:
STRIPE_TOKEN = ""
warning("Starting without a STRIPE Token will not allow you to accept Donations!")
log.warning("Starting without a STRIPE Token will not allow you to accept Donations!")
s = Router()
t = T_info()
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
info("Bot script started.")
log.info("Bot script started.")
def start(update: Update, context: CallbackContext):
"""Send help text when the command /start is issued."""
info(f"Start command ran by {update.message.chat.username}")
log.info(f"Start command ran by {update.message.chat.username}")
update.message.reply_text(
text=t.help_text,
parse_mode=telegram.ParseMode.MARKDOWN,
@ -64,7 +62,7 @@ def start(update: Update, context: CallbackContext):
def help(update: Update, context: CallbackContext):
"""Send help text when the command /help is issued."""
info(f"Help command ran by {update.message.chat.username}")
log.info(f"Help command ran by {update.message.chat.username}")
update.message.reply_text(
text=t.help_text,
parse_mode=telegram.ParseMode.MARKDOWN,
@ -74,7 +72,7 @@ def help(update: Update, context: CallbackContext):
def license(update: Update, context: CallbackContext):
"""Send bots license when the /license command is issued."""
info(f"License command ran by {update.message.chat.username}")
log.info(f"License command ran by {update.message.chat.username}")
update.message.reply_text(
text=t.license,
parse_mode=telegram.ParseMode.MARKDOWN,
@ -84,14 +82,10 @@ def license(update: Update, context: CallbackContext):
def status(update: Update, context: CallbackContext):
"""Gather status of bot and dependant services and return important status updates."""
warning(f"Status command ran by {update.message.chat.username}")
bot_resp_time = (
datetime.datetime.now(update.message.date.tzinfo) - update.message.date
)
log.warning(f"Status command ran by {update.message.chat.username}")
bot_resp_time = datetime.datetime.now(update.message.date.tzinfo) - update.message.date
bot_status = s.status(
f"It took {bot_resp_time.total_seconds()} seconds for the bot to get your message."
)
bot_status = s.status(f"It took {bot_resp_time.total_seconds()} seconds for the bot to get your message.")
update.message.reply_text(
text=bot_status,
@ -101,7 +95,7 @@ def status(update: Update, context: CallbackContext):
def donate(update: Update, context: CallbackContext):
"""Sets up donation."""
info(f"Donate command ran by {update.message.chat.username}")
log.info(f"Donate command ran by {update.message.chat.username}")
chat_id = update.message.chat_id
if update.message.text.strip() == "/donate" or "/donate@" in update.message.text:
@ -110,16 +104,16 @@ def donate(update: Update, context: CallbackContext):
parse_mode=telegram.ParseMode.MARKDOWN,
disable_notification=True,
)
amount = 1
amount = 1.0
else:
amount = update.message.text.replace("/donate", "").replace("$", "").strip()
amount = float(update.message.text.replace("/donate", "").replace("$", "").strip())
try:
price = int(float(amount) * 100)
price = int(amount * 100)
except ValueError:
update.message.reply_text(f"{amount} is not a valid donation amount or number.")
return
info(f"Donation amount: {price} by {update.message.chat.username}")
log.info(f"Donation amount: {price} by {update.message.chat.username}")
context.bot.send_invoice(
chat_id=chat_id,
@ -139,7 +133,7 @@ def donate(update: Update, context: CallbackContext):
def precheckout_callback(update: Update, context: CallbackContext):
"""Approves donation"""
info(f"precheckout_callback queried")
log.info("precheckout_callback queried")
query = update.pre_checkout_query
query.answer(ok=True)
@ -153,10 +147,8 @@ def precheckout_callback(update: Update, context: CallbackContext):
def successful_payment_callback(update: Update, context: CallbackContext):
"""Thanks user for donation"""
info(f"Successful payment!")
update.message.reply_text(
"Thank you for your donation! It goes a long way to keeping the bot free!"
)
log.info("Successful payment!")
update.message.reply_text("Thank you for your donation! It goes a long way to keeping the bot free!")
def symbol_detect_image(update: Update, context: CallbackContext):
@ -179,16 +171,16 @@ def symbol_detect(update: Update, context: CallbackContext):
chat_id = update.message.chat_id
if "$" in message:
symbols = s.find_symbols(message)
info("Looking for Symbols")
log.info("Looking for Symbols")
else:
return
except AttributeError as ex:
info(ex)
log.info(ex)
return
if symbols:
# Let user know bot is working
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING)
info(f"Symbols found: {symbols}")
log.info(f"Symbols found: {symbols}")
for reply in s.price_reply(symbols):
update.message.reply_text(
@ -200,7 +192,7 @@ def symbol_detect(update: Update, context: CallbackContext):
def intra(update: Update, context: CallbackContext):
"""returns a chart of intraday data for a symbol"""
info(f"Intra command ran by {update.message.chat.username}")
log.info(f"Intra command ran by {update.message.chat.username}")
message = update.message.text
chat_id = update.message.chat_id
@ -229,9 +221,7 @@ def intra(update: Update, context: CallbackContext):
)
return
context.bot.send_chat_action(
chat_id=chat_id, action=telegram.ChatAction.UPLOAD_PHOTO
)
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.UPLOAD_PHOTO)
buf = io.BytesIO()
mpf.plot(
@ -256,7 +246,7 @@ def intra(update: Update, context: CallbackContext):
def chart(update: Update, context: CallbackContext):
"""returns a chart of the past month of data for a symbol"""
info(f"Chart command ran by {update.message.chat.username}")
log.info(f"Chart command ran by {update.message.chat.username}")
message = update.message.text
chat_id = update.message.chat_id
@ -283,9 +273,7 @@ def chart(update: Update, context: CallbackContext):
disable_notification=True,
)
return
context.bot.send_chat_action(
chat_id=chat_id, action=telegram.ChatAction.UPLOAD_PHOTO
)
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.UPLOAD_PHOTO)
buf = io.BytesIO()
mpf.plot(
@ -309,14 +297,14 @@ def chart(update: Update, context: CallbackContext):
def trending(update: Update, context: CallbackContext):
"""returns currently trending symbols and how much they've moved in the past trading day."""
info(f"Trending command ran by {update.message.chat.username}")
log.info(f"Trending command ran by {update.message.chat.username}")
chat_id = update.message.chat_id
context.bot.send_chat_action(chat_id=chat_id, action=telegram.ChatAction.TYPING)
trending_list = s.trending()
info(trending_list)
log.info(trending_list)
update.message.reply_text(
text=trending_list,
@ -331,13 +319,14 @@ def inline_query(update: Update, context: CallbackContext):
in the symbol and returns matches in alphabetical order.
"""
# info(f"Inline command ran by {update.message.chat.username}")
info(f"Query: {update.inline_query.query}")
log.info(f"Query: {update.inline_query.query}")
ignored_queries = {"$", "$$", " ", ""}
if update.inline_query.query.strip() in ignored_queries:
default_message = """
You can type:\n@SimpleStockBot `[search]`\nin any chat or direct message to search for the stock bots full list of stock and crypto symbols and return the price.
You can type:\n@SimpleStockBot `[search]`
in any chat or direct message to search for the stock bots full list of stock and crypto symbols and return the price.
"""
update.inline_query.answer(
@ -345,9 +334,7 @@ def inline_query(update: Update, context: CallbackContext):
InlineQueryResultArticle(
str(uuid4()),
title="Please enter a query. It can be a ticker or a name of a company.",
input_message_content=InputTextMessageContent(
default_message, parse_mode=telegram.ParseMode.MARKDOWN
),
input_message_content=InputTextMessageContent(default_message, parse_mode=telegram.ParseMode.MARKDOWN),
)
]
)
@ -360,24 +347,20 @@ def inline_query(update: Update, context: CallbackContext):
InlineQueryResultArticle(
str(uuid4()),
title=row["description"],
input_message_content=InputTextMessageContent(
row["price_reply"], parse_mode=telegram.ParseMode.MARKDOWN
),
input_message_content=InputTextMessageContent(row["price_reply"], parse_mode=telegram.ParseMode.MARKDOWN),
)
)
if len(results) == 5:
update.inline_query.answer(results, cache_time=60 * 60)
info("Inline Command was successful")
log.info("Inline Command was successful")
return
update.inline_query.answer(results)
def rand_pick(update: Update, context: CallbackContext):
"""For the gamblers. Returns a random symbol to buy and a sell date"""
info(
f"Someone is gambling! Random_pick command ran by {update.message.chat.username}"
)
log.info(f"Someone is gambling! Random_pick command ran by {update.message.chat.username}")
update.message.reply_text(
text=s.random_pick(),
@ -388,15 +371,13 @@ def rand_pick(update: Update, context: CallbackContext):
def error(update: Update, context: CallbackContext):
"""Log Errors caused by Updates."""
warning('Update "%s" caused error "%s"', update, error)
log.warning('Update "%s" caused error "%s"', update, error)
tb_list = traceback.format_exception(
None, context.error, context.error.__traceback__
)
tb_list = traceback.format_exception(None, context.error, context.error.__traceback__)
tb_string = "".join(tb_list)
err_code = "".join([random.choice(string.ascii_lowercase) for i in range(5)])
warning(f"Logging error: {err_code}")
log.warning(f"Logging error: {err_code}")
if update:
message = (
@ -407,18 +388,14 @@ def error(update: Update, context: CallbackContext):
f"<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n"
f"<pre>{html.escape(tb_string)}</pre>"
)
warning(message)
log.warning(message)
else:
warning(tb_string)
log.warning(tb_string)
# update.message.reply_text(
# text=f"An error has occured. Please inform @MisterBiggs if the error persists. Error Code: `{err_code}`",
# parse_mode=telegram.ParseMode.MARKDOWN,
# )
# Finally, send the message
# update.message.reply_text(text=message, parse_mode=telegram.ParseMode.HTML)
# update.message.reply_text(text="Please inform the bot admin of this issue.")
update.message.reply_text(
text=f"An error has occured. Please inform @MisterBiggs if the error persists. Error Code: `{err_code}`",
parse_mode=telegram.ParseMode.MARKDOWN,
)
def main():
@ -457,9 +434,7 @@ def main():
dp.add_handler(PreCheckoutQueryHandler(precheckout_callback))
# Payment success
dp.add_handler(
MessageHandler(Filters.successful_payment, successful_payment_callback)
)
dp.add_handler(MessageHandler(Filters.successful_payment, successful_payment_callback))
# log all errors
dp.add_error_handler(error)

View File

@ -1,10 +1,8 @@
"""Class with functions for running the bot with IEX Cloud.
"""
import logging
from datetime import datetime
from logging import critical, debug, error, info, warning
from typing import List, Optional, Tuple
import logging as log
from typing import List
import pandas as pd
import requests as r
@ -21,8 +19,7 @@ class cg_Crypto:
vs_currency = "usd" # simple/supported_vs_currencies for list of options
searched_symbols = {}
trending_cache = None
trending_cache: List[str] = []
def __init__(self) -> None:
"""Creates a Symbol Object
@ -42,7 +39,7 @@ class cg_Crypto:
try:
resp.raise_for_status()
except r.exceptions.HTTPError as e:
logging.error(e)
log.error(e)
return {}
# Make sure API returned valid JSON
@ -50,35 +47,27 @@ class cg_Crypto:
resp_json = resp.json()
return resp_json
except r.exceptions.JSONDecodeError as e:
logging.error(e)
log.error(e)
return {}
def symbol_id(self, symbol) -> str:
try:
return self.symbol_list[self.symbol_list["symbol"] == symbol]["id"].values[
0
]
return self.symbol_list[self.symbol_list["symbol"] == symbol]["id"].values[0]
except KeyError:
return ""
def get_symbol_list(
self, return_df=False
) -> Optional[Tuple[pd.DataFrame, datetime]]:
def get_symbol_list(self):
raw_symbols = self.get("/coins/list")
symbols = pd.DataFrame(data=raw_symbols)
# Removes all binance-peg symbols
symbols = symbols[~symbols["id"].str.contains("binance-peg")]
symbols["description"] = (
"$$" + symbols["symbol"].str.upper() + ": " + symbols["name"]
)
symbols["description"] = "$$" + symbols["symbol"].str.upper() + ": " + symbols["name"]
symbols = symbols[["id", "symbol", "name", "description"]]
symbols["type_id"] = "$$" + symbols["symbol"]
self.symbol_list = symbols
if return_df:
return symbols, datetime.now()
def status(self) -> str:
"""Checks CoinGecko /ping endpoint for API issues.
@ -95,8 +84,10 @@ class cg_Crypto:
try:
status.raise_for_status()
return f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} Seconds."
except:
return (
f"CoinGecko API responded that it was OK with a {status.status_code} in {status.elapsed.total_seconds()} Seconds."
)
except r.HTTPError:
return f"CoinGecko API returned an error code {status.status_code} in {status.elapsed.total_seconds()} Seconds."
def price_reply(self, coin: Coin) -> str:
@ -165,9 +156,7 @@ class cg_Crypto:
f"/coins/{symbol.id}/ohlc",
params={"vs_currency": self.vs_currency, "days": 1},
):
df = pd.DataFrame(
resp, columns=["Date", "Open", "High", "Low", "Close"]
).dropna()
df = pd.DataFrame(resp, columns=["Date", "Open", "High", "Low", "Close"]).dropna()
df["Date"] = pd.to_datetime(df["Date"], unit="ms")
df = df.set_index("Date")
return df
@ -193,9 +182,7 @@ class cg_Crypto:
f"/coins/{symbol.id}/ohlc",
params={"vs_currency": self.vs_currency, "days": 30},
):
df = pd.DataFrame(
resp, columns=["Date", "Open", "High", "Low", "Close"]
).dropna()
df = pd.DataFrame(resp, columns=["Date", "Open", "High", "Low", "Close"]).dropna()
df["Date"] = pd.to_datetime(df["Date"], unit="ms")
df = df.set_index("Date")
return df
@ -248,14 +235,14 @@ class cg_Crypto:
"""
if resp := self.get(
f"/simple/price",
"/simple/price",
params={
"ids": coin.id,
"vs_currencies": self.vs_currency,
"include_market_cap": "true",
},
):
debug(resp)
log.debug(resp)
try:
data = resp[coin.id]
@ -267,7 +254,10 @@ class cg_Crypto:
if cap == 0:
return f"The market cap for {coin.name} is not available for unknown reasons."
message = f"The current price of {coin.name} is $**{price:,}** and its market cap is $**{cap:,.2f}** {self.vs_currency.upper()}"
message = (
f"The current price of {coin.name} is $**{price:,}** and"
+ " its market cap is $**{cap:,.2f}** {self.vs_currency.upper()}"
)
else:
message = f"The Coin: {coin.name} was not found or returned and error."
@ -300,7 +290,7 @@ class cg_Crypto:
def spark_reply(self, symbol: Coin) -> str:
change = self.get(
f"/simple/price",
"/simple/price",
params={
"ids": symbol.id,
"vs_currencies": self.vs_currency,
@ -328,7 +318,7 @@ class cg_Crypto:
sym = c["symbol"].upper()
name = c["name"]
change = self.get(
f"/simple/price",
"/simple/price",
params={
"ids": c["id"],
"vs_currencies": self.vs_currency,
@ -341,7 +331,7 @@ class cg_Crypto:
trending.append(msg)
except Exception as e:
logging.warning(e)
log.warning(e)
return self.trending_cache
self.trending_cache = trending
@ -362,7 +352,7 @@ class cg_Crypto:
query = ",".join([c.id for c in coins])
prices = self.get(
f"/simple/price",
"/simple/price",
params={
"ids": query,
"vs_currencies": self.vs_currency,

6
dev-reqs.txt Normal file
View File

@ -0,0 +1,6 @@
-r requirements.txt
black==23.3.0
flake8==5.0.4
Flake8-pyproject==1.2.3
pylama==8.4.1
mypy==1.2.0

8
pyproject.toml Normal file
View File

@ -0,0 +1,8 @@
[tool.black]
line-length = 130
[tool.flake8]
max-line-length = 130
[tool.pycodestyle]
max_line_length = 130

View File

@ -69,11 +69,7 @@ class Router:
coins = set(re.findall(self.CRYPTO_REGEX, text))
for coin in coins:
sym = self.crypto.symbol_list[
self.crypto.symbol_list["symbol"].str.fullmatch(
coin.lower(), case=False
)
]
sym = self.crypto.symbol_list[self.crypto.symbol_list["symbol"].str.fullmatch(coin.lower(), case=False)]
if ~sym.empty:
symbols.append(Coin(sym))
else:
@ -81,9 +77,7 @@ class Router:
if symbols:
info(symbols)
for symbol in symbols:
self.trending_count[symbol.tag] = (
self.trending_count.get(symbol.tag, 0) + trending_weight
)
self.trending_count[symbol.tag] = self.trending_count.get(symbol.tag, 0) + trending_weight
return symbols
@ -128,9 +122,9 @@ class Router:
df = pd.concat([self.stock.symbol_list, self.crypto.symbol_list])
df = df[
df["description"].str.contains(search, regex=False, case=False)
].sort_values(by="type_id", key=lambda x: x.str.len())
df = df[df["description"].str.contains(search, regex=False, case=False)].sort_values(
by="type_id", key=lambda x: x.str.len()
)
symbols = df.head(matches)
symbols["price_reply"] = symbols["type_id"].apply(
@ -335,10 +329,7 @@ class Router:
reply += "🔥Trending on the Stock Bot:\n`"
reply += "" * len("Trending on the Stock Bot:") + "`\n"
sorted_trending = [
s[0]
for s in sorted(self.trending_count.items(), key=lambda item: item[1])
][::-1][0:5]
sorted_trending = [s[0] for s in sorted(self.trending_count.items(), key=lambda item: item[1])][::-1][0:5]
for t in sorted_trending:
reply += self.spark_reply(self.find_symbols(t))[0] + "\n"
@ -365,13 +356,8 @@ class Router:
return "Trending data is not currently available."
def random_pick(self) -> str:
choice = random.choice(
list(self.stock.symbol_list["description"])
+ list(self.crypto.symbol_list["description"])
)
hold = (
datetime.date.today() + datetime.timedelta(random.randint(1, 365))
).strftime("%b %d, %Y")
choice = random.choice(list(self.stock.symbol_list["description"]) + list(self.crypto.symbol_list["description"]))
hold = (datetime.date.today() + datetime.timedelta(random.randint(1, 365))).strftime("%b %d, %Y")
return f"{choice}\nBuy and hold until: {hold}"