scripto/api/api.py
2024-11-29 22:41:27 +01:00

289 lines
12 KiB
Python

from flask import Flask, request, jsonify
import os
import uuid
from datetime import datetime
from pydub import AudioSegment
from faster_whisper import WhisperModel
from pyannote.audio import Pipeline
import requests
# Remplacez par votre token Hugging Face si nécessaire
HUGGINGFACE_TOKEN = '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'
# Créer l'application Flask
app = Flask(__name__)
# Charger le modèle de transcription avec faster-whisper
model = WhisperModel("medium")
# Charger le pipeline de diarisation
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HUGGINGFACE_TOKEN)
# Fonction pour sauvegarder le fichier audio
def saveAudioFile(file, folderName):
try:
# Convertir folderName en chaîne si c'est un UUID
folderName = str(folderName) # Assurez-vous que folderName est une chaîne
# Créer le chemin pour le dossier de session
sessionFolder = os.path.join('work', folderName)
os.makedirs(sessionFolder, exist_ok=True)
# Créer un nom de fichier unique basé sur le timestamp et un UUID
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
fileExtension = os.path.splitext(file.filename)[1] or '.wav'
uniqueFilename = f"{timestamp}_{uuid.uuid4().hex}{fileExtension}"
# Créer le chemin complet pour enregistrer le fichier
savePath = os.path.join(sessionFolder, uniqueFilename)
os.makedirs(os.path.dirname(savePath), exist_ok=True)
# Sauvegarder le fichier sur le disque
file.save(savePath)
# Retourner le chemin du fichier sauvegardé
return savePath
except Exception as e:
return {'error': f"Error saving file: {str(e)}"}
# Fonction pour réaliser la diarisation et la transcription
def diarizeAndTranscribe(audio_file, folderName):
try:
# Charger l'audio avec Pydub
audio = AudioSegment.from_file(audio_file)
# Effectuer la diarisation
diarization = pipeline(audio_file)
# Liste pour stocker les résultats sous forme de tableau
speakerTranscriptions = []
current_speaker = None
current_transcription = ""
current_speaker_role = 'Locuteur'
current_speaker_name = ''
# Extraire les segments et les transcrire
for turn, _, speaker in diarization.itertracks(yield_label=True):
start_time = turn.start * 1000 # Convertir en millisecondes
end_time = turn.end * 1000 # Convertir en millisecondes
if end_time - start_time > 200: # Ignorer les segments de moins de 200 ms
# Extraire le segment audio correspondant
segment_audio = audio[start_time:end_time]
# Sauvegarder le segment temporairement
segment_path = f"work/{folderName}/temp_segment_{speaker}_{int(start_time)}.wav"
segment_audio.export(segment_path, format="wav")
# Transcrire le segment avec faster-whisper
segments, _ = model.transcribe(segment_path,language="fr")
transcription = " ".join([segment.text for segment in segments])
# Fusionner les segments si le même locuteur continue
if speaker == current_speaker:
current_transcription += " " + transcription
else:
# Ajouter le précédent locuteur au résultat final
if current_speaker is not None:
speakerTranscriptions.append([
current_speaker, # Identifiant du locuteur (e.g., SPEAKER_00)
current_transcription.strip(), # Transcription
"Locuteur", # Rôle par défaut
"" # Nom par défaut
])
# Mettre à jour le locuteur actuel et réinitialiser la transcription
current_speaker = speaker
current_transcription = transcription
# Ajouter le dernier locuteur au résultat final
if current_speaker is not None:
speakerTranscriptions.append([current_speaker, current_transcription, current_speaker_role, current_speaker_name])
# Supprimer les segments temporaires
for file in os.listdir(f"work/{folderName}"):
if file.startswith("temp_segment"):
os.remove(f"work/{folderName}/{file}")
return speakerTranscriptions
except Exception as e:
return {'error': f"Diarization/transcription error: {str(e)}"}
@app.route('/work', methods=['POST'])
def work():
try:
# Vérifier si un fichier est présent
if 'file' not in request.files:
print("No file part in the request")
return jsonify({'error': 'No file part in the request'}), 400
file = request.files['file']
# Vérifier si un ID de session a été fourni
sessionId = uuid.uuid4()
sessionName = request.form.get('sessionName')
sessionDate = request.form.get('sessionDate')
sessionEmail = request.form.get('sessionEmail')
sessionSummarizeWithLLM = request.form.get('sessionSummarizeWithLLM')
print(f"Received session info: {sessionName}, {sessionDate}, {sessionEmail}")
# Vérifier si les informations obligatoires sont présentes
if not sessionName or not sessionDate or not sessionEmail:
print("Missing session information")
return jsonify({'error': 'Missing session information'}), 400
# Sauvegarder le fichier audio
savedFilePath = saveAudioFile(file, sessionId)
# Vérifier si savedFilePath est un dictionnaire d'erreur
if isinstance(savedFilePath, dict) and 'error' in savedFilePath:
print(f"Error saving file: {savedFilePath['error']}")
return jsonify(savedFilePath), 500
# Vérifier si le fichier existe et a une taille valide
if not os.path.exists(savedFilePath) or os.path.getsize(savedFilePath) == 0:
print("The file is empty after saving")
return jsonify({'error': 'The file is empty after saving'}), 400
print(f"File saved successfully: {savedFilePath}")
# Effectuer la diarisation et la transcription
speakerTranscriptions = diarizeAndTranscribe(savedFilePath, sessionId)
# Vérifier le succès de la diarisation
if isinstance(speakerTranscriptions, dict) and 'error' in speakerTranscriptions:
print(f"Diarization/transcription error: {speakerTranscriptions['error']}")
return jsonify(speakerTranscriptions), 500
# Enregistrer les informations de transcription
documentOutputContent = ""
documentOutput = f"work/{sessionId}/transcription.txt"
# Titre de la session
documentOutputContent += f"Session: {sessionName}\n"
documentOutputContent += f"Date: {sessionDate}\n"
documentOutputContent += f"Email: {sessionEmail}\n\n"
# list des locuteurs et transcriptions
documentOutputContent += "Locuteurs:\n"
for i in range(len(speakerTranscriptions)):
if len(speakerTranscriptions[i]) < 3:
print(f"Error: Missing data in speakerTranscriptions[{i}]: {speakerTranscriptions[i]}")
continue # Ignorer les entrées mal formées
documentOutputContent += f"{speakerTranscriptions[i][0]} ({speakerTranscriptions[i][2]})\n"
documentOutputContent += "\n"
documentOutputContent += "Transcriptions:\n"
for i in range(len(speakerTranscriptions)):
if len(speakerTranscriptions[i]) < 2:
print(f"Error: Missing transcription data in speakerTranscriptions[{i}]: {speakerTranscriptions[i]}")
continue # Ignorer les entrées mal formées
documentOutputContent += f"{speakerTranscriptions[i][0]}: {speakerTranscriptions[i][1]}\n"
documentOutputContent += "\n"
# Résumé par LLM
if sessionSummarizeWithLLM:
user_input = "Résumez la session suivante: " + documentOutputContent
# Call API (Streaming)
url = "http://localhost:11434/api/generate"
payload = {
"model": "llama3.1",
"prompt": user_input,
"stream": 'false'
}
try:
# Envoi de la requête
response = requests.post(url, json=payload)
response.raise_for_status() # Vérifie les erreurs HTTP
# Déboguer la réponse brute pour s'assurer qu'elle est valide
print(f"Raw response text: {response.text}")
# Charger la réponse JSON ligne par ligne et collecter les parties
for line in response.text.splitlines():
try:
response_json = response.json() # Tenter de lire la réponse JSON
if 'response' in response_json:
documentOutputContent += response_json['response']
if response_json.get('done'): # Si 'done' est vrai, c'est la fin du message
break
except ValueError as e:
print(f"Erreur de format JSON sur une ligne: {line}")
continue
# Affichage du contenu final après avoir assemblé tous les morceaux
print(f"Résumé complet: {documentOutputContent}")
except requests.exceptions.RequestException as e:
print(f"Erreur HTTP: {e}")
except ValueError as e: # Erreur de format JSON
print(f"Erreur de décodage JSON: {e}")
print(f"Texte brut de la réponse causant l'erreur: {response.text}")
# Enregistrer le fichier de transcription
with open(documentOutput, 'w') as f:
f.write(documentOutputContent)
print(f"Speaker transcriptions: {speakerTranscriptions}")
# Reconnaissance des locuteurs par leur phrase 'Hey Scripto, je suis %nom% et j'ai le rôle de %rôle%'
for i in range(len(speakerTranscriptions)):
# Vérifiez si la phrase clé est présente
if 'Hey Scripto, je suis' in speakerTranscriptions[i][1]:
try:
# Extraire le rôle après "et j'ai le rôle de "
role = speakerTranscriptions[i][1].split("et j'ai le rôle de ")[1]
speakerTranscriptions[i][2] = role.strip() # Mettre à jour le rôle
# Extraire le nom après "Hey Scripto, je suis"
name = speakerTranscriptions[i][1].split("Hey Scripto, je suis")[1].split('et')[0]
speakerTranscriptions[i][3] = name.strip() # Mettre à jour le nom
# Nettoyer la transcription en retirant la phrase clé
speakerTranscriptions[i][1] = speakerTranscriptions[i][1].split("Hey Scripto, je suis")[0].strip()
except IndexError as e:
print(f"Error parsing 'Hey Scripto' phrase: {e}")
# Supprimer le fichier temporaire
try:
os.remove(savedFilePath)
print(f"Deleted temporary file: {savedFilePath}")
except Exception as e:
print(f"Failed to delete temporary file: {str(e)}")
return jsonify({'error': f"Failed to delete temporary file: {str(e)}"}), 500
# Supprimer le dossier de la session
# try:
# os.rmdir(f"work/{sessionId}")
# print(f"Deleted session folder: work/{sessionId}")
# except Exception as e:
# print(f"Failed to delete session folder: {str(e)}")
# return jsonify({'error': f"Failed to delete session folder: {str(e)}"}), 500
# Retourner les résultats
return jsonify({
'message': 'File saved, diarization, and transcription completed successfully',
'speakerTranscriptions': speakerTranscriptions
})
except Exception as e:
print(f"Unhandled error: {str(e)}")
return jsonify({'error': f"Internal server error: {str(e)}"}), 500
# Liste des sessions actives
workSession = []
if __name__ == '__main__':
# Lancer l'application Flask
app.run(debug=True, host='0.0.0.0', port=3003)