Skip to content

Polarway Architecture Guide

Deep dive into Polarway's architecture, design decisions, and implementation details.

πŸ—οΈ High-Level Architecture

System Overview

Layer Component Sub-Components Description
Client Layer Python Client, Rust Client, Other gRPC Connect via gRPC (HTTP/2)
Polarway Server
↳ gRPC Service Layer Tonic/Tower middleware stack Handles client requests
↳ Handle Manager TTL Cache Handle 1 (30 min), Handle 2 (30 min), Handle N (30 min) Manages DataFrame lifecycle
↳ DataFrame Engine Polars Lazy Eval, Query Optimizer (Projection/Predicate Pushdown) Query planning and optimization
↳ Execution Engine Arrow Parallel Executor, SIMD Kernels (vectorized) Parallel compute
↳ Arrow Memory Columnar Batch1, Batch2, ..., BatchN In-memory storage
↳ I/O Layer Tokio Parquet/CSV, WebSocket/Kafka, REST API/gRPC Async file/network operations

Key Components

  1. gRPC Service Layer: Handles client requests via HTTP/2
  2. Handle Manager: Manages DataFrame lifecycle with TTL
  3. DataFrame Engine: Lazy evaluation and query optimization
  4. Execution Engine: Parallel execution with Arrow compute kernels
  5. I/O Layer: Async file/network operations

🎯 Core Design Principles

1. Performance First

Zero-Copy Operations

Polarway uses Apache Arrow's zero-copy IPC for data transfer:

// No serialization needed - Arrow buffers shared directly
let batch: RecordBatch = dataframe.to_arrow_batch()?;
let ipc_bytes = arrow_ipc::writer::stream_to_bytes(&batch);
// Client receives same memory layout

Memory Efficiency

  • Columnar storage reduces memory fragmentation
  • Lazy evaluation defers computation until needed
  • Streaming prevents OOM on large datasets
  • Handle-based architecture avoids data duplication

SIMD Optimization

Arrow compute kernels use SIMD instructions:

// Vectorized filter operation (8 values at once on AVX2)
fn filter_gt_simd(column: &Float64Array, threshold: f64) -> BooleanArray {
    // Uses _mm256_cmp_pd for 4x speedup
    column.iter().map(|v| v > threshold).collect()
}

2. Type Safety

Rust's Type System

// Compile-time guarantees
fn apply_filter(df: DataFrame, expr: Expr) -> Result<DataFrame, PolarsError> {
    // Type checked at compile time
    // No null pointer exceptions
    // Thread-safe by default
}

Result Types

// Explicit error handling
pub enum Result<T> {
    Ok(T),
    Err(PolarsError),
}

// Forces caller to handle errors
let df = read_parquet("file.parquet")?;

3. Composability

Expression System

# Expressions compose naturally
expr = (
    (pd.col("price") * pd.col("quantity"))
    .cast(pd.Float64)
    .alias("notional")
)

# Can be reused across operations
df1.with_column(expr)
df2.with_column(expr)

Lazy Evaluation

# Operations build a query plan
df = pd.read_parquet("data.parquet")
df = df.filter(pd.col("price") > 100)  # No execution yet
df = df.select(["symbol", "price"])     # Still no execution

# Optimized as single query
result = df.collect()  # Execute once

πŸ—„οΈ Data Model

Columnar Storage

Why Columnar?

Traditional row-based storage:

Row 1: [id=1, name="bob", age=30, city="NYC"]
Row 2: [id=2, name="Bob", age=25, city="LA"]

Columnar storage (Arrow):

id:   [1, 2]
name: ["bob", "Bob"]
age:  [30, 25]
city: ["NYC", "LA"]

Benefits: - Better compression (similar values together) - SIMD operations (process 4-16 values at once) - Cache efficiency (fewer cache misses) - Selective column loading (skip unused columns)

Arrow Format

RecordBatch Structure:

Section Content
Schema field: \"price\", type: Float64; field: \"volume\", type: Int64
Arrays Float64Array [100.5, 101.2, ...]; Int64Array [1000, 2000, ...]

Each Array contains: - Buffer (validity bitmap) - Buffer (offsets for var-length) - Buffer (data values)

Null Handling

// Validity bitmap (1 bit per value)
// 1 = valid, 0 = null
[1, 1, 0, 1] β†’ values: [10, 20, _, 30]

// Efficient null checks with SIMD
fn has_nulls(array: &Array) -> bool {
    array.null_count() > 0  // O(1) operation
}

Type System

pub enum DataType {
    // Numeric
    Int8, Int16, Int32, Int64,
    UInt8, UInt16, UInt32, UInt64,
    Float32, Float64,

    // Temporal
    Date,
    Datetime(TimeUnit, Option<String>),  // timezone
    Duration(TimeUnit),
    Time,

    // Complex
    Utf8,
    Boolean,
    List(Box<DataType>),
    Struct(Vec<Field>),
    Categorical,
}

βš™οΈ Query Execution

Lazy Evaluation Pipeline

1. Expression Building
   ↓
2. Logical Plan Construction
   ↓
3. Query Optimization
   ↓
4. Physical Plan Generation
   ↓
5. Parallel Execution
   ↓
6. Result Materialization

Query Optimizer

Projection Pushdown

Before optimization:

df = pd.read_parquet("data.parquet")  # Read all 100 columns
df = df.select(["col1", "col2"])       # Use only 2 columns

After optimization:

Only read col1, col2 from Parquet (98 columns skipped)

Predicate Pushdown

Before:

df = pd.read_parquet("data.parquet")  # Read 1M rows
df = df.filter(pd.col("date") > "2024-01-01")  # Filter to 10K rows

After:

Push filter into Parquet reader β†’ Read only 10K rows

Example Optimization:

# User writes this
result = (
    pd.read_parquet("sales.parquet")
    .filter(pd.col("region") == "US")
    .select(["product", "revenue", "date"])
    .filter(pd.col("date") > "2024-01-01")
    .group_by("product")
    .agg({"revenue": "sum"})
)

# Optimizer rewrites to:
# 1. Push both filters to Parquet reader
# 2. Read only [product, revenue, date, region] columns
# 3. Apply filters during read (skip 90% of data)
# 4. Group and aggregate remaining rows

Execution Engine

Parallel Execution

// Split data into chunks
let chunks = partition_data(data, num_cores);

// Process in parallel
let results: Vec<_> = chunks
    .par_iter()  // Rayon parallel iterator
    .map(|chunk| apply_filter(chunk, &predicate))
    .collect();

// Merge results
merge_chunks(results)

Pipeline Parallelism

Read Thread β†’ Parse Thread β†’ Filter Thread β†’ Aggregate Thread
    ↓             ↓              ↓               ↓
 Chunk 1       Chunk 1        Chunk 1         Result 1
 Chunk 2       Chunk 2        Chunk 2         Result 2
 Chunk 3       Chunk 3        Chunk 3         Result 3

🌐 Handle-Based Architecture

Why Handles?

Problem with Embedded Libraries:

# Polars (PyO3) - data lives in Python process
df = pl.read_parquet("10GB.parquet")  # Uses 10GB RAM in Python
df2 = df.filter(...)                  # Another 10GB copy
df3 = df2.select(...)                 # Another copy
# Total: 30GB RAM used

Solution with Handles:

# Polarway - data lives on server
df = pd.read_parquet("10GB.parquet")  # Handle("abc"), server uses 10GB
df2 = df.filter(...)                  # Handle("def"), server uses 10GB
df3 = df2.select(...)                 # Handle("ghi"), server still 10GB
# Total: 10GB RAM on server, ~1KB in Python

Handle Lifecycle

pub struct HandleManager {
    handles: Arc<RwLock<HashMap<String, DataFrame>>>,
    ttl: Duration,  // 30 minutes default
}

impl HandleManager {
    pub fn create(&self, df: DataFrame) -> String {
        let id = Uuid::new_v4().to_string();
        let expiry = Instant::now() + self.ttl;

        self.handles.write().insert(id.clone(), (df, expiry));

        // Spawn cleanup task
        tokio::spawn(cleanup_expired_handle(id.clone(), expiry));

        id
    }

    pub fn get(&self, id: &str) -> Option<DataFrame> {
        self.handles.read().get(id).map(|(df, _)| df.clone())
    }

    pub fn extend_ttl(&self, id: &str) {
        // Reset expiry on access
        if let Some((_, expiry)) = self.handles.write().get_mut(id) {
            *expiry = Instant::now() + self.ttl;
        }
    }
}

Memory Management

Reference Counting:

// DataFrames use Arc<> for cheap cloning
pub struct DataFrame {
    data: Arc<Vec<Series>>,  // Shared ownership
}

// Multiple handles can reference same data
let df1 = DataFrame::new(data);
let df2 = df1.clone();  // Just increments Arc counter (O(1))

πŸš€ Distributed Computing

Architecture Overview

Polarway’s distributed direction is: - Time-series storage/metadata/observability anchored in QuestDB (tables, partitions, retention, SQL, system tables). - Distributed query execution via DataFusion + Ballista (scheduler + executors, shuffle, object store). - APIs: gRPC remains the primary engine API; a QuestDB-like REST endpoint (/exec) is added for compatibility and easy integrations.

In that model, Polarway is the gateway / control-plane that: - receives API requests (gRPC/REST) - resolves handles and metadata - compiles queries (DataFusion logical plan) - executes locally (single-node) or dispatches to Ballista (distributed) - returns results as handles (and via Arrow IPC streams)

Clients
  (Python/Rust/REST)
      |
      v
  gRPC/HTTP Load Balancer
      |
      v
  Polarway API Gateways (stateless)
   - gRPC Service
   - REST /exec (QuestDB-like)
   - Handle routing
   - Plan compilation
      |
      | dispatch/execute
      v
  DataFusion (single-node)  OR  Ballista (distributed)
      |
      | reads/writes
      v
  Object Store (results, shuffles) + QuestDB (time-series, metadata)

Ballista (distributed execution)

Ballista architecture (at a high level): - Scheduler: receives DataFusion plans and coordinates execution. - Executors: run partitions of the plan and exchange shuffle data. - Object store: stores shuffle and result artifacts (recommended for scale).

Polarway integration direction: - Polarway builds a DataFusion plan and submits it to Ballista when distributed=true. - Handles reference results stored externally (object store). - QuestDB remains the authoritative store for time-series tables and operational metadata.

Data Partitioning

// Hash partitioning
fn partition_by_hash(df: DataFrame, key: &str, n: usize) -> Vec<DataFrame> {
    df.partition_by(|row| {
        hash(row[key]) % n  // Deterministic assignment
    })
}

// Range partitioning
fn partition_by_range(df: DataFrame, key: &str, ranges: &[Range]) 
    -> Vec<DataFrame> 
{
    ranges.iter().map(|range| {
        df.filter(col(key).between(range.start, range.end))
    }).collect()
}

πŸ”„ Async Operations

Tokio Runtime

#[tokio::main]
async fn main() {
    let server = PolarwayServer::new();

    // Handle connections concurrently
    let listener = TcpListener::bind("0.0.0.0:50051").await?;

    loop {
        let (socket, _) = listener.accept().await?;

        // Spawn task for each connection
        tokio::spawn(async move {
            handle_connection(socket).await
        });
    }
}

Streaming I/O

// Async Parquet reader
pub async fn read_parquet_stream(path: &str) 
    -> impl Stream<Item = Result<RecordBatch>> 
{
    let file = File::open(path).await?;
    let reader = AsyncParquetReader::new(file);

    // Yield batches as they're read
    stream! {
        while let Some(batch) = reader.next_batch().await? {
            yield Ok(batch);
        }
    }
}

πŸ“Š I/O Subsystem

File Formats

Parquet Reader:

pub struct ParquetReader {
    // Memory-mapped file for zero-copy reads
    mmap: Mmap,

    // Metadata (footer) parsed once
    metadata: ParquetMetadata,

    // Row groups (can read in parallel)
    row_groups: Vec<RowGroupReader>,
}

impl ParquetReader {
    pub async fn read_row_group_parallel(&self, rg: usize) 
        -> Result<RecordBatch> 
    {
        // Read all columns in parallel
        let columns = join_all(
            self.row_groups[rg]
                .columns()
                .map(|col| tokio::spawn(read_column(col)))
        ).await?;

        RecordBatch::try_new(self.schema(), columns)
    }
}

WebSocket Streaming

pub async fn websocket_stream(url: &str) 
    -> impl Stream<Item = Result<RecordBatch>> 
{
    let (mut ws, _) = connect_async(url).await?;
    let mut buffer = Vec::new();

    stream! {
        while let Some(msg) = ws.next().await {
            buffer.push(parse_message(msg)?);

            // Yield batch every 1000 messages
            if buffer.len() >= 1000 {
                let batch = create_batch(&buffer)?;
                buffer.clear();
                yield Ok(batch);
            }
        }
    }
}

🐍 Python Integration

PyO3 Bindings

#[pyclass]
pub struct DataFrame {
    handle: String,
    client: Arc<GrpcClient>,
}

#[pymethods]
impl DataFrame {
    fn select(&self, columns: Vec<String>) -> PyResult<Self> {
        let new_handle = self.client.select(self.handle, columns)?;
        Ok(DataFrame {
            handle: new_handle,
            client: self.client.clone(),
        })
    }

    fn collect(&self) -> PyResult<PyArrowTable> {
        let bytes = self.client.collect(self.handle)?;
        let table = deserialize_arrow_ipc(bytes)?;
        Ok(PyArrowTable(table))
    }
}

🎯 Design Decisions

Why Fork Polars?

Reasons:

  1. gRPC Architecture: Requires handle-based design incompatible with Polars
  2. Distributed Computing: Needs network layer and partitioning
  3. Streaming-First: Different memory management strategy
  4. Financial Focus: Domain-specific optimizations (OHLCV, tick data)

What We Keep:

  • Core DataFrame engine (solid foundation)
  • Expression system (powerful and composable)
  • Arrow integration (industry standard)
  • Query optimizer (excellent performance)

What We Change:

  • Remove PyO3 bindings (use gRPC instead)
  • Add handle management layer
  • Implement streaming execution
  • Add time-series operations

Why Rust?

Advantages:

  1. Memory Safety: No null pointers, no data races
  2. Performance: Zero-cost abstractions, SIMD
  3. Concurrency: Safe parallelism with Send/Sync
  4. Type System: Compile-time guarantees

Example:

// This won't compile (caught at compile time!)
let df = DataFrame::new();
thread::spawn(move || {
    df.filter(...)  // Error: DataFrame not Send
});

// Fix: Use Arc<> for shared ownership
let df = Arc::new(DataFrame::new());
let df_clone = df.clone();
thread::spawn(move || {
    df_clone.filter(...)  // OK!
});

Why Arrow?

Benefits:

  1. Zero-Copy: Share memory between processes
  2. Language Agnostic: C++, Rust, Python, Java all use same format
  3. Columnar: Perfect for analytics
  4. Ecosystem: Works with Parquet, DataFusion, Spark

Example:

// Create Arrow data in Rust
let array = Int64Array::from(vec![1, 2, 3]);
let batch = RecordBatch::try_new(schema, vec![array])?;

// Send to Python (zero-copy)
let ipc_bytes = write_ipc_stream(&batch);
// Python receives same memory layout, no deserialization needed

πŸ“ˆ Future Architecture

Planned Enhancements

GPU Acceleration:

// Use RAPIDS cuDF for GPU operations
pub trait GpuExecutable {
    fn to_gpu(&self) -> GpuDataFrame;
    fn execute_on_gpu(&self, expr: Expr) -> GpuDataFrame;
}

Advanced Query Planning:

Cost-Based Optimizer:
- Statistics collection (histograms, min/max, null count)
- Cardinality estimation
- Join reordering based on costs
- Adaptive execution (change plan based on runtime stats)

SQL Interface:

-- Use DataFusion for SQL parsing
SELECT symbol, AVG(price) as avg_price
FROM polarway://data.parquet
WHERE date > '2024-01-01'
GROUP BY symbol
ORDER BY avg_price DESC
LIMIT 10;

See Also: - Quick Reference - Common operations cheat sheet - API Documentation - Complete API reference - Migration Guide - Moving from Polars - REST Exec API - QuestDB-like /exec endpoint - Distributed Getting Started - Multi-container/VM setup - Distributed Implementation Plan - Phased rollout and best practices