Video Mosaic on Live Webcam Stream with OpenCV and Numba

What will we cover in this tutorial?

We will investigate if we can create a decent video mosaic effect on a live webcam stream using OpenCV, Numba and Python. First we will learn the simple way to create a video mosaic and investigate the performance of that. Then we will extend that to create a better quality video mosaic and try to improve the performance by lowering the quality.

Step 1: How does simple photo mosaic work?

A photographic mosaic is a photo generated by other small images. A black and white example is given here.

The above is not a perfect example of it as it is generated with speed to get it running smooth from a webcam stream. Also, it is done in gray scale to improve performance.

The idea is to generate the original image (photograph) by mosaic technique by a lot of smaller sampled images. This is done in the above with the original frame of 640×480 pixels and the mosaic is constructed of small images of size 16×12 pixels.

The first thing we want to achieve is to create a simple mosaic. A simple mosaic is when the original image is scaled down and each pixel is then exchanged with one small image with the same average color. This is simple and efficient to do.

On a high level this is the process.

  1. Have a collection C of small images used to create the photographic mosaic
  2. Scale down the photo P you want to create a mosaic of.
  3. For each pixel in photo P find the image I from C that has the closed average color as the pixel. Insert image I to represent that pixel.

This explains the simple way of doing. The next question is, will it be efficient enough to have a live webcam stream processed?

Step 2: Create a collection of small images

To optimize performance we have chosen to make it in gray scale. The first step is to collect images you want to use. This can be any pictures.

We have used photos from Pexels, which are all free for use without copyright.

What we need is to convert them all to gray scale and resize to fit our purpose.

import cv2
import glob
import os
import numpy as np

output = "small-pics-16x12"
path = "pics"
files = glob.glob(os.path.join(path, "*"))
for file_name in files:
    print(file_name)
    img = cv2.imread(file_name)
    img = cv2.resize(img, (16, 12))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    mean = np.mean(img)
    output_file_name = "image-" + str(mean).replace('.', '-') + ".jpg"
    output_file_name = os.path.join(output, output_file_name)
    print(output_file_name)
    cv2.imwrite(output_file_name, img)

The script assumes that we have located the images we want to convert to gray scale and resize are located in the local folder pics. Further, we assume that the output images (the processed images) will be put in an already existing folder small-pics-16×12.

Step 3: Get a live stream from the webcam

On a high level a live stream from a webcam is given in the following diagram.

This process framework is given in the code below.

import cv2
import numpy as np


def process(frame):
    return frame


def main():
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    while True:
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Update the frame
        updated_frame = process(gray)

        # Show the frame in a window
        cv2.imshow('WebCam', updated_frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()


main()

The above code is just an empty shell where the function call to process is where the all the processing will be. This code will just generate a window that shows a gray scale image.

Step 4: The simple video mosaic

We need to introduce two main things to create this simple video mosaic.

  1. Loading all the images we need to use (the 16×12 gray scale images).
  2. Fill out the processing of each frame, which replaces each 16×12 box of the frame with the best matching image.

The first step is preprocessing and should be done before we enter the main loop of the webcam capturing. The second part is done in each iteration inside the process function.

import cv2
import numpy as np
import glob
import os


def preprocess():
    path = "small-pics-16x12"
    files = glob.glob(os.path.join(path, "*"))
    files.sort()
    images = []
    for filename in files:
        img = cv2.imread(filename)
        images.append(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
    return np.stack(images)


def process(frame, images, box_height=12, box_width=16):
    height, width = frame.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame[i:i + box_height, j:j + box_width]
            mean = np.mean(roi[:, :])
            roi[:, :] = images[int((len(images)-1)*mean/256)]
    return frame


def main(images):
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    while True:
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Update the frame
        mosaic_frame = process(gray, images)

        # Show the frame in a window
        cv2.imshow('Mosaic Video', mosaic_frame)
        cv2.imshow('Webcam', frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()



images = preprocess()
main(images)

The preprocessing function reads all the images, converts them to gray scale (to have only 1 channel per pixel), and returns them as a NumPy array to have optimized code.

The process function takes and breaks down the image in blocks of 16×12 pixels, computes the average gray scale, and takes the estimated best match. Notice the average (mean) value is a float, hence, we can have more than 256 gray scale images.

In this example we used 1.885 images to process it.

A result can be seen here.

The result is decent but not good.

Step 5: Testing the performance and improve it by using Numba

While the performance is quite good, let us test it.

We do that by using the time library.

First you need to import the time library.

import time

Then time the actual time the process call uses. New code inserted in the main while loop.

        # Update the frame
        start = time.time()
        mosaic_frame = process(gray, images)
        print("Process time", time.time()- start, "seconds")

This will result in the following output.

Process time 0.02651691436767578 seconds
Process time 0.026834964752197266 seconds
Process time 0.025418996810913086 seconds
Process time 0.02562689781188965 seconds
Process time 0.025369882583618164 seconds
Process time 0.025450944900512695 seconds

Or a few lines from it. About 0.025-0.027 seconds.

Let’s try to use Numba in the equation. Numba is a just-in-time compiler for NumPy code. That means it compiles to python code to a binary for speed. If you are new to Numba we recommend you read this tutorial.

import cv2
import numpy as np
import glob
import os
import time
from numba import jit


def preprocess():
    path = "small-pics-16x12"
    files = glob.glob(os.path.join(path, "*"))
    files.sort()
    images = []
    for filename in files:
        img = cv2.imread(filename)
        images.append(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
    return np.stack(images)


@jit(nopython=True)
def process(frame, images, box_height=12, box_width=16):
    height, width = frame.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame[i:i + box_height, j:j + box_width]
            mean = np.mean(roi[:, :])
            roi[:, :] = images[int((len(images)-1)*mean/256)]
    return frame


def main(images):
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    while True:
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Update the frame
        start = time.time()
        mosaic_frame = process(gray, images)
        print("Process time", time.time()- start, "seconds")

        # Show the frame in a window
        cv2.imshow('Mosaic Video', mosaic_frame)
        cv2.imshow('Webcam', frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()



images = preprocess()
main(images)

This gives the following performance.

Process time 0.0014820098876953125 seconds
Process time 0.0013887882232666016 seconds
Process time 0.0015859603881835938 seconds
Process time 0.0016350746154785156 seconds
Process time 0.0018379688262939453 seconds
Process time 0.0016241073608398438 seconds

Which is a factor 15-20 speed improvement.

Good enough for live streaming. But the result is still not decent.

Step 6: A more advanced video mosaic approach

The more advanced video mosaic consist of approximating the each replacement box of pixels by the replacement image pixel by pixel.

import cv2
import numpy as np
import glob
import os
import time
from numba import jit


def preprocess():
    path = "small-pics-16x12"
    files = glob.glob(os.path.join(path, "*"))
    files.sort()
    images = []
    for filename in files:
        img = cv2.imread(filename)
        images.append(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
    return np.stack(images)


@jit(nopython=True)
def process(frame, images, box_height=12, box_width=16):
    height, width = frame.shape
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame[i:i + box_height, j:j + box_width]
            best_match = np.inf
            best_match_index = 0
            for k in range(1, images.shape[0]):
                total_sum = np.sum(np.where(roi > images[k], roi - images[k], images[k] - roi))
                if total_sum < best_match:
                    best_match = total_sum
                    best_match_index = k
            roi[:,:] = images[best_match_index]
    return frame


def main(images):
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    while True:
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Update the frame
        start = time.time()
        mosaic_frame = process(gray, images)
        print("Process time", time.time()- start, "seconds")

        # Show the frame in a window
        cv2.imshow('Mosaic Video', mosaic_frame)
        cv2.imshow('Webcam', frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()


images = preprocess()
main(images)

There is one line to notice specifically.

total_sum = np.sum(np.where(roi > images[k], roi - images[k], images[k] - roi))

Which is needed, as we work with unsigned 8 bit integers. What it does is, that it takes the and calculates the difference between each pixel in the region of interest (roi) and the image[k]. This is a very expensive calculation as we will see.

Performance shows the following.

Process time 7.030380010604858 seconds
Process time 7.034134149551392 seconds
Process time 7.105709075927734 seconds
Process time 7.138839960098267 seconds

Over 7 seconds for each frame. The result is what can be expected by using this amount of images, but the performance is too slow to have a flowing smooth live webcam stream.

The result can be seen here.

Step 7: Compromise options

There are various options to compromise for speed and we will not investigate all. Here are some.

  • Use fever images in our collection (use less than 1.885 images). Notice, that using half the images, say 900 images, will only speed up 50%.
  • Bigger image sizes. Scaling up to use 32×24 images. Here we will still need to do a lot of processing per pixel still. Hence, the expected speedup might be less than expected.
  • Make a compromised version of the difference calculation (total_sum). This has great potential, but might have undesired effects.
  • Scale down pixel estimation for fever calculations.

We will try the last two.

First, let’s try to exchange the calculation of total_sum, which is our distance function that measures how close our image is. Say, we use this.

                total_sum = np.sum(np.subtract(roi, images[k]))

This results in overflow if we have a calculation like 1 – 2 = 255, which is undesired. On the other hand. It might happen in expected 50% of the cases, and maybe it will skew the calculation evenly for all images.

Let’s try.

Process time 1.857623815536499 seconds
Process time 1.7193729877471924 seconds
Process time 1.7445549964904785 seconds
Process time 1.707035779953003 seconds
Process time 1.6778359413146973 seconds

Wow. That is a speedup of a factor 4-6 per frame. The quality is still fine, but you will notice a poorly mapped image from time to time. But the result is close to the advanced video mosaic and far from the first simple video mosaic.

Another addition we could make is to estimate each box by only 4 pixels. This should still be better than the simple video mosaic approach. I have given the full code below.

import cv2
import numpy as np
import glob
import os
import time
from numba import jit


def preprocess():
    path = "small-pics-16x12"
    files = glob.glob(os.path.join(path, "*"))
    files.sort()
    images = []
    for filename in files:
        img = cv2.imread(filename)
        images.append(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
    return np.stack(images)


def preprocess2(images, scale_width=8, scale_height=6):
    scaled = []
    _, height, width = images.shape
    print("Dimensions", width, height)
    width //= scale_width
    height //= scale_height
    print("Scaled Dimensions", width, height)
    for i in range(images.shape[0]):
        scaled.append(cv2.resize(images[i], (width, height)))
    return np.stack(scaled)


@jit(nopython=True)
def process3(frame, frame_scaled, images, scaled, box_height=12, box_width=16, scale_width=8, scale_height=6):
    height, width = frame.shape
    width //= scale_width
    height //= scale_height
    box_width //= scale_width
    box_height //= scale_height
    for i in range(0, height, box_height):
        for j in range(0, width, box_width):
            roi = frame_scaled[i:i + box_height, j:j + box_width]
            best_match = np.inf
            best_match_index = 0
            for k in range(1, scaled.shape[0]):
                total_sum = np.sum(roi - scaled[k])
                if total_sum < best_match:
                    best_match = total_sum
                    best_match_index = k
            frame[i*scale_height:(i + box_height)*scale_height, j*scale_width:(j + box_width)*scale_width] = images[best_match_index]
    return frame


def main(images, scaled):
    # Get the webcam (default webcam is 0)
    cap = cv2.VideoCapture(0)
    # If your webcam does not support 640 x 480, this will find another resolution
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    while True:
        # Read the a frame from webcam
        _, frame = cap.read()
        # Flip the frame
        frame = cv2.flip(frame, 1)
        frame = cv2.resize(frame, (640, 480))
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Update the frame
        start = time.time()
        gray_scaled = cv2.resize(gray, (640//8, 480//6))
        mosaic_frame = process3(gray, gray_scaled, images, scaled)
        print("Process time", time.time()- start, "seconds")

        # Show the frame in a window
        cv2.imshow('Mosaic Video', mosaic_frame)
        cv2.imshow('Webcam', frame)

        # Check if q has been pressed to quit
        if cv2.waitKey(1) == ord('q'):
            break

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()


images = preprocess()
scaled = preprocess2(images)
main(images, scaled)

Where there is added preprocessing step (preprocess2). The process time is now.

Process time 0.5559628009796143 seconds
Process time 0.5979928970336914 seconds
Process time 0.5543379783630371 seconds
Process time 0.5621011257171631 seconds

Which is okay, but still less than 2 frames per seconds.

The result can be seen here.

It is not all bad. It is still better than the simple video mosaic approach.

The result is not perfect. If you want to use it on a live webcam stream with 25-30 frames per seconds, you need to find further optimizations of live with the simple mosaic video approach.

Performance comparison of Numba vs Vectorization vs Lambda function with NumPy

What will we cover in this tutorial?

We will continue our investigation of Numba from this tutorial.

Numba is a just-in-time compiler for Python that works amazingly with NumPy. As we saw in the last tutorial, the built in vectorization can depending on the case and size of instance be faster than Numba.

Here we will explore that further as well to see how Numba compares with lambda functions. Lambda functions has the advantage, that they can be parsed as an argument down to a library that can optimize the performance and not depend on slow Python code.

Step 1: Example of Vectorization slower than Numba

In the previous tutorial we only investigated an example of vectorization, which was faster than Numba. Here we will see, that this is not always the case.

import numpy as np
from numba import jit
import time

size = 100
x = np.random.rand(size, size)
y = np.random.rand(size, size)
iterations = 100000


@jit(nopython=True)
def add_numba(a, b):
    c = np.zeros(a.shape)
    for i in range(a.shape[0]):
        for j in range(a.shape[1]):
            c[i, j] = a[i, j] + b[i, j]
    return c


def add_vectorized(a, b):
    return a + b


# We call the function once, to precompile the code
z = add_numba(x, y)
start = time.time()
for _ in range(iterations):
    z = add_numba(x, y)
end = time.time()
print("Elapsed (numba, precompiled) = %s" % (end - start))

start = time.time()
for _ in range(iterations):
    z = add_vectorized(x, y)
end = time.time()
print("Elapsed (vectorized) = %s" % (end - start))

Varying the size of the NumPy array, we can see the performance between the two in the graph below.

Where it is clear that the vectorized approach is slower.

Step 2: Try some more complex example comparing vectorized and Numba

A if-then-else can be expressed as vectorized using the Numpy where function.

import numpy as np
from numba import jit
import time


size = 1000
x = np.random.rand(size, size)
iterations = 1000


@jit(nopython=True)
def numba(a):
    c = np.zeros(a.shape)
    for i in range(a.shape[0]):
        for j in range(a.shape[1]):
            if a[i, j] < 0.5:
                c[i, j] = 1
    return c


def vectorized(a):
    return np.where(a < 0.5, 1, 0)


# We call the numba function to precompile it before we measure it
z = numba(x)
start = time.time()
for _ in range(iterations):
    z = numba(x)
end = time.time()
print("Elapsed (numba, precompiled) = %s" % (end - start))

start = time.time()
for _ in range(iterations):
    z = vectorized(x)
end = time.time()
print("Elapsed (vectorized) = %s" % (end - start))

This results in the following comparison.

That is close, but the vectorized approach is a bit faster.

Step 3: Compare Numba with lambda functions

I am very curious about this. Lambda functions are controversial in Python, and many are not happy about them as they have a lot of syntax, which is not aligned with Python. On the other hand, lambda functions have the advantage that you can send them down in the library that can optimize over the for-loops.

import numpy as np
from numba import jit
import time

size = 1000
x = np.random.rand(size, size)
iterations = 1000


@jit(nopython=True)
def numba(a):
    c = np.zeros((size, size))
    for i in range(a.shape[0]):
        for j in range(a.shape[1]):
            c[i, j] = a[i, j] + 1
    return c


def lambda_run(a):
    return a.apply(lambda x: x + 1)


# Call the numba function to precompile it before time measurement
z = numba(x)
start = time.time()
for _ in range(iterations):
    z = numba(x)
end = time.time()
print("Elapsed (numba, precompiled) = %s" % (end - start))

start = time.time()
for _ in range(iterations):
    z = vectorized(x)
end = time.time()
print("Elapsed (vectorized) = %s" % (end - start))

Resulting in the following performance comparison.

This is again tight, but the lambda approach is still a bit faster.

Remember, this is a simple lambda function and we cannot conclude that lambda function in general are faster than using Numba.

Conclusion

Learnings since the last tutorial is that we have found an example where simple vectorization is slower than Numba. This still leads to the conclusion that performance highly depends on the task. Further, the lambda function seems to give promising performance. Again, this should be compared to the slow approach of a Python for-loop without Numba just-in-time compiled machine code.

When to use Numba with Python NumPy: Vectorization vs Numba

What will we cover in this tutorial?

You just want your code to run fast, right? Numba is a just-in-time compiler for Python that works amazingly with NumPy. Does that mean we should alway use Numba?

Well, let’s try some examples out and learn. If you know about NumPy, you know you should use vectorization to get speed. Does Numba beat that?

Step 1: Let’s learn how Numba works

Numba will compile the Python code into machine code and run it. What about the just-in-time compiler? That means, the first time it uses the code you want to turn into machine code, it will compile it and run it. The next, or any time later, it will just run it, as it is already compiled.

Let’s try that.

import numpy as np
from numba import jit
import time


@jit(nopython=True)
def full_sum_numba(a):
    sum = 0.0
    for i in range(a.shape[0]):
        for j in range(a.shape[1]):
            sum += a[i, j]
    return sum


iterations = 1000
size = 10000
x = np.random.rand(size, size)

start = time.time()
full_sum_numba(x)
end = time.time()
print("Elapsed (Numba) = %s" % (end - start))

start = time.time()
full_sum_numba(x)
end = time.time()
print("Elapsed (Numba) = %s" % (end - start))

Where you get.

Elapsed (No Numba) = 0.41634082794189453
Elapsed (No Numba) = 0.11176300048828125

Where you see a difference in runtime.

Oh, did you get what happened in the code? Well, if you put @jit(nopython=True) in front of a function, Numba will try to compile it and run it as machine code.

As you see above, the first time as has an overhead in run-time, because it first compiles and the runs it. The second time, it already has compiled it and can run it immediately.

Step 2: Compare Numba just-in-time code to native Python code

So let us compare how much you gain by using Numba just-in-time (@jit) in our code.

import numpy as np
from numba import jit
import time


def full_sum(a):
    sum = 0.0
    for i in range(a.shape[0]):
        for j in range(a.shape[1]):
            sum += a[i, j]
    return sum


@jit(nopython=True)
def full_sum_numba(a):
    sum = 0.0
    for i in range(a.shape[0]):
        for j in range(a.shape[1]):
            sum += a[i, j]
    return sum


iterations = 1000
size = 10000
x = np.random.rand(size, size)

start = time.time()
full_sum(x)
end = time.time()
print("Elapsed (No Numba) = %s" % (end - start))

start = time.time()
full_sum_numba(x)
end = time.time()
print("Elapsed (Numba) = %s" % (end - start))

start = time.time()
full_sum_numba(x)
end = time.time()
print("Elapsed (Numba) = %s" % (end - start))

Here we added a native Python function without the @jit in front and will compare it with one which has. We will compare it here.

Elapsed (No Numba) = 38.08543515205383
Elapsed (No Numba) = 0.41634082794189453
Elapsed (No Numba) = 0.11176300048828125

That is some difference. Also, we have plotted a few more runs in the graph below.

It seems pretty evident.

Step 3: Comparing it with Vectorization

If you don’t know what vectorization is, we can recommend this tutorial. The reason to have vectorization is to move the expensive for-loops into the function call to have optimized code run it.

That sounds a lot like what Numba can do. It can change the expensive for-loops into fast machine code.

But which one is faster?

Well, I think there are two parameters to try out. First, the size of the problem. Second, to see if the number of iterations matter.

import numpy as np
from numba import jit
import time


@jit(nopython=True)
def full_sum_numba(a):
    sum = 0.0
    for i in range(a.shape[0]):
        for j in range(a.shape[1]):
            sum += a[i, j]
    return sum


def full_sum_vectorized(a):
    return a.sum()


iterations = 1000
size = 10000
x = np.random.rand(size, size)

start = time.time()
full_sum_vectorized(x)
end = time.time()
print("Elapsed (No Numba) = %s" % (end - start))

start = time.time()
full_sum_numba(x)
end = time.time()
print("Elapsed (No Numba) = %s" % (end - start))

start = time.time()
full_sum_numba(x)
end = time.time()
print("Elapsed (No Numba) = %s" % (end - start))

As a function of the size.

It is interesting that Numba is faster for small sized of the problem, while it seems like the vectorized approach outperforms Numba for bigger sizes.

And not surprisingly, the number of iterations only makes the difference bigger.

This is not surprising, as the code in a vectorized call can be more specifically optimized than the more general purpose Numba approach.

Conclusion

Does that mean the Numba does not pay off to use?

No, not at all. First of all, we have only tried it for one vectorized approach, which was obviously very easy to optimize. Secondly, not all loops can be turned into vectorized code. In general it is difficult to have a state in a vectorized approach. Hence, if you need to keep track of some internal state in a loop it can be difficult to find a vectorized approach.