Introduction to Data Engineering — Part 3

In Part 2 we introduced an API that we can send user creation data to, and talked about running queries on the raw data that is created by that API. There are some problems with that as an approach, which are going to be tackled in this article. Specifically we’ll look at sending events rather than relying on the transactional system’s database, look at some of the problems we can run into by doing that and look at potential solutions. As ever, the code can be found here.

Previously it was discussed that we could copy our API’s database into a read only system for BI like queries, which is what some organisations have done. Here we’re going to skip any practical aspect of that, because it’s not enormously interesting and is something people are moving away from anyway. Instead we’re going to look at how to introduce events to our API, in the previous article we ended up with a class describing the data required to insert a user:

class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(100), unique=True)
country_code = Column(String(2))
created_at = Column(DateTime)
def __init__(self, username: str, country_code: str, created_at: datetime):
self.username = username
self.country_code = country_code
self.created_at = created_at

Which is lovely and clean, allowing us to create some nice examples. However, the real world is a bit more of a mess, so artificial cleanliness that we have as an example might not be seen. In the real world we might add columns that currently have no analytical value or split the user insert into multiple tables, among other things. In the world of read only replicas for BI, this would mean that the data isn’t prepared well for analytics. If we want control over these kinds of things it makes sense to work with the API owner to craft events of defined analytical value.

To demonstrate this, we can make some simple changes to our API code:

# Changes to user classclass User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(100), unique=True)
country_code = Column(String(2))
created_at = Column(DateTime)
is_a_dog = Column(Boolean)
api_version = Column(String(20))
def __init__(self, username: str, country_code: str, created_at: datetime, is_a_dog: bool, api_version: str = "0.0.1"):
self.username = username
self.country_code = country_code
self.created_at = created_at
self.is_a_dog = is_a_dog
self.api_version = api_version
# Changes to API route@app.route("/users", methods=["POST"])
def user():
data = request.json
created_at=datetime.strptime(data["created_at"], "%Y-%m-%d %H:%M:%S"),
return Response(status=400)
"username": data["username"],
"country_code": data["country_code"],
"created_at": datetime.strptime(data["created_at"], "%Y-%m-%d %H:%M:%S"),
return Response(status=200)

So we’ve added some columns we don’t care about to the class, namely and . And then we’ve changed the API route to reflect the change to the table and also print out the data that we want to collect. We can test this by sending data to our API, as we did previously, but what we actually care about is the data we sent to the logger, as this is our route to extracting defined events (although I should point out that logging real data via your logger is a bad idea, because it makes it very easy to save personal information to an insecure location). In the git repository for this article you’ll find a everything you need to get this API running using , once you’re up and running you can send some test data using:

curl --header "Content-Type: application/json" --request POST --data '{"username": "jtr", "country_code": "BE", "created_at": "2021-01-01 00:00:00"}' http://localhost:5000/users

Which comes up in the logs (I’m running without , if you’re running it with that flag then you can look at these logs with ):

We can also check that the database holds more data than we want:

docker exec -it db psql --username testselect * from users;
id | username | country_code | created_at | is_a_dog | api_version
1 | jtr | BE | 2021-01-01 00:00:00 | f | 0.0.1

So we’ve established that we can tell our API to send events that are different from the requirements of the API’s transaction. This opens up a route to getting data in a near real time manner, as apposed to the data lag that we would experience with a read only replica. However, in order to achieve this we need to take our events and make them queryable, which is the hard part. The rest of this article will look at capturing and storing the events, and the next article will look at how we could query the data we’ve just captured.

In our example we did the simplest (and worst) possible thing, and that was to use the API’s logging setup to capture our events. This was used as an example, but is clearly not going to cut it, so what’s the alternative? To start with we need to store these events, so let’s write them to a file:

# Replacing with a write to a file - second_example.pywith open("events.json", "a") as file:
"username": data["username"],
"country_code": data["country_code"],
"created_at": datetime.strptime(data["created_at"], "%Y-%m-%d %H:%M:%S"),

We can start this stack with and send more data at it. Once we’ve sent some data we can take a look in the container to see what the file contains:

# Send some fake datacurl --header "Content-Type: application/json" --request POST --data '{"username": "fake_user", "country_code": "BE", "created_at": "2021-01-01 00:00:00"}' http://localhost:5000/user# Ask the api if it has stored that datadocker exec api cat events.json
> {"username": "fake_user", "country_code": "BE", "created_at": "2021-01-01 00:00:00"}

If you do this with multiple calls to the api then you will create a multiline json file, which we could be ingested later.

There is a problem with this thought, and that’s its scalability. Opening and closing a file with every API call isn’t going to scale well performance wise, and we need to have some way to cycle through the files we are writing to. What we want to have is a system that stores a series of events in a file and then moves on to a new file, by doing this we will have a set of files that we can treat as immutable and are restricted in size. These are conditions that we will try to facilitate because they will benefit our downstream processes.

In order to create a system like that we will move away from writing these events to a local file and look to offload these events to a separate process. To achieve this we will be using Kafka, which is a data streaming system that offers queues that store data waiting to be read by another process. It follows a producer-consumer pattern, where producers (like our API) put data onto a queue and consumers (which we are yet to explore) read data off of the queue, which is exactly what we want to do. A production Kafka deployment will include a cluster of multiple nodes, giving us redundancy within the cluster and therefore offering some guarantees against data loss, but for a local deployment this won’t be explored.

To introduce Kafka we’ll use another stack and do some manual testing with it. You can bring this stack up using , by this point you might have noticed that we’re using a single python environment for all of the examples and we therefore have many packages installed that aren’t required for some of the examples but are for others, we’re ignoring that for ease of having examples.

If you’ve had a look at the compose file you may have noticed that the Kafka container is joined by a ZooKeeper container. Kafka is designed to run in a cluster which poses a set of problems around how the nodes interact with each other and how they manage being a cluster, things like sharing configuration and syncing data for redundancy have to be managed. Kafka achieves this (at least for the moment) by offloading that responsiblity to ZooKeeper, which is all we need to know for now. There are also a number of environment variables assigned to each of the containers which we won’t explore.

To send data to Kafka we need to set up a topic, this is just a channel that we can send data to and is a nice way to separate different types of data. Once the stack is up, you can create a topic manually like this:

docker exec kafka kafka-topics --create --topic test --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181

We’ll round off this part of the introduction by simply showing how to send data to a topic and then read it back, we’ll continue to look at Kafka in the next one.

Now that we have a topic, we can start to use it, so we’ll set up a consumer for our topic (leave this running in a Python shell):

from confluent_kafka import Consumerc = Consumer({'bootstrap.servers': 'localhost', '': 'group'})
while True:
msg = c.poll(1.0)
if msg is None:
if msg.error():

Now when we send data to that Kafka topic, this consumer will print it out (just like how we used the logging module when looking at the API). So let’s send some data to our topic (in a different Python shell to the consumer):

from confluent_kafka import Producerp = Producer({'bootstrap.servers': 'localhost'})
p.produce('test', 'First Message!'.encode('utf-8'))

And you should now see data printed out in our consumer shell session.

Conclusion and next steps

What we haven’t seen here is the real benefit of Kafka, we’ve just sent data to a service and then been able to read that data back, which is everything that our previous API already offered. However, we can see that we will be able to offload the writing of a logged event to Kafka, which is something we need to explore further.

In the next part we’ll look at some of the important ideas that make Kafka valuable as a technology, and use those to store events in a manner that is more in line with the expectations of an engineer in the big data space. If you want to do some reading to get ahead of the game, you should look here for use cases of Kafka and here to read about the parquet data format (which will be an introduction to column stores).

Data Engineer. Double barrelled surname without a hyphen, it’s a problem I didn’t choose.