# MIT License # Copyright (c) 2022 Hash Minner # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE import os import asyncio from hachoir.parser import createParser from hachoir.metadata import extractMetadata import time import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def place_water_mark(input_file, output_file, water_mark_file): watermarked_file = f"{output_file}.watermark.png" metadata = extractMetadata(createParser(input_file)) width = metadata.get("width") # https://stackoverflow.com/a/34547184/4723940 shrink_watermark_file_genertor_command = [ "ffmpeg", "-i", water_mark_file, "-y -v quiet", "-vf", f"scale={width}*0.5:-1", watermarked_file, ] # print(shrink_watermark_file_genertor_command) process = await asyncio.create_subprocess_exec( *shrink_watermark_file_genertor_command, # stdout must a pipe to be accessible as process.stdout stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Wait for the subprocess to finish stdout, stderr = await process.communicate() e_response = stderr.decode().strip() t_response = stdout.decode().strip() commands_to_execute = [ "ffmpeg", "-i", input_file, "-i", watermarked_file, "-filter_complex", # https://stackoverflow.com/a/16235519 # "\"[0:0] scale=400:225 [wm]; [wm][1:0] overlay=305:0 [out]\"", # "-map \"[out]\" -b:v 896k -r 20 -an ", "\"overlay=(main_w-overlay_w):(main_h-overlay_h)\"", # "-vf \"drawtext=text='@FFMovingPictureExpertGroupBOT':x=W-(W/2):y=H-(H/2):fontfile=" + Config.FONT_FILE + ":fontsize=12:fontcolor=white:shadowcolor=black:shadowx=5:shadowy=5\"", output_file ] # print(commands_to_execute) process = await asyncio.create_subprocess_exec( *commands_to_execute, # stdout must a pipe to be accessible as process.stdout stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Wait for the subprocess to finish stdout, stderr = await process.communicate() e_response = stderr.decode().strip() t_response = stdout.decode().strip() return output_file async def take_screen_shot(video_file, output_directory, ttl): # https://stackoverflow.com/a/13891070/4723940 out_put_file_name = output_directory + \ "/" + str(time.time()) + ".jpg" file_genertor_command = [ "ffmpeg", "-ss", str(ttl), "-i", video_file, "-vframes", "1", out_put_file_name ] # width = "90" process = await asyncio.create_subprocess_exec( *file_genertor_command, # stdout must a pipe to be accessible as process.stdout stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Wait for the subprocess to finish stdout, stderr = await process.communicate() e_response = stderr.decode().strip() t_response = stdout.decode().strip() return out_put_file_name if os.path.lexists(out_put_file_name) else None # https://github.com/Nekmo/telegram-upload/blob/master/telegram_upload/video.py#L26 async def cult_small_video(video_file, output_directory, start_time, end_time): # https://stackoverflow.com/a/13891070/4723940 out_put_file_name = output_directory + \ "/" + str(round(time.time())) + ".mp4" file_genertor_command = [ "ffmpeg", "-i", video_file, "-ss", start_time, "-to", end_time, "-async", "1", "-strict", "-2", out_put_file_name ] process = await asyncio.create_subprocess_exec( *file_genertor_command, # stdout must a pipe to be accessible as process.stdout stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Wait for the subprocess to finish stdout, stderr = await process.communicate() e_response = stderr.decode().strip() t_response = stdout.decode().strip() return out_put_file_name if os.path.lexists(out_put_file_name) else None async def generate_screen_shots( video_file, output_directory, is_watermarkable, wf, min_duration, no_of_photos ): metadata = extractMetadata(createParser(video_file)) duration = 0 if metadata is not None and metadata.has("duration"): duration = metadata.get('duration').seconds if duration > min_duration: images = [] ttl_step = duration // no_of_photos current_ttl = ttl_step for _ in range(no_of_photos): ss_img = await take_screen_shot(video_file, output_directory, current_ttl) current_ttl = current_ttl + ttl_step if is_watermarkable: ss_img = await place_water_mark( ss_img, f"{output_directory}/{str(time.time())}.jpg", wf ) images.append(ss_img) return images else: return None