Spaces:
Running
Running
| custom_css = """ | |
| <style> | |
| .container { | |
| max-width: 100% !important; | |
| padding-left: 0 !important; | |
| padding-right: 0 !important; | |
| } | |
| .header { | |
| padding: 30px; | |
| margin-bottom: 30px; | |
| text-align: center; | |
| font-family: 'Helvetica Neue', Arial, sans-serif; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
| } | |
| .header h1 { | |
| font-size: 36px; | |
| margin-bottom: 15px; | |
| font-weight: bold; | |
| color: #333333; /* Explicitly set heading color */ | |
| } | |
| .header h2 { | |
| font-size: 24px; | |
| margin-bottom: 10px; | |
| color: #333333; /* Explicitly set subheading color */ | |
| } | |
| .header p { | |
| font-size: 18px; | |
| margin: 5px 0; | |
| color: #666666; | |
| } | |
| .blue-text { | |
| color: #4a90e2; | |
| } | |
| /* Custom styles for slider container */ | |
| .slider-container { | |
| background-color: white !important; | |
| padding-top: 0.9em; | |
| padding-bottom: 0.9em; | |
| } | |
| /* Add gap before examples */ | |
| .examples-holder { | |
| margin-top: 2em; | |
| } | |
| /* Set fixed size for example videos */ | |
| .gradio-container .gradio-examples .gr-sample { | |
| width: 240px !important; | |
| height: 135px !important; | |
| object-fit: cover; | |
| display: inline-block; | |
| margin-right: 10px; | |
| } | |
| .gradio-container .gradio-examples { | |
| display: flex; | |
| flex-wrap: wrap; | |
| gap: 10px; | |
| } | |
| /* Ensure the parent container does not stretch */ | |
| .gradio-container .gradio-examples { | |
| max-width: 100%; | |
| overflow: hidden; | |
| } | |
| /* Additional styles to ensure proper sizing in Safari */ | |
| .gradio-container .gradio-examples .gr-sample img { | |
| width: 240px !important; | |
| height: 135px !important; | |
| object-fit: cover; | |
| } | |
| </style> | |
| """ | |
| custom_html = custom_css + """ | |
| <div class="header"> | |
| <h1><span class="blue-text">The Sound of Water</span>: Inferring Physical Properties from Pouring Liquids</h1> | |
| <p><a href='https://bpiyush.github.io/pouring-water-website/'>Project Page</a> | | |
| <a href='https://github.com/bpiyush/SoundOfWater'>Github</a> | | |
| <a href='#'>Paper</a> | | |
| <a href='https://huggingface.co/datasets/bpiyush/sound-of-water'>Data</a> | |
| <a href='https://huggingface.co/bpiyush/sound-of-water-models'>Models</a></p> | |
| </div> | |
| """ | |
| tips = """ | |
| <div> | |
| <br><br> | |
| Please give us a π on <a href='https://github.com/bpiyush/SoundOfWater'>Github</a> if you like our work! | |
| Tips to get better results: | |
| <ul> | |
| <li>Make sure there is not too much noise such that the pouring is audible.</li> | |
| <li>The video is not used during the inference.</li> | |
| </ul> | |
| </div> | |
| """ | |
| import os | |
| import sys | |
| import gradio as gr | |
| import torch | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| plt.rcParams["font.family"] = "serif" | |
| import decord | |
| import PIL, PIL.Image | |
| import librosa | |
| from IPython.display import Markdown, display | |
| import pandas as pd | |
| import shared.utils as su | |
| import sound_of_water.audio_pitch.model as audio_models | |
| import sound_of_water.data.audio_loader as audio_loader | |
| import sound_of_water.data.audio_transforms as at | |
| import sound_of_water.data.csv_loader as csv_loader | |
| def read_html_file(file): | |
| with open(file) as f: | |
| return f.read() | |
| def define_axes(figsize=(13, 4), width_ratios=[0.22, 0.78]): | |
| fig, axes = plt.subplots( | |
| 1, 2, figsize=figsize, width_ratios=width_ratios, | |
| layout="constrained", | |
| ) | |
| return fig, axes | |
| def show_frame_and_spectrogram(frame, spectrogram, visualise_args, axes=None): | |
| """Shows the frame and spectrogram side by side.""" | |
| if axes is None: | |
| fig, axes = define_axes() | |
| else: | |
| assert len(axes) == 2 | |
| ax = axes[0] | |
| ax.imshow(frame, aspect="auto") | |
| ax.set_title("Example frame") | |
| ax.set_xticks([]) | |
| ax.set_yticks([]) | |
| ax = axes[1] | |
| audio_loader.show_logmelspectrogram( | |
| S=spectrogram, | |
| ax=ax, | |
| show=False, | |
| sr=visualise_args["sr"], | |
| n_fft=visualise_args["n_fft"], | |
| hop_length=visualise_args["hop_length"], | |
| ) | |
| def scatter_pitch(ax, t, f, s=60, marker="o", color="limegreen", label="Pitch"): | |
| """Scatter plot of pitch.""" | |
| ax.scatter(t, f, color=color, label=label, s=s, marker=marker) | |
| ax.set_xlabel("Time (s)") | |
| ax.set_ylabel("Frequency (Hz)") | |
| ax.legend(loc="upper left") | |
| def load_frame(video_path, video_backend="decord"): | |
| if video_backend == "decord": | |
| vr = decord.VideoReader(video_path, num_threads=1) | |
| frame = PIL.Image.fromarray(vr[0].asnumpy()) | |
| elif video_backend == "torchvision": | |
| import torchvision.io as tio | |
| video, _, _ = tio.read_video(video_path, pts_unit="sec") | |
| frame = video[0] | |
| frame = PIL.Image.fromarray(frame.numpy()) | |
| else: | |
| raise ValueError(f"Unknown video backend: {video_backend}") | |
| frame = audio_loader.crop_or_pad_to_size(frame, size=(270, 480)) | |
| return frame | |
| def load_spectrogram(video_path): | |
| y = audio_loader.load_audio_clips( | |
| audio_path=video_path, | |
| clips=None, | |
| load_entire=True, | |
| cut_to_clip_len=False, | |
| **aload_args, | |
| )[0] | |
| S = audio_loader.librosa_harmonic_spectrogram_db( | |
| y, | |
| sr=visualise_args["sr"], | |
| n_fft=visualise_args["n_fft"], | |
| hop_length=visualise_args["hop_length"], | |
| n_mels=visualise_args['n_mels'], | |
| ) | |
| return S | |
| # Load audio | |
| visualise_args = { | |
| "sr": 16000, | |
| "n_fft": 400, | |
| "hop_length": 320, | |
| "n_mels": 64, | |
| "margin": 16., | |
| "C": 340 * 100., | |
| "audio_output_fps": 49., | |
| "w_max": 100., | |
| "n_bins": 64, | |
| } | |
| aload_args = { | |
| "sr": 16000, | |
| "clip_len": None, | |
| "backend": "decord", | |
| } | |
| cfg_backbone = { | |
| "name": "Wav2Vec2WithTimeEncoding", | |
| "args": dict(), | |
| } | |
| backbone = getattr(audio_models, cfg_backbone["name"])( | |
| **cfg_backbone["args"], | |
| ) | |
| cfg_model = { | |
| "name": "WavelengthWithTime", | |
| "args": { | |
| "axial": True, | |
| "axial_bins": 64, | |
| "radial": True, | |
| "radial_bins": 64, | |
| "freeze_backbone": True, | |
| "train_backbone_modules": [6, 7, 8, 9, 10, 11], | |
| "act": "softmax", | |
| "criterion": "kl_div", | |
| } | |
| } | |
| def load_model(): | |
| model = getattr(audio_models, cfg_model["name"])( | |
| backbone=backbone, **cfg_model["args"], | |
| ) | |
| su.misc.num_params(model) | |
| # Load the model weights from trained checkpoint | |
| # NOTE: Be sure to set the correct path to the checkpoint | |
| su.log.print_update("[:::] Loading checkpoint ", color="cyan", fillchar=".", pos="left") | |
| # ckpt_dir = "/work/piyush/pretrained_checkpoints/SoundOfWater" | |
| ckpt_dir = "./checkpoints" | |
| ckpt_path = os.path.join( | |
| ckpt_dir, | |
| "dsr9mf13_ep100_step12423_real_finetuned_with_cosupervision.pth", | |
| ) | |
| assert os.path.exists(ckpt_path), \ | |
| f"Checkpoint not found at {ckpt_path}." | |
| print("Loading checkpoint from: ", ckpt_path) | |
| ckpt = torch.load(ckpt_path, map_location="cpu") | |
| msg = model.load_state_dict(ckpt) | |
| print(msg) | |
| return model | |
| # Define audio transforms | |
| cfg_transform = { | |
| "audio": { | |
| "wave": [ | |
| { | |
| "name": "AddNoise", | |
| "args": { | |
| "noise_level": 0.001 | |
| }, | |
| "augmentation": True, | |
| }, | |
| { | |
| "name": "ChangeVolume", | |
| "args": { | |
| "volume_factor": [0.8, 1.2] | |
| }, | |
| "augmentation": True, | |
| }, | |
| { | |
| "name": "Wav2Vec2WaveformProcessor", | |
| "args": { | |
| "model_name": "facebook/wav2vec2-base-960h", | |
| "sr": 16000 | |
| } | |
| } | |
| ], | |
| "spec": None, | |
| } | |
| } | |
| audio_transform = at.define_audio_transforms( | |
| cfg_transform, augment=False, | |
| ) | |
| # Define audio pipeline arguments | |
| apipe_args = { | |
| "spec_args": None, | |
| "stack": True, | |
| } | |
| def load_audio_tensor(video_path): | |
| # Load and transform input audio | |
| audio = audio_loader.load_and_process_audio( | |
| audio_path=video_path, | |
| clips=None, | |
| load_entire=True, | |
| cut_to_clip_len=False, | |
| audio_transform=audio_transform, | |
| aload_args=aload_args, | |
| apipe_args=apipe_args, | |
| )[0] | |
| return audio | |
| def get_model_output(audio, model): | |
| with torch.no_grad(): | |
| NS = audio.shape[-1] | |
| duration = NS / 16000 | |
| t = torch.tensor([[0, duration]]).unsqueeze(0) | |
| x = audio.unsqueeze(0) | |
| z_audio = model.backbone(x, t)[0][0].cpu() | |
| y_audio = model(x, t)["axial"][0][0].cpu() | |
| return z_audio, y_audio | |
| def show_output(frame, S, y_audio, z_audio): | |
| # duration = S.shape[-1] / visualise_args["sr"] | |
| # print(S.shape, y_audio.shape, z_audio.shape) | |
| duration = librosa.get_duration( | |
| S=S, | |
| sr=visualise_args["sr"], | |
| n_fft=visualise_args["n_fft"], | |
| hop_length=visualise_args["hop_length"], | |
| ) | |
| timestamps = np.linspace(0., duration, 25) | |
| # Get timestamps at evaluation frames | |
| n_frames = len(y_audio) | |
| timestamps_eval = librosa.frames_to_time( | |
| np.arange(n_frames), | |
| sr=visualise_args['sr'], | |
| n_fft=visualise_args['n_fft'], | |
| hop_length=visualise_args['hop_length'], | |
| ) | |
| # Get predicted frequencies at these times | |
| wavelengths = y_audio @ torch.linspace( | |
| 0, visualise_args['w_max'], visualise_args['n_bins'], | |
| ) | |
| f_pred = visualise_args['C'] / wavelengths | |
| # Pick only those timestamps where we define the true pitch | |
| indices = su.misc.find_nearest_indices(timestamps_eval, timestamps) | |
| f_pred = f_pred[indices] | |
| # print(timestamps, f_pred) | |
| # Show the true/pref pitch overlaid on the spectrogram | |
| fig, axes = define_axes() | |
| show_frame_and_spectrogram(frame, S, visualise_args, axes=axes) | |
| scatter_pitch(axes[1], timestamps, f_pred, color="white", label="Estimated pitch", marker="o", s=70) | |
| axes[1].set_title("True and predicted pitch overlaid on the spectrogram") | |
| # plt.show() | |
| # Convert to PIL Image and return the Image | |
| from PIL import Image | |
| # Draw the figure to a canvas | |
| canvas = fig.canvas | |
| canvas.draw() | |
| # Get the RGBA buffer from the figure | |
| w, h = fig.canvas.get_width_height() | |
| buf = canvas.tostring_rgb() | |
| # Create a PIL image from the RGB data | |
| image = Image.frombytes("RGB", (w, h), buf) | |
| # Get physical properties | |
| l_pred = su.physics.estimate_length_of_air_column(wavelengths) | |
| l_pred_mean = l_pred.mean().item() | |
| l_pred_mean = np.round(l_pred_mean, 2) | |
| H_pred = su.physics.estimate_cylinder_height(wavelengths) | |
| H_pred = np.round(H_pred, 2) | |
| R_pred = su.physics.estimate_cylinder_radius(wavelengths) | |
| R_pred = np.round(R_pred, 2) | |
| # print(f"Estimated length: {l_pred_mean} cm, Estimated height: {H_pred} cm, Estimated radius: {R_pred} cm") | |
| df_show = pd.DataFrame({ | |
| "Physical Property": ["Container height", "Container radius", "Length of air column (mean)"], | |
| "Estimated Value (in cms)": [H_pred, R_pred, l_pred_mean], | |
| }) | |
| tsne_image = su.visualize.show_temporal_tsne( | |
| z_audio.detach().numpy(), timestamps_eval, show=False, | |
| figsize=(6, 5), title="Temporal t-SNE of latent features", | |
| return_as_pil = True, | |
| ) | |
| return image, df_show, tsne_image | |