serJD's picture
Update utils.py
d476945 verified
import specklepy
from specklepy.api.client import SpeckleClient
from specklepy.api.credentials import get_default_account, get_local_accounts
from specklepy.transports.server import ServerTransport
from specklepy.api import operations
from specklepy.objects.geometry import Polyline, Point, Mesh
import json
import pandas as pd
import numpy as n
from specklepy.api.wrapper import StreamWrapper
import requests
from datetime import datetime
import copy
def get_dataframe(objects_raw, return_original_df=False):
"""
Creates a pandas DataFrame from a list of raw Speckle objects.
Args:
objects_raw (list): List of raw Speckle objects.
return_original_df (bool, optional): If True, the function also returns the original DataFrame before any conversion to numeric. Defaults to False.
Returns:
pd.DataFrame or tuple: If return_original_df is False, returns a DataFrame where all numeric columns have been converted to their respective types,
and non-numeric columns are left unchanged.
If return_original_df is True, returns a tuple where the first item is the converted DataFrame,
and the second item is the original DataFrame before conversion.
This function iterates over the raw Speckle objects, creating a dictionary for each object that excludes the '@Geometry' attribute.
These dictionaries are then used to create a pandas DataFrame.
The function attempts to convert each column to a numeric type if possible, and leaves it unchanged if not.
Non-convertible values in numeric columns are replaced with their original values.
"""
# dataFrame
df_data = []
# Iterate over speckle objects
for obj_raw in objects_raw:
obj = obj_raw.__dict__
df_obj = {k: v for k, v in obj.items() if k != '@Geometry'}
df_data.append(df_obj)
# Create DataFrame and GeoDataFrame
df = pd.DataFrame(df_data)
# Convert columns to float or int if possible, preserving non-convertible values <-
df_copy = df.copy()
for col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
df[col].fillna(df_copy[col], inplace=True)
if return_original_df:
return df, df_copy
else:
return df
def aggregate_data_optimized(df_a, df_b, uuid_col_name, ref_col_name, exclude_columns):
# Ensure the uuid_col_name is included for the merging process
columns_to_use = [col for col in df_a.columns if col not in exclude_columns or col == uuid_col_name]
df_a_filtered = df_a[columns_to_use]
# Perform the merge without adding suffixes, as we intend to overwrite existing columns in df_b
df_merged = pd.merge(df_b, df_a_filtered, how='left', left_on=ref_col_name, right_on=uuid_col_name, suffixes=(None, '_y'))
# Initialize a dictionary for logging
log_dict = {
'info': [],
'warning': [],
'summary': {}
}
# Logging matched and unmatched counts
matched_count = df_merged[ref_col_name].notnull().sum()
unmatched_count = df_b.shape[0] - matched_count
log_dict['summary'] = {
'matched_count': matched_count,
'unmatched_count': unmatched_count,
'total_rows_processed': df_b.shape[0]
}
log_dict['info'].append("Data aggregation completed successfully.")
# Explicitly overwrite columns in df_b with those from df_a, based on the merge
for col in columns_to_use:
if col not in exclude_columns and col != uuid_col_name and f'{col}_y' in df_merged:
df_merged[col] = df_merged.pop(f'{col}_y')
# Drop any remaining '_y' columns that were not explicitly handled
df_merged = df_merged.loc[:, ~df_merged.columns.str.endswith('_y')]
# Additionally, if the uuid_col_name is not part of the original df_b columns and is only used for matching, it should be removed
if uuid_col_name not in df_b.columns:
df_merged.drop(columns=[uuid_col_name], inplace=True, errors='ignore')
return df_merged, log_dict
def updateStreamAnalysisFast(client, new_data, stream_id, branch_name, geometryGroupPath=None, match_by_id="", return_original=False, comm_message=""):
if geometryGroupPath is None:
geometryGroupPath = ["@Speckle", "Geometry"]
branch = client.branch.get(stream_id, branch_name, 2)
latest_commit = branch.commits.items[0]
commit = client.commit.get(stream_id, latest_commit.id)
transport = ServerTransport(client=client, stream_id=stream_id)
res = operations.receive(commit.referencedObject, transport)
objects_raw = res[geometryGroupPath[0]][geometryGroupPath[1]]
# Pre-create a mapping from IDs to objects for faster lookup
id_to_object_map = {obj[match_by_id]: obj for obj in objects_raw} if match_by_id else {i: obj for i, obj in enumerate(objects_raw)}
# Pre-process DataFrame if match_by_id is provided
if match_by_id:
new_data.set_index(match_by_id, inplace=True)
# Update objects in a more efficient way using .items()
for local_id, updates in new_data.iterrows():
target_object = id_to_object_map.get(str(local_id))
if target_object:
for col_name, value in updates.items():
target_object[col_name] = value
# Send updated objects back to Speckle
new_objects_raw_speckle_id = operations.send(base=res, transports=[transport])
commit_id = client.commit.create(stream_id=stream_id, branch_name=branch_name, object_id=new_objects_raw_speckle_id, message=comm_message + "#+SourceCommit: "+latest_commit.id)
print("commit created")
if return_original:
return objects_raw # as back-up
return commit_id
def getSpeckleStream(stream_id,
branch_name,
client,
commit_id=""
):
"""
Retrieves data from a specific branch of a speckle stream.
Args:
stream_id (str): The ID of the speckle stream.
branch_name (str): The name of the branch within the speckle stream.
client (specklepy.api.client.Client, optional): A speckle client. Defaults to a global `client`.
commit_id (str): id of a commit, if nothing is specified, the latest commit will be fetched
Returns:
dict: The speckle stream data received from the specified branch.
This function retrieves the last commit from a specific branch of a speckle stream.
It uses the provided speckle client to get the branch and commit information, and then
retrieves the speckle stream data associated with the last commit.
It prints out the branch details and the creation dates of the last three commits for debugging purposes.
"""
print("updated A")
# set stream and branch
try:
branch = client.branch.get(stream_id, branch_name, 1)
print(branch)
except:
branch = client.branch.get(stream_id, branch_name, 1)
print(branch)
print("branch info:", branch)
#[print(ite.createdAt) for ite in branch.commits.items]
if commit_id == "":
latest_commit = branch.commits.items[0]
choosen_commit_id = latest_commit.id
commit = client.commit.get(stream_id, choosen_commit_id)
print("latest commit ", branch.commits.items[0].createdAt, " was choosen")
elif type(commit_id) == type("s"): # string, commit uuid
choosen_commit_id = commit_id
commit = client.commit.get(stream_id, choosen_commit_id)
print("provided commit ", choosen_commit_id, " was choosen")
elif type(commit_id) == type(1): #int
latest_commit = branch.commits.items[commit_id]
choosen_commit_id = latest_commit.id
commit = client.commit.get(stream_id, choosen_commit_id)
print(commit)
print(commit.referencedObject)
# get transport
transport = ServerTransport(client=client, stream_id=stream_id)
#speckle stream
res = operations.receive(commit.referencedObject, transport)
return res