harvard-artifacts-etl-analytics

Build end-to-end ETL pipelines and analytics dashboards using the Harvard Art Museums API with Python, SQL, and Streamlit

Skill file

Preview skill file
---
name: harvard-artifacts-etl-analytics
description: Build ETL pipelines and analytics dashboards using the Harvard Art Museums API with Python, SQL, and Streamlit
triggers:
  - how do I build an ETL pipeline for Harvard Art Museums data
  - set up analytics dashboard with Harvard artifacts API
  - create data engineering project with museum artifacts
  - extract and analyze Harvard art collections data
  - build Streamlit app for Harvard museums API
  - set up SQL database for artifact metadata
  - visualize Harvard art museums data with Plotly
  - create end-to-end data pipeline for museum collections
---

# Harvard Artifacts ETL Analytics Skill

> Skill by [ara.so](https://ara.so) — Data Skills collection

This skill enables you to build end-to-end data engineering and analytics applications using the Harvard Art Museums API. It demonstrates real-world ETL pipelines, SQL database design, analytical queries, and interactive visualization using Streamlit.

## What This Project Does

The Harvard Artifacts Collection Data Engineering Analytics App provides:

- **API Integration**: Fetch artifact data from Harvard Art Museums API with pagination and rate limiting
- **ETL Pipeline**: Extract, transform, and load artifact metadata, media details, and color information
- **SQL Database**: Store structured data in MySQL/TiDB with proper relational design
- **Analytics**: Execute 20+ predefined SQL queries for insights
- **Visualization**: Interactive Plotly dashboards in Streamlit

**Architecture**: API → ETL → SQL → Analytics → Visualization

## Installation

```bash
# Clone the repository
git clone https://github.com/Manali0711/Harvard-Artifacts-Collection-Data-Engineering-Analytics-App.git
cd Harvard-Artifacts-Collection-Data-Engineering-Analytics-App

# Install dependencies
pip install -r requirements.txt
```

**Required packages**:
```txt
streamlit
pandas
requests
mysql-connector-python
plotly
python-dotenv
```

## Configuration

### 1. Harvard Art Museums API Key

Get your API key from [Harvard Art Museums API](https://www.harvardartmuseums.org/collections/api)

Create a `.env` file or configure in your app:
```bash
HARVARD_API_KEY=your_api_key_here
```

### 2. Database Configuration

Set up MySQL or TiDB Cloud connection:
```python
import os
from dotenv import load_dotenv

load_dotenv()

DB_CONFIG = {
    'host': os.getenv('DB_HOST', 'localhost'),
    'user': os.getenv('DB_USER', 'root'),
    'password': os.getenv('DB_PASSWORD'),
    'database': os.getenv('DB_NAME', 'harvard_artifacts'),
    'port': int(os.getenv('DB_PORT', 3306))
}
```

## Core Components

### 1. API Data Extraction

```python
import requests
import os

class HarvardAPIClient:
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://api.harvardartmuseums.org/object"
    
    def fetch_artifacts(self, size=100, page=1):
        """Fetch artifacts with pagination"""
        params = {
            'apikey': self.api_key,
            'size': size,
            'page': page,
            'hasimage': 1  # Only artifacts with images
        }
        
        response = requests.get(self.base_url, params=params)
        response.raise_for_status()
        return response.json()
    
    def fetch_all_artifacts(self, max_records=1000):
        """Fetch multiple pages of artifacts"""
        all_artifacts = []
        page = 1
        size = 100
        
        while len(all_artifacts) < max_records:
            data = self.fetch_artifacts(size=size, page=page)
            records = data.get('records', [])
            
            if not records:
                break
            
            all_artifacts.extend(records)
            page += 1
            
            # Respect rate limits
            import time
            time.sleep(0.5)
        
        return all_artifacts[:max_records]

# Usage
api_key = os.getenv('HARVARD_API_KEY')
client = HarvardAPIClient(api_key)
artifacts = client.fetch_all_artifacts(max_records=500)
```

### 2. Data Transformation

```python
import pandas as pd

def transform_artifact_metadata(artifacts):
    """Transform artifacts into metadata DataFrame"""
    metadata = []
    
    for artifact in artifacts:
        metadata.append({
            'artifact_id': artifact.get('id'),
            'title': artifact.get('title', 'Untitled'),
            'culture': artifact.get('culture'),
            'century': artifact.get('century'),
            'classification': artifact.get('classification'),
            'department': artifact.get('department'),
            'dated': artifact.get('dated'),
            'medium': artifact.get('medium'),
            'dimensions': artifact.get('dimensions'),
            'creditline': artifact.get('creditline'),
            'division': artifact.get('division'),
            'objectnumber': artifact.get('objectnumber')
        })
    
    return pd.DataFrame(metadata)

def transform_artifact_media(artifacts):
    """Transform media information"""
    media_data = []
    
    for artifact in artifacts:
        artifact_id = artifact.get('id')
        images = artifact.get('images', [])
        
        for img in images:
            media_data.append({
                'artifact_id': artifact_id,
                'media_id': img.get('imageid'),
                'media_url': img.get('baseimageurl'),
                'media_type': 'image',
                'width': img.get('width'),
                'height': img.get('height')
            })
        
        # Handle primary image
        primary_image = artifact.get('primaryimageurl')
        if primary_image:
            media_data.append({
                'artifact_id': artifact_id,
                'media_id': f"{artifact_id}_primary",
                'media_url': primary_image,
                'media_type': 'primary_image',
                'width': None,
                'height': None
            })
    
    return pd.DataFrame(media_data)

def transform_artifact_colors(artifacts):
    """Transform color information"""
    color_data = []
    
    for artifact in artifacts:
        artifact_id = artifact.get('id')
        colors = artifact.get('colors', [])
        
        for color in colors:
            color_data.append({
                'artifact_id': artifact_id,
                'color_hex': color.get('hex'),
                'color_name': color.get('color'),
                'color_percent': color.get('percent'),
                'color_spectrum': color.get('spectrum')
            })
    
    return pd.DataFrame(color_data)
```

### 3. Database Schema and Loading

```python
import mysql.connector
from mysql.connector import Error

class ArtifactDatabase:
    def __init__(self, config):
        self.config = config
        self.connection = None
    
    def connect(self):
        """Create database connection"""
        try:
            self.connection = mysql.connector.connect(**self.config)
            return self.connection
        except Error as e:
            print(f"Error connecting to database: {e}")
            return None
    
    def create_tables(self):
        """Create database schema"""
        cursor = self.connection.cursor()
        
        # Metadata table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS artifactmetadata (
                artifact_id INT PRIMARY KEY,
                title VARCHAR(500),
                culture VARCHAR(255),
                century VARCHAR(100),
                classification VARCHAR(255),
                department VARCHAR(255),
                dated VARCHAR(255),
                medium TEXT,
                dimensions VARCHAR(500),
                creditline TEXT,
                division VARCHAR(255),
                objectnumber VARCHAR(100)
            )
        """)
        
        # Media table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS artifactmedia (
                id INT AUTO_INCREMENT PRIMARY KEY,
                artifact_id INT,
                media_id VARCHAR(255),
                media_url TEXT,
                media_type VARCHAR(50),
                width INT,
                height INT,
                FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(artifact_id)
            )
        """)
        
        # Colors table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS artifactcolors (
                id INT AUTO_INCREMENT PRIMARY KEY,
                artifact_id INT,
                color_hex VARCHAR(10),
                color_name VARCHAR(100),
                color_percent FLOAT,
                color_spectrum VARCHAR(100),
                FOREIGN KEY (artifact_id) REFERENCES artifactmetadata(artifact_id)
            )
        """)
        
        self.connection.commit()
    
    def load_metadata(self, df):
        """Bulk insert metadata"""
        cursor = self.connection.cursor()
        
        insert_query = """
            INSERT INTO artifactmetadata 
            (artifact_id, title, culture, century, classification, department, 
             dated, medium, dimensions, creditline, division, objectnumber)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE title=VALUES(title)
        """
        
        data = df.values.tolist()
        cursor.executemany(insert_query, data)
        self.connection.commit()
    
    def load_media(self, df):
        """Bulk insert media data"""
        cursor = self.connection.cursor()
        
        insert_query = """
            INSERT INTO artifactmedia 
            (artifact_id, media_id, media_url, media_type, width, height)
            VALUES (%s, %s, %s, %s, %s, %s)
        """
        
        data = df.values.tolist()
        cursor.executemany(insert_query, data)
        self.connection.commit()
    
    def execute_query(self, query):
        """Execute analytical query"""
        cursor = self.connection.cursor(dictionary=True)
        cursor.execute(query)
        return cursor.fetchall()

# Usage
db = ArtifactDatabase(DB_CONFIG)
db.connect()
db.create_tables()

# Load transformed data
metadata_df = transform_artifact_metadata(artifacts)
media_df = transform_artifact_media(artifacts)
colors_df = transform_artifact_colors(artifacts)

db.load_metadata(metadata_df)
db.load_media(media_df)
```

### 4. Analytical SQL Queries

```python
# Sample analytical queries
ANALYTICAL_QUERIES = {
    "Artifacts by Culture": """
        SELECT culture, COUNT(*) as artifact_count
        FROM artifactmetadata
        WHERE culture IS NOT NULL
        GROUP BY culture
        ORDER BY artifact_count DESC
        LIMIT 15
    """,
    
    "Artifacts by Century": """
        SELECT century, COUNT(*) as count
        FROM artifactmetadata
        WHERE century IS NOT NULL
        GROUP BY century
        ORDER BY count DESC
        LIMIT 10
    """,
    
    "Department Distribution": """
        SELECT department, COUNT(*) as total_artifacts
        FROM artifactmetadata
        WHERE department IS NOT NULL
        GROUP BY department
        ORDER BY total_artifacts DESC
    """,
    
    "Top Colors Used": """
        SELECT color_name, COUNT(*) as usage_count,
               AVG(color_percent) as avg_percentage
        FROM artifactcolors
        WHERE color_name IS NOT NULL
        GROUP BY color_name
        ORDER BY usage_count DESC
        LIMIT 10
    """,
    
    "Media Availability": """
        SELECT 
            COUNT(DISTINCT artifact_id) as artifacts_with_media,
            COUNT(*) as total_media_items,
            AVG(width) as avg_width,
            AVG(height) as avg_height
        FROM artifactmedia
    """,
    
    "Classification Analysis": """
        SELECT classification, 
               COUNT(*) as count,
               COUNT(DISTINCT culture) as cultures
        FROM artifactmetadata
        WHERE classification IS NOT NULL
        GROUP BY classification
        ORDER BY count DESC
        LIMIT 20
    """
}
```

### 5. Streamlit Dashboard

```python
import streamlit as st
import plotly.express as px
import pandas as pd

def create_dashboard():
    st.set_page_config(page_title="Harvard Artifacts Analytics", layout="wide")
    
    st.title("🏛️ Harvard Art Museums Analytics Dashboard")
    
    # Sidebar configuration
    st.sidebar.header("Configuration")
    
    # Database connection
    db = ArtifactDatabase(DB_CONFIG)
    db.connect()
    
    # Query selection
    query_name = st.sidebar.selectbox(
        "Select Analysis",
        list(ANALYTICAL_QUERIES.keys())
    )
    
    if st.sidebar.button("Run Query"):
        with st.spinner("Executing query..."):
            query = ANALYTICAL_QUERIES[query_name]
            results = db.execute_query(query)
            df = pd.DataFrame(results)
            
            # Display results
            st.subheader(f"📊 {query_name}")
            
            col1, col2 = st.columns([2, 1])
            
            with col1:
                st.dataframe(df, use_container_width=True)
            
            with col2:
                st.code(query, language="sql")
            
            # Auto-generate visualization
            if len(df.columns) >= 2:
                fig = px.bar(
                    df.head(15),
                    x=df.columns[0],
                    y=df.columns[1],
                    title=query_name,
                    labels={df.columns[0]: df.columns[0].replace('_', ' ').title()}
                )
                st.plotly_chart(fig, use_container_width=True)
    
    # ETL section
    st.sidebar.header("ETL Pipeline")
    
    num_records = st.sidebar.number_input("Records to fetch", 100, 5000, 500)
    
    if st.sidebar.button("Run ETL"):
        with st.spinner("Fetching data from API..."):
            api_key = os.getenv('HARVARD_API_KEY')
            client = HarvardAPIClient(api_key)
            artifacts = client.fetch_all_artifacts(max_records=num_records)
            
            st.success(f"Fetched {len(artifacts)} artifacts")
            
            # Transform
            metadata_df = transform_artifact_metadata(artifacts)
            media_df = transform_artifact_media(artifacts)
            colors_df = transform_artifact_colors(artifacts)
            
            # Load
            db.load_metadata(metadata_df)
            db.load_media(media_df)
            
            st.success("ETL completed successfully!")

if __name__ == "__main__":
    create_dashboard()
```

## Running the Application

```bash
# Start Streamlit app
streamlit run app.py

# Access at http://localhost:8501
```

## Common Patterns

### Pattern 1: Incremental Data Loading

```python
def get_last_artifact_id(db):
    """Get the last loaded artifact ID"""
    query = "SELECT MAX(artifact_id) as max_id FROM artifactmetadata"
    result = db.execute_query(query)
    return result[0]['max_id'] if result else 0

def incremental_load(db, api_key):
    """Load only new artifacts"""
    last_id = get_last_artifact_id(db)
    client = HarvardAPIClient(api_key)
    
    # Fetch artifacts with ID > last_id
    # Implementation depends on API filtering capabilities
    pass
```

### Pattern 2: Data Quality Checks

```python
def validate_artifacts(df):
    """Validate artifact data quality"""
    issues = []
    
    # Check for missing IDs
    if df['artifact_id'].isnull().any():
        issues.append("Missing artifact IDs found")
    
    # Check for duplicates
    if df['artifact_id'].duplicated().any():
        issues.append("Duplicate artifact IDs found")
    
    # Check required fields
    required = ['title', 'artifact_id']
    for field in required:
        null_count = df[field].isnull().sum()
        if null_count > 0:
            issues.append(f"{null_count} records missing {field}")
    
    return issues
```

### Pattern 3: Batch Processing

```python
def batch_load_artifacts(db, artifacts, batch_size=100):
    """Load artifacts in batches for better performance"""
    metadata_df = transform_artifact_metadata(artifacts)
    
    for i in range(0, len(metadata_df), batch_size):
        batch = metadata_df.iloc[i:i+batch_size]
        db.load_metadata(batch)
        print(f"Loaded batch {i//batch_size + 1}")
```

## Troubleshooting

### API Rate Limiting
```python
import time
from requests.exceptions import HTTPError

def fetch_with_retry(client, max_retries=3):
    for attempt in range(max_retries):
        try:
            return client.fetch_artifacts()
        except HTTPError as e:
            if e.response.status_code == 429:
                wait_time = 2 ** attempt
                time.sleep(wait_time)
            else:
                raise
    raise Exception("Max retries exceeded")
```

### Database Connection Issues
```python
def reconnect_database(db, max_attempts=3):
    for attempt in range(max_attempts):
        try:
            db.connect()
            return True
        except Error as e:
            print(f"Connection attempt {attempt + 1} failed: {e}")
            time.sleep(5)
    return False
```

### Memory Management for Large Datasets
```python
def stream_large_dataset(client, db, total_records=10000, chunk_size=500):
    """Process large datasets in chunks"""
    for offset in range(0, total_records, chunk_size):
        artifacts = client.fetch_artifacts(size=chunk_size, page=offset//chunk_size + 1)
        
        # Process and load immediately
        metadata_df = transform_artifact_metadata(artifacts)
        db.load_metadata(metadata_df)
        
        # Free memory
        del artifacts, metadata_df
```

This skill provides everything needed to build production-ready ETL pipelines and analytics dashboards using museum collection APIs.

Source

Creator's repository · aradotso/data-skills

View on GitHub

Security

Security checks in progress
Results will appear here once audits complete
What this skill can do
Reads your filesConnects to the internetRuns code on your machine
Checked by 3 independent security firms
Does it try to trick the AI?Not yet checkedPending · Gen Agent Trust Hub
Does it sneak in hidden code?Not yet checkedPending · Socket
Does it have known bugs?Not yet checkedPending · Snyk