Introduction to Data Engineering — Part 4

Joseph Thickpenny Ryan
7 min readJul 11, 2021

In part 3 we introduced Kafka, but didn’t explore it very deeply. In this part we will be looking at how we can utilise Kafka for event collection, with specific interest on the size of our event log outputs. We will also talk about how to land data off the queue, and what consideration you might have when doing so. Code for this series can be found here.

Batching Data

For this part of the series you’ll be happy to hear that we’ll be working with a single docker-compose file, so you won’t have to jump between stacks! You’ll also notice that this is the same as the final stack from part 3, which is a very simple single node Kafka setup.

To start an exploration of Kafka we need to think about what might make a good data streaming solution. In the previous part we sent and received single events to and from our Kafka topic, but this means that for every event we will making a request to our Kafka cluster. If we were able to request more than one event per trip then we could may fewer requests and therefore spend less time asking for data.

In the previous part we used the poll() method to gather a single event, but there is another method available where we can specify both a maximum number of messages to gather and a wait timeout to reach that number of messages, which is consume() (found in the documentation here). Let’s start our compose stack and see what we can do with our new batching information: docker-compose up -d.

We’re best off using two terminals here, one for producer work and one for consumer work. In our producer we’ll be doing the Kafka management and data producing work, in the consumer we’ll be consuming the data and doing some basic processing on it. We can create a topic using Python:

from confluent_kafka.admin import AdminClient, NewTopic# Create an admin client and use that to create a topic
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
admin_client.create_topics([NewTopic("batch", 1, 1)])
# > {'batch': <Future at 0x7f82fc750fd0 state=running>}
# Check what topics are available
admin_client.list_topics().topics
# > {'__confluent.support.metrics': TopicMetadata(__confluent.support.metrics, 1 partitions), 'batch': TopicMetadata(batch, 1 partitions)}

We may as well set up our producer while we’re here:

from confluent_kafka import Producerproducer = Producer({'bootstrap.servers': 'localhost:9092'})

We know that we can produce events quite quickly, but how we control and land that data is the part that we’re really interested in, so let’s set up a consumer and look at how we’re going to do that:

from confluent_kafka import Consumerconsumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'test'})
consumer.subscribe(["batch"])
while True:
# set maximum number of messages to be 5 and a wait timeout of 10s
messages = consumer.consume(num_messages=5, timeout=10)
print(f"Total number of messages: {len(messages)}")
for message in messages:
print(message.value().decode("utf-8"))

Back over with our producer we can now start to send messages at our queue and observe the results, you should see that before we’ve sent any messages you will just get a printout every 10 seconds saying that 0 messages have been gathered. To start with, let’s send a single message:

producer.produce("batch", "first".encode("utf-8"))

And you should now see that reflected in your consumer terminal, which will produce a print of the total messages received and then the message itself before returning to printing 0 messages. Let’s send more messages:

for x in range(0, 5):
producer.produce("batch", str(x).encode("utf-8"))

Depends on how you catch your consumer (although it’s quite unlikely because we didn’t add a sleep), you might not have got all 5 messages in a single consume, but you’ll still have messages in your consumer terminal that contains more that a single message being gathered.

We can make this a little more interesting by sending more messages and adding a delay between them:

import time
from random import randint
for x in range(0, 20):
producer.produce("batch", str(x).encode("utf-8"))
time.sleep(randint(1, 5))

Our consumer has a wait timeout of 10 seconds, and we have added a sleep that uses the random package to add a bit of randomness, meaning that we’ll get batches of uneven numbers. In my cases, the batches were 3, 3, 3, 5, 2, 2 and 2. This is what happens when we are hitting the wait timeout configured in our consumer, but what happens when we hit the message limit instead? To test that we need to send more messages and remove the sleep:

for x in range(0, 100):
producer.produce("batch", str(x).encode("utf-8"))

What you will see is that the consumer will have printed 20 lots of 5 messages, because that’s the maximum number of messages allowed per batch. So, what have we shown here?

  1. We can read off our queue in defined maximum batch numbers
  2. We can force reads off our queue given a maximum wait timeout

This gives us a pair of guarantees, the first point means that we can land data in chunks of a rough maximum size which is very useful for downstream data ingestion processes. The second point means that we can define a maximum amount of time that data could be on the queue, which is to say that we can define a maximum delay in collecting data from the queue even if there isn’t lots of data being sent to us.

Landing Data

Up to now we actually haven’t done anything with the data, so we should probably start writing it out somewhere. While we’re at it, we may as well make our messages a little bit more interesting than a looped number. We’re going to send json blobs, so we need to change our consumer to read the new data:

import jsonwhile True:
messages = consumer.consume(num_messages=5, timeout=10)
print(f"Total number of messages: {len(messages)}")
for message in messages:
print(json.loads(message.value()))

And this is what our new and interesting data looks like:

"""
dataclasses are fairly new to python, they give us lots of nice things for free but I won't be
exploring them here
"""
from dataclasses import dataclass, asdict
import json
from datetime import datetime
import uuid
@dataclass
class ChangeBalance:
user_id: str
area_id: str
event_timestamp: str
balance: int
producer.produce(
"batch",
json.dumps(
asdict(
ChangeBalance(
user_id=str(uuid.uuid4()),
area_id=str(uuid.uuid4()),
# timezones are important, let's keep everything utc
event_timestamp=datetime.utcnow().iso_format(),
balance=-10,
)
)
),
)

And again, you’ll see that we are logging out the data that got sent to our consumer, but this time we sent json in an interesting format. (A nice point, but something we’re not going to explore, is that at the producer level we could do some data validation. If we provide an API to people who wish to send us data, do the validation there before forwarding that data on to Kafka then we can offer an interface whereby people will get immediate feedback if the data they have send is wrong in some manner.) Next, we can change our consumer to save the data into a file for us:

while True:
messages = consumer.consume(num_messages=5, timeout=10)
print(f"Total number of messages: {len(messages)}")
# Only write a file if there's data to write
if len(messages) > 0:
# Use "a" here for append, if we use "w" then we risk overriding a file
with open(f"batch_{datetime.utcnow().isoformat()}.json", "a") as file:
for message in messages:
json.dump(json.loads(message.value()), file)
# Remember to end with a newline!
file.write("\n")

Now we can throw some data at it and see what we get out:

def make_change_balance(
user_id: str = str(uuid.uuid4()),
area_id: str = str(uuid.uuid4()),
timestamp: str = datetime.utcnow().isoformat(),
balance: int = 10,
):
return ChangeBalance(
user_id=user_id,
area_id=area_id,
# timezones are important, let's keep everything utc
event_timestamp=timestamp,
balance=balance,
)
# Send a single item, this should write 1 record to its own file
producer.produce("batch", json.dumps(asdict(make_change_balance(user_id="different"))))
# Sleep just to make sure that we are only going to write to a single file
# (remember that the timeout on the consumer is set to 10 seconds)
time.sleep(11)
# Send lots of data, this should write 20 files because we consume in batches of 5
for x in range(0, 100):
producer.produce("batch", json.dumps(asdict(make_change_balance())))

We’ve done two things there, sent a single event and waited for the timeout to run out (which causes a single record to be written to a single file) and then sent multiple events and written multiple files that have the maximum number of records that the consume reads per batch.

There are many things that are missing from this if you wanted to do this in a production setting, especially error handling, dealing with failures in your infrastructure and offset management. All of these are their own topics, you should seek out additional resources for these topics. The topic that I want to look at here is downstream consumers of the data we have just landed, and how they can dictate your deployment decisions.

In this example we’ve deliberately chosen a consumer batch size of 5 events, and a timeout of 10 seconds. These settings allow us to quickly explore how the system works, but they’re probably not very good in production, but why is that? Well, it’s what has already been mentioned: the consumers of our landed data. The data we land off the Kafka queue has to undergo further work before it can be queried by Data Scientists (DS) or Data Analysts (DA) via a SQL interface, this means that we need to make good decisions landing our data so that we don’t cause problems later down the stream.

This is all well and good, but what do we actually have to care about? Most of the time it’s going to boil down to the on disk size of your data. If DS and DA are going to query the data it will be landing in some kind of data warehousing solution. There are several to choose, but with the rise of the “cloud data warehouse” and the accessibility of documentation associated with these I’ll provide examples of file size demands from AWS’s Redshift and Snowflake. These both offer services where the size of the file is important in leveraging them efficiently. In the open source world a nice example of file size importance is when you look at Apache Hive and best practices for querying data on HDFS using it, where better query performance can come from making your file size close to the HDFS block size.

Conclusion And The Final Part

We can customise our Kafka consumer settings to ensure clever batching of received events, these settings were arbitrary in our example but in practice will be defined by the processes that use the data downstream.

In the final part we’ll produce some dummy data again and look at how we can make it queryable using some open source technologies.

--

--

Joseph Thickpenny Ryan

Data Engineer. Double barrelled surname without a hyphen, it’s a problem I didn’t choose.