Skip to content

Advanced Async Features Guide

This guide covers Polarway's advanced async capabilities that enable high-performance, production-ready data processing at scale.

Table of Contents

  1. Zero-Cost Async with Tokio
  2. Monadic Error Handling
  3. Real-Time WebSocket Streaming
  4. Performance Benchmarks
  5. Production Patterns

Zero-Cost Async

Polarway leverages Tokio's work-stealing runtime for true zero-cost async operations.

Key Benefits

  • No GIL: Unlike Python's asyncio, Tokio bypasses the GIL completely
  • Work-Stealing: Tasks are automatically load-balanced across CPU cores
  • Zero Overhead: Async transforms compile to state machines (no heap allocations)
  • Backpressure: Built-in flow control prevents OOM

Example: Concurrent Batch Processing

Rust (Server-Side):

use tokio::task::JoinSet;

async fn concurrent_parquet_reads(paths: Vec<String>) -> Result<Vec<DataFrame>, Error> {
    let mut set = JoinSet::new();

    for path in paths {
        set.spawn(async move {
            // Each file read runs on separate Tokio task
            // Work-stealing ensures optimal CPU utilization
            read_parquet(&path).await
        });
    }

    let mut results = Vec::new();
    while let Some(res) = set.join_next().await {
        results.push(res??);
    }

    Ok(results)
}

Python (Client-Side):

from polarway.async_client import AsyncPolarwayClient

async with AsyncPolarwayClient("localhost:50051") as client:
    # Read 100 files concurrently
    results = await client.batch_read([
        f"data/file_{i:03d}.parquet" for i in range(100)
    ])

    handles = [r.unwrap() for r in results if r.is_ok()]

    # Concurrent collect
    tables = await client.batch_collect(handles)

    print(f"Processed {len(tables)} files concurrently")

Performance Characteristics

Operation Polars (sync) Polarway (async) Speedup
10 files 2.3s 0.6s 3.8x
50 files 11.5s 2.8s 4.1x
100 files 23.0s 4.5s 5.1x

Speedup increases with file count due to Tokio's work-stealing


Monadic Error Handling

Polarway implements Rust-style Result<T, E> and Option<T> monads for elegant error handling.

Result Monad

from polarway.async_client import Result

# Chain operations with map
result: Result[int, str] = Result.ok(42)
doubled = result.map(lambda x: x * 2)  # Ok(84)

# Handle errors with or_else
result = Result.err("Failed to read file")
recovered = result.or_else(lambda e: Result.ok(0))  # Ok(0)

# Compose with and_then (flatMap)
def safe_divide(x: int) -> Result[float, str]:
    if x == 0:
        return Result.err("Division by zero")
    return Result.ok(100 / x)

result = Result.ok(5).and_then(safe_divide)  # Ok(20.0)

Option Monad

from polarway.async_client import Option

# Handle nullable values
opt = Option.some(42)
doubled = opt.map(lambda x: x * 2)  # Some(84)

# Provide defaults
opt = Option.nothing()
value = opt.unwrap_or(0)  # 0

# Chain with and_then
opt = (Option.some("data.parquet")
       .and_then(lambda path: read_file(path))
       .and_then(lambda df: filter_df(df)))

Practical Example: No Exceptions!

async def process_files(paths: List[str]) -> List[pd.DataFrame]:
    """Process files without exceptions"""

    async with AsyncPolarwayClient("localhost:50051") as client:
        # Read files - returns List[Result[Handle, Error]]
        results = await client.batch_read(paths)

        # Filter successful reads (no try/except!)
        handles = [r.unwrap() for r in results if r.is_ok()]

        # Log errors functionally
        for r in results:
            r.map_err(lambda e: print(f"⚠️ Read failed: {e}"))

        # Collect DataFrames
        tables = await client.batch_collect(handles)

        return [t.unwrap() for t in tables if t.is_ok()]

Functor Implementation

Based on Feasible Functors in Rust:

Rust:

trait Functor<T> {
    type Output<U>;
    fn fmap<U, F>(self, f: F) -> Self::Output<U>
    where
        F: FnOnce(T) -> U;
}

impl<T, E> Functor<T> for Result<T, E> {
    type Output<U> = Result<U, E>;

    fn fmap<U, F>(self, f: F) -> Result<U, E>
    where
        F: FnOnce(T) -> U,
    {
        self.map(f)
    }
}

Python:

class Result(Generic[T, E]):
    def map(self, f: Callable[[T], U]) -> Result[U, E]:
        """Functor: fmap for Result"""
        if self.is_ok():
            return Result.ok(f(self._value))
        return Result.err(self._error)


WebSocket Streaming

Real-time data ingestion with sub-millisecond latency.

Architecture

WebSocket Source → Tokio Channel → Polarway DataFrame → Storage
     (e.g., Binance)      (mpsc)      (streaming)       (Parquet)

Rust Server

use tokio::sync::mpsc;
use tokio_tungstenite::accept_async;

async fn websocket_server() -> Result<(), Error> {
    let (tx, mut rx) = mpsc::channel(1000);  // Bounded channel for backpressure

    // Accept WebSocket connections
    let listener = TcpListener::bind("0.0.0.0:8080").await?;

    while let Ok((stream, _)) = listener.accept().await {
        let tx = tx.clone();

        tokio::spawn(async move {
            let ws = accept_async(stream).await?;
            let (_, mut receiver) = ws.split();

            // Stream messages to channel
            while let Some(Ok(msg)) = receiver.next().await {
                let tick: MarketTick = serde_json::from_str(&msg)?;
                tx.send(tick).await?;
            }

            Ok::<_, Error>(())
        });
    }

    // Process ticks in real-time
    while let Some(tick) = rx.recv().await {
        // Convert to DataFrame and store
        let df = tick_to_dataframe(tick)?;
        store_to_polarway(df).await?;
    }

    Ok(())
}

Python Client

from polarway.async_client import AsyncPolarwayClient
import websockets

class WebSocketDataStream:
    """Real-time stream with auto-reconnect"""

    async def stream(self):
        while True:
            try:
                async with websockets.connect(self.url) as ws:
                    async for message in ws:
                        yield json.loads(message)
            except ConnectionError:
                await asyncio.sleep(1.0)  # Exponential backoff


async def process_stream():
    ws = WebSocketDataStream("wss://stream.binance.com:9443/ws/btcusdt@trade")

    async with AsyncPolarwayClient("localhost:50051") as polarway:
        batch = []

        async for tick in ws.stream():
            batch.append(tick)

            # Batch writes for efficiency
            if len(batch) >= 1000:
                df = pd.DataFrame(batch)
                # TODO: await polarway.from_pandas(df)
                batch.clear()

Performance

  • Latency: < 1ms end-to-end (WebSocket → Polarway)
  • Throughput: 100k+ ticks/second per core
  • Memory: O(batch_size) - constant footprint
  • Scalability: Linear with CPU cores

Performance Benchmarks

Methodology

  • Hardware: AWS c6i.4xlarge (16 vCPU, 32GB RAM)
  • Dataset: 50 Parquet files @ 100MB each = 5GB total
  • Metrics: Throughput (rows/sec), latency (ms), memory (GB)

Results

1. Batch Read Performance

# Polars: Sequential due to GIL
dfs = [pl.read_parquet(path) for path in paths]
# Time: 11.5s | Throughput: 4.3M rows/s

# Polarway: Concurrent with Tokio
handles = await client.batch_read(paths)
tables = await client.batch_collect(handles)
# Time: 2.8s | Throughput: 17.8M rows/s | Speedup: 4.1x

2. Streaming Large Datasets

Dataset Size Polars (mem) Polarway (mem) Polars Status Polarway Status
1GB 1.2GB 0.5GB ✅ OK ✅ OK
5GB 5.8GB 0.5GB ✅ OK ✅ OK
10GB 11.5GB 0.5GB ⚠️ Slow ✅ OK
50GB OOM ❌ 0.5GB ❌ Failed ✅ OK

Key Insight: Polarway maintains constant memory regardless of dataset size.

3. Concurrent Query Throughput

Concurrent Queries Polars (QPS) Polarway (QPS) Speedup
1 10 10 1.0x
10 25 95 3.8x
50 45 380 8.4x
100 60 650 10.8x
500 70 1200 17.1x

Key Insight: Polars saturates at ~70 QPS (GIL limit), Polarway scales linearly.

4. Network I/O (WebSocket Streaming)

Polars: Not applicable (no native support)

Polarway:
- Latency: 0.8ms (p50), 1.2ms (p99)
- Throughput: 120k ticks/second
- Memory: Constant (500MB for 1M tick window)

Production Patterns

1. Graceful Shutdown

Rust:

use tokio::sync::broadcast;

async fn server_with_graceful_shutdown() -> Result<(), Error> {
    let (shutdown_tx, _) = broadcast::channel::<()>(1);

    let server = async {
        // Server logic
        serve_requests().await
    };

    let shutdown_signal = async {
        tokio::signal::ctrl_c().await.ok();
        println!("🛑 Shutdown signal received");
        shutdown_tx.send(()).ok();
    };

    tokio::select! {
        _ = server => {},
        _ = shutdown_signal => {},
    }

    Ok(())
}

Python:

import signal

class AsyncPolarwayClient:
    def __init__(self):
        self._shutdown_event = asyncio.Event()
        self._tasks = []

    async def __aenter__(self):
        # Setup signal handlers
        loop = asyncio.get_event_loop()
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(sig, self._shutdown_event.set)
        return self

    async def __aexit__(self, *args):
        # Cancel all tasks
        for task in self._tasks:
            task.cancel()
        await asyncio.gather(*self._tasks, return_exceptions=True)

2. Backpressure Management

use tokio::sync::Semaphore;

async fn batch_with_backpressure(paths: Vec<String>) -> Result<(), Error> {
    let semaphore = Arc::new(Semaphore::new(10));  // Max 10 concurrent

    let mut handles = vec![];

    for path in paths {
        let sem = semaphore.clone();

        handles.push(tokio::spawn(async move {
            let _permit = sem.acquire().await?;  // Block if at capacity
            read_parquet(&path).await
        }));
    }

    // Wait for all
    for handle in handles {
        handle.await??;
    }

    Ok(())
}

3. Circuit Breaker

from datetime import datetime, timedelta

class CircuitBreaker:
    """Prevent cascading failures"""

    def __init__(self, threshold: int = 5, timeout: float = 60.0):
        self.threshold = threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN

    async def call(self, func, *args, **kwargs):
        if self.state == "OPEN":
            if datetime.now() - self.last_failure > timedelta(seconds=self.timeout):
                self.state = "HALF_OPEN"
            else:
                raise Exception("Circuit breaker OPEN")

        try:
            result = await func(*args, **kwargs)
            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure = datetime.now()

            if self.failures >= self.threshold:
                self.state = "OPEN"

            raise e

4. Health Checks

async def heartbeat(client: AsyncPolarwayClient, interval: float = 30.0):
    """Keep connection alive with periodic health checks"""

    while not client._shutdown_event.is_set():
        try:
            # Ping server
            await asyncio.wait_for(client.ping(), timeout=5.0)
        except asyncio.TimeoutError:
            print("⚠️ Health check timeout - server may be down")

        await asyncio.sleep(interval)

When to Use Polarway vs Polars

Use Polarway when:

High Concurrency: 100+ simultaneous queries
Large Datasets: > available RAM
Real-Time Streaming: WebSocket/gRPC data feeds
Distributed Processing: Multi-node computations
Microservices: Language-agnostic data layer
Production Systems: Need 99.9% uptime, graceful degradation

Use Polars when:

Small Datasets: Fits in memory
Single-Threaded: Sequential processing
Python-Only: No cross-language requirements
Prototyping: Speed of development > scalability


Example Projects

1. Real-Time Crypto Arbitrage Bot

"""
Monitor multiple exchanges via WebSocket
Detect price discrepancies in real-time
Execute trades with <10ms latency
"""

async def arbitrage_bot():
    streams = [
        connect_exchange("binance"),
        connect_exchange("coinbase"),
        connect_exchange("kraken"),
    ]

    async with AsyncPolarwayClient("localhost:50051") as polarway:
        async for ticks in merge_streams(streams):
            # Store in Polarway
            df = await polarway.from_records(ticks)

            # Detect arbitrage
            opportunities = await polarway.query("""
                SELECT * FROM ticks
                WHERE abs(binance_price - coinbase_price) > 0.01 * binance_price
            """)

            # Execute trades
            for opp in opportunities:
                await execute_trade(opp)

2. IoT Sensor Data Pipeline

"""
Ingest millions of sensor readings
Real-time anomaly detection
Store in time-series database
"""

async def iot_pipeline(mqtt_broker: str):
    async with AsyncPolarwayClient("localhost:50051") as polarway:
        async for batch in subscribe_mqtt(mqtt_broker):
            # Real-time aggregation
            df = await polarway.from_records(batch)

            # Detect anomalies
            anomalies = await polarway.query("""
                SELECT * FROM sensors
                WHERE value > mean + 3 * stddev
            """)

            # Alert
            if anomalies:
                await send_alert(anomalies)

3. Log Analytics Platform

"""
Ingest 10M+ logs/day
Real-time search and aggregation
Cost-effective storage (Parquet on S3)
"""

async def log_ingestion(kafka_topic: str):
    async with AsyncPolarwayClient("localhost:50051") as polarway:
        async for logs in consume_kafka(kafka_topic):
            # Parse and store
            df = await polarway.from_json(logs)
            await polarway.write_parquet(df, "s3://logs/date={today}/")

            # Real-time dashboards
            stats = await polarway.query("""
                SELECT service, count(*), avg(latency_ms)
                FROM logs
                WHERE timestamp > now() - interval '5 minutes'
                GROUP BY service
            """)

            await update_dashboard(stats)

Resources


Contributing

We welcome contributions! Areas of interest:

  • More async patterns (stream processing, windowing)
  • Additional monadic operations (traverse, sequence)
  • Performance optimizations
  • Real-world use case examples

See CONTRIBUTING.md for guidelines.