Task queues with Flask| Learning Flask Ep. 21
An introduction to task queues with Flask and RQ
Sometimes we need to run tasks in the background, outside of the HTTP request/response cycle. Long running tasks such as image, video or audio processing can take anywhere from minutes to hours, depending on the task at hand.
Picture this scenario.
You have a web application that allows users to upload an image, to which you’re going to produce 5 copies, all with different sizes to match the various screen sizes used on modern devices, including a thumbnail.
Even if this task only takes a few seconds to process, a few seconds of waiting on the web can feel like an eternity! And we want to keep our users using the application.
This is where task queues come in to play!
Task queues allow us to offload jobs to another worker process, meaning we can return something to the user immidiately whilst the job gets placed in a queue and processed at a later time (depending on how many tasks are currently in the queue, it could start immidiately)
There’s many use cases for task queues, a few examples include:
- Image/video/audio processing
- Web scraping
- Analysis/complex calculation
- Sending emails
And much more!
Task queues
There are a handful of task queues available for Python, however for this introduction we’re going to use RQ
, a simple yet powerful task queue that uses Redis as a message broker.
Tasks are handled by regular Python functions, which we can call, provide arguments and place in a queue.
For example, a very simple function can be used to handle a task:
def task_handler(args):
# this function takes around 2 minutes to complete
return True
When we want to add a task to a queue, we do something like the following:
q.enqueue(task_handler, args)
Where..
q
is a reference to the queue itselfenqueue
adds a new task to the queuetask_handler
is the name of the function we want to callargs
any arguments to pass to the function
Any tasks in the queue will be executed sequentially until all tasks are complete.
The message broker is the middleman between our application and our workers, delivering messages when we want to schedule a task in thr queue.
Redis is a fast, in memory database which we’ll use as our broker. So you’ll need to install it.
Installing Redis
We’re not going to cover installing Redis in thie article. However some points to note:
- If you’re on Windows 10, consider using the Windows Subsystem for Linux or Docker
- If you’re on Mac, install
Redis
usingHomebrew
- If you’re on Linux, build Redis from source or install it using your distro package manager
We’re using the Windows susbsystem for Linux running Ubuntu 18.04 and Redis is easily installed with:
sudo apt install redis-server
Here’s a great link to an article on Digital Ocean for setting up Redis on Ubuntu 18.04 if you’d like it to run as a service:
https://www.digitalocean.com/community/tutorials/how-to-install-and-secure-redis-on-ubuntu-18-04
Otherwise just start Redis in a new terminal with the following command:
redis-server
Note - We haven’t set up any authentication on our installation of Redis as it’s only for local development. If you’re running in production, you’ll want to enable password auth.
The Flask app
In this example, we’re going to build a simple application, allowing a user to submit a URL via a form.
We’ll create a task handler function which we’ll use to fetch the HTML of the URL and count the occurances of every word on the page.
But before we do that, let’s create a simple example first.
To get things going, we’re going to create a new virtual environment and install the dependencies:
python -m venv env && source env/bin/activate
We’re going to install flask
, redis
, beautifulsoup4
and rq
with pip
:
pip install flask redis rq beautifulsoup4
We’re going to create a simple, single file application in a file called app.py
.
First up, we need a few imports:
app.py
from flask import Flask, request
import redis
from rq import Queue
import time
We’re importing time
to simulate some delay in our background task.
Next, we’ll create the flask app
variable and setup our Redis
instance and task queue object:
app = Flask(__name__)
r = redis.Redis()
q = Queue(connection=r)
Lets create a very simple function that will handle a task. It takes an argument (n
) and returns the length of it with a simulated delay.
As you can see, it’s just a normal Python function!
def background_task(n):
""" Function that returns len(n) and simulates a delay """
delay = 2
print("Task running")
print(f"Simulating a {delay} second delay")
time.sleep(delay)
print(len(n))
print("Task complete")
return len(n)
Finally, we’ll create a route which looks for a query string with n
as the parameter.
@app.route("/task")
def index():
if request.args.get("n"):
job = q.enqueue(background_task, request.args.get("n"))
return f"Task ({job.id}) added to queue at {job.enqueued_at}"
return "No value for count provided"
if __name__ == "__main__":
app.run()
If n
is provided in the URL, it will add a task to the queue, with the value for n
as the function argument.
We add a task to a queue with:
q.enqueue(function_name, args)
In this case, we’ve stored this in a variable called job
.
We now have access to the job
object. You’ll notice we call job.id
and job.enqueued_at
which return a unique task id and the date when the task was enqueued.
Some other interesting attributes of the job
object include:
job.status
job.func_name
job.args
job.kwargs
job.result
job.enqueued_at
job.started_at
job.ended_at
job.exc_info
Before you run the app, you need to start the rq
worker.
Open a new terminal (In the same directory as run.py
) and start the worker with:
rq worker
You’ll see something like:
20:55:23 RQ worker 'rq:worker:jnwt.4968' started, version 0.13.0
20:55:23 *** Listening on default...
20:55:23 Cleaning registries for queue: default
Start the Flask app with:
export FLASK_APP=run.py
export FLASK_ENV = development
flask run
Go to /task?n=100
in your browser and keep an eye on your terminal running the rq
worker process.
You should see something similar to this in your browser:
Task (7dc516bb-7720-446a-acce-cbb272d7f598) added to queue at 2019-03-08 22:03:15.936306
In your terminal, you should see:
22:03:18 default: Job OK (86a8479a-e02d-4aca-8466-6df47fa69efe)
22:03:18 Result is kept for 500 seconds
22:03:18 default: run.background_task('100') (7dc516bb-7720-446a-acce-cbb272d7f598)
Task running
Simulating a 2 second delay
3
Task complete
22:03:20 default: Job OK (7dc516bb-7720-446a-acce-cbb272d7f598)
22:03:20 Result is kept for 500 seconds
You’ll notice, the app returns a response immediately while the task runs in the background.
Go ahead and refresh the URL a few times to add multiple tasks to the queue!
Example 2
Moving on to a more practical example of using the rq
task queue.
We’re going to rearrange our application structure to the following:
app
├── app
│ ├── __init__.py
│ ├── tasks.py
│ ├── templates
│ │ └── index.html
│ └── views.py
└── run.py
Rather than writing our task functions in the same file as our views, we’re going to separate them out into their own file called tasks.py
.
run.py
is the entry point to our application:
run.py
from app import app
if __name__ == "__main__":
app.run()
__init__.py
is going to bring our app together as a package and initialize some key components:
init.py
from flask import Flask
import redis
from rq import Queue
app = Flask(__name__)
r = redis.Redis()
q = Queue(connection=r)
from app import views
from app import tasks
We start out by importing the required packages and creating the app
variable.
r = redis.Redis()
creates our Redis connectionq = Queue(connection=r)
creates our task queue
There’s lots of things we can do with queues, but for now we’re just going to keep it simple and create a single queue.
Lastly, we import views
and tasks
from app. Where views.py
contains our application routes and tasks.py
contains our tasks.
views.py
- For now, we’re just going to import a few objects from our app
along with a couple of imports from flask and render a template. We’ll come back to it shortly:
views.py
from app import app
from app import r
from app import q
from flask import render_template, request
@app.route("/add-task", methods=["GET", "POST"])
def add_task():
return render_template("add_task.html")
tasks.py
is going to contain all of our task handler functions. We’re going to create a function that counts all of the occurances of each word at a given URL:
tasks.py
from urllib import request
from bs4 import BeautifulSoup
import lxml
import time
def count_words(url):
print(f"Counting words at {url}")
start = time.time()
r = request.urlopen(url)
soup = BeautifulSoup(r.read().decode(), "lxml")
paragraphs = " ".join([p.text for p in soup.find_all("p")])
word_count = dict()
for i in paragraphs.split():
if not i in word_count:
word_count[i] = 1
else:
word_count[i] += 1
end = time.time()
time_elapsed = end - start
print(word_count)
print(f"Total words: {len(word_count)}")
print(f"Time elapsed: {time_elapsed} ms")
return len(word_count)
add_task.html
is the HTML template rendered by the add_task
view.
The first container
holds a single form element, allowing us to submit a URL. The second container
displays some information about the current tasks.
We’re also using Bootstrap 4 for our CSS
add_task.html
<!doctype html>
<html lang="en">
<head>
<!-- Required meta tags -->
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<!-- Bootstrap CSS -->
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
<title>Job queue</title>
</head>
<body>
<div class="container">
<div class="row">
<div class="col">
<h5 class="mt-3">Word counter</h5>
<div class="card mt-3">
<div class="card-body">
<form action="/add-task">
<div class="form-group">
<label>Word count</label>
<input type="text" class="form-control" name="url" placeholder="Enter URL" required>
{% if message %}
<small id="emailHelp" class="form-text text-muted">{{ message }}</small>
{% endif %}
</div>
<button type="submit" class="btn btn-primary">Submit</button>
</form>
</div>
</div>
</div>
</div>
</div>
<div class="container">
<div class="row">
<div class="col">
<h5 class="mt-3 mb-3">Job queue</h5>
{% if jobs %}
{% for job in jobs %}
<div class="card mb-3">
<div class="card-body">
<h6>{{ job.func_name }}</h6>
<p>Args: {{ job.args }}</p>
<small class="text-muted d-block">Job ID: {{ job.id }}</small>
<small class="text-muted d-block">Status: {{ job.status }}</small>
<small class="text-muted d-block">Created at: {{ job.created_at.strftime('%a, %d %b %Y %H:%M:%S') }}</small>
<small class="text-muted d-block">Enqueued at: {{ job.enqueued_at.strftime('%a, %d %b %Y %H:%M:%S') }}</small>
</div>
</div>
{% endfor %}
{% else %}
<p>No jobs in the queue</p>
{% endif %}
</div>
</div>
</div>
</body>
</html>
Adding tasks
Before we run our app, we need to import the count_words
function.
We’re also importing strftime
to format the datetime object:
views.py
from app.tasks import count_words
from time import strftime`
We’ll now finish off the route:
views.py
@app.route("/add-task", methods=["GET", "POST"])
def add_task():
jobs = q.jobs # Get a list of jobs in the queue
message = None
if request.args: # Only run if a query string is sent in the request
url = request.args.get("url") # Gets the URL coming in as a query string
task = q.enqueue(count_words, url) # Send a job to the task queue
jobs = q.jobs # Get a list of jobs in the queue
q_len = len(q) # Get the queue length
message = f"Task queued at {task.enqueued_at.strftime('%a, %d %b %Y %H:%M:%S')}. {q_len} jobs queued"
return render_template("add_task.html", message=message, jobs=jobs)
We’re posting the form data as a GET request so the data will come in as a query string, using request.args.get("url")
to create a variable with the form value.
q.jobs
returns a list of any current jobs in the task queuetask = q.enqueue(count_words, url)
adds the job to the queue, withcount_words
as the function andurl
as the argumentlen(q)
returns the number of jobs in the queuemessage
is just a formatted string containing some information about the task and length of the queue
Running the app
Before we run the app itself, we need to start the RQ
worker.
The worker is a process that runs indipendently from our application and will communicate with the message broker.
When a new task is added to the queue, the worker will carry out the task!
To start the rq
worker, open a new terminal and from the same directory as run.py
, run the following:
rq worker
You should see something like:
20:55:23 RQ worker 'rq:worker:jnwt.4968' started, version 0.13.0
20:55:23 *** Listening on default...
20:55:23 Cleaning registries for queue: default
We’re now ready to run our app. Start it as you normally would any other Flask app:
export FLASK_APP=run.py
export FLASK_ENV = development
flask run
Head over to /add-task
and submit a URL, keeping an eye on the terminal window running the worker.
Depending on the URL, you should see something like:
20:19:44 default: app.tasks.count_words('https://pythonise.com/feed/flask/the-flask-request-object') (b29f3903-bd7d-4a10-a76f-f7586bb5bc37)
Counting words at https://pythonise.com/feed/flask/the-flask-request-object
This should be followed by a dictionary containing all occurances of each word and their values, finishing up with:
Total words: 455
Time elapsed: 0.8473403453826904 ms
Wrapping up
This was a gentle introduction to task queues using rq
and Redis, there’s lots of extra goodies that we haven’t covered!
Head over to the rq
docs to learn more.
Last modified · 14 Mar 2019
Source : .