import time from datetime import datetime import streamlit as st from st_aggrid import AgGrid, ColumnsAutoSizeMode from apps.dump_analysis import dump_analysis_space from queries.process_all_db import ( process_all_tech_db, process_all_tech_db_with_stats, process_atoll_db, process_nice_db, ) from queries.process_gsm import process_gsm_data_to_excel, process_gsm_data_to_kml from queries.process_invunit import process_invunit_data_to_excel from queries.process_lte import process_lte_data_to_excel, process_lte_data_to_kml # from queries.process_mal import process_mal_data_to_excel from queries.process_mrbts import process_mrbts_data_to_excel from queries.process_neighbors import process_neighbors_data_to_excel # from queries.process_trx import process_trx_with_bts_name_data_to_excel from queries.process_wcdma import process_wcdma_data_to_excel, process_wcdma_data_to_kml from utils.check_sheet_exist import DumpType, Technology, execute_checks_sheets_exist from utils.utils_vars import GsmAnalysisData, UtilsVars, WcdmaAnalysisData st.title("Database processing") uploaded_file = st.file_uploader("Upload updated dump file", type="xlsb") def process_database(process_func, database_type): if uploaded_file is not None: start_time = time.time() process_func(uploaded_file) execution_time = time.time() - start_time st.write( f"{database_type} database is generated. Execution time: {execution_time:.2f} seconds" ) download_button(database_type) @st.fragment() def download_button(database_type): if database_type == "2G": data = UtilsVars.final_gsm_database file_name = f"2G database_{datetime.now()}.xlsx" elif database_type == "3G": data = UtilsVars.final_wcdma_database file_name = f"3G database_{datetime.now()}.xlsx" elif database_type == "LTE": data = UtilsVars.final_lte_database file_name = f"LTE database_{datetime.now()}.xlsx" elif database_type == "All": data = UtilsVars.final_all_database file_name = f"All databases_{datetime.now()}.xlsx" elif database_type == "NEI": data = UtilsVars.neighbors_database file_name = f"Neighbors databases_{datetime.now()}.xlsx" elif database_type == "MRBTS": data = UtilsVars.final_mrbts_database file_name = f"MRBTS database_{datetime.now()}.xlsx" elif database_type == "INVUNIT": data = UtilsVars.final_invunit_database file_name = f"INVUNIT database_{datetime.now()}.xlsx" elif database_type == "Custom": data = UtilsVars.final_atoll_database file_name = f"Custom database_{datetime.now()}.xlsx" elif database_type == "Nice": data = UtilsVars.final_nice_database file_name = f"Nice database_{datetime.now()}.xlsx" st.download_button( type="primary", label=f"Download {database_type} Database File", data=data, file_name=file_name, mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ) def process_kml_database(process_func, database_type): if uploaded_file is not None: start_time = time.time() process_func(uploaded_file) execution_time = time.time() - start_time st.write( f"{database_type} KML is generated. Execution time: {execution_time:.2f} seconds" ) kml_download_button(database_type) @st.fragment() def kml_download_button(database_type): if database_type == "2G": data = UtilsVars.gsm_kml_file file_name = f"2G kml_{datetime.now()}.kml" elif database_type == "3G": data = UtilsVars.wcdma_kml_file file_name = f"3G kml_{datetime.now()}.kml" elif database_type == "LTE": data = UtilsVars.lte_kml_file file_name = f"LTE kml_{datetime.now()}.kml" st.download_button( type="primary", label=f"Download {database_type} KML File", data=data, file_name=file_name, mime="application/vnd.google-earth.kml+xml", ) def execute_process_all_tech_db(uploaded_file): if uploaded_file is not None: start_time = time.time() process_all_tech_db(uploaded_file) execution_time = time.time() - start_time st.write( f"All databases are generated. Execution time: {execution_time:.2f} seconds" ) download_button("All") def execute_process_custom_db(uploaded_file): if uploaded_file is not None: start_time = time.time() process_atoll_db(uploaded_file) execution_time = time.time() - start_time st.write( f"ATL databases are generated. Execution time: {execution_time:.2f} seconds" ) download_button("Custom") def execute_process_nice_db(uploaded_file): if uploaded_file is not None: start_time = time.time() process_nice_db(uploaded_file) execution_time = time.time() - start_time st.write( f"Nice databases are generated. Execution time: {execution_time:.2f} seconds" ) download_button("Nice") # def execute_process_all_tech_db_with_stats(uploaded_file: str, region_list: list): def execute_process_all_tech_db_with_stats(uploaded_file: str): if uploaded_file is not None: start_time = time.time() process_all_tech_db_with_stats( uploaded_file, # region_list ) execution_time = time.time() - start_time st.write( f"All databases are generated. Execution time: {execution_time:.2f} seconds" ) download_button("All") col1, col2, col3, col4, col5, col6 = st.columns(6) col7, col8, col9, col10, col11, col12 = st.columns(6) if uploaded_file is not None: # UtilsVars.file_path = uploaded_file try: execute_checks_sheets_exist(uploaded_file) if ( Technology.gsm == False and Technology.wcdma == False and Technology.lte == False and Technology.neighbors == False and Technology.trx == False and Technology.mrbts == False and Technology.invunit == False ): st.error( """ Uploaded file does not contain required sheets for any technology. "gsm": ["BTS", "BCF", "TRX","MAL"], "wcdma": ["WCEL", "WBTS", "WNCEL"], "lte": ["LNBTS", "LNCEL", "LNCEL_FDD", "LNCEL_TDD"], "neighbors": ["ADCE", "ADJS", "ADJI", "ADJG", "ADJW", "BTS", "WCEL"], "trx": ["TRX", "BTS"], "mrbts": ["MRBTS"], "invunit": ["INVUNIT"] """ ) if ( Technology.gsm == True and Technology.wcdma == True and Technology.lte == True and Technology.trx == True and Technology.mrbts == True and Technology.invunit == True ): DumpType.full_dump = True with col1: st.button( "Generate All DBs", on_click=lambda: execute_process_all_tech_db(uploaded_file), ) if Technology.gsm == True: with col2: st.button( "Generate 2G DB", on_click=lambda: process_database(process_gsm_data_to_excel, "2G"), ) if Technology.wcdma == True: with col3: st.button( "Generate 3G DB", on_click=lambda: process_database( process_wcdma_data_to_excel, "3G" ), ) if Technology.lte == True: with col4: st.button( "Generate LTE DB", on_click=lambda: process_database(process_lte_data_to_excel, "LTE"), ) if Technology.mrbts == True: with col5: st.button( "Generate MRBTS", on_click=lambda: process_database( process_mrbts_data_to_excel, "MRBTS" ), ) if Technology.invunit == True: with col6: st.button( "Generate INVUNIT", on_click=lambda: process_database( process_invunit_data_to_excel, "INVUNIT" ), ) if Technology.neighbors == True: with col7: st.button( "Generate NEI DB", on_click=lambda: process_database( process_neighbors_data_to_excel, "NEI" ), ) if Technology.gsm == True: with col8: st.button( "Generate 2G KML", on_click=lambda: process_kml_database( process_gsm_data_to_kml, "2G" ), ) if Technology.wcdma == True: with col9: st.button( "Generate 3G KML", on_click=lambda: process_kml_database( process_wcdma_data_to_kml, "3G" ), ) if Technology.lte == True: with col10: st.button( "Generate LTE KML", on_click=lambda: process_kml_database( process_lte_data_to_kml, "LTE" ), ) if DumpType.full_dump == True: with col11: st.button( "Generate ATL DB", on_click=lambda: execute_process_custom_db(uploaded_file), ) if DumpType.full_dump == True: with col12: st.button( "Generate Nice DB", on_click=lambda: execute_process_nice_db(uploaded_file), ) except Exception as e: st.error(f"Error: {e}") ######################## ANALYTICS AND STATS #################################### @st.fragment def table_data(): if UtilsVars.all_db_dfs_names != []: selected_table = st.selectbox("Choose Data", UtilsVars.all_db_dfs_names) table_df = UtilsVars.all_db_dfs[ UtilsVars.all_db_dfs_names.index(selected_table) ] st.write(f"### {selected_table} Data") AgGrid( table_df, fit_columns_on_grid_load=True, theme="streamlit", enable_enterprise_modules=True, filter=True, # columns_auto_size_mode=ColumnsAutoSizeMode.FIT_CONTENTS, ) if uploaded_file is not None: if DumpType.full_dump == True: # regions = st.multiselect( # "Select the region(s) you want to analyze", # ["Test BTS", "SKS", "SEG", "TBC", "KDL", "KKO", "GAO", "MPT", "KYS"], # default=[ # "Test BTS", # "SKS", # "SEG", # "TBC", # "KDL", # "KKO", # "GAO", # "MPT", # "KYS", # ], # ) if st.button("Generate All DBs and Show Stats"): # if regions: execute_process_all_tech_db_with_stats( uploaded_file, # regions ) tab1, tab2 = st.tabs(["🗃 Data", "📈 Chart"]) with tab1: table_data() with tab2: dump_analysis_space()