retail-etl-pipeline-medallion

End-to-end retail ETL pipeline using PySpark, SQL Server, and Medallion Architecture (Bronze/Silver/Gold layers) for data warehousing

Skill file

Preview skill file
---
name: retail-etl-pipeline-medallion
description: End-to-end retail ETL pipeline using PySpark, SQL Server, and Medallion Architecture (Bronze/Silver/Gold layers) for data warehousing
triggers:
  - set up a retail ETL pipeline with medallion architecture
  - build a bronze silver gold data warehouse for retail
  - create an ETL pipeline for retail sales and inventory data
  - implement medallion architecture for retail analytics
  - process retail data through bronze silver and gold layers
  - design a data pipeline for retail inventory and sales tracking
  - build a data warehouse following medallion architecture pattern
  - create stored procedures for retail ETL transformations
---

# Retail ETL Pipeline - Medallion Architecture Skill

> Skill by [ara.so](https://ara.so) — Data Skills collection

## Overview

The Retail ETL Pipeline project implements a complete data engineering solution for retail operations using the **Medallion Architecture** pattern (Bronze → Silver → Gold layers). It handles complex retail scenarios including:

- **Inventory shrinkage** resolution
- **Recipe conversions** for meat/poultry products
- **Supplier rebate tier** tracking
- **Multi-branch sales** consolidation
- **Stock level** management across locations

The pipeline processes raw CSV data from CRM/ERP systems through three progressive quality layers, ultimately delivering a "Single Version of Truth" for business intelligence.

## Architecture Layers

### Bronze Layer (Raw Ingestion)
- Raw data ingestion from CSV files
- Minimal transformation, preserving source format
- Audit columns: `_loaded_at`, `_source_file`

### Silver Layer (Cleaned & Standardized)
- Data type enforcement
- Deduplication
- Standardization (dates, currencies, product codes)
- Business rule validation

### Gold Layer (Business-Ready Analytics)
- Aggregated metrics
- Calculated KPIs (inventory turnover, shrinkage %)
- Dimensional models for BI tools

## Installation & Setup

### Prerequisites

```bash
# Required tools
- Docker & Docker Compose
- SQL Server 2019+
- Python 3.8+
- PySpark 3.x
- Apache Airflow (optional, for orchestration)
```

### Infrastructure Setup

```bash
# Clone the repository
git clone https://github.com/EsraaSolimanMubarak/Retail-ETL-Pipeline.git
cd Retail-ETL-Pipeline

# Start SQL Server container
docker-compose up -d

# Wait for SQL Server to be ready
docker logs -f retail-sql-server
```

### Database Initialization

```bash
# Connect to SQL Server and run schema setup
sqlcmd -S localhost,1433 -U sa -P ${SQL_SERVER_PASSWORD} \
  -i sql_scripts/00_create_database_and_schemas.sql
```

## Key SQL Scripts Execution Order

The pipeline consists of 13+ SQL scripts that must be executed sequentially:

```bash
# 1. Create database and schemas
sql_scripts/00_create_database_and_schemas.sql

# 2. Bronze layer ingestion
sql_scripts/01_bronze_products.sql
sql_scripts/02_bronze_sales.sql
sql_scripts/03_bronze_stock.sql

# 3. Silver layer transformations
sql_scripts/04_silver_products.sql
sql_scripts/05_silver_sales.sql
sql_scripts/06_silver_stock.sql

# 4. Gold layer aggregations
sql_scripts/07_gold_sales_summary.sql
sql_scripts/08_gold_inventory_metrics.sql
sql_scripts/09_gold_product_performance.sql

# 5. Rebuild pipeline (if needed)
sql_scripts/12_rebuild_inventory_pipeline_final_fix.sql
```

## Core ETL Patterns

### Pattern 1: Bronze Layer Ingestion (Raw CSV → SQL)

```sql
-- Example: Bronze Products Table
CREATE TABLE bronze.products (
    product_id INT,
    product_name NVARCHAR(255),
    category NVARCHAR(100),
    unit_price DECIMAL(10,2),
    supplier_id INT,
    _loaded_at DATETIME2 DEFAULT GETDATE(),
    _source_file NVARCHAR(500)
);

-- Bulk insert from CSV
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
    FIELDTERMINATOR = ',',
    ROWTERMINATOR = '\n',
    FIRSTROW = 2,
    TABLOCK
);
```

### Pattern 2: Silver Layer Cleansing

```sql
-- Example: Silver Products with Data Quality Rules
CREATE PROCEDURE silver.usp_transform_products
AS
BEGIN
    TRUNCATE TABLE silver.products;
    
    INSERT INTO silver.products (
        product_id,
        product_name,
        category,
        unit_price,
        supplier_id,
        is_active,
        processed_at
    )
    SELECT DISTINCT
        product_id,
        UPPER(TRIM(product_name)) AS product_name,
        COALESCE(category, 'UNCATEGORIZED') AS category,
        CASE 
            WHEN unit_price < 0 THEN 0 
            ELSE unit_price 
        END AS unit_price,
        supplier_id,
        1 AS is_active,
        GETDATE() AS processed_at
    FROM bronze.products
    WHERE product_id IS NOT NULL
      AND product_name IS NOT NULL;
END;
```

### Pattern 3: Gold Layer Aggregations

```sql
-- Example: Sales Summary by Branch and Product
CREATE PROCEDURE gold.usp_sales_summary
AS
BEGIN
    TRUNCATE TABLE gold.sales_summary;
    
    INSERT INTO gold.sales_summary (
        branch_name,
        product_category,
        total_quantity_sold,
        total_revenue,
        avg_unit_price,
        transaction_count,
        report_date
    )
    SELECT 
        s.branch_name,
        p.category AS product_category,
        SUM(s.quantity) AS total_quantity_sold,
        SUM(s.quantity * s.unit_price) AS total_revenue,
        AVG(s.unit_price) AS avg_unit_price,
        COUNT(DISTINCT s.transaction_id) AS transaction_count,
        CAST(GETDATE() AS DATE) AS report_date
    FROM silver.sales s
    INNER JOIN silver.products p ON s.product_id = p.product_id
    GROUP BY s.branch_name, p.category;
END;
```

### Pattern 4: Inventory Shrinkage Calculation

```sql
-- Complex business logic: Detect inventory discrepancies
CREATE PROCEDURE gold.usp_inventory_shrinkage
AS
BEGIN
    WITH StockLevels AS (
        SELECT 
            product_id,
            branch_name,
            SUM(quantity_on_hand) AS current_stock
        FROM silver.stock
        GROUP BY product_id, branch_name
    ),
    ExpectedStock AS (
        SELECT 
            s.product_id,
            s.branch_name,
            sl.current_stock - COALESCE(SUM(s.quantity), 0) AS expected_stock
        FROM StockLevels sl
        LEFT JOIN silver.sales s 
            ON sl.product_id = s.product_id 
            AND sl.branch_name = s.branch_name
        GROUP BY s.product_id, s.branch_name, sl.current_stock
    )
    INSERT INTO gold.inventory_shrinkage (
        product_id,
        branch_name,
        current_stock,
        expected_stock,
        shrinkage_qty,
        shrinkage_percent,
        calculated_at
    )
    SELECT 
        sl.product_id,
        sl.branch_name,
        sl.current_stock,
        es.expected_stock,
        sl.current_stock - es.expected_stock AS shrinkage_qty,
        CASE 
            WHEN es.expected_stock > 0 
            THEN ((sl.current_stock - es.expected_stock) * 100.0 / es.expected_stock)
            ELSE 0 
        END AS shrinkage_percent,
        GETDATE() AS calculated_at
    FROM StockLevels sl
    INNER JOIN ExpectedStock es 
        ON sl.product_id = es.product_id 
        AND sl.branch_name = es.branch_name;
END;
```

## PySpark Integration (Optional)

For large-scale data processing, integrate PySpark for Silver/Gold transformations:

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RetailETL-Silver") \
    .config("spark.sql.warehouse.dir", "/data/warehouse") \
    .getOrCreate()

# Read Bronze layer (Parquet or Delta)
bronze_sales_df = spark.read.parquet("/data/bronze/sales/")

# Silver transformations
silver_sales_df = bronze_sales_df \
    .dropDuplicates(["transaction_id"]) \
    .filter(col("quantity") > 0) \
    .withColumn("unit_price", 
                when(col("unit_price") < 0, 0).otherwise(col("unit_price"))) \
    .withColumn("total_amount", col("quantity") * col("unit_price"))

# Write to Silver layer
silver_sales_df.write \
    .mode("overwrite") \
    .partitionBy("branch_name", "sale_date") \
    .parquet("/data/silver/sales/")

# Gold aggregations
gold_sales_summary = silver_sales_df \
    .groupBy("branch_name", "product_category") \
    .agg(
        sum("quantity").alias("total_quantity"),
        sum("total_amount").alias("total_revenue"),
        avg("unit_price").alias("avg_unit_price"),
        count("transaction_id").alias("transaction_count")
    )

gold_sales_summary.write \
    .mode("overwrite") \
    .parquet("/data/gold/sales_summary/")
```

## Airflow DAG Example

Orchestrate the entire pipeline with Apache Airflow:

```python
from airflow import DAG
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'retail_etl_medallion',
    default_args=default_args,
    description='Daily Retail ETL Pipeline',
    schedule_interval='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    # Bronze ingestion
    ingest_products = MsSqlOperator(
        task_id='bronze_ingest_products',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/01_bronze_products.sql',
    )
    
    ingest_sales = MsSqlOperator(
        task_id='bronze_ingest_sales',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/02_bronze_sales.sql',
    )
    
    ingest_stock = MsSqlOperator(
        task_id='bronze_ingest_stock',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/03_bronze_stock.sql',
    )
    
    # Silver transformations
    transform_products = MsSqlOperator(
        task_id='silver_transform_products',
        mssql_conn_id='retail_sql_server',
        sql='EXEC silver.usp_transform_products;',
    )
    
    transform_sales = MsSqlOperator(
        task_id='silver_transform_sales',
        mssql_conn_id='retail_sql_server',
        sql='EXEC silver.usp_transform_sales;',
    )
    
    # Gold aggregations
    aggregate_sales_summary = MsSqlOperator(
        task_id='gold_sales_summary',
        mssql_conn_id='retail_sql_server',
        sql='EXEC gold.usp_sales_summary;',
    )
    
    aggregate_inventory_shrinkage = MsSqlOperator(
        task_id='gold_inventory_shrinkage',
        mssql_conn_id='retail_sql_server',
        sql='EXEC gold.usp_inventory_shrinkage;',
    )
    
    # Define dependencies
    [ingest_products, ingest_sales, ingest_stock] >> transform_products
    [ingest_sales] >> transform_sales
    [transform_products, transform_sales] >> aggregate_sales_summary
    [transform_products, ingest_stock] >> aggregate_inventory_shrinkage
```

## Configuration

### Environment Variables

```bash
# SQL Server connection
export SQL_SERVER_HOST=localhost
export SQL_SERVER_PORT=1433
export SQL_SERVER_USER=sa
export SQL_SERVER_PASSWORD=${YOUR_SECURE_PASSWORD}
export SQL_SERVER_DATABASE=RetailDataWarehouse

# Data paths
export DATA_SOURCE_PATH=/data_source
export BRONZE_LAYER_PATH=/data/bronze
export SILVER_LAYER_PATH=/data/silver
export GOLD_LAYER_PATH=/data/gold

# Airflow (if used)
export AIRFLOW_CONN_RETAIL_SQL_SERVER="mssql://${SQL_SERVER_USER}:${SQL_SERVER_PASSWORD}@${SQL_SERVER_HOST}:${SQL_SERVER_PORT}/${SQL_SERVER_DATABASE}"
```

### Docker Compose Configuration

```yaml
# docker-compose.yml
version: '3.8'

services:
  sqlserver:
    image: mcr.microsoft.com/mssql/server:2019-latest
    container_name: retail-sql-server
    environment:
      - ACCEPT_EULA=Y
      - SA_PASSWORD=${SQL_SERVER_PASSWORD}
      - MSSQL_PID=Developer
    ports:
      - "1433:1433"
    volumes:
      - ./data_source:/data_source
      - ./sql_scripts:/sql_scripts
      - sqlserver_data:/var/opt/mssql

volumes:
  sqlserver_data:
```

## Common Patterns & Use Cases

### Use Case 1: Multi-Branch Sales Consolidation

```sql
-- Combine sales from Alex, Cairo, and Giza branches
CREATE VIEW gold.vw_consolidated_sales AS
SELECT 
    'Alexandria' AS branch_name,
    * 
FROM bronze.alex_sales
UNION ALL
SELECT 
    'Cairo' AS branch_name,
    * 
FROM bronze.cairo_sales
UNION ALL
SELECT 
    'Giza' AS branch_name,
    * 
FROM bronze.giza_sales;
```

### Use Case 2: Recipe Yield Tracking (Meat/Poultry)

```sql
-- Track conversion rates for processed meat products
CREATE TABLE silver.recipe_conversions (
    recipe_id INT PRIMARY KEY,
    raw_product_id INT,
    finished_product_id INT,
    conversion_ratio DECIMAL(5,2), -- e.g., 0.85 (15% waste)
    effective_date DATE
);

-- Calculate actual yield vs. expected
SELECT 
    rc.recipe_id,
    SUM(s.quantity * rc.conversion_ratio) AS expected_yield,
    SUM(stock.quantity_on_hand) AS actual_yield,
    (SUM(stock.quantity_on_hand) - SUM(s.quantity * rc.conversion_ratio)) AS yield_variance
FROM silver.sales s
INNER JOIN silver.recipe_conversions rc ON s.product_id = rc.raw_product_id
INNER JOIN silver.stock stock ON rc.finished_product_id = stock.product_id
GROUP BY rc.recipe_id;
```

### Use Case 3: Supplier Rebate Tier Tracking

```sql
-- Track purchase volume for supplier rebate calculations
CREATE TABLE gold.supplier_rebate_tiers (
    supplier_id INT,
    total_purchases DECIMAL(15,2),
    rebate_tier VARCHAR(20),
    rebate_percentage DECIMAL(5,2),
    calculated_at DATETIME2
);

INSERT INTO gold.supplier_rebate_tiers
SELECT 
    p.supplier_id,
    SUM(s.quantity * s.unit_price) AS total_purchases,
    CASE 
        WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 'Platinum'
        WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 'Gold'
        WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 'Silver'
        ELSE 'Bronze'
    END AS rebate_tier,
    CASE 
        WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 5.0
        WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 3.5
        WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 2.0
        ELSE 1.0
    END AS rebate_percentage,
    GETDATE() AS calculated_at
FROM silver.sales s
INNER JOIN silver.products p ON s.product_id = p.product_id
GROUP BY p.supplier_id;
```

## Troubleshooting

### Issue 1: CSV Bulk Insert Fails

```sql
-- Check file path permissions
EXEC xp_cmdshell 'dir C:\data_source\*.csv';

-- Use ERRORFILE to capture bad rows
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
    FIELDTERMINATOR = ',',
    ROWTERMINATOR = '\n',
    FIRSTROW = 2,
    ERRORFILE = '/data_source/errors/products_errors.csv',
    MAXERRORS = 10
);
```

### Issue 2: Duplicate Records in Silver Layer

```sql
-- Add deduplication logic with ROW_NUMBER
WITH DeduplicatedSales AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY transaction_id 
            ORDER BY _loaded_at DESC
        ) AS rn
    FROM bronze.sales
)
INSERT INTO silver.sales
SELECT * 
FROM DeduplicatedSales
WHERE rn = 1;
```

### Issue 3: Performance Optimization

```sql
-- Create indexes on frequently joined columns
CREATE NONCLUSTERED INDEX idx_sales_product_id 
    ON silver.sales(product_id) INCLUDE (quantity, unit_price);

CREATE NONCLUSTERED INDEX idx_stock_product_branch 
    ON silver.stock(product_id, branch_name) INCLUDE (quantity_on_hand);

-- Partition large tables by date
CREATE PARTITION FUNCTION pf_sales_date (DATE)
AS RANGE RIGHT FOR VALUES (
    '2025-01-01', '2025-02-01', '2025-03-01', '2025-04-01'
);
```

### Issue 4: Rebuild Entire Pipeline

```bash
# Use the rebuild script to reset and reprocess all layers
sqlcmd -S ${SQL_SERVER_HOST},${SQL_SERVER_PORT} \
  -U ${SQL_SERVER_USER} -P ${SQL_SERVER_PASSWORD} \
  -d RetailDataWarehouse \
  -i sql_scripts/12_rebuild_inventory_pipeline_final_fix.sql
```

## Testing Data Quality

```sql
-- Data quality checks for Silver layer
SELECT 'Products' AS layer,
    COUNT(*) AS total_records,
    COUNT(DISTINCT product_id) AS unique_products,
    SUM(CASE WHEN unit_price < 0 THEN 1 ELSE 0 END) AS negative_prices,
    SUM(CASE WHEN product_name IS NULL THEN 1 ELSE 0 END) AS null_names
FROM silver.products

UNION ALL

SELECT 'Sales' AS layer,
    COUNT(*) AS total_records,
    COUNT(DISTINCT transaction_id) AS unique_transactions,
    SUM(CASE WHEN quantity <= 0 THEN 1 ELSE 0 END) AS invalid_quantity,
    SUM(CASE WHEN product_id NOT IN (SELECT product_id FROM silver.products) THEN 1 ELSE 0 END) AS orphaned_products
FROM silver.sales;
```

## Best Practices

1. **Always process through layers sequentially**: Bronze → Silver → Gold
2. **Use stored procedures** for reusable transformations
3. **Add audit columns** (`_loaded_at`, `_source_file`, `processed_at`)
4. **Implement idempotency**: Truncate-and-load or upsert patterns
5. **Partition large tables** by date or branch for performance
6. **Create comprehensive indexes** on join and filter columns
7. **Use CTEs** for complex business logic readability
8. **Test data quality** at each layer transition
9. **Version control** all SQL scripts and configurations
10. **Monitor pipeline execution** with Airflow or equivalent orchestrator

Source

Creator's repository · aradotso/data-skills

View on GitHub

Security

Security checks in progress
Results will appear here once audits complete
What this skill can do
Reads your filesConnects to the internetRuns code on your machine
Checked by 3 independent security firms
Does it try to trick the AI?Not yet checkedPending · Gen Agent Trust Hub
Does it sneak in hidden code?Not yet checkedPending · Socket
Does it have known bugs?Not yet checkedPending · Snyk