import json import os import re # read transcription_cuint.json from session_data/session_id # read text_token item, repetition is start from and end at , the # extract repetition information and add them to current json file, information include # content: the content between and # words: words index of each word in content, the index of first word in the segment is 0, index calculation ignore special token # mark_location: the index of last words of content # format example: """ "repetitions": [ { "content": "now the now the", "words": [ 1, 2, 3, 4 ], "mark_location": 4 } ], """ def annotate_repetition_for_mazewhisper(session_id): file_path = f"session_data/{session_id}/transcription_cunit.json" if not os.path.exists(file_path): print(f"File not found: {file_path}") return with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) for segment in data["segments"]: text_token = segment.get("text_token", "") tokens = text_token.split() repetitions_list = [] inside = False rep_words = [] rep_word_indices = [] word_count = 0 # segment-level word counter for tok in tokens: if tok == "": inside = True rep_words, rep_word_indices = [], [] continue elif tok == "": inside = False if rep_words: clean_rep_words = [w for w in rep_words if not (w.startswith("<") and w.endswith(">"))] repetitions_list.append( { "content": " ".join(clean_rep_words), "words": rep_word_indices.copy(), "mark_location": rep_word_indices[-1], } ) continue if tok.startswith("<") and tok.endswith(">"): if inside: pass else: if inside: rep_words.append(tok) rep_word_indices.append(word_count) word_count += 1 if repetitions_list: segment["repetitions"] = repetitions_list with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) # read transcription_cuint.json from session_data/session_id # read text_token item, revision is start from and end at , the # extract revision information and add them to current json file, information include # content: the content between and # words: words index of each word in content, the index of first word in the segment is 0, index calculation ignore special token # mark_location: the index of last words of content # format example: """ "repetitions": [ { "content": "now the now the", "words": [ 1, 2, 3, 4 ], "mark_location": 4 } ], """ def annotate_revision_for_mazewhisper(session_id): file_path = f"session_data/{session_id}/transcription_cunit.json" if not os.path.exists(file_path): print(f"File not found: {file_path}") return with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) for segment in data["segments"]: text_token = segment.get("text_token", "") tokens = text_token.split() revisions_list = [] inside = False rev_words = [] rev_word_indices = [] word_count = 0 for tok in tokens: if tok == "": inside = True rev_words, rev_word_indices = [], [] continue elif tok == "": inside = False if rev_words: clean_rev_words = [w for w in rev_words if not (w.startswith("<") and w.endswith(">"))] revisions_list.append( { "content": " ".join(clean_rev_words), "words": rev_word_indices.copy(), "mark_location": rev_word_indices[-1], } ) continue if tok.startswith("<") and tok.endswith(">"): continue else: if inside: rev_words.append(tok) rev_word_indices.append(word_count) word_count += 1 if revisions_list: segment["revisions"] = revisions_list with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) # read transcription_cuint.json from session_data/session_id # read text_token item, pause is represent as # extract pause information and add them to current json file, information include # start: the start of pause is the end timestamp of previous non-special token. # if no previous non-special token, start time is the end time of previous segment, then set the start time of current segment as the the end time of previous segment. # end: the end of pause is the start timestamp of next non-special token. # if no next non-special token, then move the and all possible special token after to the start of next segment. # duration: the pause duration calculated by start and end. # format example: """ "pauses": [ { "start": 364.08, "end": 369.1, "duration": 5.02 }, { "start": 369.18, "end": 369.56, "duration": 0.38 } ], """ def annotate_pause_for_mazewhisper(session_id): file_path = f"session_data/{session_id}/transcription_cunit.json" if not os.path.exists(file_path): print(f"File not found: {file_path}") return with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) for i, segment in enumerate(data['segments']): text_token = segment.get('text_token', '') words = segment.get('words', []) pauses_list = [] if '' in text_token: tokens = text_token.split() for j, token in enumerate(tokens): if token == '': # Find start time: end timestamp of previous non-special token start_time = None # Look backwards in current segment for previous word word_idx = 0 for k in range(j): if not (tokens[k].startswith('<') and tokens[k].endswith('>')): word_idx += 1 if word_idx > 0 and word_idx <= len(words): start_time = words[word_idx - 1].get('end') elif i > 0: # Use end time of previous segment start_time = data['segments'][i - 1].get('end') # Update current segment start time segment['start'] = start_time # Find end time: start timestamp of next non-special token end_time = None # Look forwards in current segment for next word next_word_idx = 0 for k in range(j + 1, len(tokens)): if not (tokens[k].startswith('<') and tokens[k].endswith('>')): next_word_idx = word_idx + 1 break if tokens[k].startswith('<') and tokens[k].endswith('>'): continue if next_word_idx > 0 and next_word_idx <= len(words): end_time = words[next_word_idx - 1].get('start') elif i < len(data['segments']) - 1: # Move pause to next segment # This is complex - for now, use next segment start end_time = data['segments'][i + 1].get('start') if i + 1 < len(data['segments']) else segment.get('end') # If we have both start and end times, calculate duration if start_time is not None and end_time is not None: duration = round(end_time - start_time, 2) pause_info = { "start": start_time, "end": end_time, "duration": duration } pauses_list.append(pause_info) # Add pauses to this segment if any found if pauses_list: segment['pauses'] = pauses_list # Save the updated data back to file with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) return # read transcription_cuint.json from session_data/session_id # read text_token item, filler words is represent as # extract filler word information and add them to current json file, information include # start: set to empty for now # end: set to empty for now # duration: set to empty for now # format example: """ "fillerwords": [ { "start": , "end": , "content": "", "duration": } ], """ # then, insert "um" as the previous token of in text_token item. # then, insert "um" to text item and word list (timestamp set to empty for now), the location is corresponding to the location in text_token # (this is a prelimary implementation, next steps should use maze whisper that transcript filler words content and align them directly) def annotate_fillerword_for_mazewhisper(session_id): file_path = f"session_data/{session_id}/transcription_cunit.json" if not os.path.exists(file_path): print(f"File not found: {file_path}") return with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) for segment in data['segments']: text_token = segment.get('text_token', '') words = segment.get('words', []) text = segment.get('text', '') fillerwords_list = [] if '' in text_token: # Create filler word entries (preliminary implementation) filler_count = text_token.count('') for _ in range(filler_count): filler_info = { "start": None, "end": None, "content": "", "duration": None } fillerwords_list.append(filler_info) # Insert "um" before each in text_token updated_text_token = text_token.replace('', 'um ') segment['text_token'] = updated_text_token # Insert "um" in text and words list at corresponding locations tokens = text_token.split() text_words = text.split() # Find positions of and insert "um" filler_positions = [] word_count = 0 for i, token in enumerate(tokens): if token == '': filler_positions.append(word_count) elif not (token.startswith('<') and token.endswith('>')): word_count += 1 # Insert "um" in reverse order to maintain indices for pos in reversed(filler_positions): text_words.insert(pos, 'um') # Insert in words list with empty timestamps um_word = { "word": "um", "start": None, "end": None } if pos <= len(words): words.insert(pos, um_word) # Update text and words in segment segment['text'] = ' '.join(text_words) segment['words'] = words # Add fillerwords to this segment segment['fillerwords'] = fillerwords_list # Save the updated data back to file with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) return def annotate_maze_for_mazewhisper(session_id): annotate_fillerword_for_mazewhisper(session_id) annotate_repetition_for_mazewhisper(session_id) annotate_revision_for_mazewhisper(session_id) annotate_pause_for_mazewhisper(session_id) print("Maze annotation completed!")