import asyncio from shazamio import Shazam import json import time from optparse import OptionParser import ffmpeg import yt_dlp import subprocess #mix = "The Magician b2b A-Trak - Live @ Elsewhere, Brooklyn [1739480907].opus" #output_file = ".".join(mix.split(".")[:-1]) + ".txt" #segment_length = 15 start_from = 0 # TODO: create new folder # TODO: args def convert_seconds_to_hhmmss(seconds): time_struct = time.gmtime(seconds) return time.strftime("%H:%M:%S", time_struct) def get_audio_length(filename): probe = ffmpeg.probe(filename) duration = float(probe['format']['duration']) return duration def extract_audio_segment(input_file, start_time, duration, output_file): ( ffmpeg .input(input_file, ss=start_time, t=duration) .output(output_file, acodec="copy") .run(overwrite_output=True, quiet=True) ) def download_song(url, track_id, output_dir='songs'): subprocess.run(["yt-dlp", '--default-search', 'ytsearch', url, "--output", f"{output_dir}/{track_id}_%(title)s.%(ext)s", "--embed-thumbnail", "--extract-audio"]) async def scan_mix(mix, segment_length): print(f"Processing {mix}...") output_file = ".".join(mix.split(".")[:-1]) + ".txt" with open(output_file, "a") as of: shazam = Shazam() audio_length = get_audio_length(mix) print(f"Got total mix length: {audio_length}") total_segments = int(audio_length // segment_length) print(f"Splitting into {total_segments} segments of {segment_length}s") n_digits = len(str(total_segments)) probe = ffmpeg.probe(mix) audio_codec = next(stream for stream in probe['streams'] if stream['codec_type'] == 'audio')['codec_name'] print(f"Got audio codec: {audio_codec}" + "\n") segment = 0 for s in range(0, int(audio_length), segment_length): if segment >= start_from: line = convert_seconds_to_hhmmss(s) + "\t" extract_audio_segment(mix, s, segment_length, "sample.opus") out = await shazam.recognize('sample.opus') if out.get("track"): name = out["track"]["share"]["subject"] line += name print(f"[{str(int(s/segment_length)).zfill(n_digits)}/{str(total_segments).zfill(n_digits)}] " + line) of.write(line + "\n") of.flush() time.sleep(3) segment += 1 of.close() return ".".join(mix.split(".")[:-1]) + ".txt" def download_all(txt_file): downloaded = [] final_file = ".".join(txt_file.split(".")[:-1]) + "_final.txt" with open(txt_file, "r") as txt: with open(final_file, "w") as final: lines = txt.readlines() last_track = "" last_time = "" ids = 1 for l in lines: timestamp = l.split("\t")[0] track_name = l.split("\t")[-1][:-1] if track_name == last_track and track_name not in downloaded: downloaded.append(track_name) print("Downloading track id " + str(ids) + f" with timestamp {last_time} and name '{track_name}'") download_song(f"ytsearch:{track_name}", ids) final.write(str(ids) + "\t" + last_time + "\t" + track_name) final.flush() ids += 1 last_track = track_name last_time = timestamp final.close() txt.close() #loop = asyncio.get_event_loop() #loop.run_until_complete(main()) def main(): usage = "usage: %prog [options] arg" parser = OptionParser(usage) #parser.add_option("-f", "--file", dest="filename", # help="process audio for FILENAME") parser.add_option("-d", "--download", dest="download", action="store_true") parser.add_option("-t", "--segment", dest="segment_length", type="int") parser.add_option("-v", "--verbose", action="store_true", dest="verbose") parser.add_option("-q", "--quiet", action="store_false", dest="verbose") (options, args) = parser.parse_args() if len(args) != 1: parser.error("incorrect number of arguments") print(parser.largs) #if options.download: print(options.segment_length) print(options.download) if __name__ == "__main__": main()