import asyncio from shazamio import Shazam import json import time import ffmpeg mix = "michael.aac" output_file = ".".join(mix.split(".")[:-1]) + ".txt" segment_length = 15 start_from = 0 # Define an offset in HH:MM:SS format initial_offset = "22:49:20" # 30 minutes offset as an example # Function to convert HH:MM:SS to seconds def hhmmss_to_seconds(hhmmss): h, m, s = map(int, hhmmss.split(':')) return h * 3600 + m * 60 + s # Calculate offset in seconds offset_seconds = hhmmss_to_seconds(initial_offset) def convert_seconds_to_hhmmss(seconds): time_struct = time.gmtime(seconds) return time.strftime("%H:%M:%S", time_struct) def get_audio_length(filename): probe = ffmpeg.probe(filename) duration = float(probe['format']['duration']) return duration def extract_audio_segment(input_file, start_time, duration, output_file): ( ffmpeg .input(input_file, ss=start_time, t=duration) .output(output_file, acodec="copy") .run(overwrite_output=True, quiet=True) ) async def main(): with open(output_file, "a") as of: shazam = Shazam() audio_length = get_audio_length(mix) print(f"Got total mix length: {audio_length}") total_segments = int(audio_length // segment_length) print(f"Splitting into {total_segments} segments of {segment_length}s") n_digits = len(str(total_segments)) probe = ffmpeg.probe(mix) audio_codec = next(stream for stream in probe['streams'] if stream['codec_type'] == 'audio')['codec_name'] print(f"Got audio codec: {audio_codec}" + "\n") segment = 0 for s in range(0, int(audio_length), segment_length): if segment >= start_from: relative = convert_seconds_to_hhmmss(s % 86400) real_wall_time = s + offset_seconds real_time_str = convert_seconds_to_hhmmss(real_wall_time % 86400) line = f"[{real_time_str}] ({relative}) " extract_audio_segment(mix, s, segment_length, "sample.aac") out = await shazam.recognize('sample.aac') if out.get("track"): name = out["track"]["share"]["subject"] line += name print(f"[{str(int(s/segment_length)).zfill(n_digits)}/{str(total_segments).zfill(n_digits)}] " + line) of.write(line + "\n") of.flush() time.sleep(3) segment += 1 of.close() print(f"Processing {mix}...") loop = asyncio.get_event_loop() loop.run_until_complete(main())