# --- # jupyter: # jupytext: # text_representation: # extension: .py # format_name: light # format_version: '1.5' # jupytext_version: 1.14.2 # kernelspec: # display_name: Python [conda env:bbytes] * # language: python # name: conda-env-bbytes-py # --- # + import csv import pandas as pd from datetime import datetime, timedelta import numpy as np import datetime as dt import matplotlib.pyplot as plt from pathlib import Path import time import plotly.graph_objects as go import plotly.io as pio from PIL import Image import streamlit as st import plotly.express as px import altair as alt import dateutil.parser from matplotlib.colors import LinearSegmentedColormap # + class color: PURPLE = '\033[95m' CYAN = '\033[96m' DARKCYAN = '\033[36m' BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' def conditional_formatter(value): return "${:.2f}".format(value) if not (abs(value) < 1.00) else "${:.5f}".format(value) @st.cache_data def print_PL(amnt, thresh, extras = "" ): if amnt > 0: return color.BOLD + color.GREEN + str(amnt) + extras + color.END elif amnt < 0: return color.BOLD + color.RED + str(amnt)+ extras + color.END elif np.isnan(amnt): return str(np.nan) else: return str(amnt + extras) @st.cache_data def get_headers(logtype): otimeheader = "" cheader = "" plheader = "" fmat = '%Y-%m-%d %H:%M:%S' if logtype == "ByBit": otimeheader = 'Create Time' cheader = 'Contracts' plheader = 'Closed P&L' fmat = '%Y-%m-%d %H:%M:%S' if logtype == "BitGet": otimeheader = 'Date' cheader = 'Futures' plheader = 'Realized P/L' fmat = '%Y-%m-%d %H:%M:%S' if logtype == "MEXC": otimeheader = 'Trade time' cheader = 'Futures' plheader = 'closing position' fmat = '%Y/%m/%d %H:%M' if logtype == "Binance": otimeheader = 'Date' cheader = 'Symbol' plheader = 'Realized Profit' fmat = '%Y-%m-%d %H:%M:%S' #if logtype == "Kucoin": # otimeheader = 'Time' # cheader = 'Contract' # plheader = '' # fmat = '%Y/%m/%d %H:%M:%S' if logtype == "Kraken": otimeheader = 'time' cheader = 'asset' plheader = 'amount' fmat = '%Y-%m-%d %H:%M:%S.%f' if logtype == "OkX": otimeheader = '\ufeffOrder Time' cheader = '\ufeffInstrument' plheader = '\ufeffPL' fmat = '%Y-%m-%d %H:%M:%S' return otimeheader.lower(), cheader.lower(), plheader.lower(), fmat @st.cache_data def get_coin_info(df_coin, principal_balance,plheader): numtrades = int(len(df_coin)) numwin = int(sum(df_coin[plheader] > 0)) numloss = int(sum(df_coin[plheader] < 0)) winrate = np.round(100*numwin/numtrades,4) grosswin = sum(df_coin[df_coin[plheader] > 0][plheader]) grossloss = sum(df_coin[df_coin[plheader] < 0][plheader]) if grossloss != 0: pfactor = -1*np.round(grosswin/grossloss,2) else: pfactor = np.nan cum_PL = np.round(sum(df_coin[plheader].values),2) cum_PL_perc = np.round(100*cum_PL/principal_balance,2) mean_PL = np.round(sum(df_coin[plheader].values/len(df_coin)),2) mean_PL_perc = np.round(100*mean_PL/principal_balance,2) return numtrades, numwin, numloss, winrate, pfactor, cum_PL, cum_PL_perc, mean_PL, mean_PL_perc @st.cache_data def get_hist_info(df_coin, principal_balance,plheader): numtrades = int(len(df_coin)) numwin = int(sum(df_coin[plheader] > 0)) numloss = int(sum(df_coin[plheader] < 0)) if numtrades != 0: winrate = np.round(100*numwin/numtrades,4) else: winrate = np.nan grosswin = sum(df_coin[df_coin[plheader] > 0][plheader]) grossloss = sum(df_coin[df_coin[plheader] < 0][plheader]) if grossloss != 0: pfactor = -1*np.round(grosswin/grossloss,2) else: pfactor = np.nan return numtrades, numwin, numloss, winrate, pfactor @st.cache_data def get_rolling_stats(df, lev, otimeheader, days): max_roll = (df[otimeheader].max() - df[otimeheader].min()).days if max_roll >= days: rollend = df[otimeheader].max()-timedelta(days=days) rolling_df = df[df[otimeheader] >= rollend] if len(rolling_df) > 0: rolling_perc = rolling_df['Return Per Trade'].dropna().cumprod().values[-1]-1 else: rolling_perc = np.nan else: rolling_perc = np.nan return 100*rolling_perc @st.cache_data def cc_coding(row): return ['background-color: lightgrey'] * len(row) if row['Exit Date'] <= datetime.strptime('2022-12-16 00:00:00','%Y-%m-%d %H:%M:%S').date() else [''] * len(row) def ctt_coding(row): return ['background-color: lightgrey'] * len(row) if row['Exit Date'] <= datetime.strptime('2023-01-02 00:00:00','%Y-%m-%d %H:%M:%S').date() else [''] * len(row) @st.cache_data def my_style(v, props=''): props = 'color:red' if v < 0 else 'color:green' return props def filt_df(df, cheader, symbol_selections): df = df.copy() df = df[df[cheader].isin(symbol_selections)] return df def drop_frac_cents(d): D = np.floor(100*d)/100 return D def load_data(filename, account, exchange, otimeheader, fmat): cols1 = ['id','datetime', 'exchange', 'subaccount', 'pair', 'side', 'action', 'amount', 'price', 'errors'] cols2 = ['id','datetime', 'exchange', 'subaccount', 'pair', 'side', 'action', 'amount', 'price', 'errors', 'P/L', 'P/L %','exit price', 'Lev'] old_df = pd.read_csv("history-old.csv", header = 0, names= cols1) df = pd.read_csv(filename, header = 0, names= cols2) df.loc[df['exit price'] > 0, 'price'] = df.loc[df['exit price'] > 0, 'exit price'] df = pd.concat([old_df, df], ignore_index=True) filtdf = df[(df.exchange == exchange) & (df.subaccount == account)].dropna() if not filtdf.empty: filtdf = filtdf.sort_values('datetime') filtdf = filtdf.iloc[np.where(filtdf.action == 'open')[0][0]:, :] #get first open signal in dataframe tnum = 0 dca = 0 newdf = pd.DataFrame([], columns=['Trade','Signal','Entry Date','Buy Price', 'Sell Price','Exit Date', 'P/L per token', 'P/L %']) for index, row in filtdf.iterrows(): if row.action == 'open': dca += 1 tnum += 1 sig = 'Long' if row.side == 'long' else 'Short' temp = pd.DataFrame({'Trade' :[tnum], 'Signal': [sig], 'Entry Date':[row.datetime],'Buy Price': [row.price], 'Sell Price': [np.nan],'Exit Date': [np.nan], 'P/L per token': [np.nan], 'P/L %': [np.nan], 'DCA': [dca]}) newdf = pd.concat([newdf,temp], ignore_index = True) if row.action == 'close': for j in np.arange(tnum-1, tnum-dca-1,-1): newdf.loc[j,'Sell Price'] = row.price newdf.loc[j,'Exit Date'] = row.datetime dca = 0 newdf['Buy Price'] = pd.to_numeric(newdf['Buy Price']) newdf['Sell Price'] = pd.to_numeric(newdf['Sell Price']) newdf['P/L per token'] = newdf['Sell Price'] - newdf['Buy Price'] newdf['P/L %'] = 100*newdf['P/L per token']/newdf['Buy Price'] newdf = newdf.dropna() else: newdf = pd.DataFrame([], columns=['Trade','Signal','Entry Date','Buy Price', 'Sell Price','Exit Date', 'P/L per token', 'P/L %']) if account == 'Pure Bread (ByBit)': tvdata = pd.read_csv('pb-history-old.csv',header = 0).drop('Unnamed: 0', axis=1) elif account == 'PUMPernickel (ByBit)': tvdata = pd.read_csv('pn-history-old.csv',header = 0).drop('Unnamed: 0', axis=1) else: tvdata = pd.DataFrame([]) if tvdata.empty: df = newdf else: df = pd.concat([tvdata, newdf], ignore_index =True) df = df.sort_values('Entry Date', ascending = True) df.index = range(len(df)) df.Trade = df.index + 1 dateheader = 'Date' theader = 'Time' df[dateheader] = [tradetimes.split(" ")[0] for tradetimes in df[otimeheader].values] df[theader] = [tradetimes.split(" ")[1] for tradetimes in df[otimeheader].values] df[otimeheader] = pd.to_datetime(df[otimeheader]) df['Exit Date'] = pd.to_datetime(df['Exit Date']) df[dateheader] = [dateutil.parser.parse(date).date() for date in df[dateheader]] df[theader] = [dateutil.parser.parse(time).time() for time in df[theader]] return df @st.cache_data def get_account_drawdown(trades, principal_balance): max_draw_perc = 0.00 beg = 0 trades = np.hstack([0.0, trades.dropna().values]) + principal_balance if len(trades) > 2: for ind in range(len(trades)-1): delta = 100*(trades[ind+1:] - trades[ind])/trades[ind] max_draw_perc = min(max_draw_perc, delta.min()) else: max_draw = min(max_draw, trades) max_draw_perc = 100*max_draw/(principal_balance) return max_draw_perc def get_pl(bot_selections, df, dca_amnt, dollar_cap, lev, principal_balance): signal_map = {'Long': 1, 'Short':-1} fees = .075/100 if df.empty: cum_pl = principal_balance effective_return = 0.0 else: if bot_selections == 'ct': dca_map = {1: dca_amnt/100, 2: dca_amnt/100, 3: dca_amnt/100, 4: dca_amnt/100, 5: dca_amnt/100, 6: dca_amnt/100} df['DCA %'] = df['DCA'].map(dca_map) df['Calculated Return %'] = (df['DCA %'])*(df['Signal'].map(signal_map)*(df['Sell Price']-df['Buy Price'])/df['Buy Price']-2*fees) #accounts for fees on open and close of trade df['DCA'] = np.floor(df['DCA'].values) df['Return Per Trade'] = np.nan df['Balance used in Trade'] = np.nan df['New Balance'] = np.nan g = df.groupby('Exit Date').sum(numeric_only=True)['Calculated Return %'].reset_index(name='Return Per Trade') df.loc[df['DCA']==1.0,'Return Per Trade'] = 1+lev*g['Return Per Trade'].values df['Compounded Return'] = df['Return Per Trade'].cumprod() df.loc[df['DCA']==1.0,'New Balance'] = [min(dollar_cap/lev, bal*principal_balance) for bal in df.loc[df['DCA']==1.0,'Compounded Return']] df.loc[df['DCA']==1.0,'Balance used in Trade'] = np.concatenate([[principal_balance], df.loc[df['DCA']==1.0,'New Balance'].values[:-1]]) else: df['Calculated Return %'] = (df['Signal'].map(signal_map)*(df['Sell Price']-df['Buy Price'])/df['Buy Price'])-2*fees #accounts for fees on open and close of trade df['Return Per Trade'] = np.nan g = df.groupby('Exit Date').sum(numeric_only=True)['Calculated Return %'].reset_index(name='Return Per Trade') df['Return Per Trade'] = 1+lev*g['Return Per Trade'].values df['Compounded Return'] = df['Return Per Trade'].cumprod() df['New Balance'] = [min(dollar_cap/lev, bal*principal_balance) for bal in df['Compounded Return']] df['Balance used in Trade'] = np.concatenate([[principal_balance], df['New Balance'].values[:-1]]) df['Net P/L Per Trade'] = drop_frac_cents((df['Return Per Trade']-1)*df['Balance used in Trade']) df['Cumulative P/L'] = df['Net P/L Per Trade'].cumsum() cum_pl = df.loc[df.dropna().index[-1],'Cumulative P/L'] + principal_balance effective_return = 100*((cum_pl - principal_balance)/principal_balance) return df, cum_pl, effective_return def runapp() -> None: no_errors = True otimeheader = 'Exit Date' fmat = '%Y-%m-%d %H:%M:%S' dollar_cap = 1000000000.00 pn_data = load_data('history.csv', 'PUMPernickel (ByBit)', 'Bybit Futures', otimeheader, fmat) pb_data = load_data('history.csv', 'Pure Bread (ByBit)', 'Bybit Futures', otimeheader, fmat) df = pd.concat([pn_data, pb_data]) ct_df = pn_data.copy(deep=True) pb_df = pb_data.copy(deep=True) dateheader = 'Date' theader = 'Time' with st.form("user input", ): st.header("Choose your settings:") if no_errors: with st.container(): col1, col2, col3 = st.columns(3) with col1: try: startdate = st.date_input("Start Date", value=pd.to_datetime(df[otimeheader]).min()) except: st.error("Please select a valid start date.") no_errors = False with col2: try: enddate = st.date_input("End Date", value=datetime.today()) except: st.error("Please select a valid end date.") no_errors = False with col3: principal_balance = st.number_input('Starting Balance', min_value=0.00, value=1000.00, max_value= dollar_cap, step=.01) #st.sidebar.subheader("Customize your Dashboard") if no_errors and (enddate < startdate): st.error("End Date must be later than Start date. Please try again.") no_errors = False if no_errors: dca_amnt = 100/5 with st.container(): col1,col2 = st.columns(2) with col1: st.write("**Pumpernickel (PN)**") ct_lev = st.number_input('PN Leverage', min_value=1, value=1, max_value= 2, step=1) ct_alloc = st.number_input("PN Allocation (%)", min_value=0, value=50, max_value=100, step=1) with col2: st.write("**Pure Bread (PB)**") pb_lev = st.number_input('PB Leverage', min_value=1, value=1, max_value= 3, step=1) pb_alloc = st.number_input("PB Allocation (%)", min_value=0, value=50, max_value=100, step=1) #hack way to get button centered c = st.columns(5) with c[2]: submitted = st.form_submit_button("Get Cookin'!") if submitted and principal_balance *ct_alloc/100 * ct_lev > dollar_cap: ct_lev = np.floor(dollar_cap/(principal_balance*ct_alloc/100)) st.error(f"WARNING:Allocated balance for Pumpernickel exceeds the ${dollar_cap} limit. Using maximum available leverage of {ct_lev}") if submitted and principal_balance *pb_alloc/100 * pb_lev > dollar_cap: pb_lev = np.floor(pb_dollar_cap/(principal_balance*pb_alloc/100)) st.error(f"WARNING:Allocated balance for Pure Bread exceeds the ${dollar_cap} limit. Using maximum available leverage of {pb_lev}") if submitted and (ct_alloc + pb_alloc) > 100: st.error("Invalid allocation amounts. The total allocations must not exceed 100% of available funds. Please check your allocations and try again.") no_errors = False if submitted and (ct_alloc + pb_alloc) < 100: st.error(f'WARNING: The allocation amounts you have selected do not sum to 100%. Only {ct_alloc + pb_alloc}% of the starting balance will be used for trading.') if no_errors == True: df = df[(df[dateheader] >= startdate) & (df[dateheader] <= enddate)] ct_df = ct_df[(ct_df[dateheader] >= startdate) & (ct_df[dateheader] <= enddate)] pb_df = pb_df[(pb_df[dateheader] >= startdate) & (pb_df[dateheader] <= enddate)] if len(df) == 0: st.error("There are no available trades matching your selections. Please try again!") no_errors = False ct_df, ct_cum_pl, ct_effective_return = get_pl('ct', ct_df, dca_amnt, dollar_cap, ct_lev, principal_balance*ct_alloc/100) pb_df, pb_cum_pl, pb_effective_return = get_pl('pb', pb_df, dca_amnt, dollar_cap, pb_lev, principal_balance*pb_alloc/100) combo = pd.concat([ct_df, pb_df], ignore_index = True).sort_values('Entry Date') combo['Cumulative P/L'] = combo['Net P/L Per Trade'].cumsum() max_draw = get_account_drawdown(combo['Cumulative P/L'], principal_balance) cum_pl = ct_cum_pl + pb_cum_pl effective_return = ct_alloc/100*ct_effective_return + pb_alloc/100*pb_effective_return st.header(f"Bread Bundle Results") with st.container(): st.metric( "Total Account Balance", f"${cum_pl:.2f}", f"{100*(cum_pl-principal_balance)/(principal_balance):.2f} %", ) col1, col2 = st.columns(2) with col1: st.metric( "Pumpernickel Balance", f"${ct_cum_pl:.2f}", f"{ct_alloc*(ct_cum_pl-principal_balance*ct_alloc/100)/(principal_balance*ct_alloc/100):.2f} %", ) with col2: st.metric( "Pure Bread Balance", f"${pb_cum_pl:.2f}", f"{pb_alloc*(pb_cum_pl-principal_balance*pb_alloc/100)/(principal_balance*pb_alloc/100):.2f} %", ) ct_df.insert(1, 'Bot', ['PN']*len(ct_df)) if ct_df.empty: grouped_ct = pd.DataFrame([]) else: grouped_ct = ct_df.groupby('Exit Date').agg({'Bot': 'first', 'Signal':'min','Entry Date': 'min','Exit Date': 'max','Buy Price': 'mean', 'Sell Price' : 'max', 'Net P/L Per Trade': 'mean', 'Return Per Trade': 'mean', 'Calculated Return %' : lambda x: np.round(ct_alloc*ct_lev*x.sum(),2), 'DCA': lambda x: int(np.floor(x.max()))}) pb_df.insert(1, 'Bot', ['PB']*len(pb_df)) if pb_df.empty: grouped_pb = pd.DataFrame([]) else: grouped_pb = pb_df.groupby('Exit Date').agg({'Bot': 'first', 'Signal':'min','Entry Date': 'min','Exit Date': 'max','Buy Price': 'mean', 'Sell Price' : 'max', 'Net P/L Per Trade': 'mean', 'Return Per Trade': 'mean', 'Calculated Return %' : lambda x: np.round(pb_alloc*pb_lev*x.sum(),2)}) all_dfs = [grouped_ct, grouped_pb] df = pd.concat([d for d in all_dfs if not d.empty]) df['Entry Date'] = pd.to_datetime(df['Entry Date']) df['Exit Date'] = pd.to_datetime(df['Exit Date']) df.index = range(len(df)) df.sort_values('Exit Date', ascending = True, inplace=True) # Create figure fig = go.Figure() pyLogo = Image.open("logo.png") # Add trace fig.add_trace( go.Scatter(x=df['Exit Date'], y=np.round(df['Net P/L Per Trade'].cumsum().values,2), line_shape='spline', line = {'smoothing': 1.0, 'color' : 'rgba(31, 119, 200,.8)'}, name='Cumulative P/L') ) dfdata = df[(df['Bot'] == 'PN')] eth_buyhold = ((ct_alloc)/100*principal_balance/dfdata['Buy Price'][dfdata.index[0]])*(dfdata['Buy Price']-dfdata['Buy Price'][dfdata.index[0]]) fig.add_trace(go.Scatter(x=dfdata['Exit Date'], y=np.round(eth_buyhold.values,2), line_shape='spline', line = {'smoothing': 1.0, 'color' :'red'}, name = 'ETH Buy & Hold Return') ) dfdata = df[df['Bot'] == 'PB'] doge_buyhold = ((pb_alloc)/100*principal_balance/dfdata['Buy Price'][dfdata.index[0]])*(dfdata['Buy Price']-dfdata['Buy Price'][dfdata.index[0]]) fig.add_trace(go.Scatter(x=dfdata['Exit Date'], y=np.round(doge_buyhold.values,2), line_shape='spline', line = {'smoothing': 1.0, 'color' :'green'}, name = 'DOGE Buy & Hold Return') ) img_width = 2001 img_height = 622 fig.add_layout_image( dict( source=pyLogo, xref="paper", yref="paper", x = 0.1, y = 1, xanchor ="left", yanchor = "top", sizex= 1, sizey= 1, opacity=0.2, layer = "below") ) #style layout fig.update_layout( height = 550, xaxis=dict( title="Exit Date", tickmode='array', showgrid=False ), yaxis=dict( title="Cumulative P/L", showgrid=False ), legend=dict( x=.05, y=0.95, traceorder="normal" ), plot_bgcolor = 'rgba(10, 10, 10, 1)' ) st.plotly_chart(fig, theme=None, use_container_width=True, height=550) df['Per Trade Return Rate'] = df['Return Per Trade']-1 totals = pd.DataFrame([], columns = ['# of Trades', 'Wins', 'Losses', 'Win Rate', 'Profit Factor']) data = get_hist_info(df, principal_balance,'Per Trade Return Rate') totals.loc[len(totals)] = list(i for i in data) totals['Cum. P/L'] = cum_pl-principal_balance totals['Cum. P/L (%)'] = 100*(cum_pl-principal_balance)/principal_balance if df.empty: st.error("Oops! None of the data provided matches your selection(s). Please try again.") else: with st.container(): for row in totals.itertuples(): c1, c2, c3, c4 = st.columns(4) col1, col2, col3, col4 = st.columns(4) with col1: st.metric( "Total Trades", f"{row._1:.0f}", ) with c1: st.metric( "Cumulative P/L", f"${row._6:.2f}", f"{row._7:.2f} %", ) with col2: st.metric( "Wins", f"{row.Wins:.0f}", ) with c2: st.metric( "Profit Factor", f"{row._5:.2f}", ) with col3: st.metric( "Losses", f"{row.Losses:.0f}", ) with c3: st.metric( "Rolling 7 Days", "",#f"{(1+get_rolling_stats(df,lev, otimeheader, 7)/100)*principal_balance:.2f}", f"{(ct_alloc*get_rolling_stats(df[(df['Bot'] == 'PN')],ct_lev, otimeheader, 7) + pb_alloc*get_rolling_stats(df[(df['Bot'] == 'PB')],pb_lev, otimeheader, 7))/100:.2f}%", ) st.metric( "Rolling 90 Days", "",#f"{(1+get_rolling_stats(df,lev, otimeheader, 30)/100)*principal_balance:.2f}", f"{(ct_alloc*get_rolling_stats(df[(df['Bot'] == 'PN')],ct_lev, otimeheader, 90) + pb_alloc*get_rolling_stats(df[(df['Bot'] == 'PB')],pb_lev, otimeheader, 90))/100:.2f}%", ) with col4: st.metric( "Win Rate", f"{row._4:.1f}%", ) with c4: st.metric( "Rolling 30 Days", "",#f"{(1+get_rolling_stats(df,lev, otimeheader, 90)/100)*principal_balance:.2f}", f"{(ct_alloc*get_rolling_stats(df[(df['Bot'] == 'PN')],ct_lev, otimeheader, 30) + pb_alloc*get_rolling_stats(df[(df['Bot'] == 'PB')],pb_lev, otimeheader, 30))/100:.2f}%", ) st.metric( "Max Drawdown", "",#f"{np.round(100*max_draw/principal_balance,2)/100*principal_balance:.2f}", f"{np.round(max_draw,2)}%", ) df.rename(columns={'DCA' : '# of DCAs', 'Buy Price':'Avg. Buy Price', 'Sell Price': 'Avg. Sell Price', 'Net P/L Per Trade':'Net P/L', 'Calculated Return %':'P/L %'}, inplace=True) if '# of DCAs' in df.columns: df['# of DCAs'] = df['# of DCAs'].fillna(1.0) df['# of DCAs'] = [int(i) for i in df['# of DCAs'].values] else: df['# of DCAs'] = np.ones(len(df)) df.sort_values('Entry Date', ascending = True, inplace=True) df.insert(0,'Trade',np.arange(1, len(df)+1)) df.index = range(len(df)) df = df.drop('Per Trade Return Rate', axis=1) df = df.drop('Return Per Trade', axis=1) st.subheader("Trade Logs") st.dataframe(df.style.format({'Entry Date':'{:%m-%d-%Y %H:%M:%S}','Exit Date':'{:%m-%d-%Y %H:%M:%S}','Avg. Buy Price': conditional_formatter, 'Avg. Sell Price': conditional_formatter, 'Net P/L':'${:.2f}', 'P/L %':'{:.2f}%'})\ .applymap(my_style,subset=['Net P/L'])\ .applymap(my_style,subset=['P/L %']), use_container_width=True) if __name__ == "__main__": st.set_page_config( "Trading Bot Dashboard", layout = 'wide' ) runapp() # -