Introduction to Data Engineering — Part 2

Joseph Thickpenny Ryan
9 min readJun 4, 2021

A technical starting points for Data Engineering. Here we’ll look at an example of how a traditional BI workflow might look, and then discuss how Data Engineering handles the same problem when the scale and immediacy becomes an issue for the traditional approach. The example will have us looking at an API written in Python and backed by Postgres. We’ll use this to examine how to get data out of your system in a batch manner, and discuss this as an approach. Code is available here.

Making Our API

The first thing we need is an API, and we want it to do some actions that have interesting things to measure. To do that we’ll use Flask, which is a simple web framework. This is a small example that will let us add a user to a database, it lacks basically all the things you would expect in production (like error handling, a way to migrate the database or a wsgi) but we don’t care about those things, so this is our starting point. It closely follows the pattern suggested by the Flask project here.

# first_example.pyfrom sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from flask import Flask, request, Response
Base = declarative_base()
engine = create_engine("sqlite:///test.db")
db_session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine)
)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(100), unique=True)
def __init__(self, username: str):
self.username = username
Base.metadata.create_all(bind=engine)app = Flask(__name__)@app.route("/users", methods=["POST"])
def user():
data = request.json
db_session.add(User(username=data["username"]))
db_session.commit()
return Response(status=200)
app.run()

If we want to create a user then we can post to that endpoint like so:

# curl your way to victorycurl --header "Content-Type: application/json" --request POST --data '{"username": "jtr"}' http://localhost:5000/users

This example is provided as a minimalist representation of a backend API and serves as a reminder of how data is created in the first place. If you’re completely new to software development then this will probably be new to you, so I’d suggest going through the Flask Mega-Tutorial to get to grips with some of the ideas here. I have also written about some of the difficulties in getting into Data Engineering as someone who is completely new to the tech industry, which might provide some extra insight.

As you may have noticed, this article promises Postgres and has currently delivered SQLite. This has been done to introduce the API in the first place, give some people a stopping off point if they need to do further reading and to avoid immediately launching into docker-compose.

Having a SQLite database is fine for testing, but it’s not what you’re going to see in the real world, and while the code written for this set of articles doesn’t aim to be production ready, we are still trying to look at real world examples. Therefore it makes sense to have the API be backed by Postgres, and for this we’ll start using containers so that we have an actual running service that we can query later.

To make our queries interesting later on we need to make a change to our expected data, because just having a table of usernames isn’t very interesting. The amended table class looks like this (full code for the new imports here):

# A more interesting userclass User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(100), unique=True)
country_code = Column(String(2))
created_at = Column(DateTime)
def __init__(
self,
username: str,
country_code: str,
created_at: datetime
):
self.username = username
self.country_code = country_code
self.created_at = created_at

Now we can run a few interesting queries about user creation demographics over time.

Containers come into play when we want to isolate our services, in this case we can use them to utilise a specific Postgres version without installing that version onto our base machine. As part of the definition of our Postgres service, we can provide environment variables to create a user and a database, we’ll need these for our SQLAlchemy connection string.

# Snippet from the docker-compose.yml fileservices:
db:
container_name: db
image: postgres:11-alpine
environment:
- POSTGRES_USER=test
- POSTGRES_PASSWORD=test
- POSTGRES_DB=test

Setting all those environment variables to test is obviously not something you should be doing in production, but it works for us here. The change in connection string is in the python file:

# Connection string change in our flask appengine = create_engine("postgresql+psycopg2://test:test@db/test")

To gather the new data from our API requests, we’ll need to amend the handler to take the new data as defined in the User class:

# Updated handler - see second_example.py for the full code@app.route("/users", methods=["POST"])
def user():
data = request.json
db_session.add(
User(
username=data["username"],
country_code=data["country_code"],
created_at=datetime.strptime(data["created_at"], "%Y-%m-%d %H:%M:%S"),
)
)
db_session.commit() return Response(status=200)

Note here that we’re allowing the sender to dictate both country_code and created_at, which we likely wouldn’t want to do in the real world, but here we are going to exploit that to create some data that is more interesting to query.

The last piece of this particular puzzle is to introduce containers in order to isolate our services, if you aren’t familiar with containers then now is a good time to check out Docker. The docker-compose.yml file that we’ll use defines both the api and the db, where the api requires building and the db doesn’t:

version: "3.8"services:
db:
container_name: db
image: postgres:11-alpine
environment:
- POSTGRES_USER=test
- POSTGRES_PASSWORD=test
- POSTGRES_DB=test
api:
container_name: api
build: .
ports:
- "5000:5000"
depends_on:
- db

When we define a build in the docker-compose.yml file we point at a build context, in this case we are using ., so at build time it will look at the current directory for a Dockerfile, which will itself be a definition of the environment required to run our api. We’re also using Poetry, so this will be handled in Dockerfile but may be new to some Python users.

FROM python:3.7ENV POETRY_VERSION=1.1.6RUN python -m pip install "poetry==$POETRY_VERSION"WORKDIR /app
COPY poetry.lock pyproject.toml /app/
RUN poetry config virtualenvs.create false \
&& poetry install --no-dev --no-interaction --no-ansi
COPY second_example.py /app/ENTRYPOINT ["python", "second_example.py"]

Now we can put up our API, send some data to it and query that data.

Using Our API

Working with docker-compose is relatively straight forward, for our work all we need to do is run docker-compose up and it will take the definition in docker-compose.yml and bring it into existence. You’ll see all the stdout of your containers all in a single terminal, which can be a little overwhelming but is completely fine. In the future we can use the -d flag to stop the output, but this requires us to request logs from docker using docker logs -f <container_name>.

Now that the stack is up you can post user data to the endpoint, it’s worth remembering that we haven’t added any error handling or anything else that would help in the case that the input data strays from the happy path, so for now we should play nice and send the expected payload to our API:

curl --header "Content-Type: application/json" --request POST --data '{"username": "jtr", "country_code": "BE", "created_at": "2021-01-01 00:00:00"}' http://localhost:5000/users

I’m not from Belgium, but my API can’t prove that, so as far as it’s concerned I’m now Belgian (we’ll keep the article in English to avoid embarrassment though). We can check this information in the database:

docker exec -it db psql --username testselect * from users; id | username | country_code |     created_at      
----+----------+--------------+---------------------
1 | jtr | BE | 2021-01-01 00:00:00

So now we have a mock user creation API that we can send expected payloads at to create some data we can query later, and by havingcreated_at and country_code columns we can do some vaguely interesting queries. However, we can’t do interesting queries unless we have significantly more records. Generating more records is easy enough, but we probably don’t want to be hand crafting 100s of curl commands. We can send requests using the Python standard library, and we can craft some interesting data while we do it:

# Using uuid for unique usernames, and adding some randomness to make queries interestingimport urllib.request
import uuid
import json
import random
endpoint = "http://localhost:5000/users"for id in [uuid.uuid4() for _ in range(0, 1000)]:
req = urllib.request.Request(endpoint)
req.add_header("Content-Type", "application/json")
data = json.dumps({"username": str(id), "country_code": random.choices(["BE", "DE", "GB"])[0], "created_at": f"2020-09-{random.choices(['01', '12', '19'])[0]} {random.choices(['11', '13', '20'])[0]}:01:00"})
resp = urllib.request.urlopen(req, data.encode("utf-8"))
if resp.status != 200:
print(resp.reason)
break

Now we have some fake data collected and in a state that we can query, which means we can run a few queries.

# Count the total number of usersselect count(*) from users; count 
-------
1000
# Count the total number of users per countryselect country_code, count(*) as total from users group by country_code; country_code | total
--------------+-------
BE | 295
GB | 345
DE | 360
# Count the total number of users created per given dateselect date(created_at), count(*) as total from users group by date(created_at); date | total
------------+-------
2020-09-12 | 319
2020-09-01 | 325
2020-09-19 | 356
# Count the total number of users created per given date, with missing dates filled inwith dates as (
select generate_series(
(select min(created_at) from users),
(select max(created_at) from users),
interval '1 day'
)::date as date
), counts as (
select date(created_at) as date, count(*) as total
from users
group by date(created_at)
) select d.date, coalesce(c.total, 0)
from dates d
left join counts c on d.date = c.date;
date | coalesce
------------+----------
2020-09-01 | 325
2020-09-02 | 0
2020-09-03 | 0
2020-09-04 | 0
2020-09-05 | 0
2020-09-06 | 0
2020-09-07 | 0
2020-09-08 | 0
2020-09-09 | 0
2020-09-10 | 0
2020-09-11 | 0
2020-09-12 | 319
2020-09-13 | 0
2020-09-14 | 0
2020-09-15 | 0
2020-09-16 | 0
2020-09-17 | 0
2020-09-18 | 0
2020-09-19 | 356

With these queries we can start to see that we could run analytical queries against our database. However, there are a few problems with this that we aren’t going to be replication with our local docker setup. Primarily, we are running queries against a database that is backing our API, this adds pressure to our live system and will eventually cause performance issues. Given that the API is supposed to be our live system, ie our gateway to revenue generation, we want to avoid performance degradation. Historically this has led to companies investing in read only replicas of their live platforms, so that analytical queries can be run without impacting a live product. This still puts the data in an OLTP system, where we would likely want to use an OLAP system (or some other data warehousing solution) for better query performance.

With any data that we produce there will be an amount of time before it is available to query. For a system as described above, where we have a read only replica, it depends entirely on how often the replication is configured to occur. A common pattern is to do the replication overnight, when the system is likely under less pressure and nobody is at work waiting to run queries. In this case there is a 24 hour waiting period in order to get answers, which might not be acceptable.

Problems And Solutions

So we have 2 problems, the first is our potential data latency of 24 hours and the second is the choice of technology not being ideal for large analytical queries.

To address the data latency, and also potentially ignore unwanted columns in the data, we can move to an event driven system that utilises a queueing technology like Kafka. Here we can craft custom events that are sent at commit time, these will be sent to the queue and picked up by a separate process. The events can be processed and loaded into a different system, this removes the query burden from the live system and gives us the opportunity to control (as best we can) our own data latency.

We want to use a system that’s more tailored to big data, utilising column oriented data formats that lend themselves to large datasets. These work better for the types of analytical queries that we want to do than the OLTP system that works for our API.

Conclusions And The Future

We’ve covered a little bit of history and an example of data generation, to show how data could come into existence and how utilising that data can be difficult due to performance and data latency.

The next steps are to look at using a queueing system and events to remove the pressure on the live system and allow us to craft the data we want, avoiding any extra data columns that aren’t useful to us.

--

--

Joseph Thickpenny Ryan

Data Engineer. Double barrelled surname without a hyphen, it’s a problem I didn’t choose.