243 lines
9.1 KiB
Python
243 lines
9.1 KiB
Python
from flask import Flask, request, jsonify
|
|
import os
|
|
import uuid
|
|
from datetime import datetime
|
|
from pydub import AudioSegment
|
|
from faster_whisper import WhisperModel
|
|
from pyannote.audio import Pipeline
|
|
|
|
# Remplacez par votre token Hugging Face si nécessaire
|
|
HUGGINGFACE_TOKEN = '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'
|
|
|
|
# Créer l'application Flask
|
|
app = Flask(__name__)
|
|
|
|
# Charger le modèle de transcription avec faster-whisper
|
|
model = WhisperModel("medium")
|
|
|
|
# Charger le pipeline de diarisation
|
|
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HUGGINGFACE_TOKEN)
|
|
|
|
# Fonction pour sauvegarder le fichier audio
|
|
def save_audio_file(file, sessionId):
|
|
try:
|
|
# Create a folder for the session
|
|
session_folder = os.path.join('treatment', sessionId)
|
|
os.makedirs(session_folder, exist_ok=True)
|
|
|
|
# Générer un nom unique pour le fichier
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
file_extension = os.path.splitext(file.filename)[1] or '.wav'
|
|
unique_filename = f"{timestamp}_{uuid.uuid4().hex}{file_extension}"
|
|
|
|
# Sauvegarder dans le dossier 'treatment'
|
|
save_path = os.path.join(session_folder, unique_filename)
|
|
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
|
file.save(save_path)
|
|
|
|
return save_path
|
|
except Exception as e:
|
|
return {'error': f"Error saving file: {str(e)}"}
|
|
|
|
# Fonction pour réaliser la diarisation et la transcription
|
|
def diarizeAndTranscribe(audio_file, sessionId):
|
|
try:
|
|
# Charger l'audio avec Pydub
|
|
audio = AudioSegment.from_file(audio_file)
|
|
|
|
# Effectuer la diarisation
|
|
diarization = pipeline(audio_file)
|
|
|
|
# Liste pour stocker les résultats sous forme de tableau
|
|
speakerTranscriptions = []
|
|
|
|
current_speaker = None
|
|
current_transcription = ""
|
|
current_speaker_role = 'Locuteur'
|
|
current_speaker_name = ''
|
|
|
|
# Extraire les segments et les transcrire
|
|
for turn, _, speaker in diarization.itertracks(yield_label=True):
|
|
start_time = turn.start * 1000 # Convertir en millisecondes
|
|
end_time = turn.end * 1000 # Convertir en millisecondes
|
|
|
|
# Extraire le segment audio correspondant
|
|
segment_audio = audio[start_time:end_time]
|
|
|
|
# Sauvegarder le segment temporairement
|
|
segment_path = f"treatment/{sessionId}/temp_segment_{speaker}_{int(start_time)}.wav"
|
|
segment_audio.export(segment_path, format="wav")
|
|
|
|
# Transcrire le segment avec faster-whisper
|
|
segments, _ = model.transcribe(segment_path)
|
|
transcription = " ".join([segment.text for segment in segments])
|
|
|
|
# Fusionner les segments si le même locuteur continue
|
|
if speaker == current_speaker:
|
|
current_transcription += " " + transcription
|
|
else:
|
|
# Ajouter le précédent locuteur au résultat final
|
|
if current_speaker is not None:
|
|
speakerTranscriptions.append([current_speaker, current_transcription])
|
|
# Mettre à jour le locuteur actuel et réinitialiser la transcription
|
|
current_speaker = speaker
|
|
current_transcription = transcription
|
|
|
|
# Ajouter le dernier locuteur au résultat final
|
|
if current_speaker is not None:
|
|
speakerTranscriptions.append([current_speaker, current_transcription, current_speaker_role,current_speaker_name])
|
|
|
|
# Supprimer les segments temporaires
|
|
for file in os.listdir(f"treatment/{sessionId}"):
|
|
if file.startswith("temp_segment"):
|
|
os.remove(f"treatment/{sessionId}/{file}")
|
|
|
|
print(speakerTranscriptions)
|
|
return speakerTranscriptions
|
|
except Exception as e:
|
|
return {'error': f"Diarization/transcription error: {str(e)}"}
|
|
|
|
# Route pour sauvegarder et traiter le fichier audio
|
|
@app.route('/diarize', methods=['POST'])
|
|
def diarize():
|
|
# Vérifier si un fichier est présent
|
|
if 'file' not in request.files:
|
|
return jsonify({'error': 'No file part in the request'}), 400
|
|
|
|
file = request.files['file']
|
|
|
|
# Vérifier si le fichier est sélectionné
|
|
if file.filename == '':
|
|
return jsonify({'error': 'No selected file'}), 400
|
|
|
|
# Vérifier si un ID de session a été fourni
|
|
sessionId = request.form.get('sessionId')
|
|
if not sessionId:
|
|
return jsonify({'error': 'Missing sessionId'}), 400
|
|
|
|
# Sauvegarder le fichier audio
|
|
savedFilePath = save_audio_file(file, sessionId)
|
|
|
|
# Vérifier la sauvegarde
|
|
if isinstance(savedFilePath, dict) and 'error' in savedFilePath:
|
|
return jsonify(savedFilePath), 500
|
|
|
|
if os.path.getsize(savedFilePath) == 0:
|
|
return jsonify({'error': 'The file is empty after saving'}), 400
|
|
|
|
# Effectuer la diarisation et la transcription
|
|
speakerTranscriptions = diarizeAndTranscribe(savedFilePath,sessionId)
|
|
|
|
|
|
# Vérifier le succès de la diarisation
|
|
if isinstance(speakerTranscriptions, dict) and 'error' in speakerTranscriptions:
|
|
return jsonify(speakerTranscriptions), 500
|
|
|
|
# Supprimer le fichier temporaire
|
|
try:
|
|
os.remove(savedFilePath)
|
|
except Exception as e:
|
|
return jsonify({'error': f"Failed to delete temporary file: {str(e)}"}), 500
|
|
|
|
# Retourner les résultats
|
|
return jsonify({
|
|
'message': 'File saved, diarization, and transcription completed successfully',
|
|
'speakerTranscriptions': speakerTranscriptions
|
|
})
|
|
|
|
@app.route('/workfinale', methods=['POST'])
|
|
def workfinale():
|
|
# Vérifier si un fichier est présent
|
|
if 'file' not in request.files:
|
|
return jsonify({'error': 'No file part in the request'}), 400
|
|
|
|
file = request.files['file']
|
|
|
|
# Vérifier si un ID de session a été fourni
|
|
sessionId = request.form.get('sessionId')
|
|
sessionName = request.form.get('sessionName')
|
|
sessionDate = request.form.get('sessionDate')
|
|
sessionEmail = request.form.get('sessionEmail')
|
|
sessionSummarizeWithLLM = request.form.get('sessionSummarizeWithLLM')
|
|
|
|
if not sessionId or not sessionName or not sessionDate or not sessionEmail or not sessionSummarizeWithLLM:
|
|
return jsonify({'error':'Mssing information'}), 400
|
|
|
|
# Sauvegarder le fichier audio
|
|
savedFilePath = save_audio_file(file, sessionId)
|
|
|
|
if os.path.getsize(savedFilePath) == 0:
|
|
return jsonify({'error': 'The file is empty after saving'}), 400
|
|
|
|
# Effectuer la diarisation et la transcription
|
|
speakerTranscriptions = diarizeAndTranscribe(savedFilePath,sessionId)
|
|
|
|
# Vérifier le succès de la diarisation
|
|
if isinstance(speakerTranscriptions, dict) and 'error' in speakerTranscriptions:
|
|
return jsonify(speakerTranscriptions), 500
|
|
|
|
# Reconnaissance des locuteurs par leur phrase 'Hey Scripto, je suis %nom% et j'ai le rôle de %rôle%'
|
|
for i in range(len(speakerTranscriptions)):
|
|
# Vérifie si la transcription contient la phrase clé
|
|
if 'Hey Scripto, je suis' in speakerTranscriptions[i][1]:
|
|
# Extraire le rôle après "et j'ai le rôle de "
|
|
role = speakerTranscriptions[i][1].split('et j\'ai le rôle de ')[1]
|
|
speakerTranscriptions[i][3] = role # Mettre à jour le rôle dans speaker_role
|
|
|
|
# Extraire le texte avant "Hey Scripto, je suis"
|
|
transcription = speakerTranscriptions[i][0].split('Hey Scripto, je suis')[0]
|
|
speakerTranscriptions[i][3] = transcription # Mettre à jour la transcription
|
|
|
|
# Supprimer le fichier temporaire
|
|
try:
|
|
os.remove(savedFilePath)
|
|
except Exception as e:
|
|
return jsonify({'error': f"Failed to delete temporary file: {str(e)}"}), 500
|
|
|
|
# Supprimer le dossier de la session
|
|
try:
|
|
os.rmdir(f"treatment/{sessionId}")
|
|
except Exception as e:
|
|
return jsonify({'error': f"Failed to delete session folder: {str(e)}"}), 500
|
|
# Retourner les résultats
|
|
return jsonify({
|
|
'message': 'File saved, diarization, and transcription completed successfully',
|
|
'speakerTranscriptions': speakerTranscriptions
|
|
})
|
|
|
|
# Route pour initialiser une session
|
|
@app.route('/start', methods=['POST'])
|
|
def init():
|
|
session_id = request.json.get('sessionId')
|
|
if not session_id:
|
|
return jsonify({'error': 'Missing sessionId'}), 400
|
|
|
|
# Ajouter la session à une liste
|
|
workSession.append(session_id)
|
|
|
|
return jsonify({
|
|
'message': 'Session initialized',
|
|
'sessionId': session_id
|
|
})
|
|
|
|
# Route pour terminer une session
|
|
@app.route('/stop', methods=['POST'])
|
|
def finish():
|
|
session_id = request.json.get('sessionId')
|
|
if not session_id:
|
|
return jsonify({'error': 'Missing sessionId'}), 400
|
|
|
|
# Retirer la session de la liste
|
|
if session_id in workSession:
|
|
workSession.remove(session_id)
|
|
return jsonify({'message': 'Session finished'})
|
|
else:
|
|
return jsonify({'error': 'Session not found'}), 404
|
|
|
|
# Liste des sessions actives
|
|
workSession = []
|
|
|
|
if __name__ == '__main__':
|
|
# Lancer l'application Flask
|
|
app.run(debug=True, host='0.0.0.0', port=3003)
|