Skip to main content

Streams

Streams are the fundamental data structure in Timeplus Proton for working with real-time, continuously flowing data. Unlike traditional tables that store static snapshots, streams represent unbounded sequences of events.

Streams vs Tables

-- Query returns continuous results as new data arrives
SELECT device, count(*), avg(temperature) 
FROM sensor_data 
GROUP BY device;
Behavior: Query never terminates, continuously emits updated results as events flow in.
Key differences:
AspectStreamTable
Data modelUnbounded, append-onlyBounded snapshot
Query modeContinuous, never terminatesBatch, terminates
ResultsIncremental updatesFinal result
Use caseReal-time monitoring, alertsHistorical analysis
Every stream in Proton has both streaming and historical modes. Use table(stream_name) to query historical data.

Stream Types

Proton supports multiple stream types for different use cases:

Regular Stream

Standard append-only stream with persistent storage.
CREATE STREAM user_events (
  user_id uint64,
  event_type string,
  timestamp datetime64(3) DEFAULT now64(),
  properties string
) 
SETTINGS 
  event_time_column='timestamp',
  logstore_retention_bytes=8589934592;  -- 8GB retention
Properties:
  • Append-only semantics
  • Backed by MergeTree storage
  • Supports indexes and TTL
  • Distributed via Kafka WAL

Random Stream

Generates synthetic data for testing and development.
CREATE RANDOM STREAM devices (
  device string DEFAULT 'device'||to_string(rand()%100),
  temperature float DEFAULT rand()%1000/10,
  humidity float DEFAULT rand()%100
)
SETTINGS eps=10;  -- 10 events per second
Use cases:
  • Load testing streaming pipelines
  • Development without external dependencies
  • Demos and examples
Random streams exist only in memory and generate data on-the-fly during queries.

Changelog Stream (CDC)

Special stream for Change Data Capture with versioning.
CREATE STREAM customers (
  id int,
  first_name string,
  last_name string,
  email string
) 
PRIMARY KEY id
SETTINGS 
  mode='changelog_kv',
  version_column='_tp_time';
Features:
  • Maintains current state of keys
  • Supports INSERT, UPDATE, DELETE semantics
  • Automatic versioning with _tp_time or custom column
  • Perfect for database replication
CDC workflow from source code:
-- External stream from Debezium CDC topic
CREATE EXTERNAL STREAM customers_cdc(raw string)
SETTINGS type='kafka',
         brokers='localhost:9092',
         topic='dbserver1.inventory.customers';

-- Changelog stream to maintain current state
CREATE STREAM customers(
  id int, 
  first_name string, 
  last_name string, 
  email string
) 
PRIMARY KEY id 
SETTINGS mode='changelog_kv', version_column='_tp_time';

-- Materialized view to process CDC events
CREATE MATERIALIZED VIEW mv_customers_c INTO customers AS 
SELECT 
  to_time(raw:payload.ts_ms) AS _tp_time,
  raw:payload.after.id::int AS id,
  raw:payload.after.first_name AS first_name,
  raw:payload.after.last_name AS last_name,
  raw:payload.after.email AS email,
  1::int8 as _tp_delta  -- +1 for insert
FROM customers_cdc 
WHERE raw:payload.op='c' 
SETTINGS seek_to='earliest';
The _tp_delta column indicates the change type:
  • 1: Insert or update (add row)
  • -1: Delete (remove row)
For updates, emit both -1 (old version) and +1 (new version).

Versioned Stream

Maintains full history of changes with automatic versioning.
CREATE STREAM product_prices (
  product_id uint64,
  price decimal(10,2),
  effective_time datetime64(3) DEFAULT now64()
)
PRIMARY KEY product_id
SETTINGS 
  mode='versioned_kv',
  version_column='effective_time';
Query historical state:
-- Current state
SELECT * FROM table(product_prices);

-- State at specific time
SELECT * FROM table(product_prices)
WHERE effective_time <= '2024-01-01 00:00:00'
QUALIFY row_number() OVER (PARTITION BY product_id ORDER BY effective_time DESC) = 1;

Creating Streams

Basic Syntax

CREATE STREAM [IF NOT EXISTS] stream_name (
  column1 type1 [DEFAULT expr1] [CODEC(codec1)],
  column2 type2,
  ...
)
[PRIMARY KEY expr]
[PARTITION BY expr]  
[ORDER BY expr]
[SETTINGS 
  key1=value1,
  key2=value2
];

Important Settings

SETTINGS
  storage_type='default',  -- 'default' or 'memory'
  logstore_retention_bytes=8589934592,  -- 8GB
  logstore_retention_ms=604800000;  -- 7 days

Inserting Data

Direct INSERT

-- Single row
INSERT INTO sensor_data (device, temperature, timestamp)
VALUES ('device-01', 25.3, now64());

-- Batch insert (recommended)
INSERT INTO sensor_data (device, temperature, timestamp)
VALUES 
  ('device-01', 25.3, now64()),
  ('device-02', 26.1, now64()),
  ('device-03', 24.8, now64());
Avoid high-frequency single-row inserts. Batch inserts (hundreds to thousands of rows) provide much better performance.

Streaming INSERT

-- Continuous insert from another stream
INSERT INTO processed_events
SELECT 
  user_id,
  event_type,
  parse_json(data) as parsed_data,
  now64() as processed_time
FROM raw_events
WHERE event_type IN ('click', 'purchase');

From External Streams

See External Streams for Kafka, Pulsar, and HTTP sources.

Querying Streams

Streaming Query

-- Live aggregation
SELECT 
  device,
  count(*) as event_count,
  avg(temperature) as avg_temp,
  max(temperature) as max_temp
FROM sensor_data
GROUP BY device;
Output behavior:
┌─device────┬─event_count─┬─avg_temp─┬─max_temp─┐
│ device-01 │        1245 │     25.3 │     28.1 │
│ device-02 │        1198 │     26.1 │     29.3 │
└───────────┴─────────────┴──────────┴──────────┘

┌─device────┬─event_count─┬─avg_temp─┬─max_temp─┐  
│ device-01 │        1342 │     25.4 │     28.1 │  -- Updated
│ device-02 │        1287 │     26.0 │     29.3 │
│ device-03 │          45 │     24.8 │     27.2 │  -- New
└───────────┴─────────────┴──────────┴──────────┘

Historical Query

-- Query stored data
SELECT 
  device,
  count(*) as total_events,
  min(timestamp) as first_seen,
  max(timestamp) as last_seen
FROM table(sensor_data)
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY device;

Filtering

-- Filter by columns
SELECT * FROM sensor_data
WHERE temperature > 30 AND device LIKE 'production-%';

-- Filter by time
SELECT * FROM sensor_data
WHERE _tp_time >= now64() - INTERVAL 5 MINUTE;

Virtual Columns

Proton automatically adds virtual columns to streams:
ColumnTypeDescription
_tp_timedatetime64(3)Event ingestion time
_tp_snint64Sequence number in shard
_tp_sharduint64Shard ID
_tp_append_timedatetime64(3)Write time to storage
-- Query with virtual columns
SELECT 
  _tp_time,
  _tp_shard, 
  device,
  temperature
FROM sensor_data
WHERE _tp_time >= now64() - INTERVAL 1 HOUR;
Virtual columns are computed automatically and don’t consume storage space.

Stream Operations

Alter Stream

-- Add column
ALTER STREAM sensor_data ADD COLUMN location string;

-- Modify column type
ALTER STREAM sensor_data MODIFY COLUMN temperature decimal(5,2);

-- Drop column
ALTER STREAM sensor_data DROP COLUMN humidity;

Truncate Stream

-- Remove all data (irreversible!)
TRUNCATE STREAM sensor_data;

Drop Stream

-- Delete stream and all data
DROP STREAM sensor_data;

-- Drop if exists
DROP STREAM IF EXISTS sensor_data;

Performance Tips

1

Use appropriate storage mode

  • memory for high-speed temporary data
  • default for persistent historical queries
2

Batch inserts

-- Good: Batch of 1000 rows
INSERT INTO events SELECT ... LIMIT 1000;

-- Bad: Single row in loop
INSERT INTO events VALUES (...);
3

Choose good sharding keys

-- Good: High cardinality, evenly distributed
SHARDING KEY user_id

-- Bad: Low cardinality, skewed distribution  
SHARDING KEY status  -- Only 3 values
4

Set appropriate retention

SETTINGS
  logstore_retention_bytes=8589934592,  -- 8GB
  logstore_retention_ms=604800000;  -- 7 days

Examples

IoT Sensor Monitoring

CREATE STREAM iot_sensors (
  sensor_id string,
  location string,
  temperature float,
  humidity float,
  pressure float,
  timestamp datetime64(3) DEFAULT now64()
)
SETTINGS 
  event_time_column='timestamp',
  logstore_retention_ms=2592000000;  -- 30 days

-- Real-time monitoring
SELECT 
  location,
  avg(temperature) as avg_temp,
  max(temperature) as max_temp,
  count(*) as reading_count
FROM iot_sensors
GROUP BY location;

Click Stream Analytics

CREATE STREAM clickstream (
  user_id uint64,
  session_id string,
  event_type string,
  page_url string,
  timestamp datetime64(3) DEFAULT now64()
)
SHARDING KEY user_id
SHARDS 8
SETTINGS
  storage_type='default',
  event_time_column='timestamp';

-- Live user activity
SELECT 
  event_type,
  count() as events,
  count(DISTINCT user_id) as unique_users
FROM clickstream
GROUP BY event_type;

Next Steps

External Streams

Connect to Kafka, Pulsar, and other sources

Materialized Views

Build continuous ETL pipelines

Windows

Time-based aggregations

Query Syntax

Learn streaming SQL syntax