mysora/tools/caption/camera_motion_detect.py

133 lines
3.8 KiB
Python

# ref: https://github.com/antiboredom/camera-motion-detector
import argparse
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm
tqdm.pandas()
def apply(df, func, **kwargs):
if pandas_has_parallel:
return df.parallel_apply(func, **kwargs)
return df.progress_apply(func, **kwargs)
try:
from pandarallel import pandarallel
pandarallel.initialize(progress_bar=True)
pandas_has_parallel = True
except ImportError:
pandas_has_parallel = False
def make_empty(new_w, new_h):
empty = []
for y in range(new_h):
xvals = []
for x in range(new_w):
xvals.append([x, y])
empty.append(xvals)
empty = np.array(empty)
return empty
def get_type(mag, ang, zoom_in, tau_static=1.0, tau_zoom=(0.4, 0.6)):
if mag < tau_static:
return "static"
if zoom_in < tau_zoom[0]:
return "zoom out"
if zoom_in > tau_zoom[1]:
return "zoom in"
if ang < 45 or ang >= 315:
return "pan left"
if 45 <= ang < 135:
return "tilt up"
if 135 <= ang < 225:
return "pan right"
if 225 <= ang < 315:
return "tilt down"
return "unknown"
def get_video_type(frame_types):
# count the number of each type
counts = {}
max_count = 0
max_type = None
for frame_type in frame_types:
if frame_type not in counts:
counts[frame_type] = 0
counts[frame_type] += 1
if counts[frame_type] > max_count:
max_count = counts[frame_type]
max_type = frame_type
if max_count > len(frame_types) / 2:
return max_type
if "static" in counts:
return "unknown"
if "zoom in" not in counts and "zoom out" not in counts:
return "pan/tilt"
return "dynamic"
def process(path: str, frame_interval=15) -> str:
cap = cv2.VideoCapture(path)
count = 0
prvs = None
frame_types = []
while cap.isOpened():
ret, frame = cap.read()
if ret:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if count == 0:
prvs = frame
h, w = frame.shape
empty = make_empty(w, h)
empty_dists = np.sqrt(
np.square(empty.ravel()[::2] - (w / 2)) + np.square(empty.ravel()[1::2] - (h / 2))
)
else:
flow = cv2.calcOpticalFlowFarneback(prvs, frame, None, 0.5, 3, 15, 3, 5, 1.2, 0)
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1], angleInDegrees=True)
mean_mag = np.median(mag)
mean_ang = np.median(ang)
flow_coords = flow + empty
xvals = flow_coords.ravel()[::2] - (w / 2)
yvals = flow_coords.ravel()[1::2] - (h / 2)
dists = np.sqrt(np.square(xvals) + np.square(yvals))
dist_diff = dists >= empty_dists
zoom_in_factor = np.count_nonzero(dist_diff) / len(dist_diff)
frame_types.append(get_type(mean_mag, mean_ang, zoom_in_factor))
count += frame_interval
cap.set(cv2.CAP_PROP_POS_FRAMES, count)
else:
cap.release()
break
video_type = get_video_type(frame_types)
return video_type
def main(args):
output_file = args.input.replace(".csv", "_cmotion.csv")
data = pd.read_csv(args.input)
data["cmotion"] = apply(data["path"], process)
data.to_csv(output_file, index=False)
print(f"Output saved to {output_file}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("input", type=str)
parser.add_argument("--disable-parallel", action="store_true")
args = parser.parse_args()
if args.disable_parallel:
pandas_has_parallel = False
main(args)