133 lines
3.8 KiB
Python
133 lines
3.8 KiB
Python
# ref: https://github.com/antiboredom/camera-motion-detector
|
|
|
|
import argparse
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import pandas as pd
|
|
from tqdm import tqdm
|
|
|
|
tqdm.pandas()
|
|
|
|
|
|
def apply(df, func, **kwargs):
|
|
if pandas_has_parallel:
|
|
return df.parallel_apply(func, **kwargs)
|
|
return df.progress_apply(func, **kwargs)
|
|
|
|
|
|
try:
|
|
from pandarallel import pandarallel
|
|
|
|
pandarallel.initialize(progress_bar=True)
|
|
pandas_has_parallel = True
|
|
except ImportError:
|
|
pandas_has_parallel = False
|
|
|
|
|
|
def make_empty(new_w, new_h):
|
|
empty = []
|
|
for y in range(new_h):
|
|
xvals = []
|
|
for x in range(new_w):
|
|
xvals.append([x, y])
|
|
empty.append(xvals)
|
|
|
|
empty = np.array(empty)
|
|
return empty
|
|
|
|
|
|
def get_type(mag, ang, zoom_in, tau_static=1.0, tau_zoom=(0.4, 0.6)):
|
|
if mag < tau_static:
|
|
return "static"
|
|
if zoom_in < tau_zoom[0]:
|
|
return "zoom out"
|
|
if zoom_in > tau_zoom[1]:
|
|
return "zoom in"
|
|
if ang < 45 or ang >= 315:
|
|
return "pan left"
|
|
if 45 <= ang < 135:
|
|
return "tilt up"
|
|
if 135 <= ang < 225:
|
|
return "pan right"
|
|
if 225 <= ang < 315:
|
|
return "tilt down"
|
|
return "unknown"
|
|
|
|
|
|
def get_video_type(frame_types):
|
|
# count the number of each type
|
|
counts = {}
|
|
max_count = 0
|
|
max_type = None
|
|
for frame_type in frame_types:
|
|
if frame_type not in counts:
|
|
counts[frame_type] = 0
|
|
counts[frame_type] += 1
|
|
if counts[frame_type] > max_count:
|
|
max_count = counts[frame_type]
|
|
max_type = frame_type
|
|
if max_count > len(frame_types) / 2:
|
|
return max_type
|
|
if "static" in counts:
|
|
return "unknown"
|
|
if "zoom in" not in counts and "zoom out" not in counts:
|
|
return "pan/tilt"
|
|
return "dynamic"
|
|
|
|
|
|
def process(path: str, frame_interval=15) -> str:
|
|
cap = cv2.VideoCapture(path)
|
|
count = 0
|
|
prvs = None
|
|
frame_types = []
|
|
while cap.isOpened():
|
|
ret, frame = cap.read()
|
|
if ret:
|
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
if count == 0:
|
|
prvs = frame
|
|
h, w = frame.shape
|
|
empty = make_empty(w, h)
|
|
empty_dists = np.sqrt(
|
|
np.square(empty.ravel()[::2] - (w / 2)) + np.square(empty.ravel()[1::2] - (h / 2))
|
|
)
|
|
else:
|
|
flow = cv2.calcOpticalFlowFarneback(prvs, frame, None, 0.5, 3, 15, 3, 5, 1.2, 0)
|
|
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1], angleInDegrees=True)
|
|
mean_mag = np.median(mag)
|
|
mean_ang = np.median(ang)
|
|
|
|
flow_coords = flow + empty
|
|
xvals = flow_coords.ravel()[::2] - (w / 2)
|
|
yvals = flow_coords.ravel()[1::2] - (h / 2)
|
|
dists = np.sqrt(np.square(xvals) + np.square(yvals))
|
|
dist_diff = dists >= empty_dists
|
|
zoom_in_factor = np.count_nonzero(dist_diff) / len(dist_diff)
|
|
frame_types.append(get_type(mean_mag, mean_ang, zoom_in_factor))
|
|
count += frame_interval
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, count)
|
|
else:
|
|
cap.release()
|
|
break
|
|
video_type = get_video_type(frame_types)
|
|
return video_type
|
|
|
|
|
|
def main(args):
|
|
output_file = args.input.replace(".csv", "_cmotion.csv")
|
|
data = pd.read_csv(args.input)
|
|
data["cmotion"] = apply(data["path"], process)
|
|
data.to_csv(output_file, index=False)
|
|
print(f"Output saved to {output_file}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("input", type=str)
|
|
parser.add_argument("--disable-parallel", action="store_true")
|
|
args = parser.parse_args()
|
|
if args.disable_parallel:
|
|
pandas_has_parallel = False
|
|
main(args)
|