Slotting Architecture · 8 min read
Transforming Legacy ERP Sales Logs for Velocity Targeting
Legacy ERP systems rarely export sales history in a format optimized for modern warehouse slotting algorithms. Fixed-width dumps, inconsistent timestamp formats, and unnormalized SKU identifiers routinely corrupt velocity calculations, leading to misallocated pick faces and inflated travel distances. Transforming these logs into actionable velocity metrics requires strict schema validation, deterministic mapping, and resilient batch processing before data reaches the WMS. This workflow operates as a critical subsystem within Velocity Data Ingestion & WMS Sync Pipelines, where raw transactional data must survive extraction, normalization, and scoring without introducing latency or data drift into slotting recommendations.
1. Raw Log Ingestion & Encoding Normalization
Legacy exports frequently mix ASCII, ISO-8859-1, and UTF-8 encodings within the same file, causing silent character corruption that breaks downstream parsers. Implement a streaming ingestion layer that attempts UTF-8 first, falls back to latin-1, and logs encoding mismatches at the byte level. Use a fixed-byte buffer rather than loading entire files into memory. For CSV or pipe-delimited legacy formats, enforce strict column count validation per row. Rows with mismatched field counts should be routed to a quarantine directory with the original line number preserved for auditability.
Configure your ingestion worker with explicit newline handling. Legacy Windows-based ERPs often embed \r\n or stray carriage returns inside quoted fields. Strip non-printable control characters (\x00-\x1F except \n and \t) before tokenization. This prevents silent field shifts that misalign invoice dates with quantities. Adhering to RFC 4180 Common Format for CSV parsing rules ensures consistent field boundary detection across mixed delimiter environments.
import logging
import re
from pathlib import Path
from typing import Generator, List, Tuple
logger = logging.getLogger(__name__)
CONTROL_CHAR_RE = re.compile(r"[\x00-\x08\x0B\x0C\x0E-\x1F]")
def stream_and_normalize(file_path: Path, chunk_size: int = 8192) -> Generator[Tuple[int, List[str]], None, None]:
"""Yields (line_number, fields) tuples with strict encoding fallback and control char stripping."""
encodings = ["utf-8", "latin-1"]
for encoding in encodings:
try:
with open(file_path, "r", encoding=encoding, errors="strict", newline="") as f:
for line_num, raw_line in enumerate(f, start=1):
cleaned = CONTROL_CHAR_RE.sub("", raw_line.replace("\r\n", "\n").strip())
if not cleaned:
continue
# Split on pipe or comma based on first line heuristic (omitted for brevity)
fields = cleaned.split("|")
yield line_num, fields
break
except UnicodeDecodeError:
logger.warning(f"Encoding mismatch on {file_path.name}, falling back to {encoding}")
continue
else:
raise ValueError(f"Unable to decode {file_path.name} with supported encodings")
2. Deterministic Schema Validation & SKU Harmonization
Velocity calculations fail when legacy ITEM_ID values do not map cleanly to WMS master SKUs. Implement a two-pass validation layer: first, enforce structural schema compliance (required fields, data types, date ranges); second, resolve SKU aliases against a maintained cross-reference table. Unmapped or deprecated SKUs must never default to a generic placeholder, as this artificially inflates velocity for slow-moving items.
The mapping layer should operate as a stateless lookup with an LRU cache for high-frequency SKUs. When a legacy ERP uses composite keys (e.g., WAREHOUSE_CODE + ITEM_ID), concatenate and hash them deterministically before matching. This aligns with established Sales History Data Mapping protocols, ensuring that only validated, WMS-compatible identifiers proceed to velocity aggregation. Quarantine unmatched records with explicit rejection codes (SKU_NOT_FOUND, DEPRECATED_ITEM, AMBIGUOUS_ALIAS) to enable rapid reconciliation by supply chain data stewards.
import hashlib
import functools
from datetime import datetime
from pydantic import BaseModel, ValidationError, Field
class SalesRecord(BaseModel):
transaction_id: str
sku_hash: str
quantity: int = Field(gt=0)
transaction_date: datetime
is_return: bool = False
@functools.lru_cache(maxsize=10_000)
def resolve_sku_hash(warehouse_code: str, item_id: str) -> str:
"""Deterministic composite key hashing for WMS SKU alignment."""
composite = f"{warehouse_code.upper()}|{item_id.strip()}"
return hashlib.sha256(composite.encode("utf-8")).hexdigest()[:16]
def validate_and_map(raw_fields: List[str], sku_lookup: dict) -> SalesRecord | None:
"""Two-pass validation: schema compliance then SKU resolution."""
try:
# Pass 1: Structural validation
record = SalesRecord(
transaction_id=raw_fields[0],
sku_hash=raw_fields[1],
quantity=int(raw_fields[2]),
transaction_date=datetime.fromisoformat(raw_fields[3]),
is_return=bool(int(raw_fields[4]))
)
except (IndexError, ValueError, ValidationError) as e:
logger.debug(f"Schema validation failed: {e}")
return None
# Pass 2: SKU resolution
if record.sku_hash not in sku_lookup:
logger.warning(f"SKU_NOT_FOUND: {record.sku_hash}")
return None
return record
3. Time-Weighted Velocity Aggregation & Return Netting
Raw transaction counts are insufficient for modern slotting engines. Velocity must reflect recent demand patterns, seasonality, and netted returns. Implement a rolling aggregation window (typically 30/60/90 days) with exponential decay weighting to prioritize recent sales. Returns must be subtracted from gross movement at the SKU-location level, not discarded, to prevent phantom demand signals.
Use vectorized operations for batch scoring. Calculate daily units moved, apply a decay factor (e.g., 0.95^(days_ago)), and sum across the window. Normalize against available storage days to produce a velocity_score that directly maps to WMS slotting tiers (A, B, C, D).
import pandas as pd
import numpy as np
def calculate_weighted_velocity(df: pd.DataFrame, window_days: int = 90, decay_rate: float = 0.95) -> pd.DataFrame:
"""Computes time-decayed velocity with return netting."""
df = df.copy()
df["net_qty"] = np.where(df["is_return"], -df["quantity"], df["quantity"])
# Calculate days ago from reference date
ref_date = pd.Timestamp.now()
df["days_ago"] = (ref_date - df["transaction_date"]).dt.days
# Filter to window
df = df[df["days_ago"] <= window_days].copy()
# Apply exponential decay
df["decay_weight"] = decay_rate ** df["days_ago"]
df["weighted_qty"] = df["net_qty"] * df["decay_weight"]
# Aggregate by SKU
velocity = (
df.groupby("sku_hash")["weighted_qty"]
.sum()
.reset_index(name="velocity_score")
)
return velocity
4. Async Batch Processing & Pipeline Resilience
Velocity transformations must execute asynchronously to avoid blocking ERP polling cycles or WMS sync windows. Implement chunked processing with explicit backpressure controls. Each batch should carry an idempotency key derived from the source file hash and processing timestamp, preventing duplicate velocity updates during network retries.
Use an async queue with bounded concurrency. If the WMS API returns 429 Too Many Requests or 503 Service Unavailable, apply exponential backoff with jitter. Persist intermediate state to a lightweight SQLite or Redis cache to survive worker restarts without reprocessing validated records.
import asyncio
import random
import aiohttp
from typing import List
async def push_velocity_to_wms(velocity_batch: List[dict], api_url: str, max_retries: int = 3):
"""Async batch push with exponential backoff and jitter."""
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
api_url,
json={"records": velocity_batch, "idempotency_key": "batch_20241025_v1"},
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
if resp.status == 200:
logger.info(f"Successfully synced {len(velocity_batch)} velocity records")
return True
elif resp.status == 429:
wait = min(2 ** attempt + random.uniform(0, 1), 30)
logger.warning(f"WMS rate-limited. Retrying in {wait:.2f}s")
await asyncio.sleep(wait)
else:
resp.raise_for_status()
except Exception as e:
logger.error(f"Batch push failed (attempt {attempt+1}): {e}")
await asyncio.sleep(min(2 ** attempt + random.uniform(0, 1), 30))
return False
5. Operational Troubleshooting & Audit Playbook
When velocity metrics drift or WMS slotting recommendations degrade, isolate the failure point using this diagnostic sequence:
- Encoding Drift Check: Verify quarantine logs for
UnicodeDecodeErrorspikes. A sudden increase indicates a new ERP export format or regional character set injection. - SKU Cache Hit Rate: Monitor
lru_cachemetrics. A hit rate below 85% suggests stale cross-reference tables or unregistered new product introductions. Force a cache refresh against the master item registry. - Return Netting Audit: Compare gross vs. net velocity deltas. If returns are not subtracting correctly, verify the
is_returnboolean mapping and ensure credit memos are timestamped to the original sale window. - WMS Sync Latency: Track
push_velocity_to_wmsround-trip times. Consistent>5slatency indicates WMS API thread pool exhaustion. Reduce batch chunk size from 500 to 150 records and increase worker concurrency limits. - Idempotency Conflicts: Query the WMS for duplicate velocity updates. If detected, verify the idempotency key generation logic and ensure file hash + timestamp concatenation is deterministic across retries.
Conclusion
Transforming legacy ERP sales logs into reliable velocity metrics is an engineering discipline, not a simple ETL task. By enforcing strict encoding normalization, deterministic SKU harmonization, time-weighted aggregation, and resilient async delivery, warehouse operations can eliminate the data drift that causes costly pick-face misallocations. Integrate these transformations directly into your ingestion architecture, monitor quarantine rejection rates daily, and maintain tight alignment between ERP export schedules and WMS slotting cycles.