retail-etl-medallion-pipeline

End-to-end retail ETL pipeline using Medallion Architecture (Bronze/Silver/Gold) with TSQL, PySpark, and Airflow for inventory, sales, and supplier data processing

Skill file

Preview skill file
---
name: retail-etl-medallion-pipeline
description: End-to-end retail ETL pipeline using Medallion Architecture (Bronze/Silver/Gold) with TSQL, PySpark, and Airflow for inventory, sales, and supplier data processing
triggers:
  - build a retail data warehouse with medallion architecture
  - create bronze silver gold layers for retail analytics
  - set up retail ETL pipeline with inventory tracking
  - implement medallion architecture for sales data
  - process retail data with bronze silver gold pattern
  - design data warehouse for hypermarket or retail business
  - transform retail sales and inventory data by layers
  - orchestrate retail ETL with airflow and spark
---

# Retail ETL Medallion Pipeline Skill

> Skill by [ara.so](https://ara.so) — Data Skills collection.

## Overview

This project implements a production-grade **Medallion Architecture** ETL pipeline for retail/hypermarket data, handling complex business logic like inventory shrinkage, meat/poultry recipe conversions, supplier rebate tiers, and multi-branch sales consolidation. The architecture follows three data quality layers:

- **Bronze Layer**: Raw data ingestion from CSV sources (sales, stock, products)
- **Silver Layer**: Cleaned, standardized, and business-rule-applied data
- **Gold Layer**: Aggregated, analytics-ready dimensional models

The pipeline processes:
- Multi-branch sales transactions (Alex, Cairo, Giza)
- Product catalogs with recipe/yield conversions
- Stock/inventory tracking across locations
- Supplier rebate calculations

## Project Structure

```
Retail-Data-Warehouse/
├── data_source/              # Raw CSV files (CRM/ERP exports)
│   ├── 000.Hypermarket Products.csv
│   ├── 001-003.*.Branch Sales.csv
│   └── 004-006.*.Stock.csv
├── sql_scripts/              # TSQL stored procedures for each layer
│   ├── 00_create_database_and_schemas.sql
│   ├── 01-04_bronze_*.sql
│   ├── 05-08_silver_*.sql
│   └── 09-12_gold_*.sql
├── BI_Team_Analysis/         # Power BI dashboards
└── docker-compose.yml        # SQL Server container setup
```

## Installation & Setup

### 1. Infrastructure Setup (SQL Server)

Using Docker:

```bash
# Start SQL Server container
docker-compose up -d

# Verify container is running
docker ps | grep sqlserver
```

Or use an existing SQL Server instance (2017+).

### 2. Database Initialization

```bash
# Connect to SQL Server and create database structure
sqlcmd -S localhost -U sa -P $SQL_SA_PASSWORD -i sql_scripts/00_create_database_and_schemas.sql
```

This creates:
- Database: `RetailDataWarehouse`
- Schemas: `bronze`, `silver`, `gold`, `staging`

### 3. Load Raw Data to Bronze Layer

Place CSV files in accessible location, then run:

```sql
-- Execute Bronze layer ingestion procedures
EXEC bronze.usp_LoadProducts;
EXEC bronze.usp_LoadSales;
EXEC bronze.usp_LoadStock;
```

Or execute all Bronze scripts sequentially:

```bash
for script in sql_scripts/01_bronze_*.sql sql_scripts/02_bronze_*.sql sql_scripts/03_bronze_*.sql sql_scripts/04_bronze_*.sql; do
    sqlcmd -S localhost -U sa -P $SQL_SA_PASSWORD -i "$script"
done
```

## Key Architecture Patterns

### Bronze Layer (Raw Ingestion)

**Purpose**: Land raw data with minimal transformation. Add audit columns only.

```sql
-- Example: Bronze Products Table Structure
CREATE TABLE bronze.Products (
    ProductID INT,
    ProductName NVARCHAR(255),
    Category NVARCHAR(100),
    SubCategory NVARCHAR(100),
    UnitPrice DECIMAL(10,2),
    SupplierID INT,
    RecipeYield DECIMAL(5,2),  -- For meat/poultry conversions
    LoadTimestamp DATETIME2 DEFAULT GETDATE(),
    SourceFile NVARCHAR(500)
);

-- Bronze Load Pattern
CREATE PROCEDURE bronze.usp_LoadProducts
AS
BEGIN
    TRUNCATE TABLE bronze.Products;
    
    BULK INSERT bronze.Products
    FROM '/data/000.Hypermarket Products.csv'
    WITH (
        FIELDTERMINATOR = ',',
        ROWTERMINATOR = '\n',
        FIRSTROW = 2,
        ERRORFILE = '/logs/products_errors.txt'
    );
    
    -- Add audit metadata
    UPDATE bronze.Products
    SET LoadTimestamp = GETDATE(),
        SourceFile = '000.Hypermarket Products.csv';
END;
```

### Silver Layer (Cleaned & Standardized)

**Purpose**: Apply data quality rules, deduplication, and business transformations.

```sql
-- Example: Silver Sales with Business Rules
CREATE PROCEDURE silver.usp_TransformSales
AS
BEGIN
    TRUNCATE TABLE silver.Sales;
    
    INSERT INTO silver.Sales (
        SaleID,
        BranchID,
        ProductID,
        SaleDate,
        Quantity,
        UnitPrice,
        TotalAmount,
        AdjustedQuantity,  -- Recipe conversion applied
        DataQualityScore
    )
    SELECT 
        s.SaleID,
        s.BranchID,
        s.ProductID,
        CAST(s.SaleDate AS DATE) AS SaleDate,
        s.Quantity,
        s.UnitPrice,
        s.Quantity * s.UnitPrice AS TotalAmount,
        -- Apply recipe yield for meat/poultry
        CASE 
            WHEN p.Category = 'Meat & Poultry' 
            THEN s.Quantity * ISNULL(p.RecipeYield, 1.0)
            ELSE s.Quantity
        END AS AdjustedQuantity,
        -- Data quality scoring
        CASE 
            WHEN s.Quantity > 0 AND s.UnitPrice > 0 THEN 100
            WHEN s.Quantity IS NULL OR s.UnitPrice IS NULL THEN 0
            ELSE 50
        END AS DataQualityScore
    FROM bronze.Sales s
    INNER JOIN bronze.Products p ON s.ProductID = p.ProductID
    WHERE s.Quantity > 0  -- Filter invalid records
      AND s.SaleDate >= DATEADD(YEAR, -2, GETDATE());  -- Keep 2 years
END;
```

**Key Silver Transformations**:
- Date standardization
- Recipe yield conversions for perishables
- Duplicate removal
- Null handling and imputation
- Data quality scoring

### Gold Layer (Analytics-Ready Aggregates)

**Purpose**: Create dimensional models and pre-aggregated metrics for BI tools.

```sql
-- Example: Gold Inventory Turnover Metrics
CREATE PROCEDURE gold.usp_BuildInventoryMetrics
AS
BEGIN
    TRUNCATE TABLE gold.InventoryTurnover;
    
    INSERT INTO gold.InventoryTurnover (
        ProductID,
        ProductName,
        Category,
        BranchID,
        Month,
        TotalSalesQty,
        AvgStockLevel,
        TurnoverRatio,
        ShrinkagePercent,
        ReorderAlert
    )
    SELECT 
        p.ProductID,
        p.ProductName,
        p.Category,
        s.BranchID,
        DATEPART(MONTH, s.SaleDate) AS Month,
        SUM(s.AdjustedQuantity) AS TotalSalesQty,
        AVG(st.StockQuantity) AS AvgStockLevel,
        -- Turnover = Sales / Avg Stock
        CASE 
            WHEN AVG(st.StockQuantity) > 0 
            THEN SUM(s.AdjustedQuantity) / AVG(st.StockQuantity)
            ELSE 0
        END AS TurnoverRatio,
        -- Shrinkage = (Expected - Actual) / Expected
        CASE 
            WHEN SUM(st.ExpectedStock) > 0
            THEN ((SUM(st.ExpectedStock) - SUM(st.StockQuantity)) * 100.0) / SUM(st.ExpectedStock)
            ELSE 0
        END AS ShrinkagePercent,
        -- Alert if turnover < 2 (slow-moving inventory)
        CASE 
            WHEN SUM(s.AdjustedQuantity) / NULLIF(AVG(st.StockQuantity), 0) < 2 
            THEN 'Reorder Needed'
            ELSE 'OK'
        END AS ReorderAlert
    FROM silver.Sales s
    INNER JOIN silver.Products p ON s.ProductID = p.ProductID
    LEFT JOIN silver.Stock st ON s.ProductID = st.ProductID AND s.BranchID = st.BranchID
    GROUP BY p.ProductID, p.ProductName, p.Category, s.BranchID, DATEPART(MONTH, s.SaleDate);
END;
```

**Gold Layer Tables**:
- `InventoryTurnover`: Stock efficiency metrics
- `SalesPerformance`: Revenue aggregates by branch/category
- `SupplierRebates`: Tiered rebate calculations
- `ProductMargins`: Profit analysis dimensions

## Complete Pipeline Execution

### Manual Execution (Sequential)

```sql
-- 1. Bronze: Load raw data
EXEC bronze.usp_LoadProducts;
EXEC bronze.usp_LoadSales;
EXEC bronze.usp_LoadStock;

-- 2. Silver: Apply transformations
EXEC silver.usp_TransformProducts;
EXEC silver.usp_TransformSales;
EXEC silver.usp_TransformStock;

-- 3. Gold: Build analytics aggregates
EXEC gold.usp_BuildInventoryMetrics;
EXEC gold.usp_BuildSalesPerformance;
EXEC gold.usp_BuildSupplierRebates;

-- 4. Verify row counts
SELECT 'Bronze Products' AS Layer, COUNT(*) AS RowCount FROM bronze.Products
UNION ALL
SELECT 'Silver Products', COUNT(*) FROM silver.Products
UNION ALL
SELECT 'Gold Inventory', COUNT(*) FROM gold.InventoryTurnover;
```

### Automated Pipeline Script

```bash
#!/bin/bash
# run_etl_pipeline.sh

set -e

SQL_SERVER="${SQL_SERVER:-localhost}"
SQL_USER="${SQL_USER:-sa}"
SQL_PASSWORD="${SQL_PASSWORD}"

echo "Starting Retail ETL Pipeline..."

# Bronze Layer
echo "[1/3] Loading Bronze Layer..."
sqlcmd -S "$SQL_SERVER" -U "$SQL_USER" -P "$SQL_PASSWORD" -d RetailDataWarehouse -Q "EXEC bronze.usp_LoadProducts;"
sqlcmd -S "$SQL_SERVER" -U "$SQL_USER" -P "$SQL_PASSWORD" -d RetailDataWarehouse -Q "EXEC bronze.usp_LoadSales;"
sqlcmd -S "$SQL_SERVER" -U "$SQL_USER" -P "$SQL_PASSWORD" -d RetailDataWarehouse -Q "EXEC bronze.usp_LoadStock;"

# Silver Layer
echo "[2/3] Transforming Silver Layer..."
sqlcmd -S "$SQL_SERVER" -U "$SQL_USER" -P "$SQL_PASSWORD" -d RetailDataWarehouse -Q "EXEC silver.usp_TransformProducts;"
sqlcmd -S "$SQL_SERVER" -U "$SQL_USER" -P "$SQL_PASSWORD" -d RetailDataWarehouse -Q "EXEC silver.usp_TransformSales;"
sqlcmd -S "$SQL_SERVER" -U "$SQL_USER" -P "$SQL_PASSWORD" -d RetailDataWarehouse -Q "EXEC silver.usp_TransformStock;"

# Gold Layer
echo "[3/3] Building Gold Layer..."
sqlcmd -S "$SQL_SERVER" -U "$SQL_USER" -P "$SQL_PASSWORD" -d RetailDataWarehouse -Q "EXEC gold.usp_BuildInventoryMetrics;"
sqlcmd -S "$SQL_SERVER" -U "$SQL_USER" -P "$SQL_PASSWORD" -d RetailDataWarehouse -Q "EXEC gold.usp_BuildSalesPerformance;"

echo "Pipeline completed successfully!"
```

## Configuration

### Environment Variables

```bash
# .env file for pipeline configuration
SQL_SERVER=localhost
SQL_USER=sa
SQL_PASSWORD=${SQL_SA_PASSWORD}
SQL_DATABASE=RetailDataWarehouse

# Data source paths
DATA_SOURCE_PATH=/path/to/data_source
LOGS_PATH=/var/log/retail-etl

# Airflow (if using orchestration)
AIRFLOW_HOME=/opt/airflow
AIRFLOW__CORE__DAGS_FOLDER=${AIRFLOW_HOME}/dags
```

### Docker Compose Configuration

```yaml
version: '3.8'
services:
  sqlserver:
    image: mcr.microsoft.com/mssql/server:2019-latest
    environment:
      ACCEPT_EULA: Y
      SA_PASSWORD: ${SQL_SA_PASSWORD}
      MSSQL_PID: Developer
    ports:
      - "1433:1433"
    volumes:
      - ./data_source:/data
      - ./sql_scripts:/scripts
      - sqlserver_data:/var/opt/mssql
    restart: unless-stopped

volumes:
  sqlserver_data:
```

## Business Logic Examples

### Recipe Conversion for Meat Products

```sql
-- Handle meat/poultry yield conversions
-- Example: 1kg raw chicken → 0.65kg cooked meat
CREATE FUNCTION dbo.fn_ApplyRecipeYield(
    @Quantity DECIMAL(10,2),
    @RecipeYield DECIMAL(5,2),
    @Category NVARCHAR(100)
)
RETURNS DECIMAL(10,2)
AS
BEGIN
    DECLARE @AdjustedQty DECIMAL(10,2);
    
    IF @Category IN ('Meat & Poultry', 'Seafood')
        SET @AdjustedQty = @Quantity * ISNULL(@RecipeYield, 1.0);
    ELSE
        SET @AdjustedQty = @Quantity;
    
    RETURN @AdjustedQty;
END;
```

### Supplier Rebate Tiers

```sql
-- Calculate dynamic rebate percentages based on purchase volume
CREATE PROCEDURE gold.usp_CalculateSupplierRebates
AS
BEGIN
    INSERT INTO gold.SupplierRebates (
        SupplierID,
        TotalPurchaseAmount,
        RebateTier,
        RebatePercent,
        RebateAmount
    )
    SELECT 
        SupplierID,
        SUM(TotalAmount) AS TotalPurchaseAmount,
        CASE 
            WHEN SUM(TotalAmount) >= 100000 THEN 'Platinum'
            WHEN SUM(TotalAmount) >= 50000 THEN 'Gold'
            WHEN SUM(TotalAmount) >= 25000 THEN 'Silver'
            ELSE 'Bronze'
        END AS RebateTier,
        CASE 
            WHEN SUM(TotalAmount) >= 100000 THEN 5.0
            WHEN SUM(TotalAmount) >= 50000 THEN 3.0
            WHEN SUM(TotalAmount) >= 25000 THEN 1.5
            ELSE 0.0
        END AS RebatePercent,
        SUM(TotalAmount) * 
        CASE 
            WHEN SUM(TotalAmount) >= 100000 THEN 0.05
            WHEN SUM(TotalAmount) >= 50000 THEN 0.03
            WHEN SUM(TotalAmount) >= 25000 THEN 0.015
            ELSE 0.0
        END AS RebateAmount
    FROM silver.Sales s
    INNER JOIN silver.Products p ON s.ProductID = p.ProductID
    GROUP BY SupplierID;
END;
```

### Inventory Shrinkage Detection

```sql
-- Identify products with abnormal shrinkage
SELECT 
    p.ProductName,
    p.Category,
    st.BranchID,
    st.ExpectedStock,
    st.StockQuantity AS ActualStock,
    ((st.ExpectedStock - st.StockQuantity) * 100.0) / st.ExpectedStock AS ShrinkagePercent
FROM silver.Stock st
INNER JOIN silver.Products p ON st.ProductID = p.ProductID
WHERE st.ExpectedStock > 0
  AND ((st.ExpectedStock - st.StockQuantity) * 100.0) / st.ExpectedStock > 5.0  -- >5% shrinkage threshold
ORDER BY ShrinkagePercent DESC;
```

## Data Quality Checks

### Validation Queries

```sql
-- Check for duplicate sales records
SELECT SaleID, COUNT(*) AS Duplicates
FROM bronze.Sales
GROUP BY SaleID
HAVING COUNT(*) > 1;

-- Validate price consistency
SELECT 
    p.ProductID,
    p.ProductName,
    COUNT(DISTINCT s.UnitPrice) AS PriceVariations
FROM silver.Products p
INNER JOIN silver.Sales s ON p.ProductID = s.ProductID
GROUP BY p.ProductID, p.ProductName
HAVING COUNT(DISTINCT s.UnitPrice) > 3;  -- More than 3 price points

-- Check for negative stock
SELECT ProductID, BranchID, StockQuantity
FROM silver.Stock
WHERE StockQuantity < 0;

-- Data completeness metrics
SELECT 
    'Products' AS TableName,
    COUNT(*) AS TotalRows,
    SUM(CASE WHEN ProductName IS NULL THEN 1 ELSE 0 END) AS NullProductNames,
    SUM(CASE WHEN UnitPrice IS NULL THEN 1 ELSE 0 END) AS NullPrices
FROM silver.Products;
```

## Troubleshooting

### Common Issues

**Issue**: BULK INSERT fails with permission error
```sql
-- Solution: Grant read permissions to SQL Server service account
-- Or use OPENROWSET with explicit credentials
INSERT INTO bronze.Products
SELECT * FROM OPENROWSET(
    BULK '/data/000.Hypermarket Products.csv',
    FORMATFILE = '/data/products_format.xml',
    ERRORFILE = '/logs/errors.txt'
) AS DataFile;
```

**Issue**: Recipe yield conversions producing NULL values
```sql
-- Check for missing RecipeYield in Products table
SELECT ProductID, ProductName, Category, RecipeYield
FROM bronze.Products
WHERE Category IN ('Meat & Poultry', 'Seafood')
  AND RecipeYield IS NULL;

-- Fix: Set default yield to 1.0
UPDATE bronze.Products
SET RecipeYield = 1.0
WHERE RecipeYield IS NULL;
```

**Issue**: Silver layer procedure times out on large datasets
```sql
-- Solution: Add batch processing with cursor or temp tables
CREATE PROCEDURE silver.usp_TransformSalesBatch
    @BatchSize INT = 10000
AS
BEGIN
    DECLARE @MinID INT, @MaxID INT;
    
    SELECT @MinID = MIN(SaleID), @MaxID = MAX(SaleID) FROM bronze.Sales;
    
    WHILE @MinID <= @MaxID
    BEGIN
        INSERT INTO silver.Sales (...)
        SELECT ...
        FROM bronze.Sales
        WHERE SaleID BETWEEN @MinID AND (@MinID + @BatchSize - 1);
        
        SET @MinID = @MinID + @BatchSize;
    END;
END;
```

**Issue**: Gold aggregates not updating incrementally
```sql
-- Solution: Implement incremental load with watermark
CREATE TABLE gold.ETL_Watermark (
    TableName NVARCHAR(100),
    LastProcessedDate DATETIME2
);

CREATE PROCEDURE gold.usp_IncrementalInventoryMetrics
AS
BEGIN
    DECLARE @LastRun DATETIME2;
    SELECT @LastRun = LastProcessedDate FROM gold.ETL_Watermark WHERE TableName = 'InventoryMetrics';
    
    -- Delete and recalculate only changed data
    DELETE FROM gold.InventoryTurnover
    WHERE Month >= DATEPART(MONTH, @LastRun);
    
    INSERT INTO gold.InventoryTurnover (...)
    SELECT ...
    FROM silver.Sales
    WHERE SaleDate >= @LastRun;
    
    -- Update watermark
    UPDATE gold.ETL_Watermark 
    SET LastProcessedDate = GETDATE()
    WHERE TableName = 'InventoryMetrics';
END;
```

### Performance Optimization

```sql
-- Add indexes for Bronze layer queries
CREATE CLUSTERED INDEX IX_Sales_SaleID ON bronze.Sales(SaleID);
CREATE NONCLUSTERED INDEX IX_Sales_ProductID ON bronze.Sales(ProductID);
CREATE NONCLUSTERED INDEX IX_Sales_SaleDate ON bronze.Sales(SaleDate);

-- Partition Gold tables by month for faster queries
CREATE PARTITION FUNCTION pf_MonthPartition (INT)
AS RANGE RIGHT FOR VALUES (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);

CREATE PARTITION SCHEME ps_MonthPartition
AS PARTITION pf_MonthPartition ALL TO ([PRIMARY]);

CREATE TABLE gold.InventoryTurnover (
    ...
    Month INT
) ON ps_MonthPartition(Month);

-- Enable query store for performance monitoring
ALTER DATABASE RetailDataWarehouse SET QUERY_STORE = ON;
```

## Integration with BI Tools

### Power BI Connection

```sql
-- Create view optimized for Power BI
CREATE VIEW gold.vw_SalesDashboard AS
SELECT 
    s.SaleDate,
    p.ProductName,
    p.Category,
    b.BranchName,
    s.Quantity,
    s.UnitPrice,
    s.TotalAmount,
    i.TurnoverRatio,
    i.ShrinkagePercent
FROM gold.InventoryTurnover i
INNER JOIN silver.Sales s ON i.ProductID = s.ProductID AND i.BranchID = s.BranchID
INNER JOIN silver.Products p ON s.ProductID = p.ProductID
INNER JOIN silver.Branches b ON s.BranchID = b.BranchID;

-- Grant read-only access to BI service account
CREATE USER [bi_service] WITH PASSWORD = '${BI_SERVICE_PASSWORD}';
GRANT SELECT ON SCHEMA::gold TO [bi_service];
```

## Monitoring & Logging

```sql
-- Create audit log table
CREATE TABLE dbo.ETL_AuditLog (
    LogID INT IDENTITY(1,1) PRIMARY KEY,
    ProcedureName NVARCHAR(255),
    LayerName NVARCHAR(50),
    StartTime DATETIME2,
    EndTime DATETIME2,
    RowsProcessed INT,
    Status NVARCHAR(50),
    ErrorMessage NVARCHAR(MAX)
);

-- Example audit logging in procedures
CREATE PROCEDURE silver.usp_TransformSalesWithLogging
AS
BEGIN
    DECLARE @StartTime DATETIME2 = GETDATE();
    DECLARE @RowCount INT;
    
    BEGIN TRY
        -- Transform logic
        INSERT INTO silver.Sales (...) SELECT ...;
        SET @RowCount = @@ROWCOUNT;
        
        -- Log success
        INSERT INTO dbo.ETL_AuditLog (ProcedureName, LayerName, StartTime, EndTime, RowsProcessed, Status)
        VALUES ('usp_TransformSales', 'Silver', @StartTime, GETDATE(), @RowCount, 'Success');
    END TRY
    BEGIN CATCH
        -- Log failure
        INSERT INTO dbo.ETL_AuditLog (ProcedureName, LayerName, StartTime, EndTime, Status, ErrorMessage)
        VALUES ('usp_TransformSales', 'Silver', @StartTime, GETDATE(), 'Failed', ERROR_MESSAGE());
        
        THROW;
    END CATCH;
END;
```

This skill provides comprehensive guidance for implementing and extending the Retail ETL Medallion Pipeline with real-world business logic and production-ready patterns.

Source

Creator's repository · aradotso/data-skills

View on GitHub

Security

Security checks in progress
Results will appear here once audits complete
What this skill can do
Reads your filesConnects to the internetRuns code on your machine
Checked by 3 independent security firms
Does it try to trick the AI?Not yet checkedPending · Gen Agent Trust Hub
Does it sneak in hidden code?Not yet checkedPending · Socket
Does it have known bugs?Not yet checkedPending · Snyk