289 lines
12 KiB
Python
289 lines
12 KiB
Python
from flask import Flask, request, jsonify
|
|
import os
|
|
import uuid
|
|
from datetime import datetime
|
|
from pydub import AudioSegment
|
|
from faster_whisper import WhisperModel
|
|
from pyannote.audio import Pipeline
|
|
import requests
|
|
|
|
# Remplacez par votre token Hugging Face si nécessaire
|
|
HUGGINGFACE_TOKEN = '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'
|
|
|
|
# Créer l'application Flask
|
|
app = Flask(__name__)
|
|
|
|
# Charger le modèle de transcription avec faster-whisper
|
|
model = WhisperModel("medium")
|
|
|
|
# Charger le pipeline de diarisation
|
|
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HUGGINGFACE_TOKEN)
|
|
|
|
# Fonction pour sauvegarder le fichier audio
|
|
def saveAudioFile(file, folderName):
|
|
try:
|
|
# Convertir folderName en chaîne si c'est un UUID
|
|
folderName = str(folderName) # Assurez-vous que folderName est une chaîne
|
|
|
|
# Créer le chemin pour le dossier de session
|
|
sessionFolder = os.path.join('work', folderName)
|
|
os.makedirs(sessionFolder, exist_ok=True)
|
|
|
|
# Créer un nom de fichier unique basé sur le timestamp et un UUID
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
fileExtension = os.path.splitext(file.filename)[1] or '.wav'
|
|
uniqueFilename = f"{timestamp}_{uuid.uuid4().hex}{fileExtension}"
|
|
|
|
# Créer le chemin complet pour enregistrer le fichier
|
|
savePath = os.path.join(sessionFolder, uniqueFilename)
|
|
os.makedirs(os.path.dirname(savePath), exist_ok=True)
|
|
|
|
# Sauvegarder le fichier sur le disque
|
|
file.save(savePath)
|
|
|
|
# Retourner le chemin du fichier sauvegardé
|
|
return savePath
|
|
except Exception as e:
|
|
return {'error': f"Error saving file: {str(e)}"}
|
|
|
|
|
|
# Fonction pour réaliser la diarisation et la transcription
|
|
def diarizeAndTranscribe(audio_file, folderName):
|
|
try:
|
|
# Charger l'audio avec Pydub
|
|
audio = AudioSegment.from_file(audio_file)
|
|
|
|
# Effectuer la diarisation
|
|
diarization = pipeline(audio_file)
|
|
|
|
# Liste pour stocker les résultats sous forme de tableau
|
|
speakerTranscriptions = []
|
|
|
|
current_speaker = None
|
|
current_transcription = ""
|
|
current_speaker_role = 'Locuteur'
|
|
current_speaker_name = ''
|
|
|
|
# Extraire les segments et les transcrire
|
|
for turn, _, speaker in diarization.itertracks(yield_label=True):
|
|
start_time = turn.start * 1000 # Convertir en millisecondes
|
|
end_time = turn.end * 1000 # Convertir en millisecondes
|
|
|
|
if end_time - start_time > 200: # Ignorer les segments de moins de 200 ms
|
|
# Extraire le segment audio correspondant
|
|
segment_audio = audio[start_time:end_time]
|
|
|
|
# Sauvegarder le segment temporairement
|
|
segment_path = f"work/{folderName}/temp_segment_{speaker}_{int(start_time)}.wav"
|
|
segment_audio.export(segment_path, format="wav")
|
|
|
|
# Transcrire le segment avec faster-whisper
|
|
segments, _ = model.transcribe(segment_path,language="fr")
|
|
transcription = " ".join([segment.text for segment in segments])
|
|
|
|
# Fusionner les segments si le même locuteur continue
|
|
if speaker == current_speaker:
|
|
current_transcription += " " + transcription
|
|
else:
|
|
# Ajouter le précédent locuteur au résultat final
|
|
if current_speaker is not None:
|
|
speakerTranscriptions.append([
|
|
current_speaker, # Identifiant du locuteur (e.g., SPEAKER_00)
|
|
current_transcription.strip(), # Transcription
|
|
"Locuteur", # Rôle par défaut
|
|
"" # Nom par défaut
|
|
])
|
|
# Mettre à jour le locuteur actuel et réinitialiser la transcription
|
|
current_speaker = speaker
|
|
current_transcription = transcription
|
|
|
|
|
|
# Ajouter le dernier locuteur au résultat final
|
|
if current_speaker is not None:
|
|
speakerTranscriptions.append([current_speaker, current_transcription, current_speaker_role, current_speaker_name])
|
|
|
|
# Supprimer les segments temporaires
|
|
for file in os.listdir(f"work/{folderName}"):
|
|
if file.startswith("temp_segment"):
|
|
os.remove(f"work/{folderName}/{file}")
|
|
|
|
return speakerTranscriptions
|
|
except Exception as e:
|
|
return {'error': f"Diarization/transcription error: {str(e)}"}
|
|
|
|
@app.route('/work', methods=['POST'])
|
|
def work():
|
|
try:
|
|
# Vérifier si un fichier est présent
|
|
if 'file' not in request.files:
|
|
print("No file part in the request")
|
|
return jsonify({'error': 'No file part in the request'}), 400
|
|
|
|
file = request.files['file']
|
|
|
|
# Vérifier si un ID de session a été fourni
|
|
sessionId = uuid.uuid4()
|
|
sessionName = request.form.get('sessionName')
|
|
sessionDate = request.form.get('sessionDate')
|
|
sessionEmail = request.form.get('sessionEmail')
|
|
sessionSummarizeWithLLM = request.form.get('sessionSummarizeWithLLM')
|
|
|
|
print(f"Received session info: {sessionName}, {sessionDate}, {sessionEmail}")
|
|
|
|
# Vérifier si les informations obligatoires sont présentes
|
|
if not sessionName or not sessionDate or not sessionEmail:
|
|
print("Missing session information")
|
|
return jsonify({'error': 'Missing session information'}), 400
|
|
|
|
# Sauvegarder le fichier audio
|
|
savedFilePath = saveAudioFile(file, sessionId)
|
|
|
|
# Vérifier si savedFilePath est un dictionnaire d'erreur
|
|
if isinstance(savedFilePath, dict) and 'error' in savedFilePath:
|
|
print(f"Error saving file: {savedFilePath['error']}")
|
|
return jsonify(savedFilePath), 500
|
|
|
|
# Vérifier si le fichier existe et a une taille valide
|
|
if not os.path.exists(savedFilePath) or os.path.getsize(savedFilePath) == 0:
|
|
print("The file is empty after saving")
|
|
return jsonify({'error': 'The file is empty after saving'}), 400
|
|
|
|
print(f"File saved successfully: {savedFilePath}")
|
|
|
|
# Effectuer la diarisation et la transcription
|
|
|
|
speakerTranscriptions = diarizeAndTranscribe(savedFilePath, sessionId)
|
|
|
|
# Vérifier le succès de la diarisation
|
|
if isinstance(speakerTranscriptions, dict) and 'error' in speakerTranscriptions:
|
|
print(f"Diarization/transcription error: {speakerTranscriptions['error']}")
|
|
return jsonify(speakerTranscriptions), 500
|
|
|
|
# Enregistrer les informations de transcription
|
|
documentOutputContent = ""
|
|
documentOutput = f"work/{sessionId}/transcription.txt"
|
|
|
|
# Titre de la session
|
|
documentOutputContent += f"Session: {sessionName}\n"
|
|
documentOutputContent += f"Date: {sessionDate}\n"
|
|
documentOutputContent += f"Email: {sessionEmail}\n\n"
|
|
|
|
# list des locuteurs et transcriptions
|
|
documentOutputContent += "Locuteurs:\n"
|
|
for i in range(len(speakerTranscriptions)):
|
|
if len(speakerTranscriptions[i]) < 3:
|
|
print(f"Error: Missing data in speakerTranscriptions[{i}]: {speakerTranscriptions[i]}")
|
|
continue # Ignorer les entrées mal formées
|
|
documentOutputContent += f"{speakerTranscriptions[i][0]} ({speakerTranscriptions[i][2]})\n"
|
|
documentOutputContent += "\n"
|
|
|
|
|
|
documentOutputContent += "Transcriptions:\n"
|
|
for i in range(len(speakerTranscriptions)):
|
|
if len(speakerTranscriptions[i]) < 2:
|
|
print(f"Error: Missing transcription data in speakerTranscriptions[{i}]: {speakerTranscriptions[i]}")
|
|
continue # Ignorer les entrées mal formées
|
|
documentOutputContent += f"{speakerTranscriptions[i][0]}: {speakerTranscriptions[i][1]}\n"
|
|
documentOutputContent += "\n"
|
|
|
|
# Résumé par LLM
|
|
if sessionSummarizeWithLLM:
|
|
user_input = "Résumez la session suivante: " + documentOutputContent
|
|
|
|
# Call API (Streaming)
|
|
url = "http://localhost:11434/api/generate"
|
|
payload = {
|
|
"model": "llama3.1",
|
|
"prompt": user_input,
|
|
"stream": 'false'
|
|
}
|
|
|
|
try:
|
|
# Envoi de la requête
|
|
response = requests.post(url, json=payload)
|
|
response.raise_for_status() # Vérifie les erreurs HTTP
|
|
|
|
# Déboguer la réponse brute pour s'assurer qu'elle est valide
|
|
print(f"Raw response text: {response.text}")
|
|
|
|
# Charger la réponse JSON ligne par ligne et collecter les parties
|
|
for line in response.text.splitlines():
|
|
try:
|
|
response_json = response.json() # Tenter de lire la réponse JSON
|
|
if 'response' in response_json:
|
|
documentOutputContent += response_json['response']
|
|
if response_json.get('done'): # Si 'done' est vrai, c'est la fin du message
|
|
break
|
|
except ValueError as e:
|
|
print(f"Erreur de format JSON sur une ligne: {line}")
|
|
continue
|
|
|
|
# Affichage du contenu final après avoir assemblé tous les morceaux
|
|
print(f"Résumé complet: {documentOutputContent}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Erreur HTTP: {e}")
|
|
except ValueError as e: # Erreur de format JSON
|
|
print(f"Erreur de décodage JSON: {e}")
|
|
print(f"Texte brut de la réponse causant l'erreur: {response.text}")
|
|
|
|
|
|
# Enregistrer le fichier de transcription
|
|
with open(documentOutput, 'w') as f:
|
|
f.write(documentOutputContent)
|
|
|
|
print(f"Speaker transcriptions: {speakerTranscriptions}")
|
|
|
|
# Reconnaissance des locuteurs par leur phrase 'Hey Scripto, je suis %nom% et j'ai le rôle de %rôle%'
|
|
for i in range(len(speakerTranscriptions)):
|
|
# Vérifiez si la phrase clé est présente
|
|
if 'Hey Scripto, je suis' in speakerTranscriptions[i][1]:
|
|
try:
|
|
# Extraire le rôle après "et j'ai le rôle de "
|
|
role = speakerTranscriptions[i][1].split("et j'ai le rôle de ")[1]
|
|
speakerTranscriptions[i][2] = role.strip() # Mettre à jour le rôle
|
|
|
|
# Extraire le nom après "Hey Scripto, je suis"
|
|
name = speakerTranscriptions[i][1].split("Hey Scripto, je suis")[1].split('et')[0]
|
|
speakerTranscriptions[i][3] = name.strip() # Mettre à jour le nom
|
|
|
|
# Nettoyer la transcription en retirant la phrase clé
|
|
speakerTranscriptions[i][1] = speakerTranscriptions[i][1].split("Hey Scripto, je suis")[0].strip()
|
|
except IndexError as e:
|
|
print(f"Error parsing 'Hey Scripto' phrase: {e}")
|
|
|
|
|
|
# Supprimer le fichier temporaire
|
|
try:
|
|
os.remove(savedFilePath)
|
|
print(f"Deleted temporary file: {savedFilePath}")
|
|
except Exception as e:
|
|
print(f"Failed to delete temporary file: {str(e)}")
|
|
return jsonify({'error': f"Failed to delete temporary file: {str(e)}"}), 500
|
|
|
|
# Supprimer le dossier de la session
|
|
# try:
|
|
# os.rmdir(f"work/{sessionId}")
|
|
# print(f"Deleted session folder: work/{sessionId}")
|
|
# except Exception as e:
|
|
# print(f"Failed to delete session folder: {str(e)}")
|
|
# return jsonify({'error': f"Failed to delete session folder: {str(e)}"}), 500
|
|
|
|
# Retourner les résultats
|
|
return jsonify({
|
|
'message': 'File saved, diarization, and transcription completed successfully',
|
|
'speakerTranscriptions': speakerTranscriptions
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"Unhandled error: {str(e)}")
|
|
return jsonify({'error': f"Internal server error: {str(e)}"}), 500
|
|
|
|
|
|
# Liste des sessions actives
|
|
workSession = []
|
|
|
|
if __name__ == '__main__':
|
|
# Lancer l'application Flask
|
|
app.run(debug=True, host='0.0.0.0', port=3003)
|