harvard-artifacts-collection-data-engineering

End-to-end data engineering and analytics application for Harvard Art Museums API with ETL pipelines, SQL analytics, and Streamlit visualization

Skill file

Preview skill file
---
name: harvard-artifacts-collection-data-engineering
description: End-to-end data engineering and analytics application for Harvard Art Museums API with ETL pipelines, SQL analytics, and Streamlit visualization
triggers:
  - build an ETL pipeline for museum artifact data
  - connect to Harvard Art Museums API
  - create a data engineering pipeline with Streamlit
  - analyze Harvard museum collection with SQL
  - set up artifact collection data warehouse
  - visualize museum data with interactive dashboards
  - implement batch data loading from Harvard API
  - query and analyze art museum metadata
---

# Harvard Artifacts Collection Data Engineering

> Skill by [ara.so](https://ara.so) — Data Skills collection.

This project provides an end-to-end data engineering and analytics application built on the Harvard Art Museums API. It demonstrates real-world ETL pipelines, SQL database design, analytical queries, and interactive visualization using Streamlit. The architecture follows: API → ETL → SQL → Analytics → Visualization.

## What This Project Does

- **API Integration**: Fetches artifact data from Harvard Art Museums API with pagination and rate limiting
- **ETL Pipeline**: Extracts, transforms, and loads nested JSON into relational database tables
- **SQL Database**: Stores structured data across `artifactmetadata`, `artifactmedia`, and `artifactcolors` tables
- **Analytics**: Executes 20+ predefined SQL queries for insights
- **Visualization**: Interactive dashboards using Plotly and Streamlit

## Installation

```bash
# Clone the repository
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App

# Install dependencies
pip install -r requirements.txt
```

**Required packages:**
```txt
streamlit
pandas
requests
mysql-connector-python
plotly
```

## Configuration

### Environment Variables

Set up your configuration before running:

```bash
# Harvard Art Museums API Key (get from https://www.harvardartmuseums.org/collections/api)
export HARVARD_API_KEY="your_api_key_here"

# Database credentials
export DB_HOST="your_database_host"
export DB_PORT="3306"
export DB_USER="your_database_user"
export DB_PASSWORD="your_database_password"
export DB_NAME="harvard_artifacts"
```

### Database Setup

Create the required tables:

```sql
CREATE DATABASE harvard_artifacts;
USE harvard_artifacts;

CREATE TABLE artifactmetadata (
    id INT PRIMARY KEY,
    title VARCHAR(500),
    culture VARCHAR(255),
    century VARCHAR(100),
    classification VARCHAR(255),
    division VARCHAR(255),
    department VARCHAR(255),
    technique VARCHAR(500),
    period VARCHAR(255),
    dated VARCHAR(255),
    url TEXT,
    lastupdate DATETIME
);

CREATE TABLE artifactmedia (
    media_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    baseimageurl TEXT,
    iiifbaseuri TEXT,
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);

CREATE TABLE artifactcolors (
    color_id INT AUTO_INCREMENT PRIMARY KEY,
    artifact_id INT,
    color VARCHAR(50),
    spectrum VARCHAR(50),
    percentage FLOAT,
    FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(id)
);
```

## Key Components and Usage

### 1. API Data Extraction

```python
import requests
import os

class HarvardAPIClient:
    def __init__(self):
        self.api_key = os.getenv('HARVARD_API_KEY')
        self.base_url = "https://api.harvardartmuseums.org/object"
    
    def fetch_artifacts(self, page=1, size=100):
        """Fetch artifacts with pagination"""
        params = {
            'apikey': self.api_key,
            'page': page,
            'size': size,
            'hasimage': 1  # Only artifacts with images
        }
        response = requests.get(self.base_url, params=params)
        response.raise_for_status()
        return response.json()
    
    def fetch_multiple_pages(self, num_pages=10):
        """Fetch multiple pages of artifacts"""
        all_artifacts = []
        for page in range(1, num_pages + 1):
            data = self.fetch_artifacts(page=page)
            all_artifacts.extend(data.get('records', []))
        return all_artifacts
```

### 2. ETL Pipeline Implementation

```python
import pandas as pd
import mysql.connector
from typing import List, Dict

class ArtifactETL:
    def __init__(self, db_config):
        self.db_config = db_config
        self.conn = None
        
    def connect_db(self):
        """Establish database connection"""
        self.conn = mysql.connector.connect(
            host=self.db_config['host'],
            port=self.db_config['port'],
            user=self.db_config['user'],
            password=self.db_config['password'],
            database=self.db_config['database']
        )
        return self.conn
    
    def transform_artifacts(self, raw_data: List[Dict]) -> pd.DataFrame:
        """Transform raw JSON to structured DataFrame"""
        artifacts = []
        for item in raw_data:
            artifact = {
                'id': item.get('id'),
                'title': item.get('title', '')[:500],
                'culture': item.get('culture', ''),
                'century': item.get('century', ''),
                'classification': item.get('classification', ''),
                'division': item.get('division', ''),
                'department': item.get('department', ''),
                'technique': item.get('technique', ''),
                'period': item.get('period', ''),
                'dated': item.get('dated', ''),
                'url': item.get('url', ''),
                'lastupdate': item.get('lastupdate', '')
            }
            artifacts.append(artifact)
        return pd.DataFrame(artifacts)
    
    def transform_media(self, raw_data: List[Dict]) -> pd.DataFrame:
        """Extract media information"""
        media_records = []
        for item in raw_data:
            artifact_id = item.get('id')
            if 'primaryimageurl' in item:
                media_records.append({
                    'artifact_id': artifact_id,
                    'baseimageurl': item.get('primaryimageurl', ''),
                    'iiifbaseuri': item.get('iiifbaseuri', '')
                })
        return pd.DataFrame(media_records)
    
    def transform_colors(self, raw_data: List[Dict]) -> pd.DataFrame:
        """Extract color information"""
        color_records = []
        for item in raw_data:
            artifact_id = item.get('id')
            colors = item.get('colors', [])
            for color in colors:
                color_records.append({
                    'artifact_id': artifact_id,
                    'color': color.get('color', ''),
                    'spectrum': color.get('spectrum', ''),
                    'percentage': color.get('percent', 0)
                })
        return pd.DataFrame(color_records)
    
    def load_data(self, df: pd.DataFrame, table_name: str):
        """Batch insert data into SQL table"""
        cursor = self.conn.cursor()
        
        # Prepare column names and placeholders
        cols = ','.join(df.columns)
        placeholders = ','.join(['%s'] * len(df.columns))
        
        sql = f"INSERT INTO {table_name} ({cols}) VALUES ({placeholders})"
        
        # Convert DataFrame to list of tuples
        data = [tuple(row) for row in df.values]
        
        # Execute batch insert
        cursor.executemany(sql, data)
        self.conn.commit()
        cursor.close()
        
        return len(data)
```

### 3. Complete ETL Workflow

```python
def run_etl_pipeline():
    """Execute complete ETL pipeline"""
    # Configuration
    db_config = {
        'host': os.getenv('DB_HOST'),
        'port': int(os.getenv('DB_PORT', 3306)),
        'user': os.getenv('DB_USER'),
        'password': os.getenv('DB_PASSWORD'),
        'database': os.getenv('DB_NAME')
    }
    
    # Initialize clients
    api_client = HarvardAPIClient()
    etl = ArtifactETL(db_config)
    etl.connect_db()
    
    # Extract
    print("Extracting data from API...")
    raw_artifacts = api_client.fetch_multiple_pages(num_pages=5)
    print(f"Extracted {len(raw_artifacts)} artifacts")
    
    # Transform
    print("Transforming data...")
    df_artifacts = etl.transform_artifacts(raw_artifacts)
    df_media = etl.transform_media(raw_artifacts)
    df_colors = etl.transform_colors(raw_artifacts)
    
    # Load
    print("Loading data to database...")
    etl.load_data(df_artifacts, 'artifactmetadata')
    etl.load_data(df_media, 'artifactmedia')
    etl.load_data(df_colors, 'artifactcolors')
    
    print("ETL pipeline completed successfully!")
    etl.conn.close()
```

### 4. SQL Analytics Queries

```python
class ArtifactAnalytics:
    def __init__(self, db_config):
        self.db_config = db_config
    
    def execute_query(self, query: str) -> pd.DataFrame:
        """Execute SQL query and return DataFrame"""
        conn = mysql.connector.connect(**self.db_config)
        df = pd.read_sql(query, conn)
        conn.close()
        return df
    
    def get_artifacts_by_culture(self):
        """Get artifact distribution by culture"""
        query = """
        SELECT culture, COUNT(*) as artifact_count
        FROM artifactmetadata
        WHERE culture IS NOT NULL AND culture != ''
        GROUP BY culture
        ORDER BY artifact_count DESC
        LIMIT 20;
        """
        return self.execute_query(query)
    
    def get_artifacts_by_century(self):
        """Get artifact distribution by century"""
        query = """
        SELECT century, COUNT(*) as count
        FROM artifactmetadata
        WHERE century IS NOT NULL AND century != ''
        GROUP BY century
        ORDER BY count DESC;
        """
        return self.execute_query(query)
    
    def get_color_distribution(self):
        """Get color usage across artifacts"""
        query = """
        SELECT color, COUNT(*) as frequency, AVG(percentage) as avg_percentage
        FROM artifactcolors
        GROUP BY color
        ORDER BY frequency DESC
        LIMIT 15;
        """
        return self.execute_query(query)
    
    def get_artifacts_with_media(self):
        """Get artifacts with and without media"""
        query = """
        SELECT 
            CASE WHEN m.artifact_id IS NOT NULL THEN 'With Media' ELSE 'Without Media' END as media_status,
            COUNT(*) as count
        FROM artifactmetadata a
        LEFT JOIN artifactmedia m ON a.id = m.artifact_id
        GROUP BY media_status;
        """
        return self.execute_query(query)
```

### 5. Streamlit Dashboard

```python
import streamlit as st
import plotly.express as px

def create_dashboard():
    st.title("Harvard Art Museums Analytics Dashboard")
    
    # Sidebar for navigation
    st.sidebar.header("Navigation")
    option = st.sidebar.selectbox(
        "Choose Analysis",
        ["Overview", "Culture Analysis", "Century Distribution", "Color Patterns", "Media Analysis"]
    )
    
    # Initialize analytics
    db_config = {
        'host': os.getenv('DB_HOST'),
        'port': int(os.getenv('DB_PORT', 3306)),
        'user': os.getenv('DB_USER'),
        'password': os.getenv('DB_PASSWORD'),
        'database': os.getenv('DB_NAME')
    }
    analytics = ArtifactAnalytics(db_config)
    
    if option == "Culture Analysis":
        st.header("Artifacts by Culture")
        df = analytics.get_artifacts_by_culture()
        
        # Display data table
        st.dataframe(df)
        
        # Create visualization
        fig = px.bar(df, x='culture', y='artifact_count', 
                     title='Top 20 Cultures by Artifact Count')
        st.plotly_chart(fig)
    
    elif option == "Century Distribution":
        st.header("Artifacts by Century")
        df = analytics.get_artifacts_by_century()
        
        st.dataframe(df)
        
        fig = px.bar(df, x='century', y='count',
                     title='Artifact Distribution Across Centuries')
        st.plotly_chart(fig)
    
    elif option == "Color Patterns":
        st.header("Color Usage Analysis")
        df = analytics.get_color_distribution()
        
        st.dataframe(df)
        
        fig = px.bar(df, x='color', y='frequency',
                     title='Most Common Colors in Artifacts')
        st.plotly_chart(fig)

if __name__ == "__main__":
    create_dashboard()
```

## Running the Application

```bash
# Start the Streamlit dashboard
streamlit run app.py

# Run ETL pipeline separately (if needed)
python etl_pipeline.py
```

## Common Patterns

### Incremental Data Loading

```python
def incremental_load(last_update_date):
    """Load only new or updated artifacts"""
    query = f"""
    SELECT id FROM artifactmetadata 
    WHERE lastupdate > '{last_update_date}'
    """
    # Fetch only updated records from API
    # Update existing records in database
```

### Error Handling in ETL

```python
def safe_etl_run():
    try:
        run_etl_pipeline()
    except requests.HTTPError as e:
        print(f"API Error: {e}")
    except mysql.connector.Error as e:
        print(f"Database Error: {e}")
    except Exception as e:
        print(f"Unexpected Error: {e}")
```

## Troubleshooting

**API Rate Limiting**: Add delays between requests
```python
import time
time.sleep(0.5)  # 500ms delay between API calls
```

**Database Connection Issues**: Verify credentials and network access
```python
# Test connection
try:
    conn = mysql.connector.connect(**db_config)
    print("Database connection successful")
    conn.close()
except Exception as e:
    print(f"Connection failed: {e}")
```

**Missing Data Fields**: Handle null values during transformation
```python
artifact = {
    'title': item.get('title', 'Unknown')[:500],
    'culture': item.get('culture') or 'Not Specified'
}
```

**Memory Issues with Large Datasets**: Use chunking
```python
chunk_size = 1000
for i in range(0, len(data), chunk_size):
    chunk = data[i:i+chunk_size]
    etl.load_data(pd.DataFrame(chunk), 'artifactmetadata')
```

Source

Creator's repository · aradotso/data-skills

View on GitHub

Security

Security checks in progress
Results will appear here once audits complete
What this skill can do
Reads your filesConnects to the internetRuns code on your machine
Checked by 3 independent security firms
Does it try to trick the AI?Not yet checkedPending · Gen Agent Trust Hub
Does it sneak in hidden code?Not yet checkedPending · Socket
Does it have known bugs?Not yet checkedPending · Snyk