harvard-art-museums-data-engineering-pipeline

Build end-to-end ETL pipelines with Harvard Art Museums API data using Python, SQL, and Streamlit analytics dashboards

Skill file

Preview skill file
---
name: harvard-art-museums-data-engineering-pipeline
description: Build end-to-end ETL pipelines and analytics dashboards using Harvard Art Museums API with Python, SQL, and Streamlit
triggers:
  - how do I build a data pipeline with Harvard Art Museums API
  - set up ETL for museum artifacts collection
  - create analytics dashboard for Harvard Art Museums data
  - extract and transform Harvard artifacts API data
  - build Streamlit app for museum collection analytics
  - query and visualize Harvard Art Museums database
  - implement SQL analytics for artifact collections
  - design relational schema for museum API data
---

# Harvard Art Museums Data Engineering Pipeline

> Skill by [ara.so](https://ara.so) — Data Skills collection.

## Overview

This project demonstrates a production-grade data engineering workflow that:
- Extracts artifact data from Harvard Art Museums API with pagination and rate limiting
- Transforms nested JSON into normalized relational tables
- Loads data into MySQL/TiDB Cloud databases
- Executes analytical SQL queries for insights
- Visualizes results through interactive Streamlit dashboards

The application showcases real-world ETL patterns, database design, and data visualization techniques used in analytics engineering roles.

## Installation

```bash
# Clone the repository
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App

# Install dependencies
pip install -r requirements.txt

# Create .env file for configuration
echo "HARVARD_API_KEY=your_api_key_here" > .env
echo "DB_HOST=your_db_host" >> .env
echo "DB_USER=your_db_user" >> .env
echo "DB_PASSWORD=your_db_password" >> .env
echo "DB_NAME=harvard_artifacts" >> .env
```

### Required Dependencies

```txt
streamlit>=1.28.0
pandas>=2.0.0
requests>=2.31.0
mysql-connector-python>=8.1.0
plotly>=5.17.0
python-dotenv>=1.0.0
```

## Configuration

### API Key Setup

Get your API key from [Harvard Art Museums API](https://www.harvardartmuseums.org/collections/api):

```python
import os
from dotenv import load_dotenv

load_dotenv()
API_KEY = os.getenv('HARVARD_API_KEY')
BASE_URL = 'https://api.harvardartmuseums.org/object'
```

### Database Configuration

```python
import mysql.connector
from dotenv import load_dotenv
import os

load_dotenv()

db_config = {
    'host': os.getenv('DB_HOST'),
    'user': os.getenv('DB_USER'),
    'password': os.getenv('DB_PASSWORD'),
    'database': os.getenv('DB_NAME')
}

connection = mysql.connector.connect(**db_config)
cursor = connection.cursor()
```

## Database Schema Design

### Create Tables

```python
def create_tables(cursor):
    """Create normalized relational schema for artifacts data"""
    
    # Artifact metadata table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactmetadata (
            id INT PRIMARY KEY,
            title VARCHAR(500),
            culture VARCHAR(255),
            century VARCHAR(100),
            classification VARCHAR(255),
            department VARCHAR(255),
            dated VARCHAR(255),
            objectnumber VARCHAR(100),
            url TEXT,
            lastupdate DATETIME,
            INDEX idx_culture (culture),
            INDEX idx_century (century),
            INDEX idx_classification (classification)
        )
    """)
    
    # Artifact media table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactmedia (
            media_id INT AUTO_INCREMENT PRIMARY KEY,
            artifact_id INT,
            baseimageurl TEXT,
            primaryimageurl TEXT,
            totalpageviews INT,
            totaluniquepageviews INT,
            FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
                ON DELETE CASCADE,
            INDEX idx_artifact (artifact_id)
        )
    """)
    
    # Artifact colors table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactcolors (
            color_id INT AUTO_INCREMENT PRIMARY KEY,
            artifact_id INT,
            color VARCHAR(50),
            spectrum VARCHAR(50),
            hue VARCHAR(50),
            percent FLOAT,
            FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
                ON DELETE CASCADE,
            INDEX idx_artifact (artifact_id),
            INDEX idx_color (color)
        )
    """)
```

## ETL Pipeline Implementation

### Extract: Fetch Data from API

```python
import requests
import time

def fetch_artifacts(api_key, num_pages=5, page_size=100):
    """Extract artifacts with pagination and rate limiting"""
    artifacts = []
    
    for page in range(1, num_pages + 1):
        params = {
            'apikey': api_key,
            'size': page_size,
            'page': page
        }
        
        try:
            response = requests.get(BASE_URL, params=params, timeout=30)
            response.raise_for_status()
            data = response.json()
            
            if 'records' in data:
                artifacts.extend(data['records'])
                print(f"Fetched page {page}: {len(data['records'])} records")
            
            # Rate limiting - respect API limits
            time.sleep(0.5)
            
        except requests.exceptions.RequestException as e:
            print(f"Error fetching page {page}: {e}")
            continue
    
    return artifacts
```

### Transform: Process Nested JSON

```python
import pandas as pd

def transform_artifacts(artifacts):
    """Transform nested JSON into normalized dataframes"""
    
    metadata_records = []
    media_records = []
    color_records = []
    
    for artifact in artifacts:
        # Extract metadata
        metadata = {
            'id': artifact.get('id'),
            'title': artifact.get('title', '')[:500],
            'culture': artifact.get('culture', '')[:255],
            'century': artifact.get('century', '')[:100],
            'classification': artifact.get('classification', '')[:255],
            'department': artifact.get('department', '')[:255],
            'dated': artifact.get('dated', '')[:255],
            'objectnumber': artifact.get('objectnumber', '')[:100],
            'url': artifact.get('url', ''),
            'lastupdate': artifact.get('lastupdate')
        }
        metadata_records.append(metadata)
        
        # Extract media information
        media = {
            'artifact_id': artifact.get('id'),
            'baseimageurl': artifact.get('baseimageurl', ''),
            'primaryimageurl': artifact.get('primaryimageurl', ''),
            'totalpageviews': artifact.get('totalpageviews', 0),
            'totaluniquepageviews': artifact.get('totaluniquepageviews', 0)
        }
        media_records.append(media)
        
        # Extract colors
        if 'colors' in artifact and artifact['colors']:
            for color_data in artifact['colors']:
                color = {
                    'artifact_id': artifact.get('id'),
                    'color': color_data.get('color', '')[:50],
                    'spectrum': color_data.get('spectrum', '')[:50],
                    'hue': color_data.get('hue', '')[:50],
                    'percent': color_data.get('percent', 0.0)
                }
                color_records.append(color)
    
    df_metadata = pd.DataFrame(metadata_records)
    df_media = pd.DataFrame(media_records)
    df_colors = pd.DataFrame(color_records)
    
    return df_metadata, df_media, df_colors
```

### Load: Insert into Database

```python
def load_to_database(df_metadata, df_media, df_colors, connection):
    """Load transformed data with batch inserts"""
    cursor = connection.cursor()
    
    try:
        # Insert metadata
        metadata_query = """
            INSERT INTO artifactmetadata 
            (id, title, culture, century, classification, department, 
             dated, objectnumber, url, lastupdate)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
            title=VALUES(title), culture=VALUES(culture)
        """
        cursor.executemany(metadata_query, df_metadata.values.tolist())
        
        # Insert media
        media_query = """
            INSERT INTO artifactmedia 
            (artifact_id, baseimageurl, primaryimageurl, 
             totalpageviews, totaluniquepageviews)
            VALUES (%s, %s, %s, %s, %s)
        """
        cursor.executemany(media_query, df_media.values.tolist())
        
        # Insert colors
        if not df_colors.empty:
            color_query = """
                INSERT INTO artifactcolors 
                (artifact_id, color, spectrum, hue, percent)
                VALUES (%s, %s, %s, %s, %s)
            """
            cursor.executemany(color_query, df_colors.values.tolist())
        
        connection.commit()
        print(f"Loaded {len(df_metadata)} artifacts successfully")
        
    except Exception as e:
        connection.rollback()
        print(f"Error loading data: {e}")
        raise
    finally:
        cursor.close()
```

## Analytical SQL Queries

### Pre-built Analytics Queries

```python
ANALYTICS_QUERIES = {
    "Artifacts by Culture": """
        SELECT culture, COUNT(*) as artifact_count
        FROM artifactmetadata
        WHERE culture IS NOT NULL AND culture != ''
        GROUP BY culture
        ORDER BY artifact_count DESC
        LIMIT 15
    """,
    
    "Artifacts by Century": """
        SELECT century, COUNT(*) as count
        FROM artifactmetadata
        WHERE century IS NOT NULL AND century != ''
        GROUP BY century
        ORDER BY count DESC
        LIMIT 10
    """,
    
    "Top Classifications": """
        SELECT classification, COUNT(*) as total
        FROM artifactmetadata
        WHERE classification IS NOT NULL
        GROUP BY classification
        ORDER BY total DESC
        LIMIT 12
    """,
    
    "Color Distribution": """
        SELECT color, COUNT(*) as usage_count, AVG(percent) as avg_percent
        FROM artifactcolors
        WHERE color IS NOT NULL
        GROUP BY color
        ORDER BY usage_count DESC
        LIMIT 20
    """,
    
    "Media Availability": """
        SELECT 
            COUNT(*) as total_artifacts,
            SUM(CASE WHEN primaryimageurl IS NOT NULL AND primaryimageurl != '' 
                THEN 1 ELSE 0 END) as with_images,
            ROUND(100.0 * SUM(CASE WHEN primaryimageurl IS NOT NULL 
                AND primaryimageurl != '' THEN 1 ELSE 0 END) / COUNT(*), 2) 
                as image_coverage_percent
        FROM artifactmedia
    """,
    
    "Department Distribution": """
        SELECT department, COUNT(*) as count
        FROM artifactmetadata
        WHERE department IS NOT NULL
        GROUP BY department
        ORDER BY count DESC
    """,
    
    "Popular Artifacts": """
        SELECT m.title, m.culture, m.century, 
               a.totalpageviews, a.totaluniquepageviews
        FROM artifactmetadata m
        JOIN artifactmedia a ON m.id = a.artifact_id
        WHERE a.totalpageviews > 0
        ORDER BY a.totalpageviews DESC
        LIMIT 20
    """
}

def execute_analytics_query(query_name, connection):
    """Execute analytical query and return results"""
    cursor = connection.cursor()
    
    try:
        query = ANALYTICS_QUERIES[query_name]
        cursor.execute(query)
        
        columns = [desc[0] for desc in cursor.description]
        results = cursor.fetchall()
        
        df = pd.DataFrame(results, columns=columns)
        return df
        
    except Exception as e:
        print(f"Query error: {e}")
        return pd.DataFrame()
    finally:
        cursor.close()
```

## Streamlit Dashboard

### Main Application

```python
import streamlit as st
import plotly.express as px

def main():
    st.set_page_config(
        page_title="Harvard Artifacts Analytics",
        page_icon="🏛️",
        layout="wide"
    )
    
    st.title("🏛️ Harvard Art Museums - Data Analytics Dashboard")
    
    # Sidebar for navigation
    page = st.sidebar.selectbox(
        "Choose a page",
        ["Data Collection", "SQL Analytics", "Visualizations"]
    )
    
    if page == "Data Collection":
        show_data_collection_page()
    elif page == "SQL Analytics":
        show_analytics_page()
    else:
        show_visualization_page()

def show_data_collection_page():
    """ETL pipeline execution page"""
    st.header("📥 Data Collection & ETL")
    
    num_pages = st.number_input("Number of pages to fetch", 
                                  min_value=1, max_value=20, value=5)
    
    if st.button("Start ETL Pipeline"):
        with st.spinner("Fetching data from API..."):
            artifacts = fetch_artifacts(API_KEY, num_pages=num_pages)
            st.success(f"Fetched {len(artifacts)} artifacts")
        
        with st.spinner("Transforming data..."):
            df_meta, df_media, df_colors = transform_artifacts(artifacts)
            st.success("Data transformed successfully")
        
        with st.spinner("Loading to database..."):
            connection = mysql.connector.connect(**db_config)
            load_to_database(df_meta, df_media, df_colors, connection)
            connection.close()
            st.success("Data loaded to database")
        
        st.dataframe(df_meta.head(10))

def show_analytics_page():
    """SQL analytics execution page"""
    st.header("📊 SQL Analytics")
    
    query_name = st.selectbox(
        "Select Analytics Query",
        list(ANALYTICS_QUERIES.keys())
    )
    
    if st.button("Execute Query"):
        connection = mysql.connector.connect(**db_config)
        df_result = execute_analytics_query(query_name, connection)
        connection.close()
        
        if not df_result.empty:
            st.dataframe(df_result)
            
            # Auto-generate visualization
            if len(df_result.columns) == 2:
                fig = px.bar(df_result, 
                            x=df_result.columns[0], 
                            y=df_result.columns[1],
                            title=query_name)
                st.plotly_chart(fig, use_container_width=True)
        else:
            st.warning("No results found")

if __name__ == "__main__":
    main()
```

### Run the Application

```bash
streamlit run app.py
```

## Common Patterns

### Error Handling with Retry Logic

```python
from functools import wraps
import time

def retry_on_failure(max_retries=3, delay=2):
    """Decorator for API calls with retry logic"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except requests.exceptions.RequestException as e:
                    if attempt == max_retries - 1:
                        raise
                    print(f"Attempt {attempt + 1} failed: {e}")
                    time.sleep(delay * (attempt + 1))
            return None
        return wrapper
    return decorator

@retry_on_failure(max_retries=3)
def fetch_with_retry(url, params):
    response = requests.get(url, params=params, timeout=30)
    response.raise_for_status()
    return response.json()
```

### Incremental Data Loading

```python
def get_last_update_timestamp(connection):
    """Get the latest artifact timestamp from database"""
    cursor = connection.cursor()
    cursor.execute("""
        SELECT MAX(lastupdate) FROM artifactmetadata
    """)
    result = cursor.fetchone()
    cursor.close()
    return result[0] if result[0] else None

def fetch_incremental_artifacts(api_key, last_update):
    """Fetch only new/updated artifacts"""
    params = {
        'apikey': api_key,
        'size': 100,
        'sort': 'lastupdate',
        'sortorder': 'desc'
    }
    
    if last_update:
        params['after'] = last_update
    
    response = requests.get(BASE_URL, params=params)
    return response.json().get('records', [])
```

### Data Quality Checks

```python
def validate_artifact_data(df):
    """Perform data quality checks"""
    checks = {
        'missing_ids': df['id'].isna().sum(),
        'duplicate_ids': df['id'].duplicated().sum(),
        'missing_titles': df['title'].isna().sum(),
        'invalid_dates': (df['lastupdate'].isna()).sum()
    }
    
    for check, count in checks.items():
        if count > 0:
            print(f"⚠️ Data Quality Issue: {check} = {count}")
    
    return all(count == 0 for count in checks.values())
```

## Troubleshooting

### API Rate Limiting

```python
# Issue: 429 Too Many Requests
# Solution: Implement exponential backoff

import time
from requests.exceptions import HTTPError

def fetch_with_backoff(url, params, max_retries=5):
    for retry in range(max_retries):
        try:
            response = requests.get(url, params=params)
            response.raise_for_status()
            return response.json()
        except HTTPError as e:
            if e.response.status_code == 429:
                wait_time = (2 ** retry) + 1
                print(f"Rate limited. Waiting {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise
```

### Database Connection Issues

```python
# Issue: Lost connection to MySQL server
# Solution: Connection pooling

from mysql.connector import pooling

db_pool = pooling.MySQLConnectionPool(
    pool_name="harvard_pool",
    pool_size=5,
    pool_reset_session=True,
    **db_config
)

def get_connection():
    return db_pool.get_connection()
```

### Memory Issues with Large Datasets

```python
# Issue: Memory error with large API responses
# Solution: Process data in chunks

def process_artifacts_in_chunks(artifacts, chunk_size=100):
    """Process artifacts in batches to manage memory"""
    for i in range(0, len(artifacts), chunk_size):
        chunk = artifacts[i:i + chunk_size]
        df_meta, df_media, df_colors = transform_artifacts(chunk)
        
        connection = get_connection()
        load_to_database(df_meta, df_media, df_colors, connection)
        connection.close()
        
        # Clear memory
        del df_meta, df_media, df_colors
```

### Empty or Malformed API Responses

```python
def safe_extract_value(data, key, default=''):
    """Safely extract values from nested JSON"""
    try:
        value = data.get(key, default)
        return value if value is not None else default
    except (AttributeError, TypeError):
        return default

# Usage in transform function
metadata = {
    'id': safe_extract_value(artifact, 'id', 0),
    'title': safe_extract_value(artifact, 'title', 'Unknown')[:500],
    'culture': safe_extract_value(artifact, 'culture', 'Unknown')[:255]
}
```

## Performance Optimization

### Batch Insert Optimization

```python
def optimized_batch_insert(df, table_name, connection, batch_size=1000):
    """Insert data in optimized batches"""
    cursor = connection.cursor()
    
    for start in range(0, len(df), batch_size):
        batch = df.iloc[start:start + batch_size]
        placeholders = ', '.join(['%s'] * len(batch.columns))
        query = f"INSERT INTO {table_name} VALUES ({placeholders})"
        
        cursor.executemany(query, batch.values.tolist())
        connection.commit()
    
    cursor.close()
```

This skill provides comprehensive guidance for building ETL pipelines and analytics dashboards with the Harvard Art Museums API, covering all aspects from data extraction to visualization.

Source

Creator's repository · aradotso/data-skills

View on GitHub

Security

Security checks in progress
Results will appear here once audits complete
What this skill can do
Reads your filesConnects to the internetRuns code on your machine
Checked by 3 independent security firms
Does it try to trick the AI?Not yet checkedPending · Gen Agent Trust Hub
Does it sneak in hidden code?Not yet checkedPending · Socket
Does it have known bugs?Not yet checkedPending · Snyk