massimoavvisati commited on
Commit
4a8a7d7
·
verified ·
1 Parent(s): c3bcb97

Update app.py

Browse files

Trying a better solution for temp files

Files changed (1) hide show
  1. app.py +38 -4
app.py CHANGED
@@ -4,6 +4,12 @@ import asyncio
4
  import tempfile
5
  import os
6
 
 
 
 
 
 
 
7
  async def get_voices():
8
  voices = await edge_tts.list_voices()
9
  return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices}
@@ -18,12 +24,40 @@ async def text_to_speech(text, voice, rate, pitch):
18
  rate_str = f"{rate:+d}%"
19
  pitch_str = f"{pitch:+d}Hz"
20
  communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, pitch=pitch_str)
21
- with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
22
- tmp_path = tmp_file.name
23
- await communicate.save(tmp_path)
24
- return tmp_path, None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
 
26
  async def tts_interface(text, voice, rate, pitch):
 
 
 
27
  audio, warning = await text_to_speech(text, voice, rate, pitch)
28
  if warning:
29
  return audio, gr.Warning(warning)
 
4
  import tempfile
5
  import os
6
 
7
+ from pathlib import Path
8
+
9
+ # Crea una directory per i file audio
10
+ AUDIO_DIR = Path("audio_files")
11
+ AUDIO_DIR.mkdir(exist_ok=True)
12
+
13
  async def get_voices():
14
  voices = await edge_tts.list_voices()
15
  return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices}
 
24
  rate_str = f"{rate:+d}%"
25
  pitch_str = f"{pitch:+d}Hz"
26
  communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, pitch=pitch_str)
27
+
28
+ # Crea un nome file univoco basato sul contenuto
29
+ import hashlib
30
+ content_hash = hashlib.md5(f"{text}{voice}{rate}{pitch}".encode()).hexdigest()
31
+ output_path = AUDIO_DIR / f"{content_hash}.mp3"
32
+
33
+ # Genera l'audio solo se non esiste già
34
+ if not output_path.exists():
35
+ await communicate.save(str(output_path))
36
+
37
+ return str(output_path), None
38
+
39
+ # Funzione di pulizia per rimuovere i file più vecchi
40
+ def cleanup_old_files(directory: Path, max_files: int = 100, max_age_hours: int = 24):
41
+ files = list(directory.glob("*.mp3"))
42
+
43
+ # Rimuovi i file più vecchi di max_age_hours
44
+ current_time = datetime.datetime.now()
45
+ for file in files:
46
+ file_age = current_time - datetime.datetime.fromtimestamp(file.stat().st_mtime)
47
+ if file_age.total_seconds() > (max_age_hours * 3600):
48
+ file.unlink()
49
+
50
+ # Se ci sono ancora troppi file, rimuovi i più vecchi
51
+ files = list(directory.glob("*.mp3"))
52
+ if len(files) > max_files:
53
+ files.sort(key=lambda x: x.stat().st_mtime)
54
+ for file in files[:-max_files]:
55
+ file.unlink()
56
 
57
  async def tts_interface(text, voice, rate, pitch):
58
+ # Esegui la pulizia prima di generare un nuovo file
59
+ cleanup_old_files(AUDIO_DIR)
60
+
61
  audio, warning = await text_to_speech(text, voice, rate, pitch)
62
  if warning:
63
  return audio, gr.Warning(warning)