Custom Optimization Strategy Guide

Overview

This guide demonstrates how to create a custom optimization strategy in Atlas by solving a real-world challenge: optimizing inventory levels across a distribution network spanning 10 US states. We’ll build a strategy that balances inventory holding costs against stockout risks while considering transportation constraints, demand uncertainty, and service level requirements.

Business Problem - US Distribution Network Inventory Optimization

The Challenge

A retail company operates:

  • 10 Distribution Centers (DCs) across the US

  • 500+ Retail Stores served by these DCs

  • 5,000+ SKUs with varying demand patterns

  • $50M+ in inventory at any given time

Current pain points:

  • High inventory carrying costs (20% of inventory value annually)

  • Frequent stockouts in high-demand regions (5-8% lost sales)

  • Inefficient inter-DC transfers

  • Poor demand forecast accuracy (±30% error)

Business Objectives

  1. Minimize Total Cost: Inventory holding + transportation + stockout costs

  2. Maintain Service Levels: 98% product availability across all stores

  3. Optimize Network Flow: Efficient routing from DCs to stores

  4. Balance Risk: Account for demand uncertainty and supply disruptions

Strategy Design

Core Components

Our custom strategy will integrate:

  1. Multi-Echelon Inventory Model: Optimize across DC and store levels

  2. Demand Forecasting: Incorporate uncertainty and seasonality

  3. Network Flow Optimization: Minimize transportation costs

  4. Safety Stock Calculation: Buffer against demand variability

  5. Dynamic Rebalancing: Move inventory based on regional demand

Mathematical Framework

The strategy optimizes:

  • Decision Variables: Inventory levels at each DC, reorder points, transfer quantities

  • Objective Function: Total cost (holding + transport + stockout)

  • Constraints: DC capacity, service levels, lead times, budget

Implementation Guide

Step 1: Define the Strategy Interface

from atlas.strategies import BaseStrategy
from atlas.optimization import OptimizationResult
import numpy as np
import pandas as pd

class DistributionNetworkStrategy(BaseStrategy):
    """
    Custom strategy for multi-echelon inventory optimization
    across a US distribution network.
    """
    
    def __init__(self, config):
        """
        Initialize strategy with network configuration.
        
        Args:
            config: Dictionary containing:
                - dc_locations: List of distribution center locations
                - store_assignments: Mapping of stores to DCs
                - product_catalog: SKU information
                - cost_parameters: Holding, transport, stockout costs
                - service_targets: Required service levels
        """
        super().__init__(config)
        self.dc_locations = config['dc_locations']
        self.store_assignments = config['store_assignments']
        self.products = config['product_catalog']
        self.costs = config['cost_parameters']
        self.service_targets = config['service_targets']
        
        # Initialize network graph
        self._build_network_graph()
        
        # Set up demand forecasting
        self._initialize_demand_models()

Step 2: Implement Core Optimization Logic

    def optimize(self, current_state, constraints, objectives):
        """
        Execute the inventory optimization strategy.
        
        Args:
            current_state: Current inventory levels and positions
            constraints: Business constraints (capacity, budget, etc.)
            objectives: Optimization objectives and weights
            
        Returns:
            OptimizationResult with recommended actions
        """
        # Step 1: Forecast demand by region and product
        demand_forecast = self._forecast_demand(
            horizon=self.config['planning_horizon'],
            confidence_level=0.95
        )
        
        # Step 2: Calculate optimal base stock levels
        base_stock_levels = self._optimize_base_stock(
            demand_forecast=demand_forecast,
            service_targets=self.service_targets,
            constraints=constraints
        )
        
        # Step 3: Determine reorder points and quantities
        reorder_plan = self._calculate_reorder_points(
            base_stock=base_stock_levels,
            lead_times=self.config['lead_times'],
            demand_variability=demand_forecast['std_dev']
        )
        
        # Step 4: Optimize inter-DC transfers
        transfer_plan = self._optimize_transfers(
            current_inventory=current_state['inventory_levels'],
            target_levels=base_stock_levels,
            transport_costs=self.costs['transportation']
        )
        
        # Step 5: Generate deployment schedule
        deployment = self._create_deployment_schedule(
            reorder_plan=reorder_plan,
            transfer_plan=transfer_plan,
            constraints=constraints
        )
        
        # Step 6: Calculate expected outcomes
        outcomes = self._simulate_outcomes(
            deployment=deployment,
            demand_scenarios=self._generate_demand_scenarios()
        )
        
        return OptimizationResult(
            actions=deployment,
            expected_cost=outcomes['total_cost'],
            service_level=outcomes['service_level'],
            risk_metrics=outcomes['risk_analysis']
        )

Step 3: Implement Demand Forecasting

    def _forecast_demand(self, horizon, confidence_level):
        """
        Forecast demand by DC, product, and time period.
        """
        forecasts = {}
        
        for dc in self.dc_locations:
            dc_forecast = {}
            
            # Get stores served by this DC
            served_stores = self.store_assignments[dc]
            
            for product in self.products:
                # Aggregate historical demand from served stores
                historical_demand = self._get_historical_demand(
                    stores=served_stores,
                    product=product,
                    periods=52  # 52 weeks of history
                )
                
                # Apply time series model with seasonality
                model = self._build_demand_model(
                    historical_demand,
                    seasonality=product.get('seasonal_pattern', 'none')
                )
                
                # Generate forecast with prediction intervals
                point_forecast = model.predict(horizon)
                prediction_interval = model.prediction_interval(
                    confidence_level=confidence_level
                )
                
                dc_forecast[product['sku']] = {
                    'mean': point_forecast,
                    'lower': prediction_interval[0],
                    'upper': prediction_interval[1],
                    'std_dev': self._calculate_demand_std(historical_demand)
                }
            
            forecasts[dc] = dc_forecast
        
        return forecasts

Step 4: Optimize Base Stock Levels

    def _optimize_base_stock(self, demand_forecast, service_targets, constraints):
        """
        Calculate optimal inventory levels for each DC-product combination.
        """
        from scipy.optimize import minimize
        
        # Decision variables: base stock level for each DC-product
        n_vars = len(self.dc_locations) * len(self.products)
        
        def objective_function(x):
            """Total cost: holding + expected stockout cost"""
            inventory_levels = x.reshape(len(self.dc_locations), len(self.products))
            
            total_cost = 0
            for i, dc in enumerate(self.dc_locations):
                for j, product in enumerate(self.products):
                    level = inventory_levels[i, j]
                    
                    # Holding cost
                    holding_cost = (
                        level * 
                        product['unit_cost'] * 
                        self.costs['holding_rate']
                    )
                    
                    # Expected stockout cost
                    demand_dist = demand_forecast[dc][product['sku']]
                    stockout_prob = self._calculate_stockout_probability(
                        inventory_level=level,
                        demand_mean=demand_dist['mean'],
                        demand_std=demand_dist['std_dev']
                    )
                    
                    stockout_cost = (
                        stockout_prob * 
                        demand_dist['mean'] * 
                        product['unit_cost'] * 
                        self.costs['stockout_penalty']
                    )
                    
                    total_cost += holding_cost + stockout_cost
            
            return total_cost
        
        # Constraints
        constraint_list = []
        
        # Service level constraints
        for i, dc in enumerate(self.dc_locations):
            for j, product in enumerate(self.products):
                def service_constraint(x, i=i, j=j):
                    level = x.reshape(len(self.dc_locations), len(self.products))[i, j]
                    demand_dist = demand_forecast[self.dc_locations[i]][self.products[j]['sku']]
                    
                    service_level = 1 - self._calculate_stockout_probability(
                        inventory_level=level,
                        demand_mean=demand_dist['mean'],
                        demand_std=demand_dist['std_dev']
                    )
                    
                    return service_level - service_targets.get(
                        self.products[j]['category'], 
                        0.95
                    )
                
                constraint_list.append({
                    'type': 'ineq',
                    'fun': service_constraint
                })
        
        # Capacity constraints
        for i, dc in enumerate(self.dc_locations):
            def capacity_constraint(x, i=i):
                dc_inventory = x.reshape(len(self.dc_locations), len(self.products))[i, :]
                total_volume = sum(
                    dc_inventory[j] * self.products[j]['unit_volume']
                    for j in range(len(self.products))
                )
                return constraints['dc_capacity'][self.dc_locations[i]] - total_volume
            
            constraint_list.append({
                'type': 'ineq',
                'fun': capacity_constraint
            })
        
        # Budget constraint
        def budget_constraint(x):
            total_inventory_value = sum(
                x[i] * self.products[i % len(self.products)]['unit_cost']
                for i in range(n_vars)
            )
            return constraints['inventory_budget'] - total_inventory_value
        
        constraint_list.append({
            'type': 'ineq',
            'fun': budget_constraint
        })
        
        # Solve optimization
        initial_guess = self._generate_initial_solution(demand_forecast)
        
        result = minimize(
            objective_function,
            initial_guess,
            method='SLSQP',
            constraints=constraint_list,
            options={'maxiter': 1000}
        )
        
        return result.x.reshape(len(self.dc_locations), len(self.products))

Step 5: Optimize Network Transfers

    def _optimize_transfers(self, current_inventory, target_levels, transport_costs):
        """
        Determine optimal inter-DC transfers to rebalance inventory.
        """
        from scipy.optimize import linprog
        
        n_dcs = len(self.dc_locations)
        n_products = len(self.products)
        
        # Decision variables: transfer[from_dc, to_dc, product]
        n_vars = n_dcs * n_dcs * n_products
        
        # Objective: minimize transportation cost
        c = []
        for from_dc in range(n_dcs):
            for to_dc in range(n_dcs):
                for product in range(n_products):
                    if from_dc == to_dc:
                        c.append(0)  # No cost for not transferring
                    else:
                        # Cost based on distance and product weight
                        distance = self._get_distance(
                            self.dc_locations[from_dc],
                            self.dc_locations[to_dc]
                        )
                        weight = self.products[product]['weight']
                        cost_per_mile = transport_costs['per_pound_mile']
                        c.append(distance * weight * cost_per_mile)
        
        # Constraints
        A_eq = []
        b_eq = []
        
        # Conservation constraints: net flow equals target - current
        for dc in range(n_dcs):
            for product in range(n_products):
                constraint = [0] * n_vars
                
                # Outflows from this DC
                for to_dc in range(n_dcs):
                    if to_dc != dc:
                        idx = from_dc * n_dcs * n_products + to_dc * n_products + product
                        constraint[idx] = -1
                
                # Inflows to this DC
                for from_dc in range(n_dcs):
                    if from_dc != dc:
                        idx = from_dc * n_dcs * n_products + dc * n_products + product
                        constraint[idx] = 1
                
                A_eq.append(constraint)
                b_eq.append(
                    target_levels[dc, product] - current_inventory[dc, product]
                )
        
        # Bounds: non-negative transfers, limited by current inventory
        bounds = []
        for from_dc in range(n_dcs):
            for to_dc in range(n_dcs):
                for product in range(n_products):
                    if from_dc == to_dc:
                        bounds.append((0, 0))
                    else:
                        max_transfer = current_inventory[from_dc, product]
                        bounds.append((0, max_transfer))
        
        # Solve
        result = linprog(
            c=c,
            A_eq=A_eq,
            b_eq=b_eq,
            bounds=bounds,
            method='highs'
        )
        
        # Parse results into transfer plan
        transfer_plan = []
        if result.success:
            transfers = result.x.reshape(n_dcs, n_dcs, n_products)
            
            for from_dc in range(n_dcs):
                for to_dc in range(n_dcs):
                    for product in range(n_products):
                        quantity = transfers[from_dc, to_dc, product]
                        if quantity > 0.1:  # Threshold to avoid tiny transfers
                            transfer_plan.append({
                                'from': self.dc_locations[from_dc],
                                'to': self.dc_locations[to_dc],
                                'product': self.products[product]['sku'],
                                'quantity': int(quantity),
                                'cost': quantity * c[from_dc * n_dcs * n_products + 
                                                   to_dc * n_products + product]
                            })
        
        return transfer_plan

Step 6: Handle Uncertainty and Risk

    def _simulate_outcomes(self, deployment, demand_scenarios):
        """
        Monte Carlo simulation to assess strategy performance under uncertainty.
        """
        n_simulations = 1000
        results = {
            'service_levels': [],
            'total_costs': [],
            'stockout_events': []
        }
        
        for sim in range(n_simulations):
            # Sample demand scenario
            demand_scenario = self._sample_demand_scenario()
            
            # Simulate inventory dynamics
            inventory_trace = self._simulate_inventory_dynamics(
                initial_state=deployment['initial_inventory'],
                reorder_plan=deployment['reorder_plan'],
                transfer_plan=deployment['transfer_plan'],
                demand=demand_scenario,
                lead_times=self._sample_lead_times()
            )
            
            # Calculate metrics
            service_level = self._calculate_service_level(inventory_trace)
            total_cost = self._calculate_total_cost(inventory_trace)
            stockouts = self._identify_stockout_events(inventory_trace)
            
            results['service_levels'].append(service_level)
            results['total_costs'].append(total_cost)
            results['stockout_events'].extend(stockouts)
        
        # Aggregate results
        return {
            'service_level': {
                'mean': np.mean(results['service_levels']),
                'p95': np.percentile(results['service_levels'], 5),
                'p99': np.percentile(results['service_levels'], 1)
            },
            'total_cost': {
                'mean': np.mean(results['total_costs']),
                'std': np.std(results['total_costs']),
                'p95': np.percentile(results['total_costs'], 95)
            },
            'risk_analysis': {
                'stockout_risk': len(results['stockout_events']) / n_simulations,
                'value_at_risk': np.percentile(results['total_costs'], 95),
                'conditional_value_at_risk': np.mean([
                    c for c in results['total_costs'] 
                    if c > np.percentile(results['total_costs'], 95)
                ])
            }
        }

Step 7: Create Performance Monitoring

    def monitor_performance(self, actual_data, predictions):
        """
        Track strategy performance and adapt parameters.
        """
        metrics = {
            'forecast_accuracy': self._calculate_forecast_accuracy(
                actual_data['demand'],
                predictions['demand_forecast']
            ),
            'service_achievement': self._calculate_service_achievement(
                actual_data['stockouts'],
                self.service_targets
            ),
            'cost_variance': self._calculate_cost_variance(
                actual_data['costs'],
                predictions['expected_cost']
            ),
            'network_efficiency': self._calculate_network_efficiency(
                actual_data['transfers'],
                actual_data['transportation_costs']
            )
        }
        
        # Adaptive learning
        if metrics['forecast_accuracy'] < 0.8:
            self._retrain_demand_models(actual_data)
        
        if metrics['service_achievement'] < 0.95:
            self._adjust_safety_stock_parameters()
        
        return metrics

Integration with Atlas

Step 1: Register the Strategy

from atlas import StrategyRegistry

# Register the custom strategy
StrategyRegistry.register(
    name='distribution_network_optimization',
    strategy_class=DistributionNetworkStrategy,
    description='Multi-echelon inventory optimization for US distribution',
    version='1.0.0'
)

Step 2: Configure the Strategy

# Define network configuration
config = {
    'dc_locations': [
        {'id': 'DC_CA', 'state': 'CA', 'lat': 34.0522, 'lon': -118.2437},
        {'id': 'DC_TX', 'state': 'TX', 'lat': 32.7767, 'lon': -96.7970},
        {'id': 'DC_FL', 'state': 'FL', 'lat': 28.5383, 'lon': -81.3792},
        {'id': 'DC_NY', 'state': 'NY', 'lat': 40.7128, 'lon': -74.0060},
        {'id': 'DC_IL', 'state': 'IL', 'lat': 41.8781, 'lon': -87.6298},
        {'id': 'DC_WA', 'state': 'WA', 'lat': 47.6062, 'lon': -122.3321},
        {'id': 'DC_GA', 'state': 'GA', 'lat': 33.7490, 'lon': -84.3880},
        {'id': 'DC_OH', 'state': 'OH', 'lat': 39.9612, 'lon': -82.9988},
        {'id': 'DC_CO', 'state': 'CO', 'lat': 39.7392, 'lon': -104.9903},
        {'id': 'DC_NC', 'state': 'NC', 'lat': 35.2271, 'lon': -80.8431}
    ],
    'store_assignments': load_store_assignments(),
    'product_catalog': load_product_catalog(),
    'cost_parameters': {
        'holding_rate': 0.20,  # 20% annual holding cost
        'stockout_penalty': 2.5,  # 2.5x unit cost for stockouts
        'transportation': {
            'per_pound_mile': 0.15,
            'fixed_cost': 250
        }
    },
    'service_targets': {
        'high_priority': 0.99,
        'standard': 0.95,
        'low_priority': 0.90
    },
    'planning_horizon': 52,  # weeks
    'lead_times': {
        'supplier_to_dc': 14,  # days
        'dc_to_dc': 3,        # days
        'dc_to_store': 1      # days
    }
}

# Create strategy instance
strategy = DistributionNetworkStrategy(config)

Step 3: Execute Optimization

from atlas import OptimizationEngine

# Create optimization engine with the custom strategy
engine = OptimizationEngine(strategy=strategy)

# Define current state
current_state = {
    'inventory_levels': load_current_inventory(),
    'in_transit': load_in_transit_inventory(),
    'open_orders': load_open_orders()
}

# Define constraints
constraints = {
    'inventory_budget': 50_000_000,  # $50M total inventory
    'dc_capacity': {
        'DC_CA': 100_000,  # cubic feet
        'DC_TX': 150_000,
        'DC_FL': 80_000,
        'DC_NY': 120_000,
        'DC_IL': 130_000,
        'DC_WA': 90_000,
        'DC_GA': 110_000,
        'DC_OH': 100_000,
        'DC_CO': 85_000,
        'DC_NC': 95_000
    },
    'max_transfers_per_week': 50,
    'min_shipment_size': 1000  # pounds
}

# Define objectives
objectives = {
    'minimize_total_cost': 0.6,
    'maximize_service_level': 0.3,
    'minimize_transfers': 0.1
}

# Run optimization
result = engine.optimize(
    current_state=current_state,
    constraints=constraints,
    objectives=objectives
)

# Review results
print(f"Expected total cost: ${result.expected_cost:,.2f}")
print(f"Expected service level: {result.service_level:.1%}")
print(f"Number of transfers: {len(result.actions['transfers'])}")
print(f"Risk metrics: {result.risk_metrics}")

Testing and Validation

Unit Testing

import unittest
from atlas.testing import StrategyTestCase

class TestDistributionNetworkStrategy(StrategyTestCase):
    def setUp(self):
        self.strategy = DistributionNetworkStrategy(test_config)
    
    def test_demand_forecasting(self):
        """Test demand forecasting accuracy"""
        historical_data = generate_test_demand_data()
        forecast = self.strategy._forecast_demand(
            horizon=4,
            confidence_level=0.95
        )
        
        # Check forecast structure
        self.assertEqual(len(forecast), 10)  # 10 DCs
        
        # Check forecast reasonableness
        for dc in forecast:
            for product in forecast[dc]:
                self.assertGreater(forecast[dc][product]['mean'], 0)
                self.assertLess(
                    forecast[dc][product]['lower'],
                    forecast[dc][product]['upper']
                )
    
    def test_base_stock_optimization(self):
        """Test base stock calculation"""
        demand_forecast = generate_test_forecast()
        base_stock = self.strategy._optimize_base_stock(
            demand_forecast=demand_forecast,
            service_targets={'standard': 0.95},
            constraints={'inventory_budget': 1_000_000}
        )
        
        # Check all products have base stock
        self.assertEqual(base_stock.shape[0], 10)  # DCs
        self.assertGreater(base_stock.min(), 0)
    
    def test_transfer_optimization(self):
        """Test inter-DC transfer logic"""
        current = np.random.randint(0, 1000, (10, 100))
        target = np.random.randint(0, 1000, (10, 100))
        
        transfers = self.strategy._optimize_transfers(
            current_inventory=current,
            target_levels=target,
            transport_costs={'per_pound_mile': 0.15}
        )
        
        # Verify transfer feasibility
        for transfer in transfers:
            from_idx = self._get_dc_index(transfer['from'])
            product_idx = self._get_product_index(transfer['product'])
            self.assertLessEqual(
                transfer['quantity'],
                current[from_idx, product_idx]
            )

Integration Testing

def test_end_to_end_optimization():
    """Test complete optimization workflow"""
    # Setup test scenario
    test_scenario = create_test_scenario(
        n_dcs=10,
        n_products=100,
        n_stores=500,
        demand_pattern='seasonal'
    )
    
    # Run optimization
    result = run_optimization_test(
        scenario=test_scenario,
        strategy=DistributionNetworkStrategy,
        iterations=10
    )
    
    # Validate results
    assert result.service_level['mean'] >= 0.95
    assert result.total_cost['mean'] <= test_scenario.budget
    assert all(transfer['quantity'] > 0 for transfer in result.transfers)

Performance Testing

def benchmark_strategy_performance():
    """Benchmark strategy performance at scale"""
    benchmarks = []
    
    for scale in [100, 500, 1000, 5000]:  # Number of SKUs
        start_time = time.time()
        
        config = generate_scaled_config(n_products=scale)
        strategy = DistributionNetworkStrategy(config)
        
        result = strategy.optimize(
            current_state=generate_scaled_state(scale),
            constraints=standard_constraints,
            objectives=standard_objectives
        )
        
        execution_time = time.time() - start_time
        
        benchmarks.append({
            'scale': scale,
            'execution_time': execution_time,
            'memory_usage': get_memory_usage(),
            'solution_quality': result.expected_cost
        })
    
    return benchmarks

Deployment Considerations

Configuration Management

# config/distribution_strategy.yaml
strategy:
  name: distribution_network_optimization
  version: 1.0.0
  
network:
  dc_locations: !include dc_locations.yaml
  store_mappings: !include store_mappings.yaml
  
parameters:
  planning_horizon: 52
  reorder_frequency: weekly
  safety_stock_method: dynamic
  
costs:
  holding_rate: 0.20
  stockout_penalty: 2.5
  transportation:
    ltl_rate: 0.15  # per pound-mile
    ftl_rate: 2.50  # per mile
    
optimization:
  solver: SLSQP
  max_iterations: 1000
  tolerance: 1e-6
  
monitoring:
  metrics_frequency: daily
  alert_thresholds:
    service_level: 0.93
    forecast_accuracy: 0.75

Monitoring and Alerts

from atlas.monitoring import StrategyMonitor

monitor = StrategyMonitor(
    strategy='distribution_network_optimization',
    metrics=[
        'service_level',
        'total_cost',
        'forecast_accuracy',
        'transfer_efficiency'
    ],
    alert_channels=['email', 'slack']
)

# Set up alerts
monitor.add_alert(
    condition='service_level < 0.95',
    severity='high',
    message='Service level below target'
)

monitor.add_alert(
    condition='total_cost > budget * 1.1',
    severity='critical',
    message='Cost overrun exceeding 10%'
)

Continuous Improvement

from atlas.learning import StrategyLearner

learner = StrategyLearner(strategy)

# Learn from historical performance
learner.train(
    historical_data=load_historical_results(),
    performance_metrics=['cost', 'service_level'],
    parameter_ranges={
        'safety_stock_multiplier': (1.0, 3.0),
        'reorder_threshold': (0.1, 0.5),
        'transfer_threshold': (100, 1000)
    }
)

# Apply learned parameters
optimized_params = learner.get_optimal_parameters()
strategy.update_parameters(optimized_params)

Results and Benefits

Expected Outcomes

Based on typical implementations, this strategy delivers:

  1. Cost Reduction: 15-25% reduction in total inventory costs

  2. Service Improvement: 2-5% increase in product availability

  3. Network Efficiency: 30% reduction in inter-DC transfers

  4. Inventory Optimization: 20% reduction in working capital

Key Success Factors

  1. Data Quality: Accurate demand history and lead times

  2. Model Calibration: Regular updating of forecast models

  3. Change Management: Gradual rollout with monitoring

  4. Stakeholder Buy-in: Clear communication of benefits

Conclusion

This custom optimization strategy demonstrates how Atlas enables sophisticated, domain-specific optimization while maintaining clean abstractions and reusable patterns. The distribution network optimization strategy can be adapted for various industries and scaled from small regional networks to global supply chains.

Key takeaways:

  1. Atlas’s flexibility allows encoding complex business logic

  2. Custom strategies can leverage multiple optimization techniques

  3. Uncertainty and risk can be explicitly modeled

  4. Performance scales well with proper design

  5. Continuous learning improves results over time

For more information on building custom strategies, see the Atlas Strategy Development Guide.