from flask import Flask, request, jsonify import os import uuid from datetime import datetime from pydub import AudioSegment from faster_whisper import WhisperModel from pyannote.audio import Pipeline import requests # Remplacez par votre token Hugging Face si nécessaire HUGGINGFACE_TOKEN = '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' # Créer l'application Flask app = Flask(__name__) # Charger le modèle de transcription avec faster-whisper model = WhisperModel("medium") # Charger le pipeline de diarisation pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=HUGGINGFACE_TOKEN) # Fonction pour sauvegarder le fichier audio def saveAudioFile(file, folderName): try: # Convertir folderName en chaîne si c'est un UUID folderName = str(folderName) # Assurez-vous que folderName est une chaîne # Créer le chemin pour le dossier de session sessionFolder = os.path.join('work', folderName) os.makedirs(sessionFolder, exist_ok=True) # Créer un nom de fichier unique basé sur le timestamp et un UUID timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") fileExtension = os.path.splitext(file.filename)[1] or '.wav' uniqueFilename = f"{timestamp}_{uuid.uuid4().hex}{fileExtension}" # Créer le chemin complet pour enregistrer le fichier savePath = os.path.join(sessionFolder, uniqueFilename) os.makedirs(os.path.dirname(savePath), exist_ok=True) # Sauvegarder le fichier sur le disque file.save(savePath) # Retourner le chemin du fichier sauvegardé return savePath except Exception as e: return {'error': f"Error saving file: {str(e)}"} # Fonction pour réaliser la diarisation et la transcription def diarizeAndTranscribe(audio_file, folderName): try: # Charger l'audio avec Pydub audio = AudioSegment.from_file(audio_file) # Effectuer la diarisation diarization = pipeline(audio_file) # Liste pour stocker les résultats sous forme de tableau speakerTranscriptions = [] current_speaker = None current_transcription = "" current_speaker_role = 'Locuteur' current_speaker_name = '' # Extraire les segments et les transcrire for turn, _, speaker in diarization.itertracks(yield_label=True): start_time = turn.start * 1000 # Convertir en millisecondes end_time = turn.end * 1000 # Convertir en millisecondes if end_time - start_time > 200: # Ignorer les segments de moins de 200 ms # Extraire le segment audio correspondant segment_audio = audio[start_time:end_time] # Sauvegarder le segment temporairement segment_path = f"work/{folderName}/temp_segment_{speaker}_{int(start_time)}.wav" segment_audio.export(segment_path, format="wav") # Transcrire le segment avec faster-whisper segments, _ = model.transcribe(segment_path,language="fr") transcription = " ".join([segment.text for segment in segments]) # Fusionner les segments si le même locuteur continue if speaker == current_speaker: current_transcription += " " + transcription else: # Ajouter le précédent locuteur au résultat final if current_speaker is not None: speakerTranscriptions.append([ current_speaker, # Identifiant du locuteur (e.g., SPEAKER_00) current_transcription.strip(), # Transcription "Locuteur", # Rôle par défaut "" # Nom par défaut ]) # Mettre à jour le locuteur actuel et réinitialiser la transcription current_speaker = speaker current_transcription = transcription # Ajouter le dernier locuteur au résultat final if current_speaker is not None: speakerTranscriptions.append([current_speaker, current_transcription, current_speaker_role, current_speaker_name]) # Supprimer les segments temporaires for file in os.listdir(f"work/{folderName}"): if file.startswith("temp_segment"): os.remove(f"work/{folderName}/{file}") return speakerTranscriptions except Exception as e: return {'error': f"Diarization/transcription error: {str(e)}"} @app.route('/work', methods=['POST']) def work(): try: # Vérifier si un fichier est présent if 'file' not in request.files: print("No file part in the request") return jsonify({'error': 'No file part in the request'}), 400 file = request.files['file'] # Vérifier si un ID de session a été fourni sessionId = uuid.uuid4() sessionName = request.form.get('sessionName') sessionDate = request.form.get('sessionDate') sessionEmail = request.form.get('sessionEmail') sessionSummarizeWithLLM = request.form.get('sessionSummarizeWithLLM') print(f"Received session info: {sessionName}, {sessionDate}, {sessionEmail}") # Vérifier si les informations obligatoires sont présentes if not sessionName or not sessionDate or not sessionEmail: print("Missing session information") return jsonify({'error': 'Missing session information'}), 400 # Sauvegarder le fichier audio savedFilePath = saveAudioFile(file, sessionId) # Vérifier si savedFilePath est un dictionnaire d'erreur if isinstance(savedFilePath, dict) and 'error' in savedFilePath: print(f"Error saving file: {savedFilePath['error']}") return jsonify(savedFilePath), 500 # Vérifier si le fichier existe et a une taille valide if not os.path.exists(savedFilePath) or os.path.getsize(savedFilePath) == 0: print("The file is empty after saving") return jsonify({'error': 'The file is empty after saving'}), 400 print(f"File saved successfully: {savedFilePath}") # Effectuer la diarisation et la transcription speakerTranscriptions = diarizeAndTranscribe(savedFilePath, sessionId) # Vérifier le succès de la diarisation if isinstance(speakerTranscriptions, dict) and 'error' in speakerTranscriptions: print(f"Diarization/transcription error: {speakerTranscriptions['error']}") return jsonify(speakerTranscriptions), 500 # Enregistrer les informations de transcription documentOutputContent = "" documentOutput = f"work/{sessionId}/transcription.txt" # Titre de la session documentOutputContent += f"Session: {sessionName}\n" documentOutputContent += f"Date: {sessionDate}\n" documentOutputContent += f"Email: {sessionEmail}\n\n" # list des locuteurs et transcriptions documentOutputContent += "Locuteurs:\n" for i in range(len(speakerTranscriptions)): if len(speakerTranscriptions[i]) < 3: print(f"Error: Missing data in speakerTranscriptions[{i}]: {speakerTranscriptions[i]}") continue # Ignorer les entrées mal formées documentOutputContent += f"{speakerTranscriptions[i][0]} ({speakerTranscriptions[i][2]})\n" documentOutputContent += "\n" documentOutputContent += "Transcriptions:\n" for i in range(len(speakerTranscriptions)): if len(speakerTranscriptions[i]) < 2: print(f"Error: Missing transcription data in speakerTranscriptions[{i}]: {speakerTranscriptions[i]}") continue # Ignorer les entrées mal formées documentOutputContent += f"{speakerTranscriptions[i][0]}: {speakerTranscriptions[i][1]}\n" documentOutputContent += "\n" # Résumé par LLM if sessionSummarizeWithLLM: user_input = "Résumez la session suivante: " + documentOutputContent # Call API (Streaming) url = "http://localhost:11434/api/generate" payload = { "model": "llama3.1", "prompt": user_input, "stream": 'false' } try: # Envoi de la requête response = requests.post(url, json=payload) response.raise_for_status() # Vérifie les erreurs HTTP # Déboguer la réponse brute pour s'assurer qu'elle est valide print(f"Raw response text: {response.text}") # Charger la réponse JSON ligne par ligne et collecter les parties for line in response.text.splitlines(): try: response_json = response.json() # Tenter de lire la réponse JSON if 'response' in response_json: documentOutputContent += response_json['response'] if response_json.get('done'): # Si 'done' est vrai, c'est la fin du message break except ValueError as e: print(f"Erreur de format JSON sur une ligne: {line}") continue # Affichage du contenu final après avoir assemblé tous les morceaux print(f"Résumé complet: {documentOutputContent}") except requests.exceptions.RequestException as e: print(f"Erreur HTTP: {e}") except ValueError as e: # Erreur de format JSON print(f"Erreur de décodage JSON: {e}") print(f"Texte brut de la réponse causant l'erreur: {response.text}") # Enregistrer le fichier de transcription with open(documentOutput, 'w') as f: f.write(documentOutputContent) print(f"Speaker transcriptions: {speakerTranscriptions}") # Reconnaissance des locuteurs par leur phrase 'Hey Scripto, je suis %nom% et j'ai le rôle de %rôle%' for i in range(len(speakerTranscriptions)): # Vérifiez si la phrase clé est présente if 'Hey Scripto, je suis' in speakerTranscriptions[i][1]: try: # Extraire le rôle après "et j'ai le rôle de " role = speakerTranscriptions[i][1].split("et j'ai le rôle de ")[1] speakerTranscriptions[i][2] = role.strip() # Mettre à jour le rôle # Extraire le nom après "Hey Scripto, je suis" name = speakerTranscriptions[i][1].split("Hey Scripto, je suis")[1].split('et')[0] speakerTranscriptions[i][3] = name.strip() # Mettre à jour le nom # Nettoyer la transcription en retirant la phrase clé speakerTranscriptions[i][1] = speakerTranscriptions[i][1].split("Hey Scripto, je suis")[0].strip() except IndexError as e: print(f"Error parsing 'Hey Scripto' phrase: {e}") # Supprimer le fichier temporaire try: os.remove(savedFilePath) print(f"Deleted temporary file: {savedFilePath}") except Exception as e: print(f"Failed to delete temporary file: {str(e)}") return jsonify({'error': f"Failed to delete temporary file: {str(e)}"}), 500 # Supprimer le dossier de la session # try: # os.rmdir(f"work/{sessionId}") # print(f"Deleted session folder: work/{sessionId}") # except Exception as e: # print(f"Failed to delete session folder: {str(e)}") # return jsonify({'error': f"Failed to delete session folder: {str(e)}"}), 500 # Retourner les résultats return jsonify({ 'message': 'File saved, diarization, and transcription completed successfully', 'speakerTranscriptions': speakerTranscriptions }) except Exception as e: print(f"Unhandled error: {str(e)}") return jsonify({'error': f"Internal server error: {str(e)}"}), 500 # Liste des sessions actives workSession = [] if __name__ == '__main__': # Lancer l'application Flask app.run(debug=True, host='0.0.0.0', port=3003)