Polarway Architecture Guide
Deep dive into Polarway's architecture, design decisions, and implementation details.
ποΈ High-Level Architecture
System Overview
| Layer | Component | Sub-Components | Description |
|---|---|---|---|
| Client Layer | Python Client, Rust Client, Other gRPC | Connect via gRPC (HTTP/2) | |
| Polarway Server | |||
| β³ gRPC Service Layer | Tonic/Tower middleware stack | Handles client requests | |
| β³ Handle Manager | TTL Cache | Handle 1 (30 min), Handle 2 (30 min), Handle N (30 min) | Manages DataFrame lifecycle |
| β³ DataFrame Engine | Polars | Lazy Eval, Query Optimizer (Projection/Predicate Pushdown) | Query planning and optimization |
| β³ Execution Engine | Arrow | Parallel Executor, SIMD Kernels (vectorized) | Parallel compute |
| β³ Arrow Memory | Columnar | Batch1, Batch2, ..., BatchN | In-memory storage |
| β³ I/O Layer | Tokio | Parquet/CSV, WebSocket/Kafka, REST API/gRPC | Async file/network operations |
Key Components
- gRPC Service Layer: Handles client requests via HTTP/2
- Handle Manager: Manages DataFrame lifecycle with TTL
- DataFrame Engine: Lazy evaluation and query optimization
- Execution Engine: Parallel execution with Arrow compute kernels
- I/O Layer: Async file/network operations
π― Core Design Principles
1. Performance First
Zero-Copy Operations
Polarway uses Apache Arrow's zero-copy IPC for data transfer:
// No serialization needed - Arrow buffers shared directly
let batch: RecordBatch = dataframe.to_arrow_batch()?;
let ipc_bytes = arrow_ipc::writer::stream_to_bytes(&batch);
// Client receives same memory layout
Memory Efficiency
- Columnar storage reduces memory fragmentation
- Lazy evaluation defers computation until needed
- Streaming prevents OOM on large datasets
- Handle-based architecture avoids data duplication
SIMD Optimization
Arrow compute kernels use SIMD instructions:
// Vectorized filter operation (8 values at once on AVX2)
fn filter_gt_simd(column: &Float64Array, threshold: f64) -> BooleanArray {
// Uses _mm256_cmp_pd for 4x speedup
column.iter().map(|v| v > threshold).collect()
}
2. Type Safety
Rust's Type System
// Compile-time guarantees
fn apply_filter(df: DataFrame, expr: Expr) -> Result<DataFrame, PolarsError> {
// Type checked at compile time
// No null pointer exceptions
// Thread-safe by default
}
Result Types
// Explicit error handling
pub enum Result<T> {
Ok(T),
Err(PolarsError),
}
// Forces caller to handle errors
let df = read_parquet("file.parquet")?;
3. Composability
Expression System
# Expressions compose naturally
expr = (
(pd.col("price") * pd.col("quantity"))
.cast(pd.Float64)
.alias("notional")
)
# Can be reused across operations
df1.with_column(expr)
df2.with_column(expr)
Lazy Evaluation
# Operations build a query plan
df = pd.read_parquet("data.parquet")
df = df.filter(pd.col("price") > 100) # No execution yet
df = df.select(["symbol", "price"]) # Still no execution
# Optimized as single query
result = df.collect() # Execute once
ποΈ Data Model
Columnar Storage
Why Columnar?
Traditional row-based storage:
Columnar storage (Arrow):
Benefits: - Better compression (similar values together) - SIMD operations (process 4-16 values at once) - Cache efficiency (fewer cache misses) - Selective column loading (skip unused columns)
Arrow Format
RecordBatch Structure:
| Section | Content |
|---|---|
| Schema | field: \"price\", type: Float64; field: \"volume\", type: Int64 |
| Arrays | Float64Array [100.5, 101.2, ...]; Int64Array [1000, 2000, ...] |
Each Array contains: - Buffer (validity bitmap) - Buffer (offsets for var-length) - Buffer (data values)
Null Handling
// Validity bitmap (1 bit per value)
// 1 = valid, 0 = null
[1, 1, 0, 1] β values: [10, 20, _, 30]
// Efficient null checks with SIMD
fn has_nulls(array: &Array) -> bool {
array.null_count() > 0 // O(1) operation
}
Type System
pub enum DataType {
// Numeric
Int8, Int16, Int32, Int64,
UInt8, UInt16, UInt32, UInt64,
Float32, Float64,
// Temporal
Date,
Datetime(TimeUnit, Option<String>), // timezone
Duration(TimeUnit),
Time,
// Complex
Utf8,
Boolean,
List(Box<DataType>),
Struct(Vec<Field>),
Categorical,
}
βοΈ Query Execution
Lazy Evaluation Pipeline
1. Expression Building
β
2. Logical Plan Construction
β
3. Query Optimization
β
4. Physical Plan Generation
β
5. Parallel Execution
β
6. Result Materialization
Query Optimizer
Projection Pushdown
Before optimization:
df = pd.read_parquet("data.parquet") # Read all 100 columns
df = df.select(["col1", "col2"]) # Use only 2 columns
After optimization:
Predicate Pushdown
Before:
df = pd.read_parquet("data.parquet") # Read 1M rows
df = df.filter(pd.col("date") > "2024-01-01") # Filter to 10K rows
After:
Example Optimization:
# User writes this
result = (
pd.read_parquet("sales.parquet")
.filter(pd.col("region") == "US")
.select(["product", "revenue", "date"])
.filter(pd.col("date") > "2024-01-01")
.group_by("product")
.agg({"revenue": "sum"})
)
# Optimizer rewrites to:
# 1. Push both filters to Parquet reader
# 2. Read only [product, revenue, date, region] columns
# 3. Apply filters during read (skip 90% of data)
# 4. Group and aggregate remaining rows
Execution Engine
Parallel Execution
// Split data into chunks
let chunks = partition_data(data, num_cores);
// Process in parallel
let results: Vec<_> = chunks
.par_iter() // Rayon parallel iterator
.map(|chunk| apply_filter(chunk, &predicate))
.collect();
// Merge results
merge_chunks(results)
Pipeline Parallelism
Read Thread β Parse Thread β Filter Thread β Aggregate Thread
β β β β
Chunk 1 Chunk 1 Chunk 1 Result 1
Chunk 2 Chunk 2 Chunk 2 Result 2
Chunk 3 Chunk 3 Chunk 3 Result 3
π Handle-Based Architecture
Why Handles?
Problem with Embedded Libraries:
# Polars (PyO3) - data lives in Python process
df = pl.read_parquet("10GB.parquet") # Uses 10GB RAM in Python
df2 = df.filter(...) # Another 10GB copy
df3 = df2.select(...) # Another copy
# Total: 30GB RAM used
Solution with Handles:
# Polarway - data lives on server
df = pd.read_parquet("10GB.parquet") # Handle("abc"), server uses 10GB
df2 = df.filter(...) # Handle("def"), server uses 10GB
df3 = df2.select(...) # Handle("ghi"), server still 10GB
# Total: 10GB RAM on server, ~1KB in Python
Handle Lifecycle
pub struct HandleManager {
handles: Arc<RwLock<HashMap<String, DataFrame>>>,
ttl: Duration, // 30 minutes default
}
impl HandleManager {
pub fn create(&self, df: DataFrame) -> String {
let id = Uuid::new_v4().to_string();
let expiry = Instant::now() + self.ttl;
self.handles.write().insert(id.clone(), (df, expiry));
// Spawn cleanup task
tokio::spawn(cleanup_expired_handle(id.clone(), expiry));
id
}
pub fn get(&self, id: &str) -> Option<DataFrame> {
self.handles.read().get(id).map(|(df, _)| df.clone())
}
pub fn extend_ttl(&self, id: &str) {
// Reset expiry on access
if let Some((_, expiry)) = self.handles.write().get_mut(id) {
*expiry = Instant::now() + self.ttl;
}
}
}
Memory Management
Reference Counting:
// DataFrames use Arc<> for cheap cloning
pub struct DataFrame {
data: Arc<Vec<Series>>, // Shared ownership
}
// Multiple handles can reference same data
let df1 = DataFrame::new(data);
let df2 = df1.clone(); // Just increments Arc counter (O(1))
π Distributed Computing
Architecture Overview
Polarwayβs distributed direction is:
- Time-series storage/metadata/observability anchored in QuestDB (tables, partitions, retention, SQL, system tables).
- Distributed query execution via DataFusion + Ballista (scheduler + executors, shuffle, object store).
- APIs: gRPC remains the primary engine API; a QuestDB-like REST endpoint (/exec) is added for compatibility and easy integrations.
In that model, Polarway is the gateway / control-plane that: - receives API requests (gRPC/REST) - resolves handles and metadata - compiles queries (DataFusion logical plan) - executes locally (single-node) or dispatches to Ballista (distributed) - returns results as handles (and via Arrow IPC streams)
Clients
(Python/Rust/REST)
|
v
gRPC/HTTP Load Balancer
|
v
Polarway API Gateways (stateless)
- gRPC Service
- REST /exec (QuestDB-like)
- Handle routing
- Plan compilation
|
| dispatch/execute
v
DataFusion (single-node) OR Ballista (distributed)
|
| reads/writes
v
Object Store (results, shuffles) + QuestDB (time-series, metadata)
Ballista (distributed execution)
Ballista architecture (at a high level): - Scheduler: receives DataFusion plans and coordinates execution. - Executors: run partitions of the plan and exchange shuffle data. - Object store: stores shuffle and result artifacts (recommended for scale).
Polarway integration direction:
- Polarway builds a DataFusion plan and submits it to Ballista when distributed=true.
- Handles reference results stored externally (object store).
- QuestDB remains the authoritative store for time-series tables and operational metadata.
Data Partitioning
// Hash partitioning
fn partition_by_hash(df: DataFrame, key: &str, n: usize) -> Vec<DataFrame> {
df.partition_by(|row| {
hash(row[key]) % n // Deterministic assignment
})
}
// Range partitioning
fn partition_by_range(df: DataFrame, key: &str, ranges: &[Range])
-> Vec<DataFrame>
{
ranges.iter().map(|range| {
df.filter(col(key).between(range.start, range.end))
}).collect()
}
π Async Operations
Tokio Runtime
#[tokio::main]
async fn main() {
let server = PolarwayServer::new();
// Handle connections concurrently
let listener = TcpListener::bind("0.0.0.0:50051").await?;
loop {
let (socket, _) = listener.accept().await?;
// Spawn task for each connection
tokio::spawn(async move {
handle_connection(socket).await
});
}
}
Streaming I/O
// Async Parquet reader
pub async fn read_parquet_stream(path: &str)
-> impl Stream<Item = Result<RecordBatch>>
{
let file = File::open(path).await?;
let reader = AsyncParquetReader::new(file);
// Yield batches as they're read
stream! {
while let Some(batch) = reader.next_batch().await? {
yield Ok(batch);
}
}
}
π I/O Subsystem
File Formats
Parquet Reader:
pub struct ParquetReader {
// Memory-mapped file for zero-copy reads
mmap: Mmap,
// Metadata (footer) parsed once
metadata: ParquetMetadata,
// Row groups (can read in parallel)
row_groups: Vec<RowGroupReader>,
}
impl ParquetReader {
pub async fn read_row_group_parallel(&self, rg: usize)
-> Result<RecordBatch>
{
// Read all columns in parallel
let columns = join_all(
self.row_groups[rg]
.columns()
.map(|col| tokio::spawn(read_column(col)))
).await?;
RecordBatch::try_new(self.schema(), columns)
}
}
WebSocket Streaming
pub async fn websocket_stream(url: &str)
-> impl Stream<Item = Result<RecordBatch>>
{
let (mut ws, _) = connect_async(url).await?;
let mut buffer = Vec::new();
stream! {
while let Some(msg) = ws.next().await {
buffer.push(parse_message(msg)?);
// Yield batch every 1000 messages
if buffer.len() >= 1000 {
let batch = create_batch(&buffer)?;
buffer.clear();
yield Ok(batch);
}
}
}
}
π Python Integration
PyO3 Bindings
#[pyclass]
pub struct DataFrame {
handle: String,
client: Arc<GrpcClient>,
}
#[pymethods]
impl DataFrame {
fn select(&self, columns: Vec<String>) -> PyResult<Self> {
let new_handle = self.client.select(self.handle, columns)?;
Ok(DataFrame {
handle: new_handle,
client: self.client.clone(),
})
}
fn collect(&self) -> PyResult<PyArrowTable> {
let bytes = self.client.collect(self.handle)?;
let table = deserialize_arrow_ipc(bytes)?;
Ok(PyArrowTable(table))
}
}
π― Design Decisions
Why Fork Polars?
Reasons:
- gRPC Architecture: Requires handle-based design incompatible with Polars
- Distributed Computing: Needs network layer and partitioning
- Streaming-First: Different memory management strategy
- Financial Focus: Domain-specific optimizations (OHLCV, tick data)
What We Keep:
- Core DataFrame engine (solid foundation)
- Expression system (powerful and composable)
- Arrow integration (industry standard)
- Query optimizer (excellent performance)
What We Change:
- Remove PyO3 bindings (use gRPC instead)
- Add handle management layer
- Implement streaming execution
- Add time-series operations
Why Rust?
Advantages:
- Memory Safety: No null pointers, no data races
- Performance: Zero-cost abstractions, SIMD
- Concurrency: Safe parallelism with Send/Sync
- Type System: Compile-time guarantees
Example:
// This won't compile (caught at compile time!)
let df = DataFrame::new();
thread::spawn(move || {
df.filter(...) // Error: DataFrame not Send
});
// Fix: Use Arc<> for shared ownership
let df = Arc::new(DataFrame::new());
let df_clone = df.clone();
thread::spawn(move || {
df_clone.filter(...) // OK!
});
Why Arrow?
Benefits:
- Zero-Copy: Share memory between processes
- Language Agnostic: C++, Rust, Python, Java all use same format
- Columnar: Perfect for analytics
- Ecosystem: Works with Parquet, DataFusion, Spark
Example:
// Create Arrow data in Rust
let array = Int64Array::from(vec![1, 2, 3]);
let batch = RecordBatch::try_new(schema, vec![array])?;
// Send to Python (zero-copy)
let ipc_bytes = write_ipc_stream(&batch);
// Python receives same memory layout, no deserialization needed
π Future Architecture
Planned Enhancements
GPU Acceleration:
// Use RAPIDS cuDF for GPU operations
pub trait GpuExecutable {
fn to_gpu(&self) -> GpuDataFrame;
fn execute_on_gpu(&self, expr: Expr) -> GpuDataFrame;
}
Advanced Query Planning:
Cost-Based Optimizer:
- Statistics collection (histograms, min/max, null count)
- Cardinality estimation
- Join reordering based on costs
- Adaptive execution (change plan based on runtime stats)
SQL Interface:
-- Use DataFusion for SQL parsing
SELECT symbol, AVG(price) as avg_price
FROM polarway://data.parquet
WHERE date > '2024-01-01'
GROUP BY symbol
ORDER BY avg_price DESC
LIMIT 10;
See Also:
- Quick Reference - Common operations cheat sheet
- API Documentation - Complete API reference
- Migration Guide - Moving from Polars
- REST Exec API - QuestDB-like /exec endpoint
- Distributed Getting Started - Multi-container/VM setup
- Distributed Implementation Plan - Phased rollout and best practices