Polarway API Documentation
Complete API documentation for both Rust and Python interfaces.
🐍 Python API
Client Connection
connect(address: str) -> Client
Connect to a Polarway gRPC server.
Parameters:
- address: Server address in format host:port
Returns: Client instance for synchronous operations
AsyncClient(address: str)
Async client for concurrent operations.
async with pd.AsyncClient("localhost:50051") as client:
df = await client.read_parquet("data.parquet")
result = await df.collect()
Methods:
- async read_parquet(path): Read Parquet file
- async read_csv(path): Read CSV file
- async read_json(path): Read JSON file
DataFrame API
class DataFrame
Represents a reference (handle) to a DataFrame on the server.
Core Methods
select(columns: List[str | Expr]) -> DataFrame
Select specific columns.
filter(predicate: Expr) -> DataFrame
Filter rows based on condition.
with_column(expr: Expr) -> DataFrame
Add or modify a single column.
with_columns(exprs: List[Expr]) -> DataFrame
Add or modify multiple columns.
df.with_columns([
pd.col("price").cast(pd.Int64).alias("price_int"),
(pd.col("a") + pd.col("b")).alias("sum")
])
group_by(columns: List[str]) -> GroupBy
Group rows by columns.
df.group_by("symbol").agg({"price": "mean"})
df.group_by(["date", "symbol"]).agg({"price": ["mean", "max"]})
join(other: DataFrame, on: str | List[str], how: str = "inner") -> DataFrame
Join two DataFrames.
Parameters:
- how: "inner", "left", "right", "outer"
sort(by: str | List[str], descending: bool = False) -> DataFrame
Sort DataFrame.
head(n: int = 5) -> DataFrame
Take first n rows.
tail(n: int = 5) -> DataFrame
Take last n rows.
drop(columns: List[str]) -> DataFrame
Remove columns.
rename(mapping: Dict[str, str]) -> DataFrame
Rename columns.
drop_nulls(subset: Optional[List[str]] = None) -> DataFrame
Remove rows with null values.
Materialization
collect() -> Result[pyarrow.Table, Error]
Execute operations and retrieve results.
lazy() -> LazyFrame
Convert to lazy evaluation.
Information
schema() -> Schema
Get DataFrame schema.
columns() -> List[str]
Get column names.
count() -> int
Get row count.
describe() -> DataFrame
Get summary statistics.
I/O Operations
write_parquet(path: str, compression: str = "snappy", mode: str = "overwrite")
Write to Parquet file.
df.write_parquet("output.parquet")
df.write_parquet("output.parquet", compression="zstd", mode="append")
write_csv(path: str, separator: str = ",", include_header: bool = True)
Write to CSV file.
write_json(path: str, format: str = "json")
Write to JSON file.
GroupBy API
class GroupBy
Represents a grouped DataFrame.
agg(aggregations: Dict[str, str | List[str]]) -> DataFrame
Perform aggregations.
Supported aggregations:
- "mean", "avg": Average value
- "sum": Sum of values
- "min": Minimum value
- "max": Maximum value
- "count": Count of values
- "std": Standard deviation
- "var": Variance
- "first": First value
- "last": Last value
- "median": Median value
Expression API
col(name: str) -> Expr
Reference a column.
lit(value: Any) -> Expr
Create a literal value.
class Expr
Expression builder for operations.
Arithmetic Operations
pd.col("price") + 10
pd.col("price") - pd.col("cost")
pd.col("price") * pd.col("quantity")
pd.col("total") / pd.col("count")
pd.col("base") ** 2 # Power
Comparison Operations
pd.col("price") > 100
pd.col("price") >= 100
pd.col("price") < 50
pd.col("price") <= 50
pd.col("status") == "active"
pd.col("status") != "inactive"
Logical Operations
(pd.col("a") > 5) & (pd.col("b") < 10) # AND
(pd.col("x") == 1) | (pd.col("y") == 2) # OR
~pd.col("flag") # NOT
Methods
alias(name: str) -> Expr
Set column name.
cast(dtype: DataType) -> Expr
Cast to different type.
is_null() -> Expr
Check if value is null.
is_not_null() -> Expr
Check if value is not null.
fill_null(value: Any) -> Expr
Replace null values.
between(lower: Any, upper: Any) -> Expr
Check if value is in range.
String Operations
str.to_uppercase() -> Expr
Convert to uppercase.
str.to_lowercase() -> Expr
Convert to lowercase.
str.contains(pattern: str) -> Expr
Check if string contains pattern.
str.starts_with(prefix: str) -> Expr
Check if string starts with prefix.
str.ends_with(suffix: str) -> Expr
Check if string ends with suffix.
str.replace(old: str, new: str) -> Expr
Replace substring.
str.strip() -> Expr
Remove leading/trailing whitespace.
str.split(delimiter: str) -> Expr
Split string.
str.slice(start: int, length: int) -> Expr
Extract substring.
Datetime Operations
dt.year() -> Expr
Extract year.
dt.month() -> Expr
Extract month.
dt.day() -> Expr
Extract day.
dt.hour() -> Expr
Extract hour.
dt.minute() -> Expr
Extract minute.
dt.second() -> Expr
Extract second.
dt.strftime(format: str) -> Expr
Format as string.
Aggregation Methods
mean() -> Expr
Calculate mean.
sum() -> Expr
Calculate sum.
min() -> Expr
Find minimum.
max() -> Expr
Find maximum.
count() -> Expr
Count values.
std() -> Expr
Calculate standard deviation.
var() -> Expr
Calculate variance.
Window Functions
rolling(window_size: int | str) -> RollingAgg
Create rolling window.
rank() -> Expr
Rank values.
over(partition_by: str, order_by: str) -> Expr
Window partition.
Time-Series Methods
lag(periods: int) -> Expr
Shift values backward.
lead(periods: int) -> Expr
Shift values forward.
diff() -> Expr
Calculate difference.
pct_change() -> Expr
Calculate percent change.
Conditional Expressions
when(condition: Expr) -> When
Start conditional expression.
class When
then(value: Expr) -> Then
Specify value if condition is true.
class Then
when(condition: Expr) -> When
Add another condition.
otherwise(value: Expr) -> Expr
Specify default value.
I/O Functions
read_parquet(path: str, columns: Optional[List[str]] = None, predicate: Optional[str] = None) -> DataFrame
Read Parquet file.
df = pd.read_parquet("data.parquet")
df = pd.read_parquet("data.parquet", columns=["col1", "col2"])
df = pd.read_parquet("data.parquet", predicate="price > 100")
read_csv(path: str, separator: str = ",", has_header: bool = True) -> DataFrame
Read CSV file.
read_json(path: str, format: str = "json") -> DataFrame
Read JSON file.
scan_parquet(path: str) -> LazyFrame
Lazily scan Parquet file.
Streaming API
from_websocket(url: str, schema: Dict[str, DataType], format: str = "json") -> Stream
Connect to WebSocket stream.
stream = pd.from_websocket(
url="wss://stream.example.com",
schema={"price": pd.Float64, "timestamp": pd.Datetime("ms")},
format="json"
)
async for batch in stream.batches(size=1000):
print(batch)
read_rest_api(url: str, pagination: str = "none", page_size: int = 100) -> DataFrame
Read from REST API.
Time-Series API
as_timeseries(timestamp_col: str) -> TimeSeriesFrame
Convert to time-series DataFrame.
class TimeSeriesFrame
resample_ohlcv(frequency: str, price_col: str, volume_col: str) -> TimeSeriesFrame
Resample to OHLCV bars.
ohlcv = ts.resample_ohlcv("1m", price_col="price", volume_col="volume")
ohlcv = ts.resample_ohlcv("5m", price_col="price", volume_col="volume")
Supported frequencies:
- "1s", "5s", "10s", "30s": Seconds
- "1m", "5m", "15m", "30m": Minutes
- "1h", "4h", "12h": Hours
- "1d", "1w", "1M": Days, weeks, months
Data Types
# Integer types
pd.Int8, pd.Int16, pd.Int32, pd.Int64
pd.UInt8, pd.UInt16, pd.UInt32, pd.UInt64
# Float types
pd.Float32, pd.Float64
# Other types
pd.Boolean
pd.Utf8 # String
pd.Date
pd.Datetime("ms") # Timestamp with millisecond precision
pd.Datetime("us") # Timestamp with microsecond precision
pd.Datetime("ns") # Timestamp with nanosecond precision
pd.Duration
pd.Time
# Complex types
pd.List(pd.Int64)
pd.Struct({"name": pd.Utf8, "age": pd.Int64})
pd.Categorical
Error Handling
class Result[T, E]
Rust-style Result type.
is_ok() -> bool
Check if result is success.
is_err() -> bool
Check if result is error.
unwrap() -> T
Get value (raises if error).
unwrap_err() -> E
Get error (raises if success).
map(fn: Callable[[T], U]) -> Result[U, E]
Transform success value.
map_err(fn: Callable[[E], F]) -> Result[T, F]
Transform error value.
result = df.collect()
if result.is_ok():
table = result.unwrap()
else:
error = result.unwrap_err()
print(f"Error: {error}")
# Or use monadic operations
result.map(lambda t: process(t)).map_err(lambda e: log_error(e))
🦀 Rust API
Client Connection
use polarway_grpc::client::PolarwayClient;
let mut client = PolarwayClient::connect("http://localhost:50051").await?;
DataFrame Operations
use polarway::prelude::*;
// Read Parquet
let df = client.read_parquet("data.parquet").await?;
// Select columns
let df = df.select(&["col1", "col2"])?;
// Filter rows
let df = df.filter(col("price").gt(lit(100)))?;
// Collect results
let result: DataFrame = df.collect().await?;
Core Types
struct DataFrame
Methods:
- select(&[&str]) -> Result<DataFrame>
- filter(Expr) -> Result<DataFrame>
- with_column(Expr) -> Result<DataFrame>
- group_by(&[&str]) -> GroupBy
- join(DataFrame, JoinArgs) -> Result<DataFrame>
- sort(&[&str], &[bool]) -> Result<DataFrame>
- collect() -> Result<DataFrame>
struct LazyFrame
Lazy evaluation interface.
Methods:
- filter(Expr) -> LazyFrame
- select(&[Expr]) -> LazyFrame
- group_by(&[&str]) -> LazyGroupBy
- join(LazyFrame, &[&str], JoinType) -> LazyFrame
- collect() -> Result<DataFrame>
enum Expr
Expression builder.
Constructors:
- col(name: &str) -> Expr
- lit<T: Literal>(value: T) -> Expr
Methods:
- alias(name: &str) -> Expr
- cast(dtype: DataType) -> Expr
- gt(rhs: Expr) -> Expr
- lt(rhs: Expr) -> Expr
- eq(rhs: Expr) -> Expr
- and(rhs: Expr) -> Expr
- or(rhs: Expr) -> Expr
Module Reference
polarway::prelude
Common imports.
Exports: DataFrame, LazyFrame, Series, col, lit, DataType
polarway::io
I/O operations.
use polarway::io::{ParquetReader, CsvReader};
let df = ParquetReader::new("data.parquet").finish()?;
polarway::lazy
Lazy evaluation.
See Also: - Quick Reference - Common operations cheat sheet - Migration Guide - Moving from Polars - Architecture Guide - Design deep dive