Using processes instead of process pool map to solve mem issue

This commit is contained in:
Jacob Zelek
2020-12-17 03:34:00 -08:00
parent 2cdccec800
commit 7a0cf18c94

View File

@@ -24,7 +24,7 @@ from moviepy.editor import VideoFileClip
from PIL import Image from PIL import Image
from click import progressbar from click import progressbar
from collections import namedtuple from collections import namedtuple
from multiprocessing import Pool, cpu_count from multiprocessing import cpu_count, Queue, Process
import glob import glob
import os import os
import random import random
@@ -45,7 +45,8 @@ def generate_video_thumbnails(args):
output_path = args['<output>'] output_path = args['<output>']
parallelism = args.get('[<parallelism>]', cpu_count()*2-1) parallelism = args.get('[<parallelism>]', cpu_count()*2-1)
work_units = [] work_queue = Queue()
work_units = 0
if os.path.isdir(input_path): if os.path.isdir(input_path):
# Ensure output path is also directory # Ensure output path is also directory
@@ -69,20 +70,35 @@ def generate_video_thumbnails(args):
output_path, os.path.basename(file_path) + ".png" output_path, os.path.basename(file_path) + ".png"
) )
work_units.append((file_path, single_output_path, work_queue.put((file_path, single_output_path,
interval, size, columns,)) interval, size, columns,))
work_units += 1
else: else:
work_units.append((input_path, output_path, interval, size, columns,)) work_queue.put((input_path, output_path, interval, size, columns,))
work_units += 1
# Limit the number of parallel jobs if lower number of files # Limit the number of parallel jobs if lower number of files
parallelism = min(int(parallelism), len(work_units)) parallelism = min(int(parallelism), work_units)
# Process all files in parallel # Start worker processes
with Pool(parallelism) as p: processes = []
p.map(process_file, work_units) for i in range(parallelism):
p = Process(target=worker, args=(work_queue,))
p.start()
processes.append(p)
# Block until all processes complete
for p in processes:
p.join()
def process_file(work_unit): def worker(queue):
work_unit = queue.get()
# If no work unit then quit
if not work_unit:
return
input_file, output_file, interval, size, columns = work_unit input_file, output_file, interval, size, columns = work_unit
video_file_clip = VideoFileClip(input_file) video_file_clip = VideoFileClip(input_file)
output_prefix = get_output_prefix() output_prefix = get_output_prefix()