Slideshow with Loading of Photos in a Background Thread in Python

What will we cover in this tutorial?

We will continue the work of this tutorial (Create a Moving Photo Slideshow with Weighted Transitions in OpenCV). The challenge is that construction is that we pre-load all the photos we need. The reason for that, is that loading the photos in each iteration would affect the performance of the slideshows.

The solution we present in this tutorial is to load photos in a background thread. This is not straightforward as we need to ensure the communication between the main thread and the background photo loading thread is done correctly.

The result will be similar.

Already done so far and the challenge

In the previous tutorial we made great progress, creating a nice slideshow. The challenge was the long pre-loading time of the photos.

If we did not pre-load the photos, then we would need to load the photos in each iteration. Say, in the beginning of the loop, we would need to load the next photo. This would require disk access, which is quite slow. As the frame is updated quite often, this loading time will not let the new position of the photo to be updated for a fraction of a second (or more, depending the photo size and the speed of the processor). This will make the movement of the photo lacking and not run smoothly.

Said differently, in one thread, the processor can only make one thing at the time. When you tell the processor to load a photo, it will stop all other work in this program, and do that first, before updating the frame. As this can be a big task, it will take long time. As the program needs to update the frame continuously, to make the photo move, then this will be visible no matter when you tell the processor to load the image.

So how can we deal with this?

Using another thread to load the image. Having multiple threads will make it possible to do more than one thing at the time. If you have two threads, you can do two things at the same time.

Introducing threading

A Python program is by default run in one thread. Hence, it can only do one thing at the time. If you need to do more than one thing at the time, you need to use threading.

Now this sound simple, but it introduces new problems.

When working with threading a lock is a good tool to know. Basically, a lock is similar to a lock. You can enter and lock the door after you, such that no one else can enter. When you are done, you can unlock the door and leave. Then someone else can enter.

This is the same principle with a lock with threading. You can take a lock, and ensure you only enter the code after the lock, if no-one else (another thread) is using the lock. Then when you are done, you release the lock.

We need a stack of photos that can load new photos when needed.

class ImageStack:
    def __init__(self, filenames, size=3):
        if size > len(filenames):
            raise Exception("Not enough file names")
        self.size = size
        self.filenames = filenames
        self.stack = []
        while len(self.stack) < self.size:
            filename = self.filenames[random.randrange(0, len(self.filenames))]
            if any(item[0] == filename for item in self.stack):
                continue
            self.stack.append((filename, Image(filename)))
        # Lock used for accessing the stack
        self.stack_lock = threading.Lock()
        self.add_image_lock = threading.Lock()

    def get_image(self):
        self.stack_lock.acquire()
        filename, img = self.stack.pop()
        print(f"Get image {filename} (stack size:{len(self.stack)})")
        self.stack_lock.release()
        return img

    def add_image(self):
        self.add_image_lock.acquire()
        filename = self.filenames[random.randrange(0, len(self.filenames))]
        self.stack_lock.acquire()
        while any(item[0] == filename for item in self.stack):
            filename = self.filenames[random.randrange(0, len(self.filenames))]
        self.stack_lock.release()
        img = Image(filename)
        self.stack_lock.acquire()
        self.stack.append((filename, img))
        print(f"Add image {filename} (stack size: {len(self.stack)})")
        self.stack_lock.release()
        self.add_image_lock.release()

The above is an image stack which has two locks. One for accessing the stack and one for adding images.

The lock for stack is to ensure that only one thread is accessing the stack. Hence if we have the code.

stack = ImageStack(filenames)
load_next_image_process = threading.Thread(target=buffer.add_image)
stack.get_image()

The code above will create an ImageStack (notice that filenames is not defined here). Then on the second line it will start a new process to add a new image. After that it will try to get an image. But here the lock comes into the picture. If the thread with add_image has acquired the stack lock, then get_image call cannot start (it will be waiting in the first line to acquire stack lock).

There are more possible situations where the lock hits in. If the 3rd line with stack.get_image acquires the stack lock before that the call to add_image reaches the lock, then add_image needs to wait until the lock is released by the stack.get_image call.

Threading is a lot of fun but you need to understand how locks work and how to avoid deadlocks.

Full code

Below you will find the full code using a threading approach to load photos in the background.

import cv2
import glob
import os
import random
import threading


class Image:
    def __init__(self, filename, time=500, size=500):
        self.filename = filename
        self.size = size
        self.time = time
        self.shifted = 0.0
        img = cv2.imread(filename)
        height, width, _ = img.shape
        if width < height:
            self.height = int(height*size/width)
            self.width = size
            self.img = cv2.resize(img, (self.width, self.height))
            self.shift = self.height - size
            self.shift_height = True
        else:
            self.width = int(width*size/height)
            self.height = size
            self.shift = self.width - size
            self.img = cv2.resize(img, (self.width, self.height))
            self.shift_height = False
        self.delta_shift = self.shift/self.time
        self.reset()

    def reset(self):
        if random.randint(0, 1) == 0:
            self.shifted = 0.0
            self.delta_shift = abs(self.delta_shift)
        else:
            self.shifted = self.shift
            self.delta_shift = -abs(self.delta_shift)

    def get_frame(self):
        if self.shift_height:
            roi = self.img[int(self.shifted):int(self.shifted) + self.size, :, :]
        else:
            roi = self.img[:, int(self.shifted):int(self.shifted) + self.size, :]
        self.shifted += self.delta_shift
        if self.shifted > self.shift:
            self.shifted = self.shift
        if self.shifted < 0:
            self.shifted = 0
        return roi


class ImageStack:
    def __init__(self, filenames, size=3):
        if size > len(filenames):
            raise Exception("Not enough file names")
        self.size = size
        self.filenames = filenames
        self.stack = []
        while len(self.stack) < self.size:
            filename = self.filenames[random.randrange(0, len(self.filenames))]
            if any(item[0] == filename for item in self.stack):
                continue
            self.stack.append((filename, Image(filename)))
        # Lock used for accessing the stack
        self.stack_lock = threading.Lock()
        self.add_image_lock = threading.Lock()

    def get_image(self):
        self.stack_lock.acquire()
        filename, img = self.stack.pop()
        print(f"Get image {filename} (stack size:{len(self.stack)})")
        self.stack_lock.release()
        return img

    def add_image(self):
        self.add_image_lock.acquire()
        filename = self.filenames[random.randrange(0, len(self.filenames))]
        self.stack_lock.acquire()
        while any(item[0] == filename for item in self.stack):
            filename = self.filenames[random.randrange(0, len(self.filenames))]
        self.stack_lock.release()
        img = Image(filename)
        self.stack_lock.acquire()
        self.stack.append((filename, img))
        print(f"Add image {filename} (stack size: {len(self.stack)})")
        self.stack_lock.release()
        self.add_image_lock.release()


def process():
    path = "pics"
    filenames = glob.glob(os.path.join(path, "*"))

    buffer = ImageStack(filenames)

    prev_image = buffer.get_image()
    buffer.add_image()
    current_image = buffer.get_image()
    buffer.add_image()

    while True:
        for i in range(100):
            alpha = i/100
            beta = 1.0 - alpha
            dst = cv2.addWeighted(current_image.get_frame(), alpha, prev_image.get_frame(), beta, 0.0)

            cv2.imshow("Slide", dst)
            if cv2.waitKey(1) == ord('q'):
                return

        for _ in range(300):
            cv2.imshow("Slide", current_image.get_frame())
            if cv2.waitKey(1) == ord('q'):
                return

        prev_image = current_image
        current_image = buffer.get_image()
        load_next_image_process = threading.Thread(target=buffer.add_image)
        load_next_image_process.start()


process()

Create a Moving Photo Slideshow with Weighted Transitions in OpenCV

What will we cover in this tutorial?

In this tutorial you will learn how to make a slideshow of your favorite photos moving across the screen with weighted transitions. This will be done in Python with OpenCV.

See the result in the video below.

Step 1: A simple approach without moving effect

If you want to build something great, start with something simple first. The reason for that is that you will learn along the way. It is difficult to understand all aspects from the beginning.

Start small. Start simple. Learn from each step.

Here we assume that you have all your favorite photos in a folder called pics. In the first run, you just want to show them on your screen one-by-one.

import cv2
import glob
import os

def process():
    path = "pics"
    filenames = glob.glob(os.path.join(path, "*"))

    for filename in filenames:
        print(filename)
        img = cv2.imread(filename)

        cv2.imshow("Slideshow", img)

        if cv2.waitKey(1000) == ord('q'):
            return


process()

As you will realize, this will show the photos in the size they stored. Hence, when photos change, the dimensions of the window will change as well (unless the two consecutive photos have the exact same dimensions). This will not make a good user experience. Also, if the photos dimensions are larger than your screen resolution, they will not be fully visible.

Step 2: Scaling images to fit inside the screen

We want the window size where we show the photos to have fixed size. This is not as simple as it sounds.

Image a photo has dimensions 1000 x 2000 pixels. Then the next one has 2000 x 1000. How would you scale it down? If you scale it down to 500 x 500 by default, then the objects in the images will be flatten or narrowed together.

Hence, what we do in our first attempt, is to scale it based on the dimensions. That is, a photo with dimensions 1000 x 2000 will become 500 x 1000. And a photo of dimensions 2000 x 1000 will become 1000 x 500. Then we will crop it to fit the 500 x 500 dimension. The cropping will take the middle of the photo.

import cv2
import glob
import os

def process():
    path = "pics"
    filenames = glob.glob(os.path.join(path, "*"))

    for filename in filenames:
        print(filename)
        img = cv2.imread(filename)

        height, width, _ = img.shape
        if width < height:
            height = int(height*500/width)
            width = 500
            img = cv2.resize(img, (width, height))
            shift = height - 500
            img = img[shift//2:-shift//2,:,:]
        else:
            width = int(width*500/height)
            height = 500
            shift = width - 500
            img = cv2.resize(img, (width, height))
            img = img[:,shift//2:-shift//2,:]

        cv2.imshow("Slideshow", img)

        if cv2.waitKey(1000) == ord('q'):
            return

process()

This gives a better experience, but not perfect.

Step 3: Make a weighted transition between image switches

To make a weighted transition we make it by adding a transition phase of the photos.

import cv2
import glob
import os
import numpy as np

def process():
    path = "pics"
    filenames = glob.glob(os.path.join(path, "*"))

    prev_image = np.zeros((500, 500, 3), np.uint8)
    for filename in filenames:
        print(filename)
        img = cv2.imread(filename)

        height, width, _ = img.shape
        if width < height:
            height = int(height*500/width)
            width = 500
            img = cv2.resize(img, (width, height))
            shift = height - 500
            img = img[shift//2:-shift//2,:,:]
        else:
            width = int(width*500/height)
            height = 500
            shift = width - 500
            img = cv2.resize(img, (width, height))
            img = img[:,shift//2:-shift//2,:]

        for i in range(101):
            alpha = i/100
            beta = 1.0 - alpha
            dst = cv2.addWeighted(img, alpha, prev_image, beta, 0.0)

            cv2.imshow("Slideshow", dst)
            if cv2.waitKey(1) == ord('q'):
                return

        prev_image = img

        if cv2.waitKey(1000) == ord('q'):
            return

process()

Notice the prev_image variable that is needed. It is set to a black image when it enters the loop. The transition is made by using cv2.addWeighted(…) to get the effect.

Step 4: Make the photos move while showing

The idea is to let the photo move. Say, if the photo is scaled to dimension 500 x 1000. Then we want to create a view of that photo of size 500 x 500 that slides from one end to the other while it is showing.

This requires that we have a state for the photo, which stores where we are in of the current view.

For this purpose we create a class to represent a photo that keeps the current view. It also includes the resizing.

import cv2
import numpy as np
import glob
import os
import random


class Image:
    def __init__(self, filename, time=500, size=500):
        self.size = size
        self.time = time
        self.shifted = 0.0
        self.img = cv2.imread(filename)
        self.height, self.width, _ = self.img.shape
        if self.width < self.height:
            self.height = int(self.height*size/self.width)
            self.width = size
            self.img = cv2.resize(self.img, (self.width, self.height))
            self.shift = self.height - size
            self.shift_height = True
        else:
            self.width = int(self.width*size/self.height)
            self.height = size
            self.shift = self.width - size
            self.img = cv2.resize(self.img, (self.width, self.height))
            self.shift_height = False
        self.delta_shift = self.shift/self.time

    def reset(self):
        if random.randint(0, 1) == 0:
            self.shifted = 0.0
            self.delta_shift = abs(self.delta_shift)
        else:
            self.shifted = self.shift
            self.delta_shift = -abs(self.delta_shift)

    def get_frame(self):
        if self.shift_height:
            roi = self.img[int(self.shifted):int(self.shifted) + self.size, :, :]
        else:
            roi = self.img[:, int(self.shifted):int(self.shifted) + self.size, :]
        self.shifted += self.delta_shift
        if self.shifted > self.shift:
            self.shifted = self.shift
        if self.shifted < 0:
            self.shifted = 0
        return roi


def process():
    path = "pics"
    filenames = glob.glob(os.path.join(path, "*"))

    cnt = 0
    images = []
    for filename in filenames:
        print(filename)

        img = Image(filename)

        images.append(img)
        if cnt > 300:
            break
        cnt += 1

    prev_image = images[random.randrange(0, len(images))]
    prev_image.reset()

    while True:
        while True:
            img = images[random.randrange(0, len(images))]
            if img != prev_image:
                break
        img.reset()

        for i in range(100):
            alpha = i/100
            beta = 1.0 - alpha
            dst = cv2.addWeighted(img.get_frame(), alpha, prev_image.get_frame(), beta, 0.0)

            cv2.imshow("Slide", dst)
            if cv2.waitKey(1) == ord('q'):
                return

        prev_image = img
        for _ in range(300):
            cv2.imshow("Slide", img.get_frame())
            if cv2.waitKey(1) == ord('q'):
                return


process()

This results in a nice way where the photos slowly move through the view. It also has added some randomness. First of all, it takes a random photo. Also, the direction is set to be random.

This is a good start of having a nice slideshow of your favorite photos.

Insert Live Graph in Webcam Stream Using Webcam with OpenCV

What will we cover in this tutorial?

How to make a simple live graph that updates into a live webcam stream by using OpenCV.

The result can be seen in the video below.

Step 1: A basic webcam flow with OpenCV

If you need to install OpenCV for the first time we suggest you read this tutorial.

A normal webcam flow in Python looks like the following code.

import cv2
 
# Setup webcam camera
cap = cv2.VideoCapture(0)
# Set a smaller resolution
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
 
while True:
    # Capture frame-by-frame
    _, frame = cap.read()
    frame = cv2.flip(frame, 1)
 
    cv2.imshow("Webcam", frame)
 
    if cv2.waitKey(1) == ord('q'):
        break
 
# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()

This will make a live webcam stream from your webcam to a window. That is too easy not to enjoy.

Step 2: Create an object to represent the graph

There are many ways to create a graph. Here we will make an object which will have a representation of the graph. Then it will have a function to update the value and update the graph image.

class Graph:
    def __init__(self, width, height):
        self.height = height
        self.width = width
        self.graph = np.zeros((height, width, 3), np.uint8)

    def update_frame(self, value):
        if value < 0:
            value = 0
        elif value >= self.height:
            value = self.height - 1
        new_graph = np.zeros((self.height, self.width, 3), np.uint8)
        new_graph[:,:-1,:] = self.graph[:,1:,:]
        new_graph[self.height - value:,-1,:] = 255
        self.graph = new_graph

    def get_graph(self):
        return self.graph

This object is a simple object that keeps the graph as a OpenCV image (Numpy array).

The update function first verifies that the value of inside the graph size.

Then it creates a a new graph (new_graph) and copies the old values from previous graph, but shifted one position. Then it will update the new value by white color.

Step 3: Putting it all together

The Graph object created in last step needs a value. This value can be anything. Here we make a simple measure of how much movement is in the frame.

This is simply done by comparing the current frame with the previous frame. This could be done straight forward, but to minimize noise we use a gray scaled images, which we use Gaussian blur on. Then the absolute difference from last frame is used, and summing it up.

The value used to scale down is highly dependent on the settings your webcam is in. Also, if you use another resolution, then it will affect it. Hence, if the graph is all low (zero) or high (above height) then adjust this graph.update_frame(int(difference/42111)) to some other integer value in the division.

import cv2
import numpy as np


class Graph:
    def __init__(self, width, height):
        self.height = height
        self.width = width
        self.graph = np.zeros((height, width, 3), np.uint8)

    def update_frame(self, value):
        if value < 0:
            value = 0
        elif value >= self.height:
            value = self.height - 1
        new_graph = np.zeros((self.height, self.width, 3), np.uint8)
        new_graph[:,:-1,:] = self.graph[:,1:,:]
        new_graph[self.height - value:,-1,:] = 255
        self.graph = new_graph

    def get_graph(self):
        return self.graph


# Setup camera
cap = cv2.VideoCapture(0)
# Set a smaller resolution
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

graph = Graph(100, 60)

prev_frame = np.zeros((480, 640), np.uint8)
while True:
    # Capture frame-by-frame
    _, frame = cap.read()
    frame = cv2.flip(frame, 1)
    frame = cv2.resize(frame, (640, 480))

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    gray = cv2.GaussianBlur(gray, (25, 25), None)
    diff = cv2.absdiff(prev_frame, gray)
    difference = np.sum(diff)
    prev_frame = gray

    graph.update_frame(int(difference/42111))
    roi = frame[-70:-10, -110:-10,:]
    roi[:] = graph.get_graph()

    cv2.putText(frame, "...wanted a live graph", (20, 430), cv2.FONT_HERSHEY_PLAIN, 1.8, (200, 200, 200), 2)
    cv2.putText(frame, "...measures motion in frame", (20, 460), cv2.FONT_HERSHEY_PLAIN, 1.8, (200, 200, 200), 2)
    cv2.imshow("Webcam", frame)

    if cv2.waitKey(1) == ord('q'):
        break

# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()

ASCII Art of Live Webcam Stream with OpenCV

What will we cover in this tutorial?

Create ASCII Art on a live webcam stream using OpenCV with Python. To improve performance we will use Numba.

The result can look like the video below.

Step 1: A webcam flow with OpenCV in Python

If you need to install OpenCV for the first time we suggest you read this tutorial.

A normal webcam flow in Python looks like the following code.

import cv2

# Setup webcam camera
cap = cv2.VideoCapture(0)
# Set a smaller resolution
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

while True:
    # Capture frame-by-frame
    _, frame = cap.read()
    frame = cv2.flip(frame, 1)

    cv2.imshow("Webcam", frame)

    if cv2.waitKey(1) == ord('q'):
        break

# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()

This will make a live webcam stream from your webcam to a window. That is too easy not to enjoy.

Step 2: Prepare the letters to be used for ASCII art

There are many ways to achieve the ASCII art. For ease, we will create all the letters in a small gray scale (only with black and white) images. You could print the letters directly in the terminal, but it seems to be slower than just mapping the small images into a big image representing the ASCII art.

We use OpenCV to create all the letters.

import numpy as np

def generate_ascii_letters():
    images = []
    #letters = "# $%&amp;\\'()*+,-./0123456789:;<=>[email protected][]^_`abcdefghijklmnopqrstuvwxyz{|}~"
    letters = " \\ '(),-./:;[]_`{|}~"
    for letter in letters:
        img = np.zeros((12, 16), np.uint8)
        img = cv2.putText(img, letter, (0, 11), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255)
        images.append(img)
    return np.stack(images)

The list images appends all the images we create. At the end (in the return statement) we convert them to a Numpy array of images. This is done for speed as lists do not work with Numba, it needs the objects to be Numpy arrays.

If you like, you can use all the letters, by using the commented out letters string instead of the smaller with only special characters. We found the result looking better with the limited amount of letters.

A images is created simply by a black Numpy array of size 12×16 (that is width 16 and height 12). Then we add the text on the image by using cv2.putText(…).

Step 3: Transforming the webcam frame to only outline the objects

To get a decent result we found that converting the frames to only outline the object in the original frame. This can be achieved by using Canny edge detection (cv2.Canny(…)). To capture that from the live webcam stream it is advised to use Gaussian blur before.

import cv2

# Setup camera
cap = cv2.VideoCapture(0)
# Set a smaller resolution
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

while True:
    # Capture frame-by-frame
    _, frame = cap.read()
    frame = cv2.flip(frame, 1)

    gb = cv2.GaussianBlur(frame, (5, 5), 0)
    can = cv2.Canny(gb, 127, 31)

    cv2.imshow('Canny edge detection', can)
    cv2.imshow("Webcam", frame)

    if cv2.waitKey(1) == ord('q'):
        break

# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()

This would result in something like this.

Step 4: Converting the Canny edge detection to ASCII art

This is where all the magic happens. We will take the Canny edge detected image and convert it to ASCII art.

First remember, we have a Numpy array of all the letters we want to use.

def to_ascii_art(frame, images, box_height=12, box_width=16):
    height, width = frame.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame[i:i + box_height, j:j + box_width]
            best_match = np.inf
            best_match_index = 0
            for k in range(1, images.shape[0]):
                total_sum = np.sum(np.absolute(np.subtract(roi, images[k])))
                if total_sum < best_match:
                    best_match = total_sum
                    best_match_index = k
            roi[:,:] = images[best_match_index]
    return frame

The height and the width of the frame is take and then we iterate over it in small boxes of the size of the letters.

Each box is captured in a region of interest (roi). Then we loop over all possible letters and find the best match. This is not done with perfect calculation, as they are quite expensive. Hence we use the approximate calculation done in total_sum.

The correct calculation would be.

total_sum = np.sum(np.where(roi > images[k], np.subtract(roi, images[k]), np.subtract(images[k], roi)))

Alternatively, you could turn it into np.int16 instead of using np.uint8, which are causing all the problems here. Finally, notice that the cv2.norm(…) would also solve the problem, but as we need to optimize the code with Numba, this is not possible as it is not supported in Numba.

Step 5: Adding it all together and use Numba

Now we can add all the code together at try it out. We will also use Numba on the to_ascii_art function to speed it up. If you are new to Numba we can recommend this tutorial.

import cv2
import numpy as np
from numba import jit


@jit(nopython=True)
def to_ascii_art(frame, images, box_height=12, box_width=16):
    height, width = frame.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame[i:i + box_height, j:j + box_width]
            best_match = np.inf
            best_match_index = 0
            for k in range(1, images.shape[0]):
                total_sum = np.sum(np.absolute(np.subtract(roi, images[k])))
                if total_sum < best_match:
                    best_match = total_sum
                    best_match_index = k
            roi[:,:] = images[best_match_index]
    return frame


def generate_ascii_letters():
    images = []
    #letters = "# $%&amp;\\'()*+,-./0123456789:;<=>[email protected][]^_`abcdefghijklmnopqrstuvwxyz{|}~"
    letters = " \\ '(),-./:;[]_`{|}~"
    for letter in letters:
        img = np.zeros((12, 16), np.uint8)
        img = cv2.putText(img, letter, (0, 11), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255)
        images.append(img)
    return np.stack(images)


# Setup camera
cap = cv2.VideoCapture(0)
# Set a smaller resolution
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

images = generate_ascii_letters()

while True:
    # Capture frame-by-frame
    _, frame = cap.read()
    frame = cv2.flip(frame, 1)

    gb = cv2.GaussianBlur(frame, (5, 5), 0)
    can = cv2.Canny(gb, 127, 31)

    ascii_art = to_ascii_art(can, images)

    cv2.imshow('ASCII ART', ascii_art)
    cv2.imshow("Webcam", frame)

    if cv2.waitKey(1) == ord('q'):
        break

# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()

This will give the following result (if you put me in front of the camera).

Also, try to use different character set. For example the full one also given in the code above.

Pandas + GeoPandas + OpenCV: Create a Video of COVID-19 World Map

What will we cover?

How to create a video like the one below using Pandas + GeoPandas + OpenCV in Python.

  1. How to collect newest COVID-19 data in Python using Pandas.
  2. Prepare data and calculate values needed to create Choropleth map
  3. Get Choropleth map from GeoPandas and prepare to combine it
  4. Get the data frame by frame to the video
  5. Combine it all to a video using OpenCV

Step 1: Get the daily reported COVID-19 data world wide

This data is available from the European Centre for Disease Prevention and Control and can be found here.

All we need is to download the csv file, which has all the historic data from all the reported countries.

This can be done as follows.

import pandas as pd


# Just to get more rows, columns and display width
pd.set_option('display.max_rows', 300)
pd.set_option('display.max_columns', 300)
pd.set_option('display.width', 1000)

# Get the updated data
table = pd.read_csv("https://opendata.ecdc.europa.eu/covid19/casedistribution/csv")

print(table)

This will give us an idea of how the data is structured.

          dateRep  day  month  year  cases  deaths countriesAndTerritories geoId countryterritoryCode  popData2019 continentExp  Cumulative_number_for_14_days_of_COVID-19_cases_per_100000
0      01/10/2020    1     10  2020     14       0             Afghanistan    AF                  AFG   38041757.0         Asia                                           1.040961         
1      30/09/2020   30      9  2020     15       2             Afghanistan    AF                  AFG   38041757.0         Asia                                           1.048847         
2      29/09/2020   29      9  2020     12       3             Afghanistan    AF                  AFG   38041757.0         Asia                                           1.114565         
3      28/09/2020   28      9  2020      0       0             Afghanistan    AF                  AFG   38041757.0         Asia                                           1.343261         
4      27/09/2020   27      9  2020     35       0             Afghanistan    AF                  AFG   38041757.0         Asia                                           1.540413         
...           ...  ...    ...   ...    ...     ...                     ...   ...                  ...          ...          ...                                                ...         
46221  25/03/2020   25      3  2020      0       0                Zimbabwe    ZW                  ZWE   14645473.0       Africa                                                NaN         
46222  24/03/2020   24      3  2020      0       1                Zimbabwe    ZW                  ZWE   14645473.0       Africa                                                NaN         
46223  23/03/2020   23      3  2020      0       0                Zimbabwe    ZW                  ZWE   14645473.0       Africa                                                NaN         
46224  22/03/2020   22      3  2020      1       0                Zimbabwe    ZW                  ZWE   14645473.0       Africa                                                NaN         
46225  21/03/2020   21      3  2020      1       0                Zimbabwe    ZW                  ZWE   14645473.0       Africa                                                NaN         

[46226 rows x 12 columns]

First we want to convert the dateRep to a date object (cannot be seen in the above, but the dates are represented by a string). Then use that as index for easier access later.

import pandas as pd


# Just to get more rows, columns and display width
pd.set_option('display.max_rows', 300)
pd.set_option('display.max_columns', 300)
pd.set_option('display.width', 1000)

# Get the updated data
table = pd.read_csv("https://opendata.ecdc.europa.eu/covid19/casedistribution/csv")

# Convert dateRep to date object
table['date'] = pd.to_datetime(table['dateRep'], format='%d/%m/%Y')
# Use date for index
table = table.set_index('date')

Step 2: Prepare data and compute values needed for plot

What makes sense to plot?

Good question. In a Choropleth map you will color according to a value. Here we will color in darker red the higher the value a country is represented with.

If we plotted based on number new COVID-19 cases, this would be high for countries with high populations. Hence, the number of COVID-19 cases per 100,000 people is used.

Using new COVID-19 cases per 100,000 people can be volatile and change drastic from day to day. To even that out, a 7 days rolling sum can be used. That is, you take the sum of the last 7 days and continue that process through your data.

To make it even less volatile, the average of the last 14 days of the 7 days rolling sum is used.

And no, it is not just something invented by me. It is used by the authorities in my home country to calculate rules of which countries are open for travel or not.

This can by the data above be calculated by computing that data.

def get_stat(country_code, table):
    data = table.loc[table['countryterritoryCode'] == country_code]
    data = data.reindex(index=data.index[::-1])
    data['7 days sum'] = data['cases'].rolling(7).sum()
    data['7ds/100000'] = data['7 days sum'] * 100000 / data['popData2019']
    data['14 mean'] = data['7ds/100000'].rolling(14).mean()
    return data

The above function takes the table we returned from Step 1 and extract a country based on a country code. Then it reverses the data to have the dates in chronological order.

After that, it computes the 7 days rolling sum. Then computes the new cases by the population in the country in size of 100,000 people. Finally, it computes the 14 days average (mean) of it.

Step 3: Get the Choropleth map data and prepare it

GeoPandas is an amazing library to create Choropleth maps. But it does need your attention when you combine it with other data.

Here we want to combine it with the country codes (ISO_A3). If you inspect the data, some of the countries are missing that data.

Other than that the code is straight forward.

import pandas as pd
import geopandas


# Just to get more rows, columns and display width
pd.set_option('display.max_rows', 300)
pd.set_option('display.max_columns', 300)
pd.set_option('display.width', 1000)

# Get the updated data
table = pd.read_csv("https://opendata.ecdc.europa.eu/covid19/casedistribution/csv")

# Convert dateRep to date object
table['date'] = pd.to_datetime(table['dateRep'], format='%d/%m/%Y')
# Use date for index
table = table.set_index('date')


def get_stat(country_code, table):
    data = table.loc[table['countryterritoryCode'] == country_code]
    data = data.reindex(index=data.index[::-1])
    data['7 days sum'] = data['cases'].rolling(7).sum()
    data['7ds/100000'] = data['7 days sum'] * 100000 / data['popData2019']
    data['14 mean'] = data['7ds/100000'].rolling(14).mean()
    return data


# Read the data to make a choropleth map
world = geopandas.read_file(geopandas.datasets.get_path('naturalearth_lowres'))
world = world[(world.pop_est > 0) &amp; (world.name != "Antarctica")]

# Store data per country to make it easier
data_by_country = {}

for index, row in world.iterrows():
    # The world data is not fully updated with ISO_A3 names
    if row['iso_a3'] == '-99':
        country = row['name']
        if country == "Norway":
            world.at[index, 'iso_a3'] = 'NOR'
            row['iso_a3'] = "NOR"
        elif country == "France":
            world.at[index, 'iso_a3'] = 'FRA'
            row['iso_a3'] = "FRA"
        elif country == 'Kosovo':
            world.at[index, 'iso_a3'] = 'XKX'
            row['iso_a3'] = "XKX"
        elif country == "Somaliland":
            world.at[index, 'iso_a3'] = '---'
            row['iso_a3'] = "---"
        elif country == "N. Cyprus":
            world.at[index, 'iso_a3'] = '---'
            row['iso_a3'] = "---"

    # Add the data for the country
    data_by_country[row['iso_a3']] = get_stat(row['iso_a3'], table)

This will create a dictionary (data_by_country) with the needed data for each country. Notice, we do it like this, because not all countries have the same number of data points.

Step 4: Create a Choropleth map for each date and save it as an image

This can be achieved by using matplotlib.

The idea is to go through all dates and look for each country if they have data for that date and use it if they have.

import pandas as pd
import geopandas
import matplotlib.pyplot as plt


# Just to get more rows, columns and display width
pd.set_option('display.max_rows', 300)
pd.set_option('display.max_columns', 300)
pd.set_option('display.width', 1000)

# Get the updated data
table = pd.read_csv("https://opendata.ecdc.europa.eu/covid19/casedistribution/csv")

# Convert dateRep to date object
table['date'] = pd.to_datetime(table['dateRep'], format='%d/%m/%Y')
# Use date for index
table = table.set_index('date')


def get_stat(country_code, table):
    data = table.loc[table['countryterritoryCode'] == country_code]
    data = data.reindex(index=data.index[::-1])
    data['7 days sum'] = data['cases'].rolling(7).sum()
    data['7ds/100000'] = data['7 days sum'] * 100000 / data['popData2019']
    data['14 mean'] = data['7ds/100000'].rolling(14).mean()
    return data


# Read the data to make a choropleth map
world = geopandas.read_file(geopandas.datasets.get_path('naturalearth_lowres'))
world = world[(world.pop_est > 0) &amp; (world.name != "Antarctica")]

# Store data per country to make it easier
data_by_country = {}

for index, row in world.iterrows():
    # The world data is not fully updated with ISO_A3 names
    if row['iso_a3'] == '-99':
        country = row['name']
        if country == "Norway":
            world.at[index, 'iso_a3'] = 'NOR'
            row['iso_a3'] = "NOR"
        elif country == "France":
            world.at[index, 'iso_a3'] = 'FRA'
            row['iso_a3'] = "FRA"
        elif country == 'Kosovo':
            world.at[index, 'iso_a3'] = 'XKX'
            row['iso_a3'] = "XKX"
        elif country == "Somaliland":
            world.at[index, 'iso_a3'] = '---'
            row['iso_a3'] = "---"
        elif country == "N. Cyprus":
            world.at[index, 'iso_a3'] = '---'
            row['iso_a3'] = "---"

    # Add the data for the country
    data_by_country[row['iso_a3']] = get_stat(row['iso_a3'], table)

# Create an image per date
for day in pd.date_range('12-31-2019', '10-01-2020'):
    print(day)
    world['number'] = 0.0
    for index, row in world.iterrows():
        if day in data_by_country[row['iso_a3']].index:
            world.at[index, 'number'] = data_by_country[row['iso_a3']].loc[day]['14 mean']

    world.plot(column='number', legend=True, cmap='OrRd', figsize=(15, 5))
    plt.title(day.strftime("%Y-%m-%d"))
    plt.savefig(f'image-{day.strftime("%Y-%m-%d")}.png')
    plt.close()

This will create an image for each day. These images will be combined.

Step 5: Create a video from images with OpenCV

Using OpenCV to create a video from a sequence of images is quite easy. The only thing you need to ensure is that it reads the images in the correct order.

import cv2
import glob

img_array = []
filenames = glob.glob('image-*.png')
filenames.sort()
for filename in filenames:
    print(filename)
    img = cv2.imread(filename)
    height, width, layers = img.shape
    size = (width, height)
    img_array.append(img)

out = cv2.VideoWriter('covid.avi', cv2.VideoWriter_fourcc(*'DIVX'), 15, size)

for i in range(len(img_array)):
    out.write(img_array[i])
out.release()

Where we use the VideoWriter from OpenCV.

This results in this video.

From Zero to Creating Photo Mosaic using Faces with OpenCV

What will we cover in this tutorial?

  1. Where and how to get images you can use without copyright issues.
  2. How to extract the faces of the images.
  3. Building a Photo Mosaic using the extracted images of faces.

Step 1: Where and how to get images

There exists a lot of datasets of faces, but most have restrictions on them. A great place to find images is on Pexels, as they are free to use (see license here).

Also, the Python library pexels-api makes it easy to download a lot of images. It can be installed by the following command.

pip install pexels-api

To use the Pexels API you need to register.

  1. Sign up as a user at Pexels.
  2. Accept the email sent to your inbox (the email address you provide).
  3. Request your API key here.

Then you can download images by a search query from this Python program.

from pexels_api import API
import requests
import os.path
from pathlib import Path


path = 'pics'
Path(path).mkdir(parents=True, exist_ok=True)

# To get key: sign up for pexels https://www.pexels.com/join/
# Reguest key : https://www.pexels.com/api/
# - No need to set URL
# - Accept email send to you
# - Refresh API or see key here: https://www.pexels.com/api/new/

PEXELS_API_KEY = '--- INSERT YOUR API KEY HERE ---'

api = API(PEXELS_API_KEY)

query = 'person'

api.search(query)
# Get photo entries
photos = api.get_entries()
print("Search: ", query)
print("Total results: ", api.total_results)
MAX_PICS = 1000
print("Fetching max: ", MAX_PICS)

count = 0
while True:
    photos = api.get_entries()
    print(len(photos))
    if len(photos) == 0:
        break
    for photo in photos:
        # Print photographer
        print('Photographer: ', photo.photographer)
        # Print original size url
        print('Photo original size: ', photo.original)

        file = os.path.join(path, query + '-' + str(count).zfill(5) + '.' + photo.original.split('.')[-1])
        count += 1
        print(file)
        picture_request = requests.get(photo.original)
        if picture_request.status_code == 200:
            with open(file, 'wb') as f:
                f.write(picture_request.content)

        # This should be a function call to make a return
        if count >= MAX_PICS:
            break

    if count >= MAX_PICS:
        break

    if not api.has_next_page:
        print("Last page: ", api.page)
        break
        # Search next page
    api.search_next_page()

There is an upper limit of 1.000 photos in the above Python program, you can change that if you like. It is set to download photos that are shown if you query person. Feel free to change that.

It takes some time to download all the images and will take up some space.

Step 2: Extract the faces from the photos

Here OpenCV comes in. They have a trained model using the Haar Cascade Classifier. You need to install the OpenCV library by the following command.

pip install opencv-python

The trained model we use is part of the library, but is not loaded easily from the destination. Therefore we suggest you download it from here (it should be named: haarcascade_frontalface_default.xml) and add the it to the location you work from.

We want to use it to identify faces and extract them and save them in a library for later use.

import cv2
import numpy as np
import glob
import os
from pathlib import Path


def preprocess(box_width=12, box_height=16):
    path = "pics"
    output = "small-faces"
    Path(output).mkdir(parents=True, exist_ok=True)
    files = glob.glob(os.path.join(path, "*"))
    files.sort()

    face_cascade = cv2.CascadeClassifier("haarcascade_frontalface_default.xml")

    images = []
    cnt = 0
    for filename in files:
        print("Processing...", filename)
        frame = cv2.imread(filename)
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        frame_gray = cv2.equalizeHist(frame_gray)
        faces = face_cascade.detectMultiScale(frame_gray, scaleFactor=1.3, minNeighbors=10, minSize=(350, 350), flags=cv2.CASCADE_SCALE_IMAGE)
        for (x, y, w, h) in faces:
            roi = frame[y:y+h, x:x+w]

            img = cv2.resize(roi, (box_width, box_height))
            images.append(img)

            output_file_name = "face-" + str(cnt).zfill(5) + ".jpg"
            output_file_name = os.path.join(output, output_file_name)
            cv2.imwrite(output_file_name, img)

    return np.stack(images)


preprocess(box_width=12, box_height=16)

It will create a folder called small-faces with small images of the identified faces.

Notice, that the Haar Cascade Classifier is not perfect. It will miss a lot of faces and have false positives. It is a good idea to look manually though all the images and delete all false positives (images that are not having a face).

Step 3: Building our first mosaic photo

The approach to divide the photo into equal sized boxes. For each box to find the image (our faces), which fits the best as a replacement.

To improve performance of the process function we use Numba, which is a just-in-time compiler that is designed to optimize NumPy code in for-loops.

import cv2
import numpy as np
import glob
import os
from numba import jit


@jit(nopython=True)
def process(photo, images, box_width=24, box_height=32):
    height, width, _ = photo.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = photo[i:i + box_height, j:j + box_width]
            best_match = np.inf
            best_match_index = 0
            for k in range(1, images.shape[0]):
                total_sum = np.sum(np.where(roi > images[k], roi - images[k], images[k] - roi))
                if total_sum < best_match:
                    best_match = total_sum
                    best_match_index = k
            photo[i:i + box_height, j:j + box_width] = images[best_match_index]
    return photo


def main():
    photo = cv2.imread("rune.jpg")

    box_width = 12
    box_height = 16
    height, width, _ = photo.shape
    # To make sure that it we can slice the photo in box-sizes
    width = (width//box_width) * box_width
    height = (height//box_height) * box_height
    photo = cv2.resize(photo, (width, height))

    # Load all the images of the faces
    images = load_images(box_width, box_height)

    # Create the mosaic
    mosaic = process(photo.copy(), images, box_width, box_height)

    cv2.imshow("Original", photo)
    cv2.imshow("Result", mosaic)
    cv2.waitKey(0)


main()

To test it we have used the photo of Rune.

This reuses the same images. This gives a decent result, but if you want to avoid the extreme patterns of reused images, you can change the code for that.

The above example has 606 small images. If you avoid reuse it runs out fast of possible images. This would require a bigger base or the result becomes questionable.

No reuse of face images to create the Photo Mosaic

The above photo mosaic is created on a downscaled size, but still it does not create a good result, if you do not reuse images. This would require a quite larger set of images to work from.

Video Mosaic on Live Webcam Stream with OpenCV and Numba

What will we cover in this tutorial?

We will investigate if we can create a decent video mosaic effect on a live webcam stream using OpenCV, Numba and Python. First we will learn the simple way to create a video mosaic and investigate the performance of that. Then we will extend that to create a better quality video mosaic and try to improve the performance by lowering the quality.

Step 1: How does simple photo mosaic work?

A photographic mosaic is a photo generated by other small images. A black and white example is given here.

The above is not a perfect example of it as it is generated with speed to get it running smooth from a webcam stream. Also, it is done in gray scale to improve performance.

The idea is to generate the original image (photograph) by mosaic technique by a lot of smaller sampled images. This is done in the above with the original frame of 640×480 pixels and the mosaic is constructed of small images of size 16×12 pixels.

The first thing we want to achieve is to create a simple mosaic. A simple mosaic is when the original image is scaled down and each pixel is then exchanged with one small image with the same average color. This is simple and efficient to do.

On a high level this is the process.

  1. Have a collection C of small images used to create the photographic mosaic
  2. Scale down the photo P you want to create a mosaic of.
  3. For each pixel in photo P find the image I from C that has the closed average color as the pixel. Insert image I to represent that pixel.

This explains the simple way of doing. The next question is, will it be efficient enough to have a live webcam stream processed?

Step 2: Create a collection of small images

To optimize performance we have chosen to make it in gray scale. The first step is to collect images you want to use. This can be any pictures.

We have used photos from Pexels, which are all free for use without copyright.

What we need is to convert them all to gray scale and resize to fit our purpose.

import cv2
import glob
import os
import numpy as np

output = "small-pics-16x12"
path = "pics"
files = glob.glob(os.path.join(path, "*"))
for file_name in files:
    print(file_name)
    img = cv2.imread(file_name)
    img = cv2.resize(img, (16, 12))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    mean = np.mean(img)
    output_file_name = "image-" + str(mean).replace('.', '-') + ".jpg"
    output_file_name = os.path.join(output, output_file_name)
    print(output_file_name)
    cv2.imwrite(output_file_name, img)

The script assumes that we have located the images we want to convert to gray scale and resize are located in the local folder pics. Further, we assume that the output images (the processed images) will be put in an already existing folder small-pics-16×12.

Step 3: Get a live stream from the webcam

On a high level a live stream from a webcam is given in the following diagram.

This process framework is given in the code below.

import cv2
import numpy as np


def process(frame):
    return frame


def main():
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    while True:
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Update the frame
        updated_frame = process(gray)

        # Show the frame in a window
        cv2.imshow('WebCam', updated_frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()


main()

The above code is just an empty shell where the function call to process is where the all the processing will be. This code will just generate a window that shows a gray scale image.

Step 4: The simple video mosaic

We need to introduce two main things to create this simple video mosaic.

  1. Loading all the images we need to use (the 16×12 gray scale images).
  2. Fill out the processing of each frame, which replaces each 16×12 box of the frame with the best matching image.

The first step is preprocessing and should be done before we enter the main loop of the webcam capturing. The second part is done in each iteration inside the process function.

import cv2
import numpy as np
import glob
import os


def preprocess():
    path = "small-pics-16x12"
    files = glob.glob(os.path.join(path, "*"))
    files.sort()
    images = []
    for filename in files:
        img = cv2.imread(filename)
        images.append(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
    return np.stack(images)


def process(frame, images, box_height=12, box_width=16):
    height, width = frame.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame[i:i + box_height, j:j + box_width]
            mean = np.mean(roi[:, :])
            roi[:, :] = images[int((len(images)-1)*mean/256)]
    return frame


def main(images):
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    while True:
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Update the frame
        mosaic_frame = process(gray, images)

        # Show the frame in a window
        cv2.imshow('Mosaic Video', mosaic_frame)
        cv2.imshow('Webcam', frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()



images = preprocess()
main(images)

The preprocessing function reads all the images, converts them to gray scale (to have only 1 channel per pixel), and returns them as a NumPy array to have optimized code.

The process function takes and breaks down the image in blocks of 16×12 pixels, computes the average gray scale, and takes the estimated best match. Notice the average (mean) value is a float, hence, we can have more than 256 gray scale images.

In this example we used 1.885 images to process it.

A result can be seen here.

The result is decent but not good.

Step 5: Testing the performance and improve it by using Numba

While the performance is quite good, let us test it.

We do that by using the time library.

First you need to import the time library.

import time

Then time the actual time the process call uses. New code inserted in the main while loop.

        # Update the frame
        start = time.time()
        mosaic_frame = process(gray, images)
        print("Process time", time.time()- start, "seconds")

This will result in the following output.

Process time 0.02651691436767578 seconds
Process time 0.026834964752197266 seconds
Process time 0.025418996810913086 seconds
Process time 0.02562689781188965 seconds
Process time 0.025369882583618164 seconds
Process time 0.025450944900512695 seconds

Or a few lines from it. About 0.025-0.027 seconds.

Let’s try to use Numba in the equation. Numba is a just-in-time compiler for NumPy code. That means it compiles to python code to a binary for speed. If you are new to Numba we recommend you read this tutorial.

import cv2
import numpy as np
import glob
import os
import time
from numba import jit


def preprocess():
    path = "small-pics-16x12"
    files = glob.glob(os.path.join(path, "*"))
    files.sort()
    images = []
    for filename in files:
        img = cv2.imread(filename)
        images.append(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
    return np.stack(images)


@jit(nopython=True)
def process(frame, images, box_height=12, box_width=16):
    height, width = frame.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame[i:i + box_height, j:j + box_width]
            mean = np.mean(roi[:, :])
            roi[:, :] = images[int((len(images)-1)*mean/256)]
    return frame


def main(images):
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    while True:
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Update the frame
        start = time.time()
        mosaic_frame = process(gray, images)
        print("Process time", time.time()- start, "seconds")

        # Show the frame in a window
        cv2.imshow('Mosaic Video', mosaic_frame)
        cv2.imshow('Webcam', frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()



images = preprocess()
main(images)

This gives the following performance.

Process time 0.0014820098876953125 seconds
Process time 0.0013887882232666016 seconds
Process time 0.0015859603881835938 seconds
Process time 0.0016350746154785156 seconds
Process time 0.0018379688262939453 seconds
Process time 0.0016241073608398438 seconds

Which is a factor 15-20 speed improvement.

Good enough for live streaming. But the result is still not decent.

Step 6: A more advanced video mosaic approach

The more advanced video mosaic consist of approximating the each replacement box of pixels by the replacement image pixel by pixel.

import cv2
import numpy as np
import glob
import os
import time
from numba import jit


def preprocess():
    path = "small-pics-16x12"
    files = glob.glob(os.path.join(path, "*"))
    files.sort()
    images = []
    for filename in files:
        img = cv2.imread(filename)
        images.append(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
    return np.stack(images)


@jit(nopython=True)
def process(frame, images, box_height=12, box_width=16):
    height, width = frame.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame[i:i + box_height, j:j + box_width]
            best_match = np.inf
            best_match_index = 0
            for k in range(1, images.shape[0]):
                total_sum = np.sum(np.where(roi > images[k], roi - images[k], images[k] - roi))
                if total_sum < best_match:
                    best_match = total_sum
                    best_match_index = k
            roi[:,:] = images[best_match_index]
    return frame


def main(images):
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    while True:
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Update the frame
        start = time.time()
        mosaic_frame = process(gray, images)
        print("Process time", time.time()- start, "seconds")

        # Show the frame in a window
        cv2.imshow('Mosaic Video', mosaic_frame)
        cv2.imshow('Webcam', frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()


images = preprocess()
main(images)

There is one line to notice specifically.

total_sum = np.sum(np.where(roi > images[k], roi - images[k], images[k] - roi))

Which is needed, as we work with unsigned 8 bit integers. What it does is, that it takes the and calculates the difference between each pixel in the region of interest (roi) and the image[k]. This is a very expensive calculation as we will see.

Performance shows the following.

Process time 7.030380010604858 seconds
Process time 7.034134149551392 seconds
Process time 7.105709075927734 seconds
Process time 7.138839960098267 seconds

Over 7 seconds for each frame. The result is what can be expected by using this amount of images, but the performance is too slow to have a flowing smooth live webcam stream.

The result can be seen here.

Step 7: Compromise options

There are various options to compromise for speed and we will not investigate all. Here are some.

  • Use fever images in our collection (use less than 1.885 images). Notice, that using half the images, say 900 images, will only speed up 50%.
  • Bigger image sizes. Scaling up to use 32×24 images. Here we will still need to do a lot of processing per pixel still. Hence, the expected speedup might be less than expected.
  • Make a compromised version of the difference calculation (total_sum). This has great potential, but might have undesired effects.
  • Scale down pixel estimation for fever calculations.

We will try the last two.

First, let’s try to exchange the calculation of total_sum, which is our distance function that measures how close our image is. Say, we use this.

                total_sum = np.sum(np.subtract(roi, images[k]))

This results in overflow if we have a calculation like 1 – 2 = 255, which is undesired. On the other hand. It might happen in expected 50% of the cases, and maybe it will skew the calculation evenly for all images.

Let’s try.

Process time 1.857623815536499 seconds
Process time 1.7193729877471924 seconds
Process time 1.7445549964904785 seconds
Process time 1.707035779953003 seconds
Process time 1.6778359413146973 seconds

Wow. That is a speedup of a factor 4-6 per frame. The quality is still fine, but you will notice a poorly mapped image from time to time. But the result is close to the advanced video mosaic and far from the first simple video mosaic.

Another addition we could make is to estimate each box by only 4 pixels. This should still be better than the simple video mosaic approach. I have given the full code below.

import cv2
import numpy as np
import glob
import os
import time
from numba import jit


def preprocess():
    path = "small-pics-16x12"
    files = glob.glob(os.path.join(path, "*"))
    files.sort()
    images = []
    for filename in files:
        img = cv2.imread(filename)
        images.append(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
    return np.stack(images)


def preprocess2(images, scale_width=8, scale_height=6):
    scaled = []
    _, height, width = images.shape
    print("Dimensions", width, height)
    width //= scale_width
    height //= scale_height
    print("Scaled Dimensions", width, height)
    for i in range(images.shape[0]):
        scaled.append(cv2.resize(images[i], (width, height)))
    return np.stack(scaled)


@jit(nopython=True)
def process3(frame, frame_scaled, images, scaled, box_height=12, box_width=16, scale_width=8, scale_height=6):
    height, width = frame.shape
    width //= scale_width
    height //= scale_height
    box_width //= scale_width
    box_height //= scale_height
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame_scaled[i:i + box_height, j:j + box_width]
            best_match = np.inf
            best_match_index = 0
            for k in range(1, scaled.shape[0]):
                total_sum = np.sum(roi - scaled[k])
                if total_sum < best_match:
                    best_match = total_sum
                    best_match_index = k
            frame[i*scale_height:(i + box_height)*scale_height, j*scale_width:(j + box_width)*scale_width] = images[best_match_index]
    return frame


def main(images, scaled):
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    while True:
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Update the frame
        start = time.time()
        gray_scaled = cv2.resize(gray, (640//8, 480//6))
        mosaic_frame = process3(gray, gray_scaled, images, scaled)
        print("Process time", time.time()- start, "seconds")

        # Show the frame in a window
        cv2.imshow('Mosaic Video', mosaic_frame)
        cv2.imshow('Webcam', frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()


images = preprocess()
scaled = preprocess2(images)
main(images, scaled)

Where there is added preprocessing step (preprocess2). The process time is now.

Process time 0.5559628009796143 seconds
Process time 0.5979928970336914 seconds
Process time 0.5543379783630371 seconds
Process time 0.5621011257171631 seconds

Which is okay, but still less than 2 frames per seconds.

The result can be seen here.

It is not all bad. It is still better than the simple video mosaic approach.

The result is not perfect. If you want to use it on a live webcam stream with 25-30 frames per seconds, you need to find further optimizations of live with the simple mosaic video approach.

Using Numba for Efficient Frame Modifications in OpenCV

What will we cover in this tutorial?

We will compare the speed for using Numba optimization when making calculations and modifications on frames from a video stream using OpenCV.

In this tutorial we will divide each frame into same size boxes and calculate the average color for each box. Then make a frame which colors each box to that color.

See the effect down in the video. These calculations are expensive in Python, hence we will compare the performance by using Numba.

Step 1: Understand the process requirements

Each video frame from OpenCV is an image represented by a NumPy array. In this example we will use the webcam to capture a video stream and do the calculations and modifications live on the stream. This sets high requirements to the processing time of each frame.

To keep a fluid motion picture we need to show each frame in 1/25 of a second. That leaves at most 0.04 seconds for each frame, from capture, process, and update the window with the video stream.

While the capture and updating the window takes time, it leaves is a great uncertainty how fast the frame processing (calculations and modifications) should be, but a upper bound is 0.04 seconds per frame.

Step 2: The calculations and modifications on each frame

Let’s have some fun. The calculations and modification we want to apply to each frame are as follows.

  • Calculations. We divide each frame into small 6×16 pixels areas and calculate the average color for each area. To get the average color we calculate the average of each channel (BGR).
  • Modification. For each area we will change the color for each area and fill it entirely with the average color.

This can be done by adding this function to process each frame.

def process(frame, box_height=6, box_width=16):
    height, width, _ = frame.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame[i:i + box_height, j:j + box_width]
            b_mean = np.mean(roi[:, :, 0])
            g_mean = np.mean(roi[:, :, 1])
            r_mean = np.mean(roi[:, :, 2])
            roi[:, :, 0] = b_mean
            roi[:, :, 1] = g_mean
            roi[:, :, 2] = r_mean
    return frame

The frame will be divided into areas of the box size (box_height x box_width). For each box (roi: Region of Interest) the average (mean) value of each of the 3 color channels (b_mean, g_mean, r_mean) and overwriting the area to the average color.

Step 3: Testing performance for this frame process

To get an estimate of the time spend in function process, the cProfile library is quite good. It gives a profiling of time spent in each function call. This is great, since we can get an measure of how much time is spent in the function process.

We can accomplish that by running this code.

import cv2
import numpy as np
import cProfile


def process(frame, box_height=6, box_width=16):
    height, width, _ = frame.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame[i:i + box_height, j:j + box_width]
            b_mean = np.mean(roi[:, :, 0])
            g_mean = np.mean(roi[:, :, 1])
            r_mean = np.mean(roi[:, :, 2])
            roi[:, :, 0] = b_mean
            roi[:, :, 1] = g_mean
            roi[:, :, 2] = r_mean
    return frame


def main(iterations=300):
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    for _ in range(iterations):
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))

        frame = process(frame)

        # Show the frame in a window
        cv2.imshow('WebCam', frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()

cProfile.run("main()")

Where the interesting output line is given here.

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      300    7.716    0.026   50.184    0.167 TEST2.py:8(process)

Which says we use 0.026 seconds per call in the process function. This is good, if we the overhead from the other functions in the main loop is less accumulated to 0.014 seconds.

If we investigate further the calls.

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      300    5.132    0.017    5.132    0.017 {method 'read' of 'cv2.VideoCapture' objects}
      300    0.073    0.000    0.073    0.000 {resize}
      300    2.848    0.009    2.848    0.009 {waitKey}
      300    0.120    0.000    0.120    0.000 {flip}
      300    0.724    0.002    0.724    0.002 {imshow}

Which gives an overhead of approximately 0.028 seconds (0.017 + 0.009 + 0.002) from read, resize, flip, imshow and waitKey calls in each iteration. This adds up to a total of 0.054 seconds per frame or a frame rate of 18.5 frames per seconds (FPS).

This is too slow to make it running smooth.

Please notice that cProfile does add some overhead to measure the time.

Step 4: Introducing the Numba to optimize performance

The Numba library is designed to just-in-time compiling code to make NumPy loops faster. Wow. That is just what we need here. Let’s just jump right into it and see how it will do.

import cv2
import numpy as np
from numba import jit
import cProfile


@jit(nopython=True)
def process(frame, box_height=6, box_width=16):
    height, width, _ = frame.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame[i:i + box_height, j:j + box_width]
            b_mean = np.mean(roi[:, :, 0])
            g_mean = np.mean(roi[:, :, 1])
            r_mean = np.mean(roi[:, :, 2])
            roi[:, :, 0] = b_mean
            roi[:, :, 1] = g_mean
            roi[:, :, 2] = r_mean
    return frame


def main(iterations=300):
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    for _ in range(iterations):
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))

        frame = process(frame)

        # Show the frame in a window
        cv2.imshow('WebCam', frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()

main(iterations=1)
cProfile.run("main(iterations=300)")

Notice that we call the main loop with one iteration. This is done to call the process function once before we measure the performance as it will compile the code in the first call and keep it compiled.

The result is as follows.

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      300    1.187    0.004    1.187    0.004 TEST2.py:7(pixels)

Which estimates a 0.004 seconds per call. This results in a total time of 0.032 seconds per iteration (0.028 + 0.004). This is sufficient to keep the performance of more than 24 frames-per-second (FPS).

Also, this improves the performance by a factor 6.5 times (7.717 / 1.187).

Conclusion

We got the desired speedup to have a live stream from the webcam and process it frame by frame by using Numba. The speedup was approximately 6.5 times.

Average vs Weighted Average Effect in Video using OpenCV

What will we cover in this tutorial?

Compare the difference of using weighted average and normal average over the last frames streaming from your webcam using OpenCV in Python.

The effect can be seen in the video below and code used to create that is provided below.

Example output Normal Average vs Weighted Average vs One Frame

The code

The code is straight forward and not optimized. The average is calculated by using a deque from the collection library from Python to create a circular buffer.

The two classes of AverageBuffer and WeightedAverageBuffer share the same code for the constructor and apply, but have each their implementation of get_frame which calculates the average and weighted average, respectively.

Please notice, that the code is not written for efficiency and the AverageBuffer has some easy wins in performance if calculated more efficiently.

An important point to see here, is that the frames are saved as float32 in the buffers. This is necessary when we do the actual calculations on the frames later, where we multiply them by a factor, say 4.

Example. The frames are uint8, which are integers 0 to 255. Say we multiply the frame by 4, and the value is 128. This will give 128*4 = 512, which as an uint8 is 0. Hence, we get an undesirable effect. Therefore we convert them to float32 to avoid this.

import cv2
import numpy as np
from collections import deque


class AverageBuffer:
    def __init__(self, maxlen):
        self.buffer = deque(maxlen=maxlen)
        self.shape = None

    def apply(self, frame):
        self.shape = frame.shape
        self.buffer.append(frame)

    def get_frame(self):
        mean_frame = np.zeros(self.shape, dtype='float32')
        for item in self.buffer:
            mean_frame += item
        mean_frame /= len(self.buffer)
        return mean_frame.astype('uint8')


class WeightedAverageBuffer(AverageBuffer):
    def get_frame(self):
        mean_frame = np.zeros(self.shape, dtype='float32')
        i = 0
        for item in self.buffer:
            i += 4
            mean_frame += item*i
        mean_frame /= (i*(i + 1))/8.0
        return mean_frame.astype('uint8')

# Setup camera
cap = cv2.VideoCapture(0)
# Set a smaller resolution
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)

average_buffer = AverageBuffer(30)
weighted_buffer = WeightedAverageBuffer(30)

while True:
    # Capture frame-by-frame
    _, frame = cap.read()
    frame = cv2.flip(frame, 1)
    frame = cv2.resize(frame, (320, 240))

    frame_f32 = frame.astype('float32')
    average_buffer.apply(frame_f32)
    weighted_buffer.apply(frame_f32)

    cv2.imshow('WebCam', frame)
    cv2.imshow("Average", average_buffer.get_frame())
    cv2.imshow("Weighted average", weighted_buffer.get_frame())

    if cv2.waitKey(1) == ord('q'):
        break

# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()

Create a Line Drawing from Webcam Stream using OpenCV in Python

What will we cover in this tutorial?

How to convert a webcam stream into a black and white line drawing using OpenCV and Python. Also, how to adjust the parameters while running the live stream.

See result here.

The things you need to use

There are two things you need to use in order to get a good line drawing of your image.

  1. GaussianBlur to smooth out the image, as detecting lines is sensitive to noise.
  2. Canny that detects the lines.

The Gaussian blur is advised to use a 5×5 filter. The Canny then has to threshold parameters. To find the optimal values for your setting, we have inserted two trackbars where you can set them to any value as see the results.

You can read more about Canny Edge Detection here.

If you need to install OpenCV please read this tutorial.

The code is given below.

import cv2
import numpy as np

# Setup camera
cap = cv2.VideoCapture(0)
# Set a smaller resolution
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)


def nothing(x):
    pass


canny = "Canny"
cv2.namedWindow(canny)
cv2.createTrackbar('Threshold 1', canny, 0, 255, nothing)
cv2.createTrackbar('Threshold 2', canny, 0, 255, nothing)

while True:
    # Capture frame-by-frame
    _, frame = cap.read()
    frame = cv2.flip(frame, 1)

    t1 = cv2.getTrackbarPos('Threshold 1', canny)
    t2 = cv2.getTrackbarPos('Threshold 2', canny)
    gb = cv2.GaussianBlur(frame, (5, 5), 0)
    can = cv2.Canny(gb, t1, t2)

    cv2.imshow(canny, can)

    frame[np.where(can)] = 255
    cv2.imshow('WebCam', frame)
    if cv2.waitKey(1) == ord('q'):
        break

# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()