SATEv1.5 / annotation /annotation_for_mazewhisper.py
Shuwei Hou
initial_for_hf
5806e12
import json
import os
import re
# read transcription_cuint.json from session_data/session_id
# read text_token item, repetition is start from <REPSTART> and end at <REPEND>, the
# extract repetition information and add them to current json file, information include
# content: the content between <REPSTART> and <REPEND>
# words: words index of each word in content, the index of first word in the segment is 0, index calculation ignore special token
# mark_location: the index of last words of content
# format example:
"""
"repetitions": [
{
"content": "now the now the",
"words": [
1,
2,
3,
4
],
"mark_location": 4
}
],
"""
def annotate_repetition_for_mazewhisper(session_id):
file_path = f"session_data/{session_id}/transcription_cunit.json"
if not os.path.exists(file_path):
print(f"File not found: {file_path}")
return
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
for segment in data["segments"]:
text_token = segment.get("text_token", "")
tokens = text_token.split()
repetitions_list = []
inside = False
rep_words = []
rep_word_indices = []
word_count = 0 # segment-level word counter
for tok in tokens:
if tok == "<REPSTART>":
inside = True
rep_words, rep_word_indices = [], []
continue
elif tok == "<REPEND>":
inside = False
if rep_words:
clean_rep_words = [w for w in rep_words
if not (w.startswith("<") and w.endswith(">"))]
repetitions_list.append(
{
"content": " ".join(clean_rep_words),
"words": rep_word_indices.copy(),
"mark_location": rep_word_indices[-1],
}
)
continue
if tok.startswith("<") and tok.endswith(">"):
if inside:
pass
else:
if inside:
rep_words.append(tok)
rep_word_indices.append(word_count)
word_count += 1
if repetitions_list:
segment["repetitions"] = repetitions_list
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# read transcription_cuint.json from session_data/session_id
# read text_token item, revision is start from <REVSTART> and end at <REVEND>, the
# extract revision information and add them to current json file, information include
# content: the content between <REVSTART> and <REVEND>
# words: words index of each word in content, the index of first word in the segment is 0, index calculation ignore special token
# mark_location: the index of last words of content
# format example:
"""
"repetitions": [
{
"content": "now the now the",
"words": [
1,
2,
3,
4
],
"mark_location": 4
}
],
"""
def annotate_revision_for_mazewhisper(session_id):
file_path = f"session_data/{session_id}/transcription_cunit.json"
if not os.path.exists(file_path):
print(f"File not found: {file_path}")
return
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
for segment in data["segments"]:
text_token = segment.get("text_token", "")
tokens = text_token.split()
revisions_list = []
inside = False
rev_words = []
rev_word_indices = []
word_count = 0
for tok in tokens:
if tok == "<REVSTART>":
inside = True
rev_words, rev_word_indices = [], []
continue
elif tok == "<REVEND>":
inside = False
if rev_words:
clean_rev_words = [w for w in rev_words
if not (w.startswith("<") and w.endswith(">"))]
revisions_list.append(
{
"content": " ".join(clean_rev_words),
"words": rev_word_indices.copy(),
"mark_location": rev_word_indices[-1],
}
)
continue
if tok.startswith("<") and tok.endswith(">"):
continue
else:
if inside:
rev_words.append(tok)
rev_word_indices.append(word_count)
word_count += 1
if revisions_list:
segment["revisions"] = revisions_list
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# read transcription_cuint.json from session_data/session_id
# read text_token item, pause is represent as <PAUSE>
# extract pause information and add them to current json file, information include
# start: the start of pause is the end timestamp of previous non-special token.
# if no previous non-special token, start time is the end time of previous segment, then set the start time of current segment as the the end time of previous segment.
# end: the end of pause is the start timestamp of next non-special token.
# if no next non-special token, then move the <PAUSE> and all possible special token after <PAUSE> to the start of next segment.
# duration: the pause duration calculated by start and end.
# format example:
"""
"pauses": [
{
"start": 364.08,
"end": 369.1,
"duration": 5.02
},
{
"start": 369.18,
"end": 369.56,
"duration": 0.38
}
],
"""
def annotate_pause_for_mazewhisper(session_id):
file_path = f"session_data/{session_id}/transcription_cunit.json"
if not os.path.exists(file_path):
print(f"File not found: {file_path}")
return
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
for i, segment in enumerate(data['segments']):
text_token = segment.get('text_token', '')
words = segment.get('words', [])
pauses_list = []
if '<PAUSE>' in text_token:
tokens = text_token.split()
for j, token in enumerate(tokens):
if token == '<PAUSE>':
# Find start time: end timestamp of previous non-special token
start_time = None
# Look backwards in current segment for previous word
word_idx = 0
for k in range(j):
if not (tokens[k].startswith('<') and tokens[k].endswith('>')):
word_idx += 1
if word_idx > 0 and word_idx <= len(words):
start_time = words[word_idx - 1].get('end')
elif i > 0: # Use end time of previous segment
start_time = data['segments'][i - 1].get('end')
# Update current segment start time
segment['start'] = start_time
# Find end time: start timestamp of next non-special token
end_time = None
# Look forwards in current segment for next word
next_word_idx = 0
for k in range(j + 1, len(tokens)):
if not (tokens[k].startswith('<') and tokens[k].endswith('>')):
next_word_idx = word_idx + 1
break
if tokens[k].startswith('<') and tokens[k].endswith('>'):
continue
if next_word_idx > 0 and next_word_idx <= len(words):
end_time = words[next_word_idx - 1].get('start')
elif i < len(data['segments']) - 1: # Move pause to next segment
# This is complex - for now, use next segment start
end_time = data['segments'][i + 1].get('start') if i + 1 < len(data['segments']) else segment.get('end')
# If we have both start and end times, calculate duration
if start_time is not None and end_time is not None:
duration = round(end_time - start_time, 2)
pause_info = {
"start": start_time,
"end": end_time,
"duration": duration
}
pauses_list.append(pause_info)
# Add pauses to this segment if any found
if pauses_list:
segment['pauses'] = pauses_list
# Save the updated data back to file
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return
# read transcription_cuint.json from session_data/session_id
# read text_token item, filler words is represent as <FILLER>
# extract filler word information and add them to current json file, information include
# start: set to empty for now
# end: set to empty for now
# duration: set to empty for now
# format example:
"""
"fillerwords": [
{
"start": ,
"end": ,
"content": "",
"duration":
}
],
"""
# then, insert "um" as the previous token of <FILLER> in text_token item.
# then, insert "um" to text item and word list (timestamp set to empty for now), the location is corresponding to the <FILLER> location in text_token
# (this is a prelimary implementation, next steps should use maze whisper that transcript filler words content and align them directly)
def annotate_fillerword_for_mazewhisper(session_id):
file_path = f"session_data/{session_id}/transcription_cunit.json"
if not os.path.exists(file_path):
print(f"File not found: {file_path}")
return
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
for segment in data['segments']:
text_token = segment.get('text_token', '')
words = segment.get('words', [])
text = segment.get('text', '')
fillerwords_list = []
if '<FILLER>' in text_token:
# Create filler word entries (preliminary implementation)
filler_count = text_token.count('<FILLER>')
for _ in range(filler_count):
filler_info = {
"start": None,
"end": None,
"content": "",
"duration": None
}
fillerwords_list.append(filler_info)
# Insert "um" before each <FILLER> in text_token
updated_text_token = text_token.replace('<FILLER>', 'um <FILLER>')
segment['text_token'] = updated_text_token
# Insert "um" in text and words list at corresponding locations
tokens = text_token.split()
text_words = text.split()
# Find positions of <FILLER> and insert "um"
filler_positions = []
word_count = 0
for i, token in enumerate(tokens):
if token == '<FILLER>':
filler_positions.append(word_count)
elif not (token.startswith('<') and token.endswith('>')):
word_count += 1
# Insert "um" in reverse order to maintain indices
for pos in reversed(filler_positions):
text_words.insert(pos, 'um')
# Insert in words list with empty timestamps
um_word = {
"word": "um",
"start": None,
"end": None
}
if pos <= len(words):
words.insert(pos, um_word)
# Update text and words in segment
segment['text'] = ' '.join(text_words)
segment['words'] = words
# Add fillerwords to this segment
segment['fillerwords'] = fillerwords_list
# Save the updated data back to file
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return
def annotate_maze_for_mazewhisper(session_id):
annotate_fillerword_for_mazewhisper(session_id)
annotate_repetition_for_mazewhisper(session_id)
annotate_revision_for_mazewhisper(session_id)
annotate_pause_for_mazewhisper(session_id)
print("Maze annotation completed!")