scripto/api/api.py.old
2024-11-29 22:41:27 +01:00

243 lines
9.1 KiB
Python

from flask import Flask, request, jsonify
import os
import uuid
from datetime import datetime
from pydub import AudioSegment
from faster_whisper import WhisperModel
from pyannote.audio import Pipeline
# Remplacez par votre token Hugging Face si nécessaire
HUGGINGFACE_TOKEN = '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'
# Créer l'application Flask
app = Flask(__name__)
# Charger le modèle de transcription avec faster-whisper
model = WhisperModel("medium")
# Charger le pipeline de diarisation
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HUGGINGFACE_TOKEN)
# Fonction pour sauvegarder le fichier audio
def save_audio_file(file, sessionId):
try:
# Create a folder for the session
session_folder = os.path.join('treatment', sessionId)
os.makedirs(session_folder, exist_ok=True)
# Générer un nom unique pour le fichier
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_extension = os.path.splitext(file.filename)[1] or '.wav'
unique_filename = f"{timestamp}_{uuid.uuid4().hex}{file_extension}"
# Sauvegarder dans le dossier 'treatment'
save_path = os.path.join(session_folder, unique_filename)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
file.save(save_path)
return save_path
except Exception as e:
return {'error': f"Error saving file: {str(e)}"}
# Fonction pour réaliser la diarisation et la transcription
def diarizeAndTranscribe(audio_file, sessionId):
try:
# Charger l'audio avec Pydub
audio = AudioSegment.from_file(audio_file)
# Effectuer la diarisation
diarization = pipeline(audio_file)
# Liste pour stocker les résultats sous forme de tableau
speakerTranscriptions = []
current_speaker = None
current_transcription = ""
current_speaker_role = 'Locuteur'
current_speaker_name = ''
# Extraire les segments et les transcrire
for turn, _, speaker in diarization.itertracks(yield_label=True):
start_time = turn.start * 1000 # Convertir en millisecondes
end_time = turn.end * 1000 # Convertir en millisecondes
# Extraire le segment audio correspondant
segment_audio = audio[start_time:end_time]
# Sauvegarder le segment temporairement
segment_path = f"treatment/{sessionId}/temp_segment_{speaker}_{int(start_time)}.wav"
segment_audio.export(segment_path, format="wav")
# Transcrire le segment avec faster-whisper
segments, _ = model.transcribe(segment_path)
transcription = " ".join([segment.text for segment in segments])
# Fusionner les segments si le même locuteur continue
if speaker == current_speaker:
current_transcription += " " + transcription
else:
# Ajouter le précédent locuteur au résultat final
if current_speaker is not None:
speakerTranscriptions.append([current_speaker, current_transcription])
# Mettre à jour le locuteur actuel et réinitialiser la transcription
current_speaker = speaker
current_transcription = transcription
# Ajouter le dernier locuteur au résultat final
if current_speaker is not None:
speakerTranscriptions.append([current_speaker, current_transcription, current_speaker_role,current_speaker_name])
# Supprimer les segments temporaires
for file in os.listdir(f"treatment/{sessionId}"):
if file.startswith("temp_segment"):
os.remove(f"treatment/{sessionId}/{file}")
print(speakerTranscriptions)
return speakerTranscriptions
except Exception as e:
return {'error': f"Diarization/transcription error: {str(e)}"}
# Route pour sauvegarder et traiter le fichier audio
@app.route('/diarize', methods=['POST'])
def diarize():
# Vérifier si un fichier est présent
if 'file' not in request.files:
return jsonify({'error': 'No file part in the request'}), 400
file = request.files['file']
# Vérifier si le fichier est sélectionné
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
# Vérifier si un ID de session a été fourni
sessionId = request.form.get('sessionId')
if not sessionId:
return jsonify({'error': 'Missing sessionId'}), 400
# Sauvegarder le fichier audio
savedFilePath = save_audio_file(file, sessionId)
# Vérifier la sauvegarde
if isinstance(savedFilePath, dict) and 'error' in savedFilePath:
return jsonify(savedFilePath), 500
if os.path.getsize(savedFilePath) == 0:
return jsonify({'error': 'The file is empty after saving'}), 400
# Effectuer la diarisation et la transcription
speakerTranscriptions = diarizeAndTranscribe(savedFilePath,sessionId)
# Vérifier le succès de la diarisation
if isinstance(speakerTranscriptions, dict) and 'error' in speakerTranscriptions:
return jsonify(speakerTranscriptions), 500
# Supprimer le fichier temporaire
try:
os.remove(savedFilePath)
except Exception as e:
return jsonify({'error': f"Failed to delete temporary file: {str(e)}"}), 500
# Retourner les résultats
return jsonify({
'message': 'File saved, diarization, and transcription completed successfully',
'speakerTranscriptions': speakerTranscriptions
})
@app.route('/workfinale', methods=['POST'])
def workfinale():
# Vérifier si un fichier est présent
if 'file' not in request.files:
return jsonify({'error': 'No file part in the request'}), 400
file = request.files['file']
# Vérifier si un ID de session a été fourni
sessionId = request.form.get('sessionId')
sessionName = request.form.get('sessionName')
sessionDate = request.form.get('sessionDate')
sessionEmail = request.form.get('sessionEmail')
sessionSummarizeWithLLM = request.form.get('sessionSummarizeWithLLM')
if not sessionId or not sessionName or not sessionDate or not sessionEmail or not sessionSummarizeWithLLM:
return jsonify({'error':'Mssing information'}), 400
# Sauvegarder le fichier audio
savedFilePath = save_audio_file(file, sessionId)
if os.path.getsize(savedFilePath) == 0:
return jsonify({'error': 'The file is empty after saving'}), 400
# Effectuer la diarisation et la transcription
speakerTranscriptions = diarizeAndTranscribe(savedFilePath,sessionId)
# Vérifier le succès de la diarisation
if isinstance(speakerTranscriptions, dict) and 'error' in speakerTranscriptions:
return jsonify(speakerTranscriptions), 500
# Reconnaissance des locuteurs par leur phrase 'Hey Scripto, je suis %nom% et j'ai le rôle de %rôle%'
for i in range(len(speakerTranscriptions)):
# Vérifie si la transcription contient la phrase clé
if 'Hey Scripto, je suis' in speakerTranscriptions[i][1]:
# Extraire le rôle après "et j'ai le rôle de "
role = speakerTranscriptions[i][1].split('et j\'ai le rôle de ')[1]
speakerTranscriptions[i][3] = role # Mettre à jour le rôle dans speaker_role
# Extraire le texte avant "Hey Scripto, je suis"
transcription = speakerTranscriptions[i][0].split('Hey Scripto, je suis')[0]
speakerTranscriptions[i][3] = transcription # Mettre à jour la transcription
# Supprimer le fichier temporaire
try:
os.remove(savedFilePath)
except Exception as e:
return jsonify({'error': f"Failed to delete temporary file: {str(e)}"}), 500
# Supprimer le dossier de la session
try:
os.rmdir(f"treatment/{sessionId}")
except Exception as e:
return jsonify({'error': f"Failed to delete session folder: {str(e)}"}), 500
# Retourner les résultats
return jsonify({
'message': 'File saved, diarization, and transcription completed successfully',
'speakerTranscriptions': speakerTranscriptions
})
# Route pour initialiser une session
@app.route('/start', methods=['POST'])
def init():
session_id = request.json.get('sessionId')
if not session_id:
return jsonify({'error': 'Missing sessionId'}), 400
# Ajouter la session à une liste
workSession.append(session_id)
return jsonify({
'message': 'Session initialized',
'sessionId': session_id
})
# Route pour terminer une session
@app.route('/stop', methods=['POST'])
def finish():
session_id = request.json.get('sessionId')
if not session_id:
return jsonify({'error': 'Missing sessionId'}), 400
# Retirer la session de la liste
if session_id in workSession:
workSession.remove(session_id)
return jsonify({'message': 'Session finished'})
else:
return jsonify({'error': 'Session not found'}), 404
# Liste des sessions actives
workSession = []
if __name__ == '__main__':
# Lancer l'application Flask
app.run(debug=True, host='0.0.0.0', port=3003)