16  Data Pipelines


Goals


A data pipeline is a process where data is taken from raw sources, cleaned & transformed and then stored for use in a more convenient format for the intended application.

Motivation

You are part of a team that will be doing an analysis of your city’s budget over the past twenty years.

You file a request under your city’s open data law and are provided with twenty files. They have substantial differences across the years:

Year Changes
2002 XLS File (old MS Excel format). Three columns: amount, purpose, and department. department restricted to 4 upper case characters: (EDUC, SFTY, PARK, ROAD, etc.)
2009 Format switches to XLSX (modern Excel format)
2012 Most department names changed to follow new standard.
2014 subdepartment column added.
2016 amount field removed and replaced with amount_proposed, amount_actual. Prior years just provided actual amounts.
2021 Now a sqlite3 file, the shift to releasing the raw data splits the data across 3 tables with additional metadata.

One way to look at this problem might be to write the functions:

  • ingest_2002_2008
  • ingest_2009_2011
  • ingest_2012_2013
  • ingest_2014_2015
  • ingest_2016_2020
  • ingest_2021_

But as you do, you realize that these functions have more in common than not.

For instance, no matter which file format you load the data from, you are cleaning the purpose data with the same regular expression. There are 2-3 variations of the department logic, and perhaps you begin to break this down into smaller helper functions:

  • load_xls
  • load_xlsx
  • load_from_sql
  • clean_purpose
  • get_department_pre_2012
  • get_department_post_2012
  • get_department_with_subdepartment
  • etc.

Now your ingest_* functions begin to look like a series of calls to helper functions:

def ingest_2009_2011(filename, year):
    # use appropriate loader
    data = load_xlsx(filename)

    # process all records identically
    for row in data:
        row["purpose"] = clean_purpose(row["purpose"])
        row["department"] = get_department_pre_2012(row["department"])

        # perhaps some custom fix for a particular year
        if year == 2011:
            # glitch in OCR data where b00 means 900
            row["amount"] = row["amount"].replace("b00", "600")

        # write to database or file
        save_cleaned_row(row)

These methods form data pipelines. They will transform the data from the varied formats into a common representation suitable for analysis.

While this is perfectly acceptable code for many cases, it still feels somewhat redundant.

We are going to take a look at some other ways of writing code, with a focus on composing small robust functions into data pipelines.

Why small functions?

Small functions are more reusable. Consider str.capitalize_and_strip() vs. str.strip().capitalize()?

We can gain the functionality of the more complex function by composing two or more more general functions.

Also, these small functions that perform one task are far easier to test. This means a program composed of small functions will be less prone to bugs.

Unix Pipelines

This philosophy of small pieces of code that do one thing well is a key part of what is known as the “Unix philosophy”. Unix is an old operating system, the design of which remains incredibly influential today. Modern MacOS and Linux are both largely “Unix-compatible”, more specifically a specification known as POSIX.

What about Windows?

Windows does not have a Unix lineage the way that MacOS and Linux do. This is why lots of things are significantly different between Windows and the others.

Modern Windows systems have adopted similar concepts, but with different syntax and names in most cases.

A key feature of Unix is the pipe. Pipes take the output of a program and make it the input of another.

Command line programs operate on special “files” called standard input and standard output.

When you print, standard output is where output goes by default. input and other functions that wait for user input come from standard input. (There is also a standard error that allows separating error output from output intended for a user. That use will be out of scope for today.)

We can demonstrate this with an example. Let’s say you have a scraper that runs for a long time, and you have it print out 404 errors when it encounters a missing page.

$ python scrape.py
fetching page 0 ...
HTTP 404 while fetching page  0
fetching page 1 ...
HTTP 200 from page 1
fetching page 2 ...
HTTP 200 from page 2
fetching page 3 ...
HTTP 200 from page 3
fetching page 4 ...
HTTP 200 from page 4
...
fetching page 9995 ...
HTTP 200 from page 9995
fetching page 9996 ...
HTTP 200 from page 9996
fetching page 9997 ...
HTTP 200 from page 9997
fetching page 9998 ...
HTTP 200 from page 9998
fetching page 9999 ...
HTTP 200 from page 9999

If you only wanted to see the 404s, you might think you need to comment out the other print statements in the program.

Instead, we will pipe the output of this program through another program grep. grep takes input one line at a time and only prints lines that mach a pattern which can be a regular expression or simple string.

$ python scrape.py | grep "404"
HTTP 404 while fetching page  0
HTTP 404 while fetching page  111
HTTP 404 while fetching page  222
HTTP 404 while fetching page  333
fetching page 404 ...
HTTP 200 from page 404
HTTP 404 while fetching page  444
HTTP 404 while fetching page  555
HTTP 404 while fetching page  666
HTTP 404 while fetching page  777
...
HTTP 404 while fetching page  9213
HTTP 404 while fetching page  9324
fetching page 9404 ...
HTTP 200 from page 9404
HTTP 404 while fetching page  9435
HTTP 404 while fetching page  9546
HTTP 404 while fetching page  9657
HTTP 404 while fetching page  9768
HTTP 404 while fetching page  9879
HTTP 404 while fetching page  9990

We can chain together many commands. For example, to count the unique words in this file:

$ cat index.qmd | tr -s '[:space:]' '\n' | sort | uniq -c | sort -nr | head -n 20
  20 to
  18 a
  15 and
  13 the
  13 in
  11 you
  11 will
  11 this
   9 we
   9 is
   7 your
   7 with
   6 programming
   6 not
   6 it
   5 topic
   5 that
   5 on
   5 of
   5 course

To break this down one step at a time:

  • cat index.qmd simply outputs the contents of the given file to the terminal.
  • tr -s [:space:] '\n" uses the translate command to replace spaces with newlines.
  • sort sorts a list of text when one word is given per line.
  • uniq -c counts unique words (assuming sorted order)
  • head -n 20 truncates a file to the first 20 lines.

The Unix philosophy states:

  • Write programs that do one thing and do it well.
  • Write programs to work together.
  • Write programs to handle text streams, because that is a universal interface.

Which is what these programs do. Each expects a stream of text, and will output a stream of text. This allows them to be used together and recombined in many ways. We can learn from this for our own pipelines.

Source: Basics of the Unix Philosophy

Shell Scripts

The command line interface you’ve been using is itself a programming language.

You’re likely using bash, zsh, or powershell, each of which can be used to write short scripts. We call these shell scripts.

At their simplest, a shell script can be a list of commands that you’d like to run repeatedly.

We could take the command from before and make it into a shell script, assume we write this to “count_v1.sh”:

# count_v1.sh
cat index.qmd | tr -s '[:space:]' '\n' | sort | uniq -c | sort -nr | head -n 20

This will save us from having to type the command every time. We can now run “sh count_v1.sh” and it will give us the same output as before.

We can make these scripts more complex. Perhaps we want to make it possible to run this on any file, not just “index.qmd”:

The special variable “$1” is the first parameter, so we’d update our script:

# count_v2.sh
cat "$1" | tr -s '[:space:]' '\n' | sort | uniq -c | sort -nr | head -n 20

Now running sh count_v2.sh index.qmd will give us the same output as before, but the script can be used on other files.

We are using the program sh to run our script, more commonly we make these scripts executable.

This takes two steps:

First, they must begin with what is known as a shebang line which tells the operating system what program to invoke to run the script.

This line begins with the shebang #! followed by the path to the program, often /bin/sh or /bin/bash.

#!/bin/sh
# count_v3.sh
cat "$1" | tr -s '[:space:]' '\n' | sort | uniq -c | sort -nr | head -n 20

Second, we let the operating system know that the file is safe to execute:

chmod a+x count_v3.sh

Now we can run the script as it’s own program, without preceding it with sh:

$ ./count_v3.sh filename.txt

We can use this on Python files too:

#!/usr/bin/env python3

# save this in test_py_shebang.py

print("hello from Python!")
hello from Python!
$ chmod a+x test_py_shebang.py
$ ./test_py_shebang.py
hello from Python!

Makefiles/Justfiles

Another approach to this problem is the Makefile. These are used by a program called make to run sets of dependencies.

.PHONY: setup clean scrape clean_data visualize

setup:
    uv sync

clean:
    echo "Cleaning environment and artifacts..."
    rm -rf .venv artifacts/*

scrape: setup
    uv run python -m myproj.scrape

clean_data: scrape
    uv run python -m myproj.clean
    uv run python -m myproj.merge

visualize: clean_data
    uv run python -m myproj.visualize

If these commands were in a Makefile, one could run make scrape and the scrapers would run.

The sections have the format:

target_name: dependency_names
  commands

make target will first run the targets dependencies, and then the commands beneath it.

This is a very useful concept, but make is an old program that has some quirks that many find frustrating. You may have noticed we also need to add a special .PHONY target because make expects target names to be files (it was originally used to compile programs where each step would result in an output file). This is one example of make’s history making it less user-friendly.

just is a popular modern alternative to make. It offers similar, but simplified syntax. No need for things like .PHONY and much better error messages. One disadvantage is that just will not be installed by default on many systems, whereas make is almost always available.

Back to Python

While writing small composable programs is still a very useful technique, we are going to take some of these ideas with us back into Python itself.

Instead of small composable programs, we turn our attention back to our data pipeline, and can recognize that with a bit more work we could turn our code into small composable functions.

The first thing we’ll need to do is adopt a common interface. The “text stream” is UNIX’s but ours can be more suited to our program.

In our case, our program was primarily designed to operate upon individual expenditures, so let’s make a NamedTuple to represent those:

from typing import NamedTuple

class Expenditure(NamedTuple):
  year: int
  amount: float
  purpose: str
  department: str
  subdepartment: str = ""

We account for all of the fields that we plan to use.

We then revisit our methods from before, and make sure that they are all dealing with Expenditure:

  • load_xls(Path) -> list[Expenditure]
  • load_xlsx(Path) -> list[Expenditure]
  • load_from_sql(Path) -> list[Expenditure]

The loading methods now all return the same type. In the years/cases where data was missing we provide a default so we can have a uniform type across all years.

Whereas previously the helper methods took dictionaries, floats, strings, etc. Now we will make them all take the full expenditure even if they only operate on one field:

  • clean_purpose(Expenditure) -> Expenditure
  • get_department_pre_2012(Expenditure) -> Expenditure
  • get_department_post_2012(Expenditure) -> Expenditure
  • get_department_with_subdepartment(Expenditure) -> Expenditure

By having their input and output types match, we can chain these together in any order. Our ingest function could be rewritten:

def fix_2011_ocr(e: Expenditure) -> Expenditure:
    # we move custom logic into additional pipeline steps
    if e.year == 2011:
        # glitch in OCR data where b00 means 900
        return Expenditure(
            year=e.year,
            amount=float(e.amount.replace("b00", "600"))
            purpose=e.purpose,
            department=e.department,
            subdepartment=e.subdepartment,
        )

def ingest_2009_2011(filename, year):
    expenditures = load_xlsx(filename)
    for exp in expenditures:
        # these could be written one line at a time, or composed together like this
        new_exp = fix_2011_ocr(clean_purpose(get_department_pre_2012(exp)))
        save_cleaned_row(new_exp)

Functional Programming

This idea of composable functions is a key component of functional programming. Functional programming is an alternative paradigm like object-oriented programming. Some of the oldest programming languages, like LISP, favor this approach.

Some key tenants of functional programming include:

  1. Immutability - Data should not be modified in place, instead functions should return modified copies. This avoids ambiguity & bugs caused by in-place modification.
  2. Pure functions - Functions should be simple, and given the same inputs return the same outputs with no side-effects on the rest of the program. This aids in testability & composability.
  3. First-class functions - Functions themselves can be used as variables, including arguments to other functions.

Our example demonstrates two of these points, let’s see how first-class functions might apply:

At this point, the difference between our various ingest_* functions is just which loading function to use, then which