Python API minimal examples¶
Video processing¶
This example shows how to initialize the video intelligence class and process a video frame.
import numpy as np
import plumerai_video_intelligence as pvi_api
# Settings, to be changed as needed
width = 1600 # camera image width in pixels
height = 1200 # camera image height in pixels
# Initialize the video intelligence algorithm
pvi = pvi_api.VideoIntelligence(height, width)
# Set whether the video stream is night mode (IR) or not.
error_code = pvi.set_night_mode(False)
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'set_night_mode': {error_code}")
# Loop over frames in a video stream (example: 10 frames)
for t in range(10):
# Some example input here, normally this is where camera data is acquired
image = np.zeros((height, width, 3), dtype=np.uint8)
# Duration between the *previous* frame passed to `process_frame` and the
# *current* frame, in seconds.
#
# Here we assume a fixed capture rate of 30 fps, so delta_t = 1 / 30.
# If your camera runs at a different or variable frame rate, be sure to
# update `delta_t` accordingly. The function `process_frame` relies on this
# value to keep motion tracking and temporal filters in sync.
delta_t = 1.0 / 30.0
# Process the frame
error_code = pvi.process_frame(image, delta_t)
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'process_frame': {error_code}")
error_code, predictions = pvi.object_detection.get_detections()
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'get_detections': {error_code}")
# Display the results
for p in predictions:
print(
f"Box of class {p.class_id} "
f"@(x,y)->({p.x_min:.2f},{p.y_min:.2f})-({p.x_max:.2f},{p.y_max:.2f})"
)
if len(predictions) == 0:
print("No bounding boxes found in this frame")
Automatic Face Enrollment¶
This example extends the code above and shows how to use the automatic face enrollment functionality.
The changes compared to the minimal example above are highlighted.
import numpy as np
import plumerai_video_intelligence as pvi_api
# Settings, to be changed as needed
width = 1600 # camera image width in pixels
height = 1200 # camera image height in pixels
# Initialize the video intelligence algorithm
pvi = pvi_api.VideoIntelligence(height, width)
# Set whether the video stream is night mode (IR) or not.
error_code = pvi.set_night_mode(False)
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'set_night_mode': {error_code}")
# Loop over frames in a video stream (example: 20 frames)
for t in range(20):
# Some example input here, normally this is where camera data is acquired
image = np.zeros((height, width, 3), dtype=np.uint8)
# Duration between the *previous* frame passed to `process_frame` and the
# *current* frame, in seconds.
#
# Here we assume a fixed capture rate of 30 fps, so delta_t = 1 / 30.
# If your camera runs at a different or variable frame rate, be sure to
# update `delta_t` accordingly. The function `process_frame` relies on this
# value to keep motion tracking and temporal filters in sync.
delta_t = 1.0 / 30.0
# Process the frame
error_code = pvi.process_frame(image, delta_t)
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'process_frame': {error_code}")
# Report the number of faces in the library so far. At first the library
# will be empty, but as soon as a face is well visible for a while, it
# will be added to the library with a new unique face ID. The library
# will grow over time, unless `remove_face_embedding` is called.
error_code, face_ids = pvi.face_enrollment_automatic.get_face_ids()
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'get_face_ids': {error_code}")
print(f"Total of {len(face_ids)} people in the familiar face ID library")
error_code, predictions = pvi.object_detection.get_detections()
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'get_detections': {error_code}")
# Display the results
for p in predictions:
if p.class_id == pvi_api.DetectionClass.CLASS_PERSON:
# After a few frames of showing a clear face in view, the face ID should
# become non-negative as the face is automatically enrolled.
face_id = pvi.face_identification.get_face_id(p)
print(
f"Person box with face ID {face_id} "
f"@(x,y)->({p.x_min:.2f},{p.y_min:.2f})-({p.x_max:.2f},{p.y_max:.2f})"
)
if len(predictions) == 0:
print("No bounding boxes found in this frame")
Manual face enrollment¶
This example shows how to use the manual face enrollment functionality. It consists of two main loops:
- An example enrollment loop, which runs for a fixed number of frames and computes a face embedding vector to enroll one person in the face library.
- An example video processing loop, similar to the first example.
import numpy as np
import plumerai_video_intelligence as pvi_api
# Settings, to be changed as needed
width = 1600 # camera image width in pixels
height = 1200 # camera image height in pixels
# Initialize the video intelligence algorithm
pvi = pvi_api.VideoIntelligence(height, width)
# Set whether the video stream is night mode (IR) or not.
error_code = pvi.set_night_mode(False)
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'set_night_mode': {error_code}")
# ---------------------- Enrollment starting ------------------------------
error_code = pvi.face_enrollment_manual.start_enrollment()
if error_code != pvi_api.ErrorCode.ENROLLMENT_IN_PROGRESS:
raise RuntimeError(f"Error in 'start_face_enrollment': {error_code}")
# Enroll for 10 frames (just an example, more frames is better)
for t in range(10):
# Some example input here, normally this is where camera data is acquired
image = np.zeros((height, width, 3), dtype=np.uint8)
# Process the frame. If the enrollment frames come from a video source,
# then use 'process_frame' instead.
error_code = pvi.single_image(image)
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'single_image': {error_code}")
# Finish enrollment
error_code, embedding = pvi.face_enrollment_manual.finish_enrollment()
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'finish_face_enrollment': {error_code}")
# Add the embedding to the library with face ID '1'.
error_code = pvi.face_enrollment_manual.add_embedding(embedding, face_id=1)
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'add_face_embedding': {error_code}")
# ---------------------- Enrollment finished ------------------------------
# Loop over frames in a video stream (example: 10 frames)
for t in range(10):
# Some example input here, normally this is where camera data is acquired
image = np.zeros((height, width, 3), dtype=np.uint8)
# Duration between the *previous* frame passed to `process_frame` and the
# *current* frame, in seconds.
#
# Here we assume a fixed capture rate of 30 fps, so delta_t = 1 / 30.
# If your camera runs at a different or variable frame rate, be sure to
# update `delta_t` accordingly. The function `process_frame` relies on this
# value to keep motion tracking and temporal filters in sync.
delta_t = 1.0 / 30.0
# Process the frame
error_code = pvi.process_frame(image, delta_t)
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'process_frame': {error_code}")
error_code, predictions = pvi.object_detection.get_detections()
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'get_detections': {error_code}")
# Display the results
for p in predictions:
if p.class_id == pvi_api.DetectionClass.CLASS_PERSON:
face_id = pvi.face_identification.get_face_id(p)
print(
f"Person box with face ID {face_id} "
f"@(x,y)->({p.x_min:.2f},{p.y_min:.2f})-({p.x_max:.2f},{p.y_max:.2f})"
)
if len(predictions) == 0:
print("No bounding boxes found in this frame")
VLM Video Collection and Embedder¶
This example demonstrates the VLM Video capabilities.
import numpy as np
import plumerai_video_intelligence as pvi_api
# Settings, to be changed as needed
width = 1600 # camera image width in pixels
height = 1200 # camera image height in pixels
def any_motion_detected(pvi_object: pvi_api.VideoIntelligence) -> bool:
# Check if any motion was detected in the current frame.
error_code, motion_grid = pvi_object.motion_detection.get_grid()
if error_code == pvi_api.ErrorCode.MOTION_GRID_NOT_YET_READY:
return False # process another frame and wait for the grid to be ready
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'get_grid': {error_code}")
grid_height = pvi_object.motion_detection.get_grid_height()
grid_width = pvi_object.motion_detection.get_grid_width()
motion_grid = np.array(motion_grid).reshape(grid_height, grid_width)
max_motion = np.max(motion_grid, axis=(0, 1))
return max_motion > 0.1 # Example threshold
# Initialize the video intelligence algorithm
pvi = pvi_api.VideoIntelligence(height, width)
# Initialize the VLM Video embedder
pvve = pvi_api.VLMVideoEmbedder()
# The VLM Video API processes video in user-defined 'clips'.
clip_has_started = False
clip_start_time = 0.0
time_without_motion = 0.0
# Set whether the video stream is night mode (IR) or not.
error_code = pvi.set_night_mode(False)
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'set_night_mode': {error_code}")
current_time = 0.0
# Loop over frames in a video stream (example: 10 frames)
for t in range(10):
# Some example input here, normally this is where camera data is acquired
image = np.zeros((height, width, 3), dtype=np.uint8)
# Duration between the *previous* frame passed to `process_frame` and the
# *current* frame, in seconds. Variable framerates are supported.
delta_t = 1.0 / 30.0
# Process the frame
error_code = pvi.process_frame(image, delta_t)
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'process_frame': {error_code}")
if not clip_has_started:
# Example: check if we should start a new clip based on motion detection.
# This could also be based on object detection or fixed time intervals.
should_start_clip = any_motion_detected(pvi)
if should_start_clip:
error_code = pvi.vlm_video_collection.start_clip()
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'start_clip': {error_code}")
clip_has_started = True
clip_start_time = current_time
time_without_motion = 0.0
else:
# In this example we end the clip when there is no motion detected for at
# least 2 seconds, and we limit the clip length to 30 seconds.
# A more sophisticated method could also consider object detections.
max_clip_duration = 30.0
min_time_without_motion = 2.0
if any_motion_detected(pvi):
time_without_motion = 0.0
else:
time_without_motion += delta_t
clip_duration = current_time - clip_start_time
should_end_clip = ((clip_duration >= max_clip_duration) or
(time_without_motion >= min_time_without_motion))
if should_end_clip:
error_code, clip_data = pvi.vlm_video_collection.end_clip()
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'end_clip': {error_code}")
clip_has_started = False
# Now process the collected clip data with VLM Video Embedder
# This is time-consuming: there is also a `compute_single_unit_only`
# option.
compute_single_unit_only = False
error_code, clip_embeddings = pvve.compute_embeddings(
clip_data, compute_single_unit_only)
if error_code != pvi_api.ErrorCode.SUCCESS:
raise RuntimeError(f"Error in 'compute_embeddings': {error_code}")
# `clip_embeddings` can now be stored for e.g. video search.
current_time += delta_t