Streams
Streams are the fundamental data structure in Timeplus Proton for working with real-time, continuously flowing data. Unlike traditional tables that store static snapshots, streams represent unbounded sequences of events.
Streams vs Tables
-- Query returns continuous results as new data arrives
SELECT device, count ( * ), avg (temperature)
FROM sensor_data
GROUP BY device;
Behavior : Query never terminates, continuously emits updated results as events flow in.-- Query returns snapshot of current data and terminates
SELECT device, count ( * ), avg (temperature)
FROM table (sensor_data)
GROUP BY device;
Behavior : Query executes once on stored data and returns final results.
Key differences:
Aspect Stream Table Data model Unbounded, append-only Bounded snapshot Query mode Continuous, never terminates Batch, terminates Results Incremental updates Final result Use case Real-time monitoring, alerts Historical analysis
Every stream in Proton has both streaming and historical modes. Use table(stream_name) to query historical data.
Stream Types
Proton supports multiple stream types for different use cases:
Regular Stream
Standard append-only stream with persistent storage.
CREATE STREAM user_events (
user_id uint64,
event_type string,
timestamp datetime64( 3 ) DEFAULT now64(),
properties string
)
SETTINGS
event_time_column = 'timestamp' ,
logstore_retention_bytes = 8589934592 ; -- 8GB retention
Properties:
Append-only semantics
Backed by MergeTree storage
Supports indexes and TTL
Distributed via Kafka WAL
Random Stream
Generates synthetic data for testing and development.
CREATE RANDOM STREAM devices (
device string DEFAULT 'device' || to_string( rand ()% 100 ),
temperature float DEFAULT rand ()% 1000 / 10 ,
humidity float DEFAULT rand ()% 100
)
SETTINGS eps = 10 ; -- 10 events per second
Use cases:
Load testing streaming pipelines
Development without external dependencies
Demos and examples
Random streams exist only in memory and generate data on-the-fly during queries.
Changelog Stream (CDC)
Special stream for Change Data Capture with versioning.
CREATE STREAM customers (
id int ,
first_name string,
last_name string,
email string
)
PRIMARY KEY id
SETTINGS
mode = 'changelog_kv' ,
version_column = '_tp_time' ;
Features:
Maintains current state of keys
Supports INSERT, UPDATE, DELETE semantics
Automatic versioning with _tp_time or custom column
Perfect for database replication
CDC workflow from source code:
-- External stream from Debezium CDC topic
CREATE EXTERNAL STREAM customers_cdc( raw string)
SETTINGS type = 'kafka' ,
brokers = 'localhost:9092' ,
topic = 'dbserver1.inventory.customers' ;
-- Changelog stream to maintain current state
CREATE STREAM customers(
id int ,
first_name string,
last_name string,
email string
)
PRIMARY KEY id
SETTINGS mode = 'changelog_kv' , version_column = '_tp_time' ;
-- Materialized view to process CDC events
CREATE MATERIALIZED VIEW mv_customers_c INTO customers AS
SELECT
to_time( raw : payload . ts_ms ) AS _tp_time,
raw : payload . after .id:: int AS id,
raw : payload . after .first_name AS first_name,
raw : payload . after .last_name AS last_name,
raw : payload . after .email AS email,
1 ::int8 as _tp_delta -- +1 for insert
FROM customers_cdc
WHERE raw : payload . op = 'c'
SETTINGS seek_to = 'earliest' ;
The _tp_delta column indicates the change type:
1: Insert or update (add row)
-1: Delete (remove row)
For updates, emit both -1 (old version) and +1 (new version).
Versioned Stream
Maintains full history of changes with automatic versioning.
CREATE STREAM product_prices (
product_id uint64,
price decimal ( 10 , 2 ),
effective_time datetime64( 3 ) DEFAULT now64()
)
PRIMARY KEY product_id
SETTINGS
mode = 'versioned_kv' ,
version_column = 'effective_time' ;
Query historical state:
-- Current state
SELECT * FROM table (product_prices);
-- State at specific time
SELECT * FROM table (product_prices)
WHERE effective_time <= '2024-01-01 00:00:00'
QUALIFY row_number () OVER ( PARTITION BY product_id ORDER BY effective_time DESC ) = 1 ;
Creating Streams
Basic Syntax
CREATE STREAM [IF NOT EXISTS] stream_name (
column1 type1 [DEFAULT expr1] [CODEC(codec1)],
column2 type2,
...
)
[PRIMARY KEY expr]
[PARTITION BY expr]
[ORDER BY expr]
[SETTINGS
key1 = value1,
key2 = value2
];
Important Settings
Storage
Event Time
Sharding
SETTINGS
storage_type = 'default' , -- 'default' or 'memory'
logstore_retention_bytes = 8589934592 , -- 8GB
logstore_retention_ms = 604800000 ; -- 7 days
SETTINGS
event_time_column = 'timestamp' ,
emit_version_column = 'window_end' ;
CREATE STREAM events (
user_id uint64,
event_type string,
data string
)
SHARDING KEY user_id -- Co-locate events by user
SHARDS 4 ; -- 4 parallel shards
Inserting Data
Direct INSERT
-- Single row
INSERT INTO sensor_data (device, temperature, timestamp )
VALUES ( 'device-01' , 25 . 3 , now64());
-- Batch insert (recommended)
INSERT INTO sensor_data (device, temperature, timestamp )
VALUES
( 'device-01' , 25 . 3 , now64()),
( 'device-02' , 26 . 1 , now64()),
( 'device-03' , 24 . 8 , now64());
Avoid high-frequency single-row inserts. Batch inserts (hundreds to thousands of rows) provide much better performance.
Streaming INSERT
-- Continuous insert from another stream
INSERT INTO processed_events
SELECT
user_id,
event_type,
parse_json( data ) as parsed_data,
now64() as processed_time
FROM raw_events
WHERE event_type IN ( 'click' , 'purchase' );
From External Streams
See External Streams for Kafka, Pulsar, and HTTP sources.
Querying Streams
Streaming Query
-- Live aggregation
SELECT
device,
count ( * ) as event_count,
avg (temperature) as avg_temp,
max (temperature) as max_temp
FROM sensor_data
GROUP BY device;
Output behavior:
┌─device────┬─event_count─┬─avg_temp─┬─max_temp─┐
│ device-01 │ 1245 │ 25.3 │ 28.1 │
│ device-02 │ 1198 │ 26.1 │ 29.3 │
└───────────┴─────────────┴──────────┴──────────┘
┌─device────┬─event_count─┬─avg_temp─┬─max_temp─┐
│ device-01 │ 1342 │ 25.4 │ 28.1 │ -- Updated
│ device-02 │ 1287 │ 26.0 │ 29.3 │
│ device-03 │ 45 │ 24.8 │ 27.2 │ -- New
└───────────┴─────────────┴──────────┴──────────┘
Historical Query
-- Query stored data
SELECT
device,
count ( * ) as total_events,
min ( timestamp ) as first_seen,
max ( timestamp ) as last_seen
FROM table (sensor_data)
WHERE timestamp >= now () - INTERVAL 1 HOUR
GROUP BY device;
Filtering
-- Filter by columns
SELECT * FROM sensor_data
WHERE temperature > 30 AND device LIKE 'production-%' ;
-- Filter by time
SELECT * FROM sensor_data
WHERE _tp_time >= now64() - INTERVAL 5 MINUTE ;
Virtual Columns
Proton automatically adds virtual columns to streams:
Column Type Description _tp_timedatetime64(3) Event ingestion time _tp_snint64 Sequence number in shard _tp_sharduint64 Shard ID _tp_append_timedatetime64(3) Write time to storage
-- Query with virtual columns
SELECT
_tp_time,
_tp_shard,
device,
temperature
FROM sensor_data
WHERE _tp_time >= now64() - INTERVAL 1 HOUR ;
Virtual columns are computed automatically and don’t consume storage space.
Stream Operations
Alter Stream
-- Add column
ALTER STREAM sensor_data ADD COLUMN location string;
-- Modify column type
ALTER STREAM sensor_data MODIFY COLUMN temperature decimal ( 5 , 2 );
-- Drop column
ALTER STREAM sensor_data DROP COLUMN humidity;
Truncate Stream
-- Remove all data (irreversible!)
TRUNCATE STREAM sensor_data;
Drop Stream
-- Delete stream and all data
DROP STREAM sensor_data;
-- Drop if exists
DROP STREAM IF EXISTS sensor_data;
Use appropriate storage mode
memory for high-speed temporary data
default for persistent historical queries
Batch inserts
-- Good: Batch of 1000 rows
INSERT INTO events SELECT ... LIMIT 1000 ;
-- Bad: Single row in loop
INSERT INTO events VALUES (...);
Choose good sharding keys
-- Good: High cardinality, evenly distributed
SHARDING KEY user_id
-- Bad: Low cardinality, skewed distribution
SHARDING KEY status -- Only 3 values
Set appropriate retention
SETTINGS
logstore_retention_bytes = 8589934592 , -- 8GB
logstore_retention_ms = 604800000 ; -- 7 days
Examples
IoT Sensor Monitoring
CREATE STREAM iot_sensors (
sensor_id string,
location string,
temperature float ,
humidity float ,
pressure float ,
timestamp datetime64( 3 ) DEFAULT now64()
)
SETTINGS
event_time_column = 'timestamp' ,
logstore_retention_ms = 2592000000 ; -- 30 days
-- Real-time monitoring
SELECT
location ,
avg (temperature) as avg_temp,
max (temperature) as max_temp,
count ( * ) as reading_count
FROM iot_sensors
GROUP BY location ;
Click Stream Analytics
CREATE STREAM clickstream (
user_id uint64,
session_id string,
event_type string,
page_url string,
timestamp datetime64( 3 ) DEFAULT now64()
)
SHARDING KEY user_id
SHARDS 8
SETTINGS
storage_type = 'default' ,
event_time_column = 'timestamp' ;
-- Live user activity
SELECT
event_type,
count () as events,
count ( DISTINCT user_id) as unique_users
FROM clickstream
GROUP BY event_type;
Next Steps
External Streams Connect to Kafka, Pulsar, and other sources
Materialized Views Build continuous ETL pipelines
Windows Time-based aggregations
Query Syntax Learn streaming SQL syntax