harvard-art-museums-data-engineering

Build end-to-end data pipelines with Harvard Art Museums API, ETL workflows, SQL analytics, and Streamlit dashboards

Skill file

Preview skill file
---
name: harvard-art-museums-data-engineering
description: Build end-to-end data pipelines with Harvard Art Museums API, ETL workflows, SQL analytics, and Streamlit dashboards
triggers:
  - build a data pipeline for museum artifacts
  - create ETL workflow with Harvard Art Museums API
  - set up analytics dashboard for art collection data
  - analyze museum artifacts with SQL queries
  - build Streamlit app for art museum data
  - create data engineering pipeline for cultural heritage data
  - extract and visualize Harvard Art Museums data
  - design relational database for artifact metadata
---

# Harvard Art Museums Data Engineering & Analytics

> Skill by [ara.so](https://ara.so) — Data Skills collection.

This skill enables you to build production-ready data engineering pipelines using the Harvard Art Museums API. The project demonstrates real-world ETL patterns, relational database design, SQL analytics, and interactive visualization with Streamlit.

## What It Does

- **API Integration**: Fetches artifact data from Harvard Art Museums API with pagination and rate limiting
- **ETL Pipeline**: Transforms nested JSON into normalized relational tables
- **Database Design**: Creates properly structured MySQL/TiDB tables with foreign keys
- **SQL Analytics**: Executes 20+ analytical queries for insights
- **Visualization**: Builds interactive Streamlit dashboards with Plotly charts

## Installation

```bash
# Clone the repository
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App

# Install dependencies
pip install -r requirements.txt

# Required packages
pip install streamlit pandas requests mysql-connector-python plotly python-dotenv
```

## Configuration

### Environment Variables

Create a `.env` file in the project root:

```bash
# Harvard Art Museums API
HARVARD_API_KEY=your_api_key_here

# Database Configuration
DB_HOST=your_database_host
DB_PORT=3306
DB_USER=your_db_username
DB_PASSWORD=your_db_password
DB_NAME=harvard_artifacts
```

### Get Harvard API Key

1. Visit https://www.harvardartmuseums.org/collections/api
2. Request an API key (free)
3. Add to your `.env` file

### Database Setup

```python
import mysql.connector
from dotenv import load_dotenv
import os

load_dotenv()

# Connect to database
conn = mysql.connector.connect(
    host=os.getenv('DB_HOST'),
    port=int(os.getenv('DB_PORT', 3306)),
    user=os.getenv('DB_USER'),
    password=os.getenv('DB_PASSWORD'),
    database=os.getenv('DB_NAME')
)
```

## Project Structure

```
├── app.py                 # Main Streamlit application
├── etl_pipeline.py        # ETL logic for data extraction/transformation
├── database.py            # Database connection and operations
├── queries.py             # SQL analytics queries
├── requirements.txt       # Python dependencies
└── .env                   # Environment configuration
```

## Core Components

### 1. API Data Extraction

```python
import requests
import pandas as pd
from typing import List, Dict
import os

def fetch_artifacts(api_key: str, size: int = 100, page: int = 1) -> Dict:
    """Fetch artifacts from Harvard Art Museums API"""
    url = "https://api.harvardartmuseums.org/object"
    params = {
        'apikey': api_key,
        'size': size,
        'page': page,
        'hasimage': 1  # Only artifacts with images
    }
    
    response = requests.get(url, params=params)
    response.raise_for_status()
    return response.json()

def collect_artifacts(api_key: str, total_records: int = 500) -> List[Dict]:
    """Collect multiple pages of artifact data"""
    artifacts = []
    page = 1
    page_size = 100
    
    while len(artifacts) < total_records:
        data = fetch_artifacts(api_key, size=page_size, page=page)
        records = data.get('records', [])
        
        if not records:
            break
            
        artifacts.extend(records)
        page += 1
        
        # Respect rate limits
        import time
        time.sleep(0.5)
    
    return artifacts[:total_records]
```

### 2. Data Transformation

```python
import pandas as pd

def transform_artifact_metadata(artifacts: List[Dict]) -> pd.DataFrame:
    """Transform raw API data into metadata DataFrame"""
    metadata = []
    
    for artifact in artifacts:
        metadata.append({
            'objectid': artifact.get('objectid'),
            'title': artifact.get('title'),
            'culture': artifact.get('culture'),
            'period': artifact.get('period'),
            'century': artifact.get('century'),
            'classification': artifact.get('classification'),
            'medium': artifact.get('medium'),
            'dated': artifact.get('dated'),
            'department': artifact.get('department'),
            'division': artifact.get('division'),
            'creditline': artifact.get('creditline'),
            'accessionyear': artifact.get('accessionyear')
        })
    
    return pd.DataFrame(metadata)

def transform_artifact_media(artifacts: List[Dict]) -> pd.DataFrame:
    """Extract media/image information"""
    media_records = []
    
    for artifact in artifacts:
        objectid = artifact.get('objectid')
        images = artifact.get('images', [])
        
        for img in images:
            media_records.append({
                'objectid': objectid,
                'imageid': img.get('imageid'),
                'baseimageurl': img.get('baseimageurl'),
                'width': img.get('width'),
                'height': img.get('height'),
                'format': img.get('format')
            })
    
    return pd.DataFrame(media_records)

def transform_artifact_colors(artifacts: List[Dict]) -> pd.DataFrame:
    """Extract color information"""
    color_records = []
    
    for artifact in artifacts:
        objectid = artifact.get('objectid')
        colors = artifact.get('colors', [])
        
        for color in colors:
            color_records.append({
                'objectid': objectid,
                'color': color.get('color'),
                'spectrum': color.get('spectrum'),
                'hue': color.get('hue'),
                'percent': color.get('percent')
            })
    
    return pd.DataFrame(color_records)
```

### 3. Database Schema Creation

```python
def create_database_schema(conn):
    """Create normalized database tables"""
    cursor = conn.cursor()
    
    # Artifact Metadata Table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactmetadata (
            objectid INT PRIMARY KEY,
            title TEXT,
            culture VARCHAR(255),
            period VARCHAR(255),
            century VARCHAR(100),
            classification VARCHAR(255),
            medium TEXT,
            dated VARCHAR(255),
            department VARCHAR(255),
            division VARCHAR(255),
            creditline TEXT,
            accessionyear INT
        )
    """)
    
    # Artifact Media Table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactmedia (
            id INT AUTO_INCREMENT PRIMARY KEY,
            objectid INT,
            imageid INT,
            baseimageurl TEXT,
            width INT,
            height INT,
            format VARCHAR(50),
            FOREIGN KEY (objectid) REFERENCES artifactmetadata(objectid)
        )
    """)
    
    # Artifact Colors Table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS artifactcolors (
            id INT AUTO_INCREMENT PRIMARY KEY,
            objectid INT,
            color VARCHAR(50),
            spectrum VARCHAR(50),
            hue VARCHAR(50),
            percent FLOAT,
            FOREIGN KEY (objectid) REFERENCES artifactmetadata(objectid)
        )
    """)
    
    conn.commit()
    cursor.close()
```

### 4. Batch Data Loading

```python
def load_dataframe_to_table(df: pd.DataFrame, table_name: str, conn):
    """Batch insert DataFrame into SQL table"""
    if df.empty:
        return
    
    cursor = conn.cursor()
    
    # Prepare INSERT statement
    columns = ', '.join(df.columns)
    placeholders = ', '.join(['%s'] * len(df.columns))
    query = f"INSERT IGNORE INTO {table_name} ({columns}) VALUES ({placeholders})"
    
    # Convert DataFrame to list of tuples
    data = [tuple(row) for row in df.values]
    
    # Execute batch insert
    cursor.executemany(query, data)
    conn.commit()
    cursor.close()
    
    print(f"Loaded {len(df)} records into {table_name}")
```

### 5. Analytics Queries

```python
# Sample analytical queries
ANALYTICS_QUERIES = {
    "Artifacts by Century": """
        SELECT century, COUNT(*) as artifact_count
        FROM artifactmetadata
        WHERE century IS NOT NULL
        GROUP BY century
        ORDER BY artifact_count DESC
        LIMIT 10
    """,
    
    "Top Cultures": """
        SELECT culture, COUNT(*) as count
        FROM artifactmetadata
        WHERE culture IS NOT NULL
        GROUP BY culture
        ORDER BY count DESC
        LIMIT 15
    """,
    
    "Department Distribution": """
        SELECT department, COUNT(*) as total_artifacts
        FROM artifactmetadata
        WHERE department IS NOT NULL
        GROUP BY department
        ORDER BY total_artifacts DESC
    """,
    
    "Color Spectrum Analysis": """
        SELECT spectrum, COUNT(*) as occurrence, 
               AVG(percent) as avg_percent
        FROM artifactcolors
        WHERE spectrum IS NOT NULL
        GROUP BY spectrum
        ORDER BY occurrence DESC
    """,
    
    "Media Format Distribution": """
        SELECT format, COUNT(*) as image_count,
               AVG(width) as avg_width, AVG(height) as avg_height
        FROM artifactmedia
        WHERE format IS NOT NULL
        GROUP BY format
        ORDER BY image_count DESC
    """,
    
    "Artifacts with Multiple Images": """
        SELECT a.title, a.culture, COUNT(m.imageid) as image_count
        FROM artifactmetadata a
        JOIN artifactmedia m ON a.objectid = m.objectid
        GROUP BY a.objectid, a.title, a.culture
        HAVING image_count > 3
        ORDER BY image_count DESC
        LIMIT 20
    """
}

def execute_query(conn, query: str) -> pd.DataFrame:
    """Execute SQL query and return results as DataFrame"""
    return pd.read_sql(query, conn)
```

### 6. Streamlit Dashboard

```python
import streamlit as st
import plotly.express as px
from dotenv import load_dotenv

load_dotenv()

st.set_page_config(page_title="Harvard Art Museums Analytics", layout="wide")

st.title("🎨 Harvard Art Museums - Data Analytics Dashboard")

# Database connection
@st.cache_resource
def get_database_connection():
    return mysql.connector.connect(
        host=os.getenv('DB_HOST'),
        user=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD'),
        database=os.getenv('DB_NAME')
    )

conn = get_database_connection()

# Sidebar for query selection
st.sidebar.header("Analytics Queries")
query_name = st.sidebar.selectbox(
    "Select Analysis",
    list(ANALYTICS_QUERIES.keys())
)

# Execute selected query
if st.sidebar.button("Run Query"):
    with st.spinner("Executing query..."):
        query = ANALYTICS_QUERIES[query_name]
        results = execute_query(conn, query)
        
        st.subheader(f"Results: {query_name}")
        st.dataframe(results)
        
        # Auto-generate visualization
        if len(results.columns) >= 2:
            fig = px.bar(
                results,
                x=results.columns[0],
                y=results.columns[1],
                title=query_name
            )
            st.plotly_chart(fig, use_container_width=True)

# ETL Pipeline Section
st.sidebar.header("ETL Pipeline")
if st.sidebar.button("Run ETL"):
    api_key = os.getenv('HARVARD_API_KEY')
    
    with st.spinner("Fetching artifacts..."):
        artifacts = collect_artifacts(api_key, total_records=100)
        st.success(f"Fetched {len(artifacts)} artifacts")
    
    with st.spinner("Transforming data..."):
        df_metadata = transform_artifact_metadata(artifacts)
        df_media = transform_artifact_media(artifacts)
        df_colors = transform_artifact_colors(artifacts)
        st.success("Data transformation complete")
    
    with st.spinner("Loading to database..."):
        load_dataframe_to_table(df_metadata, 'artifactmetadata', conn)
        load_dataframe_to_table(df_media, 'artifactmedia', conn)
        load_dataframe_to_table(df_colors, 'artifactcolors', conn)
        st.success("Data loaded successfully!")
```

## Running the Application

```bash
# Start the Streamlit dashboard
streamlit run app.py

# Access at http://localhost:8501
```

## Common Patterns

### Pattern 1: Incremental Data Collection

```python
def get_latest_objectid(conn) -> int:
    """Get the highest objectid already in database"""
    cursor = conn.cursor()
    cursor.execute("SELECT MAX(objectid) FROM artifactmetadata")
    result = cursor.fetchone()
    cursor.close()
    return result[0] or 0

def incremental_etl(api_key: str, conn):
    """Only fetch new artifacts"""
    latest_id = get_latest_objectid(conn)
    
    # Fetch with filter
    url = "https://api.harvardartmuseums.org/object"
    params = {
        'apikey': api_key,
        'size': 100,
        'q': f'objectid:>{latest_id}'
    }
    
    response = requests.get(url, params=params)
    new_artifacts = response.json().get('records', [])
    
    # Transform and load only new records
    if new_artifacts:
        df = transform_artifact_metadata(new_artifacts)
        load_dataframe_to_table(df, 'artifactmetadata', conn)
```

### Pattern 2: Error Handling for API Calls

```python
import time
from requests.exceptions import RequestException

def robust_fetch_artifacts(api_key: str, max_retries: int = 3):
    """Fetch with retry logic"""
    for attempt in range(max_retries):
        try:
            response = requests.get(
                "https://api.harvardartmuseums.org/object",
                params={'apikey': api_key, 'size': 100},
                timeout=10
            )
            response.raise_for_status()
            return response.json()
        except RequestException as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                time.sleep(wait_time)
            else:
                raise Exception(f"Failed after {max_retries} attempts: {e}")
```

### Pattern 3: Data Quality Validation

```python
def validate_artifacts(df: pd.DataFrame) -> pd.DataFrame:
    """Clean and validate artifact data"""
    # Remove duplicates
    df = df.drop_duplicates(subset=['objectid'])
    
    # Fill missing values
    df['culture'] = df['culture'].fillna('Unknown')
    df['century'] = df['century'].fillna('Undated')
    
    # Data type conversions
    df['accessionyear'] = pd.to_numeric(df['accessionyear'], errors='coerce')
    
    # Filter invalid records
    df = df[df['objectid'].notna()]
    
    return df
```

## Troubleshooting

### Issue: API Rate Limiting

```python
# Add delays between requests
import time

for page in range(1, 10):
    data = fetch_artifacts(api_key, page=page)
    time.sleep(1)  # 1 second delay
```

### Issue: Database Connection Timeout

```python
# Use connection pooling
from mysql.connector import pooling

db_pool = pooling.MySQLConnectionPool(
    pool_name="harvard_pool",
    pool_size=5,
    host=os.getenv('DB_HOST'),
    user=os.getenv('DB_USER'),
    password=os.getenv('DB_PASSWORD'),
    database=os.getenv('DB_NAME')
)

conn = db_pool.get_connection()
```

### Issue: Large Dataset Memory Errors

```python
# Process in chunks
def chunked_etl(api_key: str, conn, chunk_size: int = 100):
    """Process data in manageable chunks"""
    page = 1
    
    while True:
        artifacts = fetch_artifacts(api_key, size=chunk_size, page=page)
        
        if not artifacts:
            break
        
        # Transform and load chunk
        df = transform_artifact_metadata(artifacts)
        load_dataframe_to_table(df, 'artifactmetadata', conn)
        
        page += 1
```

### Issue: NULL Values in Foreign Keys

```python
# Ensure parent records exist before inserting children
def safe_load_media(df_media: pd.DataFrame, conn):
    """Only load media for existing artifacts"""
    cursor = conn.cursor()
    cursor.execute("SELECT objectid FROM artifactmetadata")
    valid_ids = {row[0] for row in cursor.fetchall()}
    cursor.close()
    
    # Filter media records
    df_filtered = df_media[df_media['objectid'].isin(valid_ids)]
    load_dataframe_to_table(df_filtered, 'artifactmedia', conn)
```

## Key Commands

```bash
# Run full ETL pipeline
python -c "from etl_pipeline import run_full_etl; run_full_etl()"

# Launch dashboard
streamlit run app.py

# Run specific query
python -c "from queries import execute_query; execute_query('SELECT * FROM artifactmetadata LIMIT 10')"
```

This skill provides everything needed to build production-grade data pipelines for cultural heritage data using modern Python data engineering tools.

Source

Creator's repository · aradotso/data-skills

View on GitHub

Security

Security checks in progress
Results will appear here once audits complete
Checked by 3 independent security firms
Does it try to trick the AI?Not yet checkedPending · Gen Agent Trust Hub
Does it sneak in hidden code?Not yet checkedPending · Socket
Does it have known bugs?Not yet checkedPending · Snyk