Custom Optimization Strategy Guide
Overview
This guide demonstrates how to create a custom optimization strategy in Atlas by solving a real-world challenge: optimizing inventory levels across a distribution network spanning 10 US states. We’ll build a strategy that balances inventory holding costs against stockout risks while considering transportation constraints, demand uncertainty, and service level requirements.
Business Problem - US Distribution Network Inventory Optimization
The Challenge
A retail company operates:
10 Distribution Centers (DCs) across the US
500+ Retail Stores served by these DCs
5,000+ SKUs with varying demand patterns
$50M+ in inventory at any given time
Current pain points:
High inventory carrying costs (20% of inventory value annually)
Frequent stockouts in high-demand regions (5-8% lost sales)
Inefficient inter-DC transfers
Poor demand forecast accuracy (±30% error)
Business Objectives
Minimize Total Cost: Inventory holding + transportation + stockout costs
Maintain Service Levels: 98% product availability across all stores
Optimize Network Flow: Efficient routing from DCs to stores
Balance Risk: Account for demand uncertainty and supply disruptions
Strategy Design
Core Components
Our custom strategy will integrate:
Multi-Echelon Inventory Model: Optimize across DC and store levels
Demand Forecasting: Incorporate uncertainty and seasonality
Network Flow Optimization: Minimize transportation costs
Safety Stock Calculation: Buffer against demand variability
Dynamic Rebalancing: Move inventory based on regional demand
Mathematical Framework
The strategy optimizes:
Decision Variables: Inventory levels at each DC, reorder points, transfer quantities
Objective Function: Total cost (holding + transport + stockout)
Constraints: DC capacity, service levels, lead times, budget
Implementation Guide
Step 1: Define the Strategy Interface
from atlas.strategies import BaseStrategy
from atlas.optimization import OptimizationResult
import numpy as np
import pandas as pd
class DistributionNetworkStrategy(BaseStrategy):
"""
Custom strategy for multi-echelon inventory optimization
across a US distribution network.
"""
def __init__(self, config):
"""
Initialize strategy with network configuration.
Args:
config: Dictionary containing:
- dc_locations: List of distribution center locations
- store_assignments: Mapping of stores to DCs
- product_catalog: SKU information
- cost_parameters: Holding, transport, stockout costs
- service_targets: Required service levels
"""
super().__init__(config)
self.dc_locations = config['dc_locations']
self.store_assignments = config['store_assignments']
self.products = config['product_catalog']
self.costs = config['cost_parameters']
self.service_targets = config['service_targets']
# Initialize network graph
self._build_network_graph()
# Set up demand forecasting
self._initialize_demand_models()
Step 2: Implement Core Optimization Logic
def optimize(self, current_state, constraints, objectives):
"""
Execute the inventory optimization strategy.
Args:
current_state: Current inventory levels and positions
constraints: Business constraints (capacity, budget, etc.)
objectives: Optimization objectives and weights
Returns:
OptimizationResult with recommended actions
"""
# Step 1: Forecast demand by region and product
demand_forecast = self._forecast_demand(
horizon=self.config['planning_horizon'],
confidence_level=0.95
)
# Step 2: Calculate optimal base stock levels
base_stock_levels = self._optimize_base_stock(
demand_forecast=demand_forecast,
service_targets=self.service_targets,
constraints=constraints
)
# Step 3: Determine reorder points and quantities
reorder_plan = self._calculate_reorder_points(
base_stock=base_stock_levels,
lead_times=self.config['lead_times'],
demand_variability=demand_forecast['std_dev']
)
# Step 4: Optimize inter-DC transfers
transfer_plan = self._optimize_transfers(
current_inventory=current_state['inventory_levels'],
target_levels=base_stock_levels,
transport_costs=self.costs['transportation']
)
# Step 5: Generate deployment schedule
deployment = self._create_deployment_schedule(
reorder_plan=reorder_plan,
transfer_plan=transfer_plan,
constraints=constraints
)
# Step 6: Calculate expected outcomes
outcomes = self._simulate_outcomes(
deployment=deployment,
demand_scenarios=self._generate_demand_scenarios()
)
return OptimizationResult(
actions=deployment,
expected_cost=outcomes['total_cost'],
service_level=outcomes['service_level'],
risk_metrics=outcomes['risk_analysis']
)
Step 3: Implement Demand Forecasting
def _forecast_demand(self, horizon, confidence_level):
"""
Forecast demand by DC, product, and time period.
"""
forecasts = {}
for dc in self.dc_locations:
dc_forecast = {}
# Get stores served by this DC
served_stores = self.store_assignments[dc]
for product in self.products:
# Aggregate historical demand from served stores
historical_demand = self._get_historical_demand(
stores=served_stores,
product=product,
periods=52 # 52 weeks of history
)
# Apply time series model with seasonality
model = self._build_demand_model(
historical_demand,
seasonality=product.get('seasonal_pattern', 'none')
)
# Generate forecast with prediction intervals
point_forecast = model.predict(horizon)
prediction_interval = model.prediction_interval(
confidence_level=confidence_level
)
dc_forecast[product['sku']] = {
'mean': point_forecast,
'lower': prediction_interval[0],
'upper': prediction_interval[1],
'std_dev': self._calculate_demand_std(historical_demand)
}
forecasts[dc] = dc_forecast
return forecasts
Step 4: Optimize Base Stock Levels
def _optimize_base_stock(self, demand_forecast, service_targets, constraints):
"""
Calculate optimal inventory levels for each DC-product combination.
"""
from scipy.optimize import minimize
# Decision variables: base stock level for each DC-product
n_vars = len(self.dc_locations) * len(self.products)
def objective_function(x):
"""Total cost: holding + expected stockout cost"""
inventory_levels = x.reshape(len(self.dc_locations), len(self.products))
total_cost = 0
for i, dc in enumerate(self.dc_locations):
for j, product in enumerate(self.products):
level = inventory_levels[i, j]
# Holding cost
holding_cost = (
level *
product['unit_cost'] *
self.costs['holding_rate']
)
# Expected stockout cost
demand_dist = demand_forecast[dc][product['sku']]
stockout_prob = self._calculate_stockout_probability(
inventory_level=level,
demand_mean=demand_dist['mean'],
demand_std=demand_dist['std_dev']
)
stockout_cost = (
stockout_prob *
demand_dist['mean'] *
product['unit_cost'] *
self.costs['stockout_penalty']
)
total_cost += holding_cost + stockout_cost
return total_cost
# Constraints
constraint_list = []
# Service level constraints
for i, dc in enumerate(self.dc_locations):
for j, product in enumerate(self.products):
def service_constraint(x, i=i, j=j):
level = x.reshape(len(self.dc_locations), len(self.products))[i, j]
demand_dist = demand_forecast[self.dc_locations[i]][self.products[j]['sku']]
service_level = 1 - self._calculate_stockout_probability(
inventory_level=level,
demand_mean=demand_dist['mean'],
demand_std=demand_dist['std_dev']
)
return service_level - service_targets.get(
self.products[j]['category'],
0.95
)
constraint_list.append({
'type': 'ineq',
'fun': service_constraint
})
# Capacity constraints
for i, dc in enumerate(self.dc_locations):
def capacity_constraint(x, i=i):
dc_inventory = x.reshape(len(self.dc_locations), len(self.products))[i, :]
total_volume = sum(
dc_inventory[j] * self.products[j]['unit_volume']
for j in range(len(self.products))
)
return constraints['dc_capacity'][self.dc_locations[i]] - total_volume
constraint_list.append({
'type': 'ineq',
'fun': capacity_constraint
})
# Budget constraint
def budget_constraint(x):
total_inventory_value = sum(
x[i] * self.products[i % len(self.products)]['unit_cost']
for i in range(n_vars)
)
return constraints['inventory_budget'] - total_inventory_value
constraint_list.append({
'type': 'ineq',
'fun': budget_constraint
})
# Solve optimization
initial_guess = self._generate_initial_solution(demand_forecast)
result = minimize(
objective_function,
initial_guess,
method='SLSQP',
constraints=constraint_list,
options={'maxiter': 1000}
)
return result.x.reshape(len(self.dc_locations), len(self.products))
Step 5: Optimize Network Transfers
def _optimize_transfers(self, current_inventory, target_levels, transport_costs):
"""
Determine optimal inter-DC transfers to rebalance inventory.
"""
from scipy.optimize import linprog
n_dcs = len(self.dc_locations)
n_products = len(self.products)
# Decision variables: transfer[from_dc, to_dc, product]
n_vars = n_dcs * n_dcs * n_products
# Objective: minimize transportation cost
c = []
for from_dc in range(n_dcs):
for to_dc in range(n_dcs):
for product in range(n_products):
if from_dc == to_dc:
c.append(0) # No cost for not transferring
else:
# Cost based on distance and product weight
distance = self._get_distance(
self.dc_locations[from_dc],
self.dc_locations[to_dc]
)
weight = self.products[product]['weight']
cost_per_mile = transport_costs['per_pound_mile']
c.append(distance * weight * cost_per_mile)
# Constraints
A_eq = []
b_eq = []
# Conservation constraints: net flow equals target - current
for dc in range(n_dcs):
for product in range(n_products):
constraint = [0] * n_vars
# Outflows from this DC
for to_dc in range(n_dcs):
if to_dc != dc:
idx = from_dc * n_dcs * n_products + to_dc * n_products + product
constraint[idx] = -1
# Inflows to this DC
for from_dc in range(n_dcs):
if from_dc != dc:
idx = from_dc * n_dcs * n_products + dc * n_products + product
constraint[idx] = 1
A_eq.append(constraint)
b_eq.append(
target_levels[dc, product] - current_inventory[dc, product]
)
# Bounds: non-negative transfers, limited by current inventory
bounds = []
for from_dc in range(n_dcs):
for to_dc in range(n_dcs):
for product in range(n_products):
if from_dc == to_dc:
bounds.append((0, 0))
else:
max_transfer = current_inventory[from_dc, product]
bounds.append((0, max_transfer))
# Solve
result = linprog(
c=c,
A_eq=A_eq,
b_eq=b_eq,
bounds=bounds,
method='highs'
)
# Parse results into transfer plan
transfer_plan = []
if result.success:
transfers = result.x.reshape(n_dcs, n_dcs, n_products)
for from_dc in range(n_dcs):
for to_dc in range(n_dcs):
for product in range(n_products):
quantity = transfers[from_dc, to_dc, product]
if quantity > 0.1: # Threshold to avoid tiny transfers
transfer_plan.append({
'from': self.dc_locations[from_dc],
'to': self.dc_locations[to_dc],
'product': self.products[product]['sku'],
'quantity': int(quantity),
'cost': quantity * c[from_dc * n_dcs * n_products +
to_dc * n_products + product]
})
return transfer_plan
Step 6: Handle Uncertainty and Risk
def _simulate_outcomes(self, deployment, demand_scenarios):
"""
Monte Carlo simulation to assess strategy performance under uncertainty.
"""
n_simulations = 1000
results = {
'service_levels': [],
'total_costs': [],
'stockout_events': []
}
for sim in range(n_simulations):
# Sample demand scenario
demand_scenario = self._sample_demand_scenario()
# Simulate inventory dynamics
inventory_trace = self._simulate_inventory_dynamics(
initial_state=deployment['initial_inventory'],
reorder_plan=deployment['reorder_plan'],
transfer_plan=deployment['transfer_plan'],
demand=demand_scenario,
lead_times=self._sample_lead_times()
)
# Calculate metrics
service_level = self._calculate_service_level(inventory_trace)
total_cost = self._calculate_total_cost(inventory_trace)
stockouts = self._identify_stockout_events(inventory_trace)
results['service_levels'].append(service_level)
results['total_costs'].append(total_cost)
results['stockout_events'].extend(stockouts)
# Aggregate results
return {
'service_level': {
'mean': np.mean(results['service_levels']),
'p95': np.percentile(results['service_levels'], 5),
'p99': np.percentile(results['service_levels'], 1)
},
'total_cost': {
'mean': np.mean(results['total_costs']),
'std': np.std(results['total_costs']),
'p95': np.percentile(results['total_costs'], 95)
},
'risk_analysis': {
'stockout_risk': len(results['stockout_events']) / n_simulations,
'value_at_risk': np.percentile(results['total_costs'], 95),
'conditional_value_at_risk': np.mean([
c for c in results['total_costs']
if c > np.percentile(results['total_costs'], 95)
])
}
}
Step 7: Create Performance Monitoring
def monitor_performance(self, actual_data, predictions):
"""
Track strategy performance and adapt parameters.
"""
metrics = {
'forecast_accuracy': self._calculate_forecast_accuracy(
actual_data['demand'],
predictions['demand_forecast']
),
'service_achievement': self._calculate_service_achievement(
actual_data['stockouts'],
self.service_targets
),
'cost_variance': self._calculate_cost_variance(
actual_data['costs'],
predictions['expected_cost']
),
'network_efficiency': self._calculate_network_efficiency(
actual_data['transfers'],
actual_data['transportation_costs']
)
}
# Adaptive learning
if metrics['forecast_accuracy'] < 0.8:
self._retrain_demand_models(actual_data)
if metrics['service_achievement'] < 0.95:
self._adjust_safety_stock_parameters()
return metrics
Integration with Atlas
Step 1: Register the Strategy
from atlas import StrategyRegistry
# Register the custom strategy
StrategyRegistry.register(
name='distribution_network_optimization',
strategy_class=DistributionNetworkStrategy,
description='Multi-echelon inventory optimization for US distribution',
version='1.0.0'
)
Step 2: Configure the Strategy
# Define network configuration
config = {
'dc_locations': [
{'id': 'DC_CA', 'state': 'CA', 'lat': 34.0522, 'lon': -118.2437},
{'id': 'DC_TX', 'state': 'TX', 'lat': 32.7767, 'lon': -96.7970},
{'id': 'DC_FL', 'state': 'FL', 'lat': 28.5383, 'lon': -81.3792},
{'id': 'DC_NY', 'state': 'NY', 'lat': 40.7128, 'lon': -74.0060},
{'id': 'DC_IL', 'state': 'IL', 'lat': 41.8781, 'lon': -87.6298},
{'id': 'DC_WA', 'state': 'WA', 'lat': 47.6062, 'lon': -122.3321},
{'id': 'DC_GA', 'state': 'GA', 'lat': 33.7490, 'lon': -84.3880},
{'id': 'DC_OH', 'state': 'OH', 'lat': 39.9612, 'lon': -82.9988},
{'id': 'DC_CO', 'state': 'CO', 'lat': 39.7392, 'lon': -104.9903},
{'id': 'DC_NC', 'state': 'NC', 'lat': 35.2271, 'lon': -80.8431}
],
'store_assignments': load_store_assignments(),
'product_catalog': load_product_catalog(),
'cost_parameters': {
'holding_rate': 0.20, # 20% annual holding cost
'stockout_penalty': 2.5, # 2.5x unit cost for stockouts
'transportation': {
'per_pound_mile': 0.15,
'fixed_cost': 250
}
},
'service_targets': {
'high_priority': 0.99,
'standard': 0.95,
'low_priority': 0.90
},
'planning_horizon': 52, # weeks
'lead_times': {
'supplier_to_dc': 14, # days
'dc_to_dc': 3, # days
'dc_to_store': 1 # days
}
}
# Create strategy instance
strategy = DistributionNetworkStrategy(config)
Step 3: Execute Optimization
from atlas import OptimizationEngine
# Create optimization engine with the custom strategy
engine = OptimizationEngine(strategy=strategy)
# Define current state
current_state = {
'inventory_levels': load_current_inventory(),
'in_transit': load_in_transit_inventory(),
'open_orders': load_open_orders()
}
# Define constraints
constraints = {
'inventory_budget': 50_000_000, # $50M total inventory
'dc_capacity': {
'DC_CA': 100_000, # cubic feet
'DC_TX': 150_000,
'DC_FL': 80_000,
'DC_NY': 120_000,
'DC_IL': 130_000,
'DC_WA': 90_000,
'DC_GA': 110_000,
'DC_OH': 100_000,
'DC_CO': 85_000,
'DC_NC': 95_000
},
'max_transfers_per_week': 50,
'min_shipment_size': 1000 # pounds
}
# Define objectives
objectives = {
'minimize_total_cost': 0.6,
'maximize_service_level': 0.3,
'minimize_transfers': 0.1
}
# Run optimization
result = engine.optimize(
current_state=current_state,
constraints=constraints,
objectives=objectives
)
# Review results
print(f"Expected total cost: ${result.expected_cost:,.2f}")
print(f"Expected service level: {result.service_level:.1%}")
print(f"Number of transfers: {len(result.actions['transfers'])}")
print(f"Risk metrics: {result.risk_metrics}")
Testing and Validation
Unit Testing
import unittest
from atlas.testing import StrategyTestCase
class TestDistributionNetworkStrategy(StrategyTestCase):
def setUp(self):
self.strategy = DistributionNetworkStrategy(test_config)
def test_demand_forecasting(self):
"""Test demand forecasting accuracy"""
historical_data = generate_test_demand_data()
forecast = self.strategy._forecast_demand(
horizon=4,
confidence_level=0.95
)
# Check forecast structure
self.assertEqual(len(forecast), 10) # 10 DCs
# Check forecast reasonableness
for dc in forecast:
for product in forecast[dc]:
self.assertGreater(forecast[dc][product]['mean'], 0)
self.assertLess(
forecast[dc][product]['lower'],
forecast[dc][product]['upper']
)
def test_base_stock_optimization(self):
"""Test base stock calculation"""
demand_forecast = generate_test_forecast()
base_stock = self.strategy._optimize_base_stock(
demand_forecast=demand_forecast,
service_targets={'standard': 0.95},
constraints={'inventory_budget': 1_000_000}
)
# Check all products have base stock
self.assertEqual(base_stock.shape[0], 10) # DCs
self.assertGreater(base_stock.min(), 0)
def test_transfer_optimization(self):
"""Test inter-DC transfer logic"""
current = np.random.randint(0, 1000, (10, 100))
target = np.random.randint(0, 1000, (10, 100))
transfers = self.strategy._optimize_transfers(
current_inventory=current,
target_levels=target,
transport_costs={'per_pound_mile': 0.15}
)
# Verify transfer feasibility
for transfer in transfers:
from_idx = self._get_dc_index(transfer['from'])
product_idx = self._get_product_index(transfer['product'])
self.assertLessEqual(
transfer['quantity'],
current[from_idx, product_idx]
)
Integration Testing
def test_end_to_end_optimization():
"""Test complete optimization workflow"""
# Setup test scenario
test_scenario = create_test_scenario(
n_dcs=10,
n_products=100,
n_stores=500,
demand_pattern='seasonal'
)
# Run optimization
result = run_optimization_test(
scenario=test_scenario,
strategy=DistributionNetworkStrategy,
iterations=10
)
# Validate results
assert result.service_level['mean'] >= 0.95
assert result.total_cost['mean'] <= test_scenario.budget
assert all(transfer['quantity'] > 0 for transfer in result.transfers)
Performance Testing
def benchmark_strategy_performance():
"""Benchmark strategy performance at scale"""
benchmarks = []
for scale in [100, 500, 1000, 5000]: # Number of SKUs
start_time = time.time()
config = generate_scaled_config(n_products=scale)
strategy = DistributionNetworkStrategy(config)
result = strategy.optimize(
current_state=generate_scaled_state(scale),
constraints=standard_constraints,
objectives=standard_objectives
)
execution_time = time.time() - start_time
benchmarks.append({
'scale': scale,
'execution_time': execution_time,
'memory_usage': get_memory_usage(),
'solution_quality': result.expected_cost
})
return benchmarks
Deployment Considerations
Configuration Management
# config/distribution_strategy.yaml
strategy:
name: distribution_network_optimization
version: 1.0.0
network:
dc_locations: !include dc_locations.yaml
store_mappings: !include store_mappings.yaml
parameters:
planning_horizon: 52
reorder_frequency: weekly
safety_stock_method: dynamic
costs:
holding_rate: 0.20
stockout_penalty: 2.5
transportation:
ltl_rate: 0.15 # per pound-mile
ftl_rate: 2.50 # per mile
optimization:
solver: SLSQP
max_iterations: 1000
tolerance: 1e-6
monitoring:
metrics_frequency: daily
alert_thresholds:
service_level: 0.93
forecast_accuracy: 0.75
Monitoring and Alerts
from atlas.monitoring import StrategyMonitor
monitor = StrategyMonitor(
strategy='distribution_network_optimization',
metrics=[
'service_level',
'total_cost',
'forecast_accuracy',
'transfer_efficiency'
],
alert_channels=['email', 'slack']
)
# Set up alerts
monitor.add_alert(
condition='service_level < 0.95',
severity='high',
message='Service level below target'
)
monitor.add_alert(
condition='total_cost > budget * 1.1',
severity='critical',
message='Cost overrun exceeding 10%'
)
Continuous Improvement
from atlas.learning import StrategyLearner
learner = StrategyLearner(strategy)
# Learn from historical performance
learner.train(
historical_data=load_historical_results(),
performance_metrics=['cost', 'service_level'],
parameter_ranges={
'safety_stock_multiplier': (1.0, 3.0),
'reorder_threshold': (0.1, 0.5),
'transfer_threshold': (100, 1000)
}
)
# Apply learned parameters
optimized_params = learner.get_optimal_parameters()
strategy.update_parameters(optimized_params)
Results and Benefits
Expected Outcomes
Based on typical implementations, this strategy delivers:
Cost Reduction: 15-25% reduction in total inventory costs
Service Improvement: 2-5% increase in product availability
Network Efficiency: 30% reduction in inter-DC transfers
Inventory Optimization: 20% reduction in working capital
Key Success Factors
Data Quality: Accurate demand history and lead times
Model Calibration: Regular updating of forecast models
Change Management: Gradual rollout with monitoring
Stakeholder Buy-in: Clear communication of benefits
Conclusion
This custom optimization strategy demonstrates how Atlas enables sophisticated, domain-specific optimization while maintaining clean abstractions and reusable patterns. The distribution network optimization strategy can be adapted for various industries and scaled from small regional networks to global supply chains.
Key takeaways:
Atlas’s flexibility allows encoding complex business logic
Custom strategies can leverage multiple optimization techniques
Uncertainty and risk can be explicitly modeled
Performance scales well with proper design
Continuous learning improves results over time
For more information on building custom strategies, see the Atlas Strategy Development Guide.