harvard-artifacts-data-engineering-pipeline

Build ETL pipelines and analytics dashboards for Harvard Art Museums API using Python, SQL, and Streamlit

Skill file

Preview skill file
---
name: harvard-artifacts-data-engineering-pipeline
description: End-to-end data engineering pipeline for Harvard Art Museums API with ETL, SQL analytics, and Streamlit visualization
triggers:
  - build a data pipeline from Harvard Art Museums API
  - create ETL workflow for museum artifact data
  - set up Harvard artifacts analytics dashboard
  - extract and analyze Harvard Art Museums collection data
  - build Streamlit app for art museum data visualization
  - implement SQL analytics for Harvard artifacts
  - create museum data engineering pipeline
  - analyze Harvard Art Museums API data
---

# Harvard Artifacts Data Engineering Pipeline

> Skill by [ara.so](https://ara.so) — Data Skills collection.

This skill enables AI agents to work with the Harvard-Artifacts-Collection-Data-Engineering-Analytics-App, an end-to-end data engineering solution that extracts artifact data from the Harvard Art Museums API, performs ETL operations, stores data in SQL databases, and provides interactive analytics dashboards using Streamlit.

## What This Project Does

The Harvard Artifacts Data Engineering Pipeline demonstrates a complete data engineering workflow:

1. **API Integration**: Connects to Harvard Art Museums API with proper authentication and pagination
2. **ETL Pipeline**: Extracts artifact metadata, transforms nested JSON into relational format, loads into SQL
3. **SQL Analytics**: Runs 20+ predefined analytical queries on artifact collections
4. **Interactive Visualization**: Displays results through Streamlit dashboards with Plotly charts

The pipeline handles artifact metadata, media/image information, and color data across three normalized database tables.

## Installation

### Prerequisites

- Python 3.8+
- MySQL or TiDB Cloud database instance
- Harvard Art Museums API key (get from: https://www.harvardartmuseums.org/collections/api)

### Setup

```bash
# Clone the repository
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App

# Install dependencies
pip install -r requirements.txt

# Required packages
pip install streamlit pandas requests mysql-connector-python plotly python-dotenv
```

### Configuration

Create a `.env` file or use Streamlit secrets for configuration:

```bash
# .env file
HARVARD_API_KEY=your_api_key_here
DB_HOST=your_database_host
DB_PORT=3306
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=harvard_artifacts
```

Or configure via Streamlit secrets (`.streamlit/secrets.toml`):

```toml
[database]
host = "your_database_host"
port = 3306
user = "your_db_user"
password = "your_db_password"
database = "harvard_artifacts"

[api]
harvard_api_key = "your_api_key_here"
```

## Database Schema

The project uses three main tables with proper relational structure:

```sql
-- Artifact Metadata Table
CREATE TABLE artifactmetadata (
    id INT PRIMARY KEY,
    title VARCHAR(500),
    culture VARCHAR(200),
    century VARCHAR(100),
    classification VARCHAR(200),
    department VARCHAR(200),
    division VARCHAR(200),
    dated VARCHAR(200),
    technique VARCHAR(500),
    medium VARCHAR(500),
    dimensions VARCHAR(500),
    accession_number VARCHAR(100),
    credit_line TEXT
);

-- Artifact Media Table
CREATE TABLE artifactmedia (
    media_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    primary_image_url TEXT,
    image_count INT DEFAULT 0,
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);

-- Artifact Colors Table
CREATE TABLE artifactcolors (
    color_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    color VARCHAR(50),
    percentage FLOAT,
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);
```

## Key API Integration Patterns

### Connecting to Harvard Art Museums API

```python
import requests
import os

def fetch_artifacts(api_key, page=1, size=100):
    """Fetch artifacts from Harvard Art Museums API with pagination"""
    base_url = "https://api.harvardartmuseums.org/object"
    
    params = {
        'apikey': api_key,
        'page': page,
        'size': size,
        'hasimage': 1  # Only artifacts with images
    }
    
    response = requests.get(base_url, params=params)
    response.raise_for_status()
    
    return response.json()

# Usage
api_key = os.getenv('HARVARD_API_KEY')
data = fetch_artifacts(api_key, page=1, size=100)

print(f"Total records: {data['info']['totalrecords']}")
print(f"Total pages: {data['info']['pages']}")
print(f"Records fetched: {len(data['records'])}")
```

### Handling Pagination

```python
def fetch_all_artifacts(api_key, max_pages=10):
    """Fetch multiple pages of artifacts"""
    all_artifacts = []
    
    for page in range(1, max_pages + 1):
        try:
            data = fetch_artifacts(api_key, page=page, size=100)
            all_artifacts.extend(data['records'])
            
            print(f"Fetched page {page}/{max_pages}")
            
            # Respect rate limits
            import time
            time.sleep(0.5)
            
        except Exception as e:
            print(f"Error on page {page}: {e}")
            break
    
    return all_artifacts
```

## ETL Pipeline Implementation

### Extract Phase

```python
import pandas as pd

def extract_artifact_data(artifact):
    """Extract relevant fields from artifact JSON"""
    return {
        'id': artifact.get('id'),
        'title': artifact.get('title'),
        'culture': artifact.get('culture'),
        'century': artifact.get('century'),
        'classification': artifact.get('classification'),
        'department': artifact.get('department'),
        'division': artifact.get('division'),
        'dated': artifact.get('dated'),
        'technique': artifact.get('technique'),
        'medium': artifact.get('medium'),
        'dimensions': artifact.get('dimensions'),
        'accession_number': artifact.get('accessionyear'),
        'credit_line': artifact.get('creditline')
    }

def extract_media_data(artifact):
    """Extract media/image information"""
    return {
        'artifact_id': artifact.get('id'),
        'primary_image_url': artifact.get('primaryimageurl'),
        'image_count': artifact.get('totalpageviews', 0)
    }

def extract_color_data(artifact):
    """Extract color information"""
    colors = []
    if 'colors' in artifact and artifact['colors']:
        for color in artifact['colors']:
            colors.append({
                'artifact_id': artifact.get('id'),
                'color': color.get('color'),
                'percentage': color.get('percent')
            })
    return colors
```

### Transform Phase

```python
def transform_artifacts_to_dataframes(artifacts):
    """Transform list of artifacts into structured DataFrames"""
    
    metadata_list = []
    media_list = []
    colors_list = []
    
    for artifact in artifacts:
        # Extract metadata
        metadata_list.append(extract_artifact_data(artifact))
        
        # Extract media
        media_list.append(extract_media_data(artifact))
        
        # Extract colors
        colors_list.extend(extract_color_data(artifact))
    
    # Create DataFrames
    df_metadata = pd.DataFrame(metadata_list)
    df_media = pd.DataFrame(media_list)
    df_colors = pd.DataFrame(colors_list)
    
    # Clean and validate
    df_metadata['title'] = df_metadata['title'].fillna('Unknown')
    df_metadata = df_metadata.drop_duplicates(subset=['id'])
    
    return df_metadata, df_media, df_colors
```

### Load Phase

```python
import mysql.connector
from mysql.connector import Error

def create_database_connection(config):
    """Create MySQL database connection"""
    try:
        connection = mysql.connector.connect(
            host=config['host'],
            port=config['port'],
            user=config['user'],
            password=config['password'],
            database=config['database']
        )
        return connection
    except Error as e:
        print(f"Database connection error: {e}")
        return None

def load_dataframe_to_sql(df, table_name, connection, if_exists='append'):
    """Load DataFrame into SQL table with batch inserts"""
    cursor = connection.cursor()
    
    # Prepare insert statement
    columns = ', '.join(df.columns)
    placeholders = ', '.join(['%s'] * len(df.columns))
    
    insert_query = f"""
        INSERT INTO {table_name} ({columns})
        VALUES ({placeholders})
        ON DUPLICATE KEY UPDATE id=id
    """
    
    # Batch insert
    data_tuples = [tuple(row) for row in df.values]
    
    try:
        cursor.executemany(insert_query, data_tuples)
        connection.commit()
        print(f"Loaded {len(data_tuples)} rows into {table_name}")
    except Error as e:
        connection.rollback()
        print(f"Error loading data: {e}")
    finally:
        cursor.close()

# Complete ETL pipeline
def run_etl_pipeline(api_key, db_config, num_pages=5):
    """Execute full ETL pipeline"""
    
    # Extract
    print("Extracting data from API...")
    artifacts = fetch_all_artifacts(api_key, max_pages=num_pages)
    
    # Transform
    print("Transforming data...")
    df_metadata, df_media, df_colors = transform_artifacts_to_dataframes(artifacts)
    
    # Load
    print("Loading data to database...")
    connection = create_database_connection(db_config)
    
    if connection:
        load_dataframe_to_sql(df_metadata, 'artifactmetadata', connection)
        load_dataframe_to_sql(df_media, 'artifactmedia', connection)
        load_dataframe_to_sql(df_colors, 'artifactcolors', connection)
        
        connection.close()
        print("ETL pipeline completed successfully!")
```

## SQL Analytics Queries

### Common Analytical Queries

```python
# Sample analytical queries
ANALYTICAL_QUERIES = {
    "artifacts_by_culture": """
        SELECT culture, COUNT(*) as count
        FROM artifactmetadata
        WHERE culture IS NOT NULL
        GROUP BY culture
        ORDER BY count DESC
        LIMIT 20
    """,
    
    "artifacts_by_century": """
        SELECT century, COUNT(*) as count
        FROM artifactmetadata
        WHERE century IS NOT NULL
        GROUP BY century
        ORDER BY count DESC
    """,
    
    "artifacts_with_images": """
        SELECT 
            CASE WHEN image_count > 0 THEN 'With Images' ELSE 'Without Images' END as status,
            COUNT(*) as count
        FROM artifactmedia
        GROUP BY status
    """,
    
    "top_colors": """
        SELECT color, COUNT(*) as frequency, AVG(percentage) as avg_percentage
        FROM artifactcolors
        GROUP BY color
        ORDER BY frequency DESC
        LIMIT 15
    """,
    
    "department_distribution": """
        SELECT department, COUNT(*) as count
        FROM artifactmetadata
        WHERE department IS NOT NULL
        GROUP BY department
        ORDER BY count DESC
    """,
    
    "classification_breakdown": """
        SELECT classification, COUNT(*) as count
        FROM artifactmetadata
        WHERE classification IS NOT NULL
        GROUP BY classification
        ORDER BY count DESC
        LIMIT 20
    """
}

def execute_analytical_query(connection, query_name):
    """Execute a predefined analytical query"""
    cursor = connection.cursor(dictionary=True)
    
    try:
        cursor.execute(ANALYTICAL_QUERIES[query_name])
        results = cursor.fetchall()
        return pd.DataFrame(results)
    except Error as e:
        print(f"Query error: {e}")
        return None
    finally:
        cursor.close()
```

## Streamlit Application Structure

```python
import streamlit as st
import plotly.express as px

def main():
    st.set_page_config(
        page_title="Harvard Artifacts Analytics",
        page_icon="🏛️",
        layout="wide"
    )
    
    st.title("🏛️ Harvard Art Museums Analytics Dashboard")
    
    # Sidebar configuration
    with st.sidebar:
        st.header("Configuration")
        
        # API Key input
        api_key = st.text_input("Harvard API Key", type="password",
                               value=os.getenv('HARVARD_API_KEY', ''))
        
        # Database config
        db_config = {
            'host': st.text_input("DB Host", value=os.getenv('DB_HOST', 'localhost')),
            'port': st.number_input("DB Port", value=3306),
            'user': st.text_input("DB User", value=os.getenv('DB_USER', '')),
            'password': st.text_input("DB Password", type="password",
                                     value=os.getenv('DB_PASSWORD', '')),
            'database': st.text_input("DB Name", value=os.getenv('DB_NAME', 'harvard_artifacts'))
        }
        
        # ETL Controls
        st.header("ETL Pipeline")
        num_pages = st.slider("Pages to fetch", 1, 20, 5)
        
        if st.button("Run ETL Pipeline"):
            with st.spinner("Running ETL..."):
                run_etl_pipeline(api_key, db_config, num_pages)
                st.success("ETL completed!")
    
    # Main content tabs
    tab1, tab2, tab3 = st.tabs(["📊 Analytics", "🔍 Query Explorer", "📈 Visualizations"])
    
    with tab1:
        display_analytics_dashboard(db_config)
    
    with tab2:
        display_query_explorer(db_config)
    
    with tab3:
        display_visualizations(db_config)

def display_analytics_dashboard(db_config):
    """Display predefined analytics"""
    st.header("Quick Analytics")
    
    connection = create_database_connection(db_config)
    if not connection:
        st.error("Database connection failed")
        return
    
    col1, col2 = st.columns(2)
    
    with col1:
        st.subheader("Artifacts by Culture")
        df = execute_analytical_query(connection, "artifacts_by_culture")
        if df is not None:
            fig = px.bar(df, x='culture', y='count', title="Top Cultures")
            st.plotly_chart(fig, use_container_width=True)
    
    with col2:
        st.subheader("Artifacts by Century")
        df = execute_analytical_query(connection, "artifacts_by_century")
        if df is not None:
            fig = px.bar(df, x='century', y='count', title="Distribution by Century")
            st.plotly_chart(fig, use_container_width=True)
    
    connection.close()

def display_query_explorer(db_config):
    """Custom SQL query interface"""
    st.header("Custom Query Explorer")
    
    query = st.text_area("Enter SQL Query", height=150,
                         value="SELECT * FROM artifactmetadata LIMIT 10")
    
    if st.button("Execute Query"):
        connection = create_database_connection(db_config)
        if connection:
            try:
                df = pd.read_sql(query, connection)
                st.dataframe(df, use_container_width=True)
                st.download_button("Download CSV", 
                                  df.to_csv(index=False),
                                  "query_results.csv")
            except Exception as e:
                st.error(f"Query error: {e}")
            finally:
                connection.close()

if __name__ == "__main__":
    main()
```

## Running the Application

```bash
# Set environment variables
export HARVARD_API_KEY="your_api_key"
export DB_HOST="your_host"
export DB_USER="your_user"
export DB_PASSWORD="your_password"
export DB_NAME="harvard_artifacts"

# Run Streamlit app
streamlit run app.py

# Access at http://localhost:8501
```

## Common Patterns

### Rate Limiting for API Calls

```python
import time
from functools import wraps

def rate_limit(calls_per_second=2):
    """Decorator to rate limit API calls"""
    min_interval = 1.0 / calls_per_second
    last_called = [0.0]
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            left_to_wait = min_interval - elapsed
            if left_to_wait > 0:
                time.sleep(left_to_wait)
            result = func(*args, **kwargs)
            last_called[0] = time.time()
            return result
        return wrapper
    return decorator

@rate_limit(calls_per_second=2)
def fetch_artifacts_rate_limited(api_key, page):
    return fetch_artifacts(api_key, page)
```

### Error Handling and Retry Logic

```python
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def fetch_with_retry(api_key, page):
    """Fetch artifacts with automatic retry on failure"""
    response = requests.get(
        "https://api.harvardartmuseums.org/object",
        params={'apikey': api_key, 'page': page, 'size': 100}
    )
    response.raise_for_status()
    return response.json()
```

### Incremental Data Loading

```python
def get_last_artifact_id(connection):
    """Get the highest artifact ID already in database"""
    cursor = connection.cursor()
    cursor.execute("SELECT MAX(id) FROM artifactmetadata")
    result = cursor.fetchone()
    cursor.close()
    return result[0] if result[0] else 0

def incremental_etl(api_key, db_config):
    """Load only new artifacts since last ETL run"""
    connection = create_database_connection(db_config)
    last_id = get_last_artifact_id(connection)
    
    # Fetch only artifacts with id > last_id
    new_artifacts = fetch_artifacts_after_id(api_key, last_id)
    
    # Transform and load
    df_metadata, df_media, df_colors = transform_artifacts_to_dataframes(new_artifacts)
    load_dataframe_to_sql(df_metadata, 'artifactmetadata', connection)
    load_dataframe_to_sql(df_media, 'artifactmedia', connection)
    load_dataframe_to_sql(df_colors, 'artifactcolors', connection)
    
    connection.close()
```

## Troubleshooting

### API Key Issues

```python
# Verify API key is working
def test_api_connection(api_key):
    """Test Harvard API connection"""
    try:
        response = requests.get(
            "https://api.harvardartmuseums.org/object",
            params={'apikey': api_key, 'size': 1}
        )
        response.raise_for_status()
        print("✓ API connection successful")
        return True
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 401:
            print("✗ Invalid API key")
        else:
            print(f"✗ API error: {e}")
        return False
```

### Database Connection Issues

```python
def diagnose_database_connection(config):
    """Diagnose database connection problems"""
    try:
        connection = mysql.connector.connect(
            host=config['host'],
            port=config['port'],
            user=config['user'],
            password=config['password'],
            database=config['database'],
            connect_timeout=10
        )
        print("✓ Database connection successful")
        connection.close()
        return True
    except Error as e:
        print(f"✗ Database error: {e}")
        if "Access denied" in str(e):
            print("  Check username and password")
        elif "Unknown database" in str(e):
            print("  Database does not exist - create it first")
        elif "Can't connect" in str(e):
            print("  Check host and port")
        return False
```

### Memory Management for Large Datasets

```python
def batch_process_artifacts(api_key, db_config, batch_size=100):
    """Process artifacts in batches to manage memory"""
    connection = create_database_connection(db_config)
    page = 1
    
    while True:
        # Fetch batch
        data = fetch_artifacts(api_key, page=page, size=batch_size)
        
        if not data['records']:
            break
        
        # Process and load immediately
        df_metadata, df_media, df_colors = transform_artifacts_to_dataframes(data['records'])
        load_dataframe_to_sql(df_metadata, 'artifactmetadata', connection)
        load_dataframe_to_sql(df_media, 'artifactmedia', connection)
        load_dataframe_to_sql(df_colors, 'artifactcolors', connection)
        
        print(f"Processed batch {page}")
        page += 1
    
    connection.close()
```

This skill provides comprehensive guidance for building and working with the Harvard Artifacts data engineering pipeline, from API integration through ETL to analytics and visualization.

Source

Creator's repository · aradotso/data-skills

View on GitHub

Security

Security checks in progress
Results will appear here once audits complete
What this skill can do
Reads your filesConnects to the internetRuns code on your machine
Checked by 3 independent security firms
Does it try to trick the AI?Not yet checkedPending · Gen Agent Trust Hub
Does it sneak in hidden code?Not yet checkedPending · Socket
Does it have known bugs?Not yet checkedPending · Snyk