If you've ever found yourself manually downloading CSV files, copying data between spreadsheets, reformatting columns, and then pasting results into a report — you already have a data pipeline. It's just a painfully manual one.
A data pipeline is any process that moves data from point A to point B, transforming it along the way. The moment you automate that process — pulling data on a schedule, cleaning it without human intervention, and loading it where it needs to go — you free up hours of your week and eliminate an entire category of human error.
Python is the best tool for the job. It's free, readable, and has an enormous ecosystem of libraries built specifically for data work. In this guide, I'll walk you through building a complete automated data pipeline from scratch — even if you've never written one before.
What You'll Need
Before we dive in, make sure you have Python 3.8+ installed. Then install the libraries we'll use throughout this tutorial:
pip install pandas requests schedule openpyxl
Here's what each one does:
pandas— the Swiss army knife for data manipulation. Reading files, filtering rows, merging datasets, and writing output.requests— makes HTTP requests to pull data from APIs.schedule— a lightweight Python library for scheduling tasks to run at regular intervals.openpyxl— lets pandas read and write Excel files (.xlsx).
We'll also use sqlite3, logging, and json — all of which come built into Python, so no extra installation needed.
1 Extract — Pulling Data From Sources
Every pipeline starts with extraction: getting data from wherever it lives. The two most common sources I deal with in client projects are APIs and flat files (CSV, Excel, JSON). Let's handle both.
Pulling Data From an API
Most modern services expose a REST API you can query for data. Here's how to pull JSON data from an API and convert it into a pandas DataFrame:
import requests
import pandas as pd
def extract_from_api(url, headers=None, params=None):
"""Pull data from a REST API and return a DataFrame."""
response = requests.get(url, headers=headers, params=params)
response.raise_for_status() # Raise an error for bad status codes
data = response.json()
# Many APIs wrap results in a key like "results" or "data"
if isinstance(data, dict) and "results" in data:
data = data["results"]
return pd.DataFrame(data)
# Example: fetch user data from a public API
df_users = extract_from_api("https://jsonplaceholder.typicode.com/users")
print(df_users.head())
The raise_for_status() call is important — it throws an exception if the API returns a 4xx or 5xx error, so your pipeline fails loudly instead of silently processing garbage data.
Reading From CSV and Excel Files
For local files, pandas makes extraction trivial:
import pandas as pd
# CSV file
df_csv = pd.read_csv("data/sales_report.csv")
# Excel file (requires openpyxl)
df_excel = pd.read_excel("data/inventory.xlsx", sheet_name="Sheet1")
# JSON file
df_json = pd.read_json("data/products.json")
# You can even read directly from a URL
df_remote = pd.read_csv("https://example.com/data/export.csv")
Pro tip: When reading CSV files from clients, always specify the encoding parameter if you're seeing garbled characters. Try
encoding="utf-8"first, thenencoding="latin-1"as a fallback.
2 Transform — Cleaning and Shaping the Data
Raw data is almost never ready to use. Column names are inconsistent, there are missing values, date formats are all over the place, and you need to join data from multiple sources. This is where pandas really shines.
Common Cleaning Operations
import pandas as pd
def transform_data(df):
"""Clean and transform raw data."""
# Standardize column names: lowercase, replace spaces with underscores
df.columns = df.columns.str.lower().str.replace(" ", "_")
# Remove duplicate rows
df = df.drop_duplicates()
# Drop rows where critical columns are missing
df = df.dropna(subset=["email", "order_total"])
# Fill non-critical missing values with defaults
df["notes"] = df["notes"].fillna("N/A")
# Convert data types
df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
df["order_total"] = pd.to_numeric(df["order_total"], errors="coerce")
# Strip whitespace from string columns
string_cols = df.select_dtypes(include="object").columns
df[string_cols] = df[string_cols].apply(lambda col: col.str.strip())
# Filter: only keep orders from the last 90 days
cutoff = pd.Timestamp.now() - pd.Timedelta(days=90)
df = df[df["order_date"] >= cutoff]
return df
Merging Data From Multiple Sources
Often, useful insights come from combining datasets. For example, enriching order data with customer information:
# Merge orders with customer details
df_orders = pd.read_csv("data/orders.csv")
df_customers = pd.read_csv("data/customers.csv")
df_enriched = pd.merge(
df_orders,
df_customers,
on="customer_id", # The shared key
how="left" # Keep all orders, even if no customer match
)
# Add calculated columns
df_enriched["revenue_per_unit"] = (
df_enriched["order_total"] / df_enriched["quantity"]
)
# Group and aggregate
summary = df_enriched.groupby("region").agg(
total_revenue=("order_total", "sum"),
avg_order=("order_total", "mean"),
order_count=("order_id", "count")
).reset_index()
print(summary)
This is the kind of transformation that takes 30 minutes of manual spreadsheet work and replaces it with three seconds of code execution.
3 Load — Writing Data to Its Destination
Once your data is clean and shaped, you need to send it somewhere useful. Here are the most common destinations I build into client pipelines.
Writing to CSV and Excel
# Write to CSV
df.to_csv("output/cleaned_data.csv", index=False)
# Write to Excel with formatting
with pd.ExcelWriter("output/report.xlsx", engine="openpyxl") as writer:
summary.to_excel(writer, sheet_name="Summary", index=False)
df_enriched.to_excel(writer, sheet_name="Detail", index=False)
Loading Into a SQLite Database
For anything beyond a simple export, I recommend loading data into a database. SQLite is built into Python and requires zero setup — perfect for local pipelines and prototyping:
import sqlite3
import pandas as pd
def load_to_database(df, table_name, db_path="pipeline.db"):
"""Load a DataFrame into a SQLite database."""
conn = sqlite3.connect(db_path)
# "replace" drops and recreates the table each run
# Use "append" if you want to add rows to an existing table
df.to_sql(table_name, conn, if_exists="replace", index=False)
# Verify the load
row_count = pd.read_sql(
f"SELECT COUNT(*) as cnt FROM {table_name}", conn
).iloc[0]["cnt"]
print(f"Loaded {row_count} rows into '{table_name}'")
conn.close()
# Usage
load_to_database(df_enriched, "orders")
load_to_database(summary, "order_summary")
Once the data is in SQLite, you can query it with standard SQL — which makes it easy to build dashboards, run ad-hoc analysis, or serve it to other tools.
Bonus: Writing to Google Sheets
If your stakeholders live in Google Sheets, you can push data there programmatically using the gspread library:
pip install gspread google-auth
import gspread
from google.oauth2.service_account import Credentials
import pandas as pd
def load_to_google_sheets(df, spreadsheet_name, worksheet_name):
"""Push a DataFrame to a Google Sheet."""
scopes = ["https://www.googleapis.com/auth/spreadsheets"]
creds = Credentials.from_service_account_file(
"credentials.json", scopes=scopes
)
client = gspread.authorize(creds)
sheet = client.open(spreadsheet_name).worksheet(worksheet_name)
# Clear existing data and write fresh
sheet.clear()
sheet.update(
[df.columns.values.tolist()] + df.values.tolist()
)
print(f"Updated '{worksheet_name}' in '{spreadsheet_name}'")
Pro tip: When loading to Google Sheets, always clear and rewrite rather than appending. It prevents data duplication and keeps the sheet in a predictable state.
4 Schedule — Running Your Pipeline on Autopilot
A pipeline that you have to remember to run manually isn't really automated. Let's fix that.
Option A: The schedule Library (Python-Native)
The schedule library is the simplest way to run Python functions on a timer. It's great for lightweight pipelines that run on a server or always-on machine:
import schedule
import time
def run_pipeline():
"""Execute the full ETL pipeline."""
print(f"Pipeline started at {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Extract
df_raw = extract_from_api("https://api.example.com/sales")
# Transform
df_clean = transform_data(df_raw)
# Load
load_to_database(df_clean, "daily_sales")
df_clean.to_csv("output/daily_sales.csv", index=False)
print("Pipeline completed successfully.")
# Schedule the pipeline
schedule.every().day.at("06:00").do(run_pipeline)
schedule.every().monday.at("09:00").do(run_pipeline) # Extra Monday run
# Keep the script running
print("Scheduler is running. Press Ctrl+C to stop.")
while True:
schedule.run_pending()
time.sleep(60) # Check every 60 seconds
The schedule library is readable and intuitive. You can schedule by minutes, hours, days, or specific weekdays — all in plain English.
Option B: Cron (Linux/Mac) or Task Scheduler (Windows)
For production pipelines, I usually prefer using the operating system's built-in scheduler. It's more reliable because it doesn't depend on a Python process staying alive.
On Linux or Mac, use cron. Open your crontab with crontab -e and add:
# Run the pipeline every day at 6:00 AM
0 6 * * * /usr/bin/python3 /home/user/pipeline/main.py >> /home/user/pipeline/logs/cron.log 2>&1
# Run every Monday at 9:00 AM
0 9 * * 1 /usr/bin/python3 /home/user/pipeline/main.py >> /home/user/pipeline/logs/cron.log 2>&1
On Windows, use Task Scheduler: create a new task, set the trigger to your desired schedule, and set the action to run python.exe with your script path as the argument.
5 Error Handling and Logging — Building a Pipeline You Can Trust
A pipeline that runs silently is a pipeline that fails silently. You need two things: robust error handling so one bad API call doesn't crash the whole run, and logging so you can see exactly what happened after the fact.
Setting Up Logging
import logging
# Configure logging — writes to both console and file
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("pipeline.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
Wrapping Steps in Try/Except
Never let a single failure take down the entire pipeline. Wrap each major step so you know exactly where things went wrong:
def run_pipeline():
logger.info("Pipeline started.")
# --- Extract ---
try:
df_raw = extract_from_api("https://api.example.com/sales")
logger.info(f"Extracted {len(df_raw)} rows from API.")
except requests.exceptions.RequestException as e:
logger.error(f"Extraction failed: {e}")
return # Stop the pipeline — no data to process
# --- Transform ---
try:
df_clean = transform_data(df_raw)
logger.info(f"Transformed data: {len(df_clean)} rows after cleaning.")
except Exception as e:
logger.error(f"Transformation failed: {e}")
return
# --- Load ---
try:
load_to_database(df_clean, "daily_sales")
df_clean.to_csv("output/daily_sales.csv", index=False)
logger.info("Data loaded successfully.")
except Exception as e:
logger.error(f"Load failed: {e}")
return
logger.info("Pipeline completed successfully.")
With this structure, when something goes wrong at 3 AM, you open pipeline.log and immediately see which step failed and why. No guesswork.
Pro tip: For critical pipelines, add email or Slack notifications on failure. A simple SMTP call or webhook in the except block can save you from discovering a broken pipeline three days too late.
Complete End-to-End Example
Let's put everything together into a single, runnable pipeline script. This example pulls user data from a public API, cleans it, and loads it into both a CSV file and a SQLite database — with full logging and error handling.
"""
data_pipeline.py
A complete automated data pipeline with Extract, Transform, Load, and Scheduling.
"""
import requests
import pandas as pd
import sqlite3
import logging
import schedule
import time
from datetime import datetime
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("pipeline.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# --- Extract ---
def extract(url):
"""Pull data from an API and return a DataFrame."""
response = requests.get(url, timeout=30)
response.raise_for_status()
data = response.json()
logger.info(f"Extracted {len(data)} records from {url}")
return pd.DataFrame(data)
# --- Transform ---
def transform(df):
"""Clean and enrich the raw data."""
# Standardize column names
df.columns = df.columns.str.lower().str.replace(" ", "_")
# Drop duplicates
df = df.drop_duplicates(subset=["id"])
# Keep only the columns we need
cols_to_keep = ["id", "name", "username", "email", "phone"]
df = df[[c for c in cols_to_keep if c in df.columns]]
# Clean email: lowercase and strip whitespace
if "email" in df.columns:
df["email"] = df["email"].str.lower().str.strip()
# Add metadata
df["loaded_at"] = datetime.now().isoformat()
logger.info(f"Transformed data: {len(df)} rows, {len(df.columns)} columns")
return df
# --- Load ---
def load_csv(df, filepath):
"""Write DataFrame to a CSV file."""
df.to_csv(filepath, index=False)
logger.info(f"Saved {len(df)} rows to {filepath}")
def load_sqlite(df, table_name, db_path="pipeline.db"):
"""Write DataFrame to a SQLite database."""
conn = sqlite3.connect(db_path)
df.to_sql(table_name, conn, if_exists="replace", index=False)
count = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
conn.close()
logger.info(f"Loaded {count} rows into '{table_name}' ({db_path})")
# --- Pipeline ---
def run_pipeline():
"""Execute the full ETL pipeline."""
logger.info("=" * 50)
logger.info("Pipeline run started.")
# Extract
try:
df_raw = extract("https://jsonplaceholder.typicode.com/users")
except requests.exceptions.RequestException as e:
logger.error(f"Extraction failed: {e}")
return
# Transform
try:
df_clean = transform(df_raw)
except Exception as e:
logger.error(f"Transformation failed: {e}")
return
# Load
try:
load_csv(df_clean, "output/users_clean.csv")
load_sqlite(df_clean, "users")
except Exception as e:
logger.error(f"Load failed: {e}")
return
logger.info("Pipeline run completed successfully.")
logger.info("=" * 50)
# --- Entry Point ---
if __name__ == "__main__":
# Run once immediately
run_pipeline()
# Then schedule for daily runs
schedule.every().day.at("06:00").do(run_pipeline)
logger.info("Scheduler active. Press Ctrl+C to stop.")
while True:
schedule.run_pending()
time.sleep(60)
Save this as data_pipeline.py, run it with python data_pipeline.py, and you have a working automated pipeline. It will execute once immediately, then again every day at 6 AM.
Tips for Taking This to Production
The script above is a solid starting point. Here's how to harden it for real-world use:
- Use virtual environments. Always run your pipeline in a
venvto isolate dependencies. Create one withpython -m venv venvand activate it before installing packages. This prevents version conflicts when you have multiple projects on the same machine. - Store configuration separately. Never hardcode API URLs, database paths, or credentials in your script. Use a
config.jsonor.envfile and load values withjson.load()or thepython-dotenvlibrary. This makes it easy to switch between dev and production environments without modifying code. - Add data validation checkpoints. After each step, verify the data looks right. Check that row counts are within expected ranges, that required columns exist, and that key values are non-null. Fail fast if something looks off — it's better to stop the pipeline than to load bad data.
- Implement retry logic. API calls fail. Networks time out. Use a library like
tenacityor write a simple retry decorator to automatically retry failed requests 2-3 times with a short delay between attempts. - Monitor and alert. At minimum, check your
pipeline.logdaily. Better yet, send yourself a summary email or Slack message after each run. For critical pipelines, set up alerts that fire when a run fails or when data volumes look unusual. - Version control your pipeline. Treat your pipeline code like any other software project. Keep it in Git, write meaningful commit messages, and tag releases. When something breaks in production, you want to be able to compare against the last known working version.
The difference between a script on your laptop and a production pipeline is reliability. Logging, error handling, config management, and monitoring are what make the difference.
Need Help Building Your Pipeline?
If you've got data scattered across APIs, spreadsheets, and databases — and you want it flowing automatically into clean, usable reports — I can help. I've built Python data pipelines for businesses on Upwork that pull from CRMs, e-commerce platforms, ad networks, and internal tools, delivering clean data to dashboards and stakeholders on autopilot.
Whether you need a simple CSV-to-database ETL or a multi-source pipeline with scheduling and monitoring, I'll build it, document it, and hand it off so your team can maintain it.
Get in touch for a free consultation, or hire me on Upwork to get started right away.