#!/usr/bin/env python3
# save this in test_py_shebang.py
print("hello from Python!")
hello from Python!
Goals
A data pipeline is a process where data is taken from raw sources, cleaned & transformed and then stored for use in a more convenient format for the intended application.
You are part of a team that will be doing an analysis of your city’s budget over the past twenty years.
You file a request under your city’s open data law and are provided with twenty files. They have substantial differences across the years:
Year | Changes |
---|---|
2002 | XLS File (old MS Excel format). Three columns: amount , purpose , and department . department restricted to 4 upper case characters: (EDUC, SFTY, PARK, ROAD, etc.) |
2009 | Format switches to XLSX (modern Excel format) |
2012 | Most department names changed to follow new standard. |
2014 | subdepartment column added. |
2016 | amount field removed and replaced with amount_proposed , amount_actual . Prior years just provided actual amounts. |
2021 | Now a sqlite3 file, the shift to releasing the raw data splits the data across 3 tables with additional metadata. |
One way to look at this problem might be to write the functions:
ingest_2002_2008
ingest_2009_2011
ingest_2012_2013
ingest_2014_2015
ingest_2016_2020
ingest_2021_
But as you do, you realize that these functions have more in common than not.
For instance, no matter which file format you load the data from, you are cleaning the purpose data with the same regular expression. There are 2-3 variations of the department logic, and perhaps you begin to break this down into smaller helper functions:
load_xls
load_xlsx
load_from_sql
clean_purpose
get_department_pre_2012
get_department_post_2012
get_department_with_subdepartment
Now your ingest_*
functions begin to look like a series of calls to helper functions:
def ingest_2009_2011(filename, year):
# use appropriate loader
= load_xlsx(filename)
data
# process all records identically
for row in data:
"purpose"] = clean_purpose(row["purpose"])
row["department"] = get_department_pre_2012(row["department"])
row[
# perhaps some custom fix for a particular year
if year == 2011:
# glitch in OCR data where b00 means 900
"amount"] = row["amount"].replace("b00", "600")
row[
# write to database or file
save_cleaned_row(row)
These methods form data pipelines. They will transform the data from the varied formats into a common representation suitable for analysis.
While this is perfectly acceptable code for many cases, it still feels somewhat redundant.
We are going to take a look at some other ways of writing code, with a focus on composing small robust functions into data pipelines.
Small functions are more reusable. Consider str.capitalize_and_strip()
vs. str.strip().capitalize()
?
We can gain the functionality of the more complex function by composing two or more more general functions.
Also, these small functions that perform one task are far easier to test. This means a program composed of small functions will be less prone to bugs.
This philosophy of small pieces of code that do one thing well is a key part of what is known as the “Unix philosophy”. Unix is an old operating system, the design of which remains incredibly influential today. Modern MacOS and Linux are both largely “Unix-compatible”, more specifically a specification known as POSIX.
Windows does not have a Unix lineage the way that MacOS and Linux do. This is why lots of things are significantly different between Windows and the others.
Modern Windows systems have adopted similar concepts, but with different syntax and names in most cases.
A key feature of Unix is the pipe. Pipes take the output of a program and make it the input of another.
Command line programs operate on special “files” called standard input and standard output.
When you print
, standard output is where output goes by default. input
and other functions that wait for user input come from standard input. (There is also a standard error that allows separating error output from output intended for a user. That use will be out of scope for today.)
We can demonstrate this with an example. Let’s say you have a scraper that runs for a long time, and you have it print out 404 errors when it encounters a missing page.
$ python scrape.py
fetching page 0 ...
HTTP 404 while fetching page 0
fetching page 1 ...
HTTP 200 from page 1
fetching page 2 ...
HTTP 200 from page 2
fetching page 3 ...
HTTP 200 from page 3
fetching page 4 ...
HTTP 200 from page 4
...
fetching page 9995 ...
HTTP 200 from page 9995
fetching page 9996 ...
HTTP 200 from page 9996
fetching page 9997 ...
HTTP 200 from page 9997
fetching page 9998 ...
HTTP 200 from page 9998
fetching page 9999 ...
HTTP 200 from page 9999
If you only wanted to see the 404s, you might think you need to comment out the other print statements in the program.
Instead, we will pipe the output of this program through another program grep
. grep
takes input one line at a time and only prints lines that mach a pattern which can be a regular expression or simple string.
$ python scrape.py | grep "404"
HTTP 404 while fetching page 0
HTTP 404 while fetching page 111
HTTP 404 while fetching page 222
HTTP 404 while fetching page 333
fetching page 404 ...
HTTP 200 from page 404
HTTP 404 while fetching page 444
HTTP 404 while fetching page 555
HTTP 404 while fetching page 666
HTTP 404 while fetching page 777
...
HTTP 404 while fetching page 9213
HTTP 404 while fetching page 9324
fetching page 9404 ...
HTTP 200 from page 9404
HTTP 404 while fetching page 9435
HTTP 404 while fetching page 9546
HTTP 404 while fetching page 9657
HTTP 404 while fetching page 9768
HTTP 404 while fetching page 9879
HTTP 404 while fetching page 9990
We can chain together many commands. For example, to count the unique words in this file:
$ cat index.qmd | tr -s '[:space:]' '\n' | sort | uniq -c | sort -nr | head -n 20
20 to
18 a
15 and
13 the
13 in
11 you
11 will
11 this
9 we
9 is
7 your
7 with
6 programming
6 not
6 it
5 topic
5 that
5 on
5 of
5 course
To break this down one step at a time:
cat index.qmd
simply outputs the contents of the given file to the terminal.tr -s [:space:] '\n"
uses the tr
anslate command to replace spaces with newlines.sort
sorts a list of text when one word is given per line.uniq -c
counts unique words (assuming sorted order)head -n 20
truncates a file to the first 20 lines.The Unix philosophy states:
Which is what these programs do. Each expects a stream of text, and will output a stream of text. This allows them to be used together and recombined in many ways. We can learn from this for our own pipelines.
Source: Basics of the Unix Philosophy
The command line interface you’ve been using is itself a programming language.
You’re likely using bash
, zsh
, or powershell
, each of which can be used to write short scripts. We call these shell scripts.
At their simplest, a shell script can be a list of commands that you’d like to run repeatedly.
We could take the command from before and make it into a shell script, assume we write this to “count_v1.sh”:
# count_v1.sh
cat index.qmd | tr -s '[:space:]' '\n' | sort | uniq -c | sort -nr | head -n 20
This will save us from having to type the command every time. We can now run “sh count_v1.sh” and it will give us the same output as before.
We can make these scripts more complex. Perhaps we want to make it possible to run this on any file, not just “index.qmd”:
The special variable “$1” is the first parameter, so we’d update our script:
# count_v2.sh
cat "$1" | tr -s '[:space:]' '\n' | sort | uniq -c | sort -nr | head -n 20
Now running sh count_v2.sh index.qmd
will give us the same output as before, but the script can be used on other files.
We are using the program sh
to run our script, more commonly we make these scripts executable.
This takes two steps:
First, they must begin with what is known as a shebang line which tells the operating system what program to invoke to run the script.
This line begins with the shebang #!
followed by the path to the program, often /bin/sh
or /bin/bash
.
#!/bin/sh
# count_v3.sh
cat "$1" | tr -s '[:space:]' '\n' | sort | uniq -c | sort -nr | head -n 20
Second, we let the operating system know that the file is safe to execute:
chmod a+x count_v3.sh
Now we can run the script as it’s own program, without preceding it with sh
:
$ ./count_v3.sh filename.txt
We can use this on Python files too:
#!/usr/bin/env python3
# save this in test_py_shebang.py
print("hello from Python!")
hello from Python!
$ chmod a+x test_py_shebang.py
$ ./test_py_shebang.py
hello from Python!
Another approach to this problem is the Makefile
. These are used by a program called make
to run sets of dependencies.
.PHONY: setup clean scrape clean_data visualize
setup:
uv sync
clean:
echo "Cleaning environment and artifacts..."
rm -rf .venv artifacts/*
scrape: setup
uv run python -m myproj.scrape
clean_data: scrape
uv run python -m myproj.clean
uv run python -m myproj.merge
visualize: clean_data
uv run python -m myproj.visualize
If these commands were in a Makefile
, one could run make scrape
and the scrapers would run.
The sections have the format:
target_name: dependency_names
commands
make target
will first run the targets dependencies, and then the commands beneath it.
This is a very useful concept, but make
is an old program that has some quirks that many find frustrating. You may have noticed we also need to add a special .PHONY
target because make
expects target names to be files (it was originally used to compile programs where each step would result in an output file). This is one example of make
’s history making it less user-friendly.
just is a popular modern alternative to make
. It offers similar, but simplified syntax. No need for things like .PHONY
and much better error messages. One disadvantage is that just
will not be installed by default on many systems, whereas make
is almost always available.
While writing small composable programs is still a very useful technique, we are going to take some of these ideas with us back into Python itself.
Instead of small composable programs, we turn our attention back to our data pipeline, and can recognize that with a bit more work we could turn our code into small composable functions.
The first thing we’ll need to do is adopt a common interface. The “text stream” is UNIX’s but ours can be more suited to our program.
In our case, our program was primarily designed to operate upon individual expenditures, so let’s make a NamedTuple
to represent those:
from typing import NamedTuple
class Expenditure(NamedTuple):
int
year: float
amount: str
purpose: str
department: str = "" subdepartment:
We account for all of the fields that we plan to use.
We then revisit our methods from before, and make sure that they are all dealing with Expenditure
:
load_xls(Path) -> list[Expenditure]
load_xlsx(Path) -> list[Expenditure]
load_from_sql(Path) -> list[Expenditure]
The loading methods now all return the same type. In the years/cases where data was missing we provide a default so we can have a uniform type across all years.
Whereas previously the helper methods took dictionaries, floats, strings, etc. Now we will make them all take the full expenditure even if they only operate on one field:
clean_purpose(Expenditure) -> Expenditure
get_department_pre_2012(Expenditure) -> Expenditure
get_department_post_2012(Expenditure) -> Expenditure
get_department_with_subdepartment(Expenditure) -> Expenditure
By having their input and output types match, we can chain these together in any order. Our ingest function could be rewritten:
def fix_2011_ocr(e: Expenditure) -> Expenditure:
# we move custom logic into additional pipeline steps
if e.year == 2011:
# glitch in OCR data where b00 means 900
return Expenditure(
=e.year,
year=float(e.amount.replace("b00", "600"))
amount=e.purpose,
purpose=e.department,
department=e.subdepartment,
subdepartment
)
def ingest_2009_2011(filename, year):
= load_xlsx(filename)
expenditures for exp in expenditures:
# these could be written one line at a time, or composed together like this
= fix_2011_ocr(clean_purpose(get_department_pre_2012(exp)))
new_exp save_cleaned_row(new_exp)
This idea of composable functions is a key component of functional programming. Functional programming is an alternative paradigm like object-oriented programming. Some of the oldest programming languages, like LISP, favor this approach.
Some key tenants of functional programming include:
Our example demonstrates two of these points, let’s see how first-class functions might apply:
At this point, the difference between our various ingest_*
functions is just which loading function to use, then which