Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import sys | |
| import json | |
| import subprocess | |
| import numpy as np | |
| import re | |
| import datetime | |
| from typing import List | |
| import torch | |
| from PIL import Image, ExifTags | |
| from PIL.PngImagePlugin import PngInfo | |
| from pathlib import Path | |
| from string import Template | |
| import itertools | |
| import functools | |
| import folder_paths | |
| from .logger import logger | |
| from .image_latent_nodes import * | |
| from .load_video_nodes import LoadVideoUpload, LoadVideoPath, LoadVideoFFmpegUpload, LoadVideoFFmpegPath, LoadImagePath | |
| from .load_images_nodes import LoadImagesFromDirectoryUpload, LoadImagesFromDirectoryPath | |
| from .batched_nodes import VAEEncodeBatched, VAEDecodeBatched | |
| from .utils import ffmpeg_path, get_audio, hash_path, validate_path, requeue_workflow, \ | |
| gifski_path, calculate_file_hash, strip_path, try_download_video, is_url, \ | |
| imageOrLatent, BIGMAX, merge_filter_args, ENCODE_ARGS, floatOrInt, cached | |
| from comfy.utils import ProgressBar | |
| if 'VHS_video_formats' not in folder_paths.folder_names_and_paths: | |
| folder_paths.folder_names_and_paths["VHS_video_formats"] = ((),{".json"}) | |
| if len(folder_paths.folder_names_and_paths['VHS_video_formats'][1]) == 0: | |
| folder_paths.folder_names_and_paths["VHS_video_formats"][1].add(".json") | |
| audio_extensions = ['mp3', 'mp4', 'wav', 'ogg'] | |
| def gen_format_widgets(video_format): | |
| for k in video_format: | |
| if k.endswith("_pass"): | |
| for i in range(len(video_format[k])): | |
| if isinstance(video_format[k][i], list): | |
| item = [video_format[k][i]] | |
| yield item | |
| video_format[k][i] = item[0] | |
| else: | |
| if isinstance(video_format[k], list): | |
| item = [video_format[k]] | |
| yield item | |
| video_format[k] = item[0] | |
| base_formats_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "video_formats") | |
| def get_video_formats(): | |
| format_files = {} | |
| for format_name in folder_paths.get_filename_list("VHS_video_formats"): | |
| format_files[format_name] = folder_paths.get_full_path("VHS_video_formats", format_name) | |
| for item in os.scandir(base_formats_dir): | |
| if not item.is_file() or not item.name.endswith('.json'): | |
| continue | |
| format_files[item.name[:-5]] = item.path | |
| formats = [] | |
| format_widgets = {} | |
| for format_name, path in format_files.items(): | |
| with open(path, 'r') as stream: | |
| video_format = json.load(stream) | |
| if "gifski_pass" in video_format and gifski_path is None: | |
| #Skip format | |
| continue | |
| widgets = [w[0] for w in gen_format_widgets(video_format)] | |
| formats.append("video/" + format_name) | |
| if (len(widgets) > 0): | |
| format_widgets["video/"+ format_name] = widgets | |
| return formats, format_widgets | |
| def apply_format_widgets(format_name, kwargs): | |
| if os.path.exists(os.path.join(base_formats_dir, format_name + ".json")): | |
| video_format_path = os.path.join(base_formats_dir, format_name + ".json") | |
| else: | |
| video_format_path = folder_paths.get_full_path("VHS_video_formats", format_name) | |
| with open(video_format_path, 'r') as stream: | |
| video_format = json.load(stream) | |
| for w in gen_format_widgets(video_format): | |
| if w[0][0] not in kwargs: | |
| if len(w[0]) > 2 and 'default' in w[0][2]: | |
| default = w[0][2]['default'] | |
| else: | |
| if type(w[0][1]) is list: | |
| default = w[0][1][0] | |
| else: | |
| #NOTE: This doesn't respect max/min, but should be good enough as a fallback to a fallback to a fallback | |
| default = {"BOOLEAN": False, "INT": 0, "FLOAT": 0, "STRING": ""}[w[0][1]] | |
| kwargs[w[0][0]] = default | |
| logger.warn(f"Missing input for {w[0][0]} has been set to {default}") | |
| if len(w[0]) > 3: | |
| w[0] = Template(w[0][3]).substitute(val=kwargs[w[0][0]]) | |
| else: | |
| w[0] = str(kwargs[w[0][0]]) | |
| return video_format | |
| def tensor_to_int(tensor, bits): | |
| #TODO: investigate benefit of rounding by adding 0.5 before clip/cast | |
| tensor = tensor.cpu().numpy() * (2**bits-1) | |
| return np.clip(tensor, 0, (2**bits-1)) | |
| def tensor_to_shorts(tensor): | |
| return tensor_to_int(tensor, 16).astype(np.uint16) | |
| def tensor_to_bytes(tensor): | |
| return tensor_to_int(tensor, 8).astype(np.uint8) | |
| def ffmpeg_process(args, video_format, video_metadata, file_path, env): | |
| res = None | |
| frame_data = yield | |
| total_frames_output = 0 | |
| if video_format.get('save_metadata', 'False') != 'False': | |
| os.makedirs(folder_paths.get_temp_directory(), exist_ok=True) | |
| metadata = json.dumps(video_metadata) | |
| metadata_path = os.path.join(folder_paths.get_temp_directory(), "metadata.txt") | |
| #metadata from file should escape = ; # \ and newline | |
| metadata = metadata.replace("\\","\\\\") | |
| metadata = metadata.replace(";","\\;") | |
| metadata = metadata.replace("#","\\#") | |
| metadata = metadata.replace("=","\\=") | |
| metadata = metadata.replace("\n","\\\n") | |
| metadata = "comment=" + metadata | |
| with open(metadata_path, "w") as f: | |
| f.write(";FFMETADATA1\n") | |
| f.write(metadata) | |
| m_args = args[:1] + ["-i", metadata_path] + args[1:] + ["-metadata", "creation_time=now"] | |
| with subprocess.Popen(m_args + [file_path], stderr=subprocess.PIPE, | |
| stdin=subprocess.PIPE, env=env) as proc: | |
| try: | |
| while frame_data is not None: | |
| proc.stdin.write(frame_data) | |
| #TODO: skip flush for increased speed | |
| frame_data = yield | |
| total_frames_output+=1 | |
| proc.stdin.flush() | |
| proc.stdin.close() | |
| res = proc.stderr.read() | |
| except BrokenPipeError as e: | |
| err = proc.stderr.read() | |
| #Check if output file exists. If it does, the re-execution | |
| #will also fail. This obscures the cause of the error | |
| #and seems to never occur concurrent to the metadata issue | |
| if os.path.exists(file_path): | |
| raise Exception("An error occurred in the ffmpeg subprocess:\n" \ | |
| + err.decode(*ENCODE_ARGS)) | |
| #Res was not set | |
| print(err.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
| logger.warn("An error occurred when saving with metadata") | |
| if res != b'': | |
| with subprocess.Popen(args + [file_path], stderr=subprocess.PIPE, | |
| stdin=subprocess.PIPE, env=env) as proc: | |
| try: | |
| while frame_data is not None: | |
| proc.stdin.write(frame_data) | |
| frame_data = yield | |
| total_frames_output+=1 | |
| proc.stdin.flush() | |
| proc.stdin.close() | |
| res = proc.stderr.read() | |
| except BrokenPipeError as e: | |
| res = proc.stderr.read() | |
| raise Exception("An error occurred in the ffmpeg subprocess:\n" \ | |
| + res.decode(*ENCODE_ARGS)) | |
| yield total_frames_output | |
| if len(res) > 0: | |
| print(res.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
| def gifski_process(args, video_format, file_path, env): | |
| frame_data = yield | |
| with subprocess.Popen(args + video_format['main_pass'] + ['-f', 'yuv4mpegpipe', '-'], | |
| stderr=subprocess.PIPE, stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, env=env) as procff: | |
| with subprocess.Popen([gifski_path] + video_format['gifski_pass'] | |
| + ['-q', '-o', file_path, '-'], stderr=subprocess.PIPE, | |
| stdin=procff.stdout, stdout=subprocess.PIPE, | |
| env=env) as procgs: | |
| try: | |
| while frame_data is not None: | |
| procff.stdin.write(frame_data) | |
| frame_data = yield | |
| procff.stdin.flush() | |
| procff.stdin.close() | |
| resff = procff.stderr.read() | |
| resgs = procgs.stderr.read() | |
| outgs = procgs.stdout.read() | |
| except BrokenPipeError as e: | |
| procff.stdin.close() | |
| resff = procff.stderr.read() | |
| resgs = procgs.stderr.read() | |
| raise Exception("An error occurred while creating gifski output\n" \ | |
| + "Make sure you are using gifski --version >=1.32.0\nffmpeg: " \ | |
| + resff.decode(*ENCODE_ARGS) + '\ngifski: ' + resgs.decode(*ENCODE_ARGS)) | |
| if len(resff) > 0: | |
| print(resff.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
| if len(resgs) > 0: | |
| print(resgs.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
| #should always be empty as the quiet flag is passed | |
| if len(outgs) > 0: | |
| print(outgs.decode(*ENCODE_ARGS)) | |
| def to_pingpong(inp): | |
| if not hasattr(inp, "__getitem__"): | |
| inp = list(inp) | |
| yield from inp | |
| for i in range(len(inp)-2,0,-1): | |
| yield inp[i] | |
| class VideoCombine: | |
| def INPUT_TYPES(s): | |
| ffmpeg_formats, format_widgets = get_video_formats() | |
| return { | |
| "required": { | |
| "images": (imageOrLatent,), | |
| "frame_rate": ( | |
| floatOrInt, | |
| {"default": 8, "min": 1, "step": 1}, | |
| ), | |
| "loop_count": ("INT", {"default": 0, "min": 0, "max": 100, "step": 1}), | |
| "filename_prefix": ("STRING", {"default": "AnimateDiff"}), | |
| "format": (["image/gif", "image/webp"] + ffmpeg_formats, {'formats': format_widgets}), | |
| "pingpong": ("BOOLEAN", {"default": False}), | |
| "save_output": ("BOOLEAN", {"default": True}), | |
| }, | |
| "optional": { | |
| "audio": ("AUDIO",), | |
| "meta_batch": ("VHS_BatchManager",), | |
| "vae": ("VAE",), | |
| }, | |
| "hidden": { | |
| "prompt": "PROMPT", | |
| "extra_pnginfo": "EXTRA_PNGINFO", | |
| "unique_id": "UNIQUE_ID" | |
| }, | |
| } | |
| RETURN_TYPES = ("VHS_FILENAMES",) | |
| RETURN_NAMES = ("Filenames",) | |
| OUTPUT_NODE = True | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
| FUNCTION = "combine_video" | |
| def combine_video( | |
| self, | |
| frame_rate: int, | |
| loop_count: int, | |
| images=None, | |
| latents=None, | |
| filename_prefix="AnimateDiff", | |
| format="image/gif", | |
| pingpong=False, | |
| save_output=True, | |
| prompt=None, | |
| extra_pnginfo=None, | |
| audio=None, | |
| unique_id=None, | |
| manual_format_widgets=None, | |
| meta_batch=None, | |
| vae=None, | |
| **kwargs | |
| ): | |
| if latents is not None: | |
| images = latents | |
| if images is None: | |
| return ((save_output, []),) | |
| if vae is not None: | |
| if isinstance(images, dict): | |
| images = images['samples'] | |
| else: | |
| vae = None | |
| if isinstance(images, torch.Tensor) and images.size(0) == 0: | |
| return ((save_output, []),) | |
| num_frames = len(images) | |
| pbar = ProgressBar(num_frames) | |
| if vae is not None: | |
| downscale_ratio = getattr(vae, "downscale_ratio", 8) | |
| width = images.size(-1)*downscale_ratio | |
| height = images.size(-2)*downscale_ratio | |
| frames_per_batch = (1920 * 1080 * 16) // (width * height) or 1 | |
| #Python 3.12 adds an itertools.batched, but it's easily replicated for legacy support | |
| def batched(it, n): | |
| while batch := tuple(itertools.islice(it, n)): | |
| yield batch | |
| def batched_encode(images, vae, frames_per_batch): | |
| for batch in batched(iter(images), frames_per_batch): | |
| image_batch = torch.from_numpy(np.array(batch)) | |
| yield from vae.decode(image_batch) | |
| images = batched_encode(images, vae, frames_per_batch) | |
| first_image = next(images) | |
| #repush first_image | |
| images = itertools.chain([first_image], images) | |
| #A single image has 3 dimensions. Discard higher dimensions | |
| while len(first_image.shape) > 3: | |
| first_image = first_image[0] | |
| else: | |
| first_image = images[0] | |
| images = iter(images) | |
| # get output information | |
| output_dir = ( | |
| folder_paths.get_output_directory() | |
| if save_output | |
| else folder_paths.get_temp_directory() | |
| ) | |
| ( | |
| full_output_folder, | |
| filename, | |
| _, | |
| subfolder, | |
| _, | |
| ) = folder_paths.get_save_image_path(filename_prefix, output_dir) | |
| output_files = [] | |
| metadata = PngInfo() | |
| video_metadata = {} | |
| if prompt is not None: | |
| metadata.add_text("prompt", json.dumps(prompt)) | |
| video_metadata["prompt"] = json.dumps(prompt) | |
| if extra_pnginfo is not None: | |
| for x in extra_pnginfo: | |
| metadata.add_text(x, json.dumps(extra_pnginfo[x])) | |
| video_metadata[x] = extra_pnginfo[x] | |
| extra_options = extra_pnginfo.get('workflow', {}).get('extra', {}) | |
| else: | |
| extra_options = {} | |
| metadata.add_text("CreationTime", datetime.datetime.now().isoformat(" ")[:19]) | |
| if meta_batch is not None and unique_id in meta_batch.outputs: | |
| (counter, output_process) = meta_batch.outputs[unique_id] | |
| else: | |
| # comfy counter workaround | |
| max_counter = 0 | |
| # Loop through the existing files | |
| matcher = re.compile(f"{re.escape(filename)}_(\\d+)\\D*\\..+", re.IGNORECASE) | |
| for existing_file in os.listdir(full_output_folder): | |
| # Check if the file matches the expected format | |
| match = matcher.fullmatch(existing_file) | |
| if match: | |
| # Extract the numeric portion of the filename | |
| file_counter = int(match.group(1)) | |
| # Update the maximum counter value if necessary | |
| if file_counter > max_counter: | |
| max_counter = file_counter | |
| # Increment the counter by 1 to get the next available value | |
| counter = max_counter + 1 | |
| output_process = None | |
| # save first frame as png to keep metadata | |
| first_image_file = f"{filename}_{counter:05}.png" | |
| file_path = os.path.join(full_output_folder, first_image_file) | |
| if extra_options.get('VHS_MetadataImage', True) != False: | |
| Image.fromarray(tensor_to_bytes(first_image)).save( | |
| file_path, | |
| pnginfo=metadata, | |
| compress_level=4, | |
| ) | |
| output_files.append(file_path) | |
| format_type, format_ext = format.split("/") | |
| if format_type == "image": | |
| if meta_batch is not None: | |
| raise Exception("Pillow('image/') formats are not compatible with batched output") | |
| image_kwargs = {} | |
| if format_ext == "gif": | |
| image_kwargs['disposal'] = 2 | |
| if format_ext == "webp": | |
| #Save timestamp information | |
| exif = Image.Exif() | |
| exif[ExifTags.IFD.Exif] = {36867: datetime.datetime.now().isoformat(" ")[:19]} | |
| image_kwargs['exif'] = exif | |
| file = f"{filename}_{counter:05}.{format_ext}" | |
| file_path = os.path.join(full_output_folder, file) | |
| if pingpong: | |
| images = to_pingpong(images) | |
| frames = map(lambda x : Image.fromarray(tensor_to_bytes(x)), images) | |
| # Use pillow directly to save an animated image | |
| next(frames).save( | |
| file_path, | |
| format=format_ext.upper(), | |
| save_all=True, | |
| append_images=frames, | |
| duration=round(1000 / frame_rate), | |
| loop=loop_count, | |
| compress_level=4, | |
| **image_kwargs | |
| ) | |
| output_files.append(file_path) | |
| else: | |
| # Use ffmpeg to save a video | |
| if ffmpeg_path is None: | |
| raise ProcessLookupError(f"ffmpeg is required for video outputs and could not be found.\nIn order to use video outputs, you must either:\n- Install imageio-ffmpeg with pip,\n- Place a ffmpeg executable in {os.path.abspath('')}, or\n- Install ffmpeg and add it to the system path.") | |
| #Acquire additional format_widget values | |
| if manual_format_widgets is None: | |
| if prompt is not None: | |
| kwargs, passed_kwargs = prompt[unique_id]['inputs'], kwargs | |
| kwargs.update(passed_kwargs) | |
| else: | |
| manual_format_widgets = {} | |
| if kwargs is None: | |
| missing = {} | |
| for k in kwargs.keys(): | |
| if k in manual_format_widgets: | |
| kwargs[k] = manual_format_widgets[k] | |
| else: | |
| missing[k] = kwargs[k] | |
| if len(missing) > 0: | |
| logger.warn("Extra format values were not provided, the following defaults will be used: " + str(kwargs) + "\nThis is likely due to usage of ComfyUI-to-python. These values can be manually set by supplying a manual_format_widgets argument") | |
| video_format = apply_format_widgets(format_ext, kwargs) | |
| has_alpha = first_image.shape[-1] == 4 | |
| dim_alignment = video_format.get("dim_alignment", 2) | |
| if (first_image.shape[1] % dim_alignment) or (first_image.shape[0] % dim_alignment): | |
| #output frames must be padded | |
| to_pad = (-first_image.shape[1] % dim_alignment, | |
| -first_image.shape[0] % dim_alignment) | |
| padding = (to_pad[0]//2, to_pad[0] - to_pad[0]//2, | |
| to_pad[1]//2, to_pad[1] - to_pad[1]//2) | |
| padfunc = torch.nn.ReplicationPad2d(padding) | |
| def pad(image): | |
| image = image.permute((2,0,1))#HWC to CHW | |
| padded = padfunc(image.to(dtype=torch.float32)) | |
| return padded.permute((1,2,0)) | |
| images = map(pad, images) | |
| new_dims = (-first_image.shape[1] % dim_alignment + first_image.shape[1], | |
| -first_image.shape[0] % dim_alignment + first_image.shape[0]) | |
| dimensions = f"{new_dims[0]}x{new_dims[1]}" | |
| logger.warn("Output images were not of valid resolution and have had padding applied") | |
| else: | |
| dimensions = f"{first_image.shape[1]}x{first_image.shape[0]}" | |
| if loop_count > 0: | |
| loop_args = ["-vf", "loop=loop=" + str(loop_count)+":size=" + str(num_frames)] | |
| else: | |
| loop_args = [] | |
| if pingpong: | |
| if meta_batch is not None: | |
| logger.error("pingpong is incompatible with batched output") | |
| images = to_pingpong(images) | |
| if video_format.get('input_color_depth', '8bit') == '16bit': | |
| images = map(tensor_to_shorts, images) | |
| if has_alpha: | |
| i_pix_fmt = 'rgba64' | |
| else: | |
| i_pix_fmt = 'rgb48' | |
| else: | |
| images = map(tensor_to_bytes, images) | |
| if has_alpha: | |
| i_pix_fmt = 'rgba' | |
| else: | |
| i_pix_fmt = 'rgb24' | |
| file = f"{filename}_{counter:05}.{video_format['extension']}" | |
| file_path = os.path.join(full_output_folder, file) | |
| bitrate_arg = [] | |
| bitrate = video_format.get('bitrate') | |
| if bitrate is not None: | |
| bitrate_arg = ["-b:v", str(bitrate) + "M" if video_format.get('megabit') == 'True' else str(bitrate) + "K"] | |
| args = [ffmpeg_path, "-v", "error", "-f", "rawvideo", "-pix_fmt", i_pix_fmt, | |
| "-s", dimensions, "-r", str(frame_rate), "-i", "-"] \ | |
| + loop_args | |
| images = map(lambda x: x.tobytes(), images) | |
| env=os.environ.copy() | |
| if "environment" in video_format: | |
| env.update(video_format["environment"]) | |
| if "pre_pass" in video_format: | |
| if meta_batch is not None: | |
| #Performing a prepass requires keeping access to all frames. | |
| #Potential solutions include keeping just output frames in | |
| #memory or using 3 passes with intermediate file, but | |
| #very long gifs probably shouldn't be encouraged | |
| raise Exception("Formats which require a pre_pass are incompatible with Batch Manager.") | |
| images = [b''.join(images)] | |
| os.makedirs(folder_paths.get_temp_directory(), exist_ok=True) | |
| pre_pass_args = args[:13] + video_format['pre_pass'] | |
| merge_filter_args(pre_pass_args) | |
| try: | |
| subprocess.run(pre_pass_args, input=images[0], env=env, | |
| capture_output=True, check=True) | |
| except subprocess.CalledProcessError as e: | |
| raise Exception("An error occurred in the ffmpeg prepass:\n" \ | |
| + e.stderr.decode(*ENCODE_ARGS)) | |
| if "inputs_main_pass" in video_format: | |
| args = args[:13] + video_format['inputs_main_pass'] + args[13:] | |
| if output_process is None: | |
| if 'gifski_pass' in video_format: | |
| output_process = gifski_process(args, video_format, file_path, env) | |
| else: | |
| args += video_format['main_pass'] + bitrate_arg | |
| merge_filter_args(args) | |
| output_process = ffmpeg_process(args, video_format, video_metadata, file_path, env) | |
| #Proceed to first yield | |
| output_process.send(None) | |
| if meta_batch is not None: | |
| meta_batch.outputs[unique_id] = (counter, output_process) | |
| for image in images: | |
| pbar.update(1) | |
| output_process.send(image) | |
| if meta_batch is not None: | |
| requeue_workflow((meta_batch.unique_id, not meta_batch.has_closed_inputs)) | |
| if meta_batch is None or meta_batch.has_closed_inputs: | |
| #Close pipe and wait for termination. | |
| try: | |
| total_frames_output = output_process.send(None) | |
| output_process.send(None) | |
| except StopIteration: | |
| pass | |
| if meta_batch is not None: | |
| meta_batch.outputs.pop(unique_id) | |
| if len(meta_batch.outputs) == 0: | |
| meta_batch.reset() | |
| else: | |
| #batch is unfinished | |
| #TODO: Check if empty output breaks other custom nodes | |
| return {"ui": {"unfinished_batch": [True]}, "result": ((save_output, []),)} | |
| output_files.append(file_path) | |
| a_waveform = None | |
| if audio is not None: | |
| try: | |
| #safely check if audio produced by VHS_LoadVideo actually exists | |
| a_waveform = audio['waveform'] | |
| except: | |
| pass | |
| if a_waveform is not None: | |
| # Create audio file if input was provided | |
| output_file_with_audio = f"{filename}_{counter:05}-audio.{video_format['extension']}" | |
| output_file_with_audio_path = os.path.join(full_output_folder, output_file_with_audio) | |
| if "audio_pass" not in video_format: | |
| logger.warn("Selected video format does not have explicit audio support") | |
| video_format["audio_pass"] = ["-c:a", "libopus"] | |
| # FFmpeg command with audio re-encoding | |
| #TODO: expose audio quality options if format widgets makes it in | |
| #Reconsider forcing apad/shortest | |
| channels = audio['waveform'].size(1) | |
| min_audio_dur = total_frames_output / frame_rate + 1 | |
| if video_format.get('trim_to_audio', 'False') != 'False': | |
| apad = [] | |
| else: | |
| apad = ["-af", "apad=whole_dur="+str(min_audio_dur)] | |
| mux_args = [ffmpeg_path, "-v", "error", "-n", "-i", file_path, | |
| "-ar", str(audio['sample_rate']), "-ac", str(channels), | |
| "-f", "f32le", "-i", "-", "-c:v", "copy"] \ | |
| + video_format["audio_pass"] \ | |
| + apad + ["-shortest", output_file_with_audio_path] | |
| audio_data = audio['waveform'].squeeze(0).transpose(0,1) \ | |
| .numpy().tobytes() | |
| merge_filter_args(mux_args, '-af') | |
| try: | |
| res = subprocess.run(mux_args, input=audio_data, | |
| env=env, capture_output=True, check=True) | |
| except subprocess.CalledProcessError as e: | |
| raise Exception("An error occured in the ffmpeg subprocess:\n" \ | |
| + e.stderr.decode(*ENCODE_ARGS)) | |
| if res.stderr: | |
| print(res.stderr.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
| output_files.append(output_file_with_audio_path) | |
| #Return this file with audio to the webui. | |
| #It will be muted unless opened or saved with right click | |
| file = output_file_with_audio | |
| if extra_options.get('VHS_KeepIntermediate', True) == False: | |
| for intermediate in output_files[1:-1]: | |
| if os.path.exists(intermediate): | |
| os.remove(intermediate) | |
| preview = { | |
| "filename": file, | |
| "subfolder": subfolder, | |
| "type": "output" if save_output else "temp", | |
| "format": format, | |
| "frame_rate": frame_rate, | |
| "workflow": first_image_file, | |
| "fullpath": output_files[-1], | |
| } | |
| if num_frames == 1 and 'png' in format and '%03d' in file: | |
| preview['format'] = 'image/png' | |
| preview['filename'] = file.replace('%03d', '001') | |
| return {"ui": {"gifs": [preview]}, "result": ((save_output, output_files),)} | |
| class LoadAudio: | |
| def INPUT_TYPES(s): | |
| #Hide ffmpeg formats if ffmpeg isn't available | |
| return { | |
| "required": { | |
| "audio_file": ("STRING", {"default": "input/", "vhs_path_extensions": ['wav','mp3','ogg','m4a','flac']}), | |
| }, | |
| "optional" : {"seek_seconds": ("FLOAT", {"default": 0, "min": 0})} | |
| } | |
| RETURN_TYPES = ("AUDIO",) | |
| RETURN_NAMES = ("audio",) | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’/audio" | |
| FUNCTION = "load_audio" | |
| def load_audio(self, audio_file, seek_seconds): | |
| audio_file = strip_path(audio_file) | |
| if audio_file is None or validate_path(audio_file) != True: | |
| raise Exception("audio_file is not a valid path: " + audio_file) | |
| if is_url(audio_file): | |
| audio_file = try_download_video(audio_file) or audio_file | |
| #Eagerly fetch the audio since the user must be using it if the | |
| #node executes, unlike Load Video | |
| return (get_audio(audio_file, start_time=seek_seconds),) | |
| def IS_CHANGED(s, audio_file, seek_seconds): | |
| return hash_path(audio_file) | |
| def VALIDATE_INPUTS(s, audio_file, **kwargs): | |
| return validate_path(audio_file, allow_none=True) | |
| class LoadAudioUpload: | |
| def INPUT_TYPES(s): | |
| input_dir = folder_paths.get_input_directory() | |
| files = [] | |
| for f in os.listdir(input_dir): | |
| if os.path.isfile(os.path.join(input_dir, f)): | |
| file_parts = f.split('.') | |
| if len(file_parts) > 1 and (file_parts[-1] in audio_extensions): | |
| files.append(f) | |
| return {"required": { | |
| "audio": (sorted(files),), | |
| "start_time": ("FLOAT" , {"default": 0, "min": 0, "max": 10000000, "step": 0.01}), | |
| "duration": ("FLOAT" , {"default": 0, "min": 0, "max": 10000000, "step": 0.01}), | |
| }, | |
| } | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’/audio" | |
| RETURN_TYPES = ("AUDIO", ) | |
| RETURN_NAMES = ("audio",) | |
| FUNCTION = "load_audio" | |
| def load_audio(self, start_time, duration, **kwargs): | |
| audio_file = folder_paths.get_annotated_filepath(strip_path(kwargs['audio'])) | |
| if audio_file is None or validate_path(audio_file) != True: | |
| raise Exception("audio_file is not a valid path: " + audio_file) | |
| return (get_audio(audio_file, start_time, duration),) | |
| def IS_CHANGED(s, audio, start_time, duration): | |
| audio_file = folder_paths.get_annotated_filepath(strip_path(audio)) | |
| return hash_path(audio_file) | |
| def VALIDATE_INPUTS(s, audio, **kwargs): | |
| audio_file = folder_paths.get_annotated_filepath(strip_path(audio)) | |
| return validate_path(audio_file, allow_none=True) | |
| class AudioToVHSAudio: | |
| """Legacy method for external nodes that utilized VHS_AUDIO, | |
| VHS_AUDIO is deprecated as a format and should no longer be used""" | |
| def INPUT_TYPES(s): | |
| return {"required": {"audio": ("AUDIO",)}} | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’/audio" | |
| RETURN_TYPES = ("VHS_AUDIO", ) | |
| RETURN_NAMES = ("vhs_audio",) | |
| FUNCTION = "convert_audio" | |
| def convert_audio(self, audio): | |
| ar = str(audio['sample_rate']) | |
| ac = str(audio['waveform'].size(1)) | |
| mux_args = [ffmpeg_path, "-f", "f32le", "-ar", ar, "-ac", ac, | |
| "-i", "-", "-f", "wav", "-"] | |
| audio_data = audio['waveform'].squeeze(0).transpose(0,1) \ | |
| .numpy().tobytes() | |
| try: | |
| res = subprocess.run(mux_args, input=audio_data, | |
| capture_output=True, check=True) | |
| except subprocess.CalledProcessError as e: | |
| raise Exception("An error occured in the ffmpeg subprocess:\n" \ | |
| + e.stderr.decode(*ENCODE_ARGS)) | |
| if res.stderr: | |
| print(res.stderr.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
| return (lambda: res.stdout,) | |
| class VHSAudioToAudio: | |
| """Legacy method for external nodes that utilized VHS_AUDIO, | |
| VHS_AUDIO is deprecated as a format and should no longer be used""" | |
| def INPUT_TYPES(s): | |
| return {"required": {"vhs_audio": ("VHS_AUDIO",)}} | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’/audio" | |
| RETURN_TYPES = ("AUDIO", ) | |
| RETURN_NAMES = ("audio",) | |
| FUNCTION = "convert_audio" | |
| def convert_audio(self, vhs_audio): | |
| if not vhs_audio or not vhs_audio(): | |
| raise Exception("audio input is not valid") | |
| args = [ffmpeg_path, "-i", '-'] | |
| try: | |
| res = subprocess.run(args + ["-f", "f32le", "-"], input=vhs_audio(), | |
| capture_output=True, check=True) | |
| audio = torch.frombuffer(bytearray(res.stdout), dtype=torch.float32) | |
| except subprocess.CalledProcessError as e: | |
| raise Exception("An error occured in the ffmpeg subprocess:\n" \ | |
| + e.stderr.decode(*ENCODE_ARGS)) | |
| match = re.search(', (\\d+) Hz, (\\w+), ',res.stderr.decode(*ENCODE_ARGS)) | |
| if match: | |
| ar = int(match.group(1)) | |
| #NOTE: Just throwing an error for other channel types right now | |
| #Will deal with issues if they come | |
| ac = {"mono": 1, "stereo": 2}[match.group(2)] | |
| else: | |
| ar = 44100 | |
| ac = 2 | |
| audio = audio.reshape((-1,ac)).transpose(0,1).unsqueeze(0) | |
| return ({'waveform': audio, 'sample_rate': ar},) | |
| class PruneOutputs: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "filenames": ("VHS_FILENAMES",), | |
| "options": (["Intermediate", "Intermediate and Utility"],) | |
| } | |
| } | |
| RETURN_TYPES = () | |
| OUTPUT_NODE = True | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
| FUNCTION = "prune_outputs" | |
| def prune_outputs(self, filenames, options): | |
| if len(filenames[1]) == 0: | |
| return () | |
| assert(len(filenames[1]) <= 3 and len(filenames[1]) >= 2) | |
| delete_list = [] | |
| if options in ["Intermediate", "Intermediate and Utility", "All"]: | |
| delete_list += filenames[1][1:-1] | |
| if options in ["Intermediate and Utility", "All"]: | |
| delete_list.append(filenames[1][0]) | |
| if options in ["All"]: | |
| delete_list.append(filenames[1][-1]) | |
| output_dirs = [folder_paths.get_output_directory(), | |
| folder_paths.get_temp_directory()] | |
| for file in delete_list: | |
| #Check that path is actually an output directory | |
| if (os.path.commonpath([output_dirs[0], file]) != output_dirs[0]) \ | |
| and (os.path.commonpath([output_dirs[1], file]) != output_dirs[1]): | |
| raise Exception("Tried to prune output from invalid directory: " + file) | |
| if os.path.exists(file): | |
| os.remove(file) | |
| return () | |
| class BatchManager: | |
| def __init__(self, frames_per_batch=-1): | |
| self.frames_per_batch = frames_per_batch | |
| self.inputs = {} | |
| self.outputs = {} | |
| self.unique_id = None | |
| self.has_closed_inputs = False | |
| self.total_frames = float('inf') | |
| def reset(self): | |
| self.close_inputs() | |
| for key in self.outputs: | |
| if getattr(self.outputs[key][-1], "gi_suspended", False): | |
| try: | |
| self.outputs[key][-1].send(None) | |
| except StopIteration: | |
| pass | |
| self.__init__(self.frames_per_batch) | |
| def has_open_inputs(self): | |
| return len(self.inputs) > 0 | |
| def close_inputs(self): | |
| for key in self.inputs: | |
| if getattr(self.inputs[key][-1], "gi_suspended", False): | |
| try: | |
| self.inputs[key][-1].send(1) | |
| except StopIteration: | |
| pass | |
| self.inputs = {} | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "frames_per_batch": ("INT", {"default": 16, "min": 1, "max": BIGMAX, "step": 1}) | |
| }, | |
| "hidden": { | |
| "prompt": "PROMPT", | |
| "unique_id": "UNIQUE_ID" | |
| }, | |
| } | |
| RETURN_TYPES = ("VHS_BatchManager",) | |
| RETURN_NAMES = ("meta_batch",) | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
| FUNCTION = "update_batch" | |
| def update_batch(self, frames_per_batch, prompt=None, unique_id=None): | |
| if unique_id is not None and prompt is not None: | |
| requeue = prompt[unique_id]['inputs'].get('requeue', 0) | |
| else: | |
| requeue = 0 | |
| if requeue == 0: | |
| self.reset() | |
| self.frames_per_batch = frames_per_batch | |
| self.unique_id = unique_id | |
| else: | |
| num_batches = (self.total_frames+self.frames_per_batch-1)//frames_per_batch | |
| print(f'Meta-Batch {requeue}/{num_batches}') | |
| #onExecuted seems to not be called unless some message is sent | |
| return (self,) | |
| class VideoInfo: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "video_info": ("VHS_VIDEOINFO",), | |
| } | |
| } | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
| RETURN_TYPES = ("FLOAT","INT", "FLOAT", "INT", "INT", "FLOAT","INT", "FLOAT", "INT", "INT") | |
| RETURN_NAMES = ( | |
| "source_fpsπ¨", | |
| "source_frame_countπ¨", | |
| "source_durationπ¨", | |
| "source_widthπ¨", | |
| "source_heightπ¨", | |
| "loaded_fpsπ¦", | |
| "loaded_frame_countπ¦", | |
| "loaded_durationπ¦", | |
| "loaded_widthπ¦", | |
| "loaded_heightπ¦", | |
| ) | |
| FUNCTION = "get_video_info" | |
| def get_video_info(self, video_info): | |
| keys = ["fps", "frame_count", "duration", "width", "height"] | |
| source_info = [] | |
| loaded_info = [] | |
| for key in keys: | |
| source_info.append(video_info[f"source_{key}"]) | |
| loaded_info.append(video_info[f"loaded_{key}"]) | |
| return (*source_info, *loaded_info) | |
| class VideoInfoSource: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "video_info": ("VHS_VIDEOINFO",), | |
| } | |
| } | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
| RETURN_TYPES = ("FLOAT","INT", "FLOAT", "INT", "INT",) | |
| RETURN_NAMES = ( | |
| "fpsπ¨", | |
| "frame_countπ¨", | |
| "durationπ¨", | |
| "widthπ¨", | |
| "heightπ¨", | |
| ) | |
| FUNCTION = "get_video_info" | |
| def get_video_info(self, video_info): | |
| keys = ["fps", "frame_count", "duration", "width", "height"] | |
| source_info = [] | |
| for key in keys: | |
| source_info.append(video_info[f"source_{key}"]) | |
| return (*source_info,) | |
| class VideoInfoLoaded: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "video_info": ("VHS_VIDEOINFO",), | |
| } | |
| } | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
| RETURN_TYPES = ("FLOAT","INT", "FLOAT", "INT", "INT",) | |
| RETURN_NAMES = ( | |
| "fpsπ¦", | |
| "frame_countπ¦", | |
| "durationπ¦", | |
| "widthπ¦", | |
| "heightπ¦", | |
| ) | |
| FUNCTION = "get_video_info" | |
| def get_video_info(self, video_info): | |
| keys = ["fps", "frame_count", "duration", "width", "height"] | |
| loaded_info = [] | |
| for key in keys: | |
| loaded_info.append(video_info[f"loaded_{key}"]) | |
| return (*loaded_info,) | |
| class SelectFilename: | |
| def INPUT_TYPES(s): | |
| return {"required": {"filenames": ("VHS_FILENAMES",), "index": ("INT", {"default": -1, "step": 1, "min": -1})}} | |
| RETURN_TYPES = ("STRING",) | |
| RETURN_NAMES =("Filename",) | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
| FUNCTION = "select_filename" | |
| def select_filename(self, filenames, index): | |
| return (filenames[1][index],) | |
| class Unbatch: | |
| class Any(str): | |
| def __ne__(self, other): | |
| return False | |
| def INPUT_TYPES(s): | |
| return {"required": {"batched": ("*",)}} | |
| RETURN_TYPES = (Any('*'),) | |
| INPUT_IS_LIST = True | |
| RETURN_NAMES =("unbatched",) | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
| FUNCTION = "unbatch" | |
| def unbatch(self, batched): | |
| if isinstance(batched[0], torch.Tensor): | |
| return (torch.cat(batched),) | |
| if isinstance(batched[0], dict): | |
| out = batched[0].copy() | |
| out['samples'] = torch.cat([x['samples'] for x in batched]) | |
| out.pop('batch_index', None) | |
| return (out,) | |
| return (functools.reduce(lambda x,y: x+y, batched),) | |
| def VALIDATE_INPUTS(cls, input_types): | |
| return True | |
| class SelectLatest: | |
| def INPUT_TYPES(s): | |
| return {"required": {"filename_prefix": ("STRING", {'default': 'output/AnimateDiff', 'vhs_path_extensions': []}), | |
| "filename_postfix": ("STRING", {"placeholder": ".webm"})}} | |
| RETURN_TYPES = ("STRING",) | |
| RETURN_NAMES =("Filename",) | |
| CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
| FUNCTION = "select_latest" | |
| EXPERIMENTAL = True | |
| def select_latest(self, filename_prefix, filename_postfix): | |
| assert False, "Not Reachable" | |
| NODE_CLASS_MAPPINGS = { | |
| "VHS_VideoCombine": VideoCombine, | |
| "VHS_LoadVideo": LoadVideoUpload, | |
| "VHS_LoadVideoPath": LoadVideoPath, | |
| "VHS_LoadVideoFFmpeg": LoadVideoFFmpegUpload, | |
| "VHS_LoadVideoFFmpegPath": LoadVideoFFmpegPath, | |
| "VHS_LoadImagePath": LoadImagePath, | |
| "VHS_LoadImages": LoadImagesFromDirectoryUpload, | |
| "VHS_LoadImagesPath": LoadImagesFromDirectoryPath, | |
| "VHS_LoadAudio": LoadAudio, | |
| "VHS_LoadAudioUpload": LoadAudioUpload, | |
| "VHS_AudioToVHSAudio": AudioToVHSAudio, | |
| "VHS_VHSAudioToAudio": VHSAudioToAudio, | |
| "VHS_PruneOutputs": PruneOutputs, | |
| "VHS_BatchManager": BatchManager, | |
| "VHS_VideoInfo": VideoInfo, | |
| "VHS_VideoInfoSource": VideoInfoSource, | |
| "VHS_VideoInfoLoaded": VideoInfoLoaded, | |
| "VHS_SelectFilename": SelectFilename, | |
| # Batched Nodes | |
| "VHS_VAEEncodeBatched": VAEEncodeBatched, | |
| "VHS_VAEDecodeBatched": VAEDecodeBatched, | |
| # Latent and Image nodes | |
| "VHS_SplitLatents": SplitLatents, | |
| "VHS_SplitImages": SplitImages, | |
| "VHS_SplitMasks": SplitMasks, | |
| "VHS_MergeLatents": MergeLatents, | |
| "VHS_MergeImages": MergeImages, | |
| "VHS_MergeMasks": MergeMasks, | |
| "VHS_GetLatentCount": GetLatentCount, | |
| "VHS_GetImageCount": GetImageCount, | |
| "VHS_GetMaskCount": GetMaskCount, | |
| "VHS_DuplicateLatents": RepeatLatents, | |
| "VHS_DuplicateImages": RepeatImages, | |
| "VHS_DuplicateMasks": RepeatMasks, | |
| "VHS_SelectEveryNthLatent": SelectEveryNthLatent, | |
| "VHS_SelectEveryNthImage": SelectEveryNthImage, | |
| "VHS_SelectEveryNthMask": SelectEveryNthMask, | |
| "VHS_SelectLatents": SelectLatents, | |
| "VHS_SelectImages": SelectImages, | |
| "VHS_SelectMasks": SelectMasks, | |
| "VHS_Unbatch": Unbatch, | |
| "VHS_SelectLatest": SelectLatest, | |
| } | |
| NODE_DISPLAY_NAME_MAPPINGS = { | |
| "VHS_VideoCombine": "Video Combine π₯π ₯π π ’", | |
| "VHS_LoadVideo": "Load Video (Upload) π₯π ₯π π ’", | |
| "VHS_LoadVideoPath": "Load Video (Path) π₯π ₯π π ’", | |
| "VHS_LoadVideoFFmpeg": "Load Video FFmpeg (Upload) π₯π ₯π π ’", | |
| "VHS_LoadVideoFFmpegPath": "Load Video FFmpeg (Path) π₯π ₯π π ’", | |
| "VHS_LoadImagePath": "Load Image (Path) π₯π ₯π π ’", | |
| "VHS_LoadImages": "Load Images (Upload) π₯π ₯π π ’", | |
| "VHS_LoadImagesPath": "Load Images (Path) π₯π ₯π π ’", | |
| "VHS_LoadAudio": "Load Audio (Path)π₯π ₯π π ’", | |
| "VHS_LoadAudioUpload": "Load Audio (Upload)π₯π ₯π π ’", | |
| "VHS_AudioToVHSAudio": "Audio to legacy VHS_AUDIOπ₯π ₯π π ’", | |
| "VHS_VHSAudioToAudio": "Legacy VHS_AUDIO to Audioπ₯π ₯π π ’", | |
| "VHS_PruneOutputs": "Prune Outputs π₯π ₯π π ’", | |
| "VHS_BatchManager": "Meta Batch Manager π₯π ₯π π ’", | |
| "VHS_VideoInfo": "Video Info π₯π ₯π π ’", | |
| "VHS_VideoInfoSource": "Video Info (Source) π₯π ₯π π ’", | |
| "VHS_VideoInfoLoaded": "Video Info (Loaded) π₯π ₯π π ’", | |
| "VHS_SelectFilename": "Select Filename π₯π ₯π π ’", | |
| # Batched Nodes | |
| "VHS_VAEEncodeBatched": "VAE Encode Batched π₯π ₯π π ’", | |
| "VHS_VAEDecodeBatched": "VAE Decode Batched π₯π ₯π π ’", | |
| # Latent and Image nodes | |
| "VHS_SplitLatents": "Split Latents π₯π ₯π π ’", | |
| "VHS_SplitImages": "Split Images π₯π ₯π π ’", | |
| "VHS_SplitMasks": "Split Masks π₯π ₯π π ’", | |
| "VHS_MergeLatents": "Merge Latents π₯π ₯π π ’", | |
| "VHS_MergeImages": "Merge Images π₯π ₯π π ’", | |
| "VHS_MergeMasks": "Merge Masks π₯π ₯π π ’", | |
| "VHS_GetLatentCount": "Get Latent Count π₯π ₯π π ’", | |
| "VHS_GetImageCount": "Get Image Count π₯π ₯π π ’", | |
| "VHS_GetMaskCount": "Get Mask Count π₯π ₯π π ’", | |
| "VHS_DuplicateLatents": "Repeat Latents π₯π ₯π π ’", | |
| "VHS_DuplicateImages": "Repeat Images π₯π ₯π π ’", | |
| "VHS_DuplicateMasks": "Repeat Masks π₯π ₯π π ’", | |
| "VHS_SelectEveryNthLatent": "Select Every Nth Latent π₯π ₯π π ’", | |
| "VHS_SelectEveryNthImage": "Select Every Nth Image π₯π ₯π π ’", | |
| "VHS_SelectEveryNthMask": "Select Every Nth Mask π₯π ₯π π ’", | |
| "VHS_SelectLatents": "Select Latents π₯π ₯π π ’", | |
| "VHS_SelectImages": "Select Images π₯π ₯π π ’", | |
| "VHS_SelectMasks": "Select Masks π₯π ₯π π ’", | |
| "VHS_Unbatch": "Unbatch π₯π ₯π π ’", | |
| "VHS_SelectLatest": "Select Latest π₯π ₯π π ’", | |
| } | |