Streams are the fundamental data structures in Timeplus Proton for storing and processing event data.
Basic CREATE STREAM Syntax
The basic syntax for creating a stream:
CREATE STREAM [IF NOT EXISTS] stream_name (
column1 type1 [DEFAULT expr1],
column2 type2 [DEFAULT expr2],
...
)
[ENGINE = engine_name]
[SETTINGS key1=value1, key2=value2];
Simple Stream Example
CREATE STREAM events (
event_id string,
user_id int32,
event_type string,
event_time datetime64(3)
);
Column Types and Defaults
Supported Data Types
Proton supports standard SQL data types:
CREATE STREAM sensor_data (
device_id string, -- String type
temperature float32, -- 32-bit float
humidity float64, -- 64-bit float
reading_count int64, -- 64-bit integer
is_active bool, -- Boolean
location string, -- String
measured_at datetime64(3), -- Timestamp with millisecond precision
metadata string -- JSON data as string
);
Default Values
Specify default values for columns:
CREATE STREAM orders (
order_id string,
customer_id int32,
amount float64,
status string DEFAULT 'pending',
order_time datetime64(3) DEFAULT now64(3),
random_id int32 DEFAULT rand()
);
The reserved _tp_time column is automatically added to every stream and represents ingestion time.
CREATE RANDOM STREAM
Random streams generate synthetic data for testing and development:
CREATE RANDOM STREAM metrics (
device_id int DEFAULT rand() % 10,
cpu_usage float32 DEFAULT rand() % 100,
memory_usage float32 DEFAULT rand() % 100
)
SETTINGS eps = 5; -- Generate 5 events per second
Random Stream Settings
eps (events per second): Controls data generation rate
- Useful for testing queries without external data sources
- Supports all standard column types
Random Stream Example with Functions
CREATE RANDOM STREAM test_data (
id int DEFAULT rand() % 100,
category string DEFAULT ['A', 'B', 'C', 'D'][rand() % 4 + 1],
value float64 DEFAULT rand() % 1000,
timestamp datetime64(3) DEFAULT now64(3)
)
SETTINGS eps = 10;
Storage Engines
Default Stream Engine
By default, streams use the Stream engine:
CREATE STREAM events (
id string,
data string
) ENGINE = Stream;
Engine Settings
Configure engine-specific settings:
CREATE STREAM events (
id string,
timestamp datetime64(3)
)
SETTINGS
logstore_retention_bytes = 1073741824, -- 1GB retention
logstore_retention_ms = 86400000; -- 24 hours
Partitioning and Ordering
Partition By
Partition streams for better query performance:
CREATE STREAM user_events (
user_id int32,
event_type string,
event_time datetime64(3)
)
PARTITION BY to_YYYYMM(event_time)
ORDER BY (user_id, event_time);
Primary Key
Define primary key for efficient lookups:
CREATE STREAM metrics (
device_id string,
metric_name string,
value float64,
timestamp datetime64(3)
)
PRIMARY KEY (device_id, metric_name)
ORDER BY (device_id, metric_name, timestamp);
Advanced Stream Types
External Streams
External streams connect to external data sources:
CREATE EXTERNAL STREAM kafka_events (
event_id string,
payload string
)
SETTINGS
type = 'kafka',
brokers = 'localhost:9092',
topic = 'events',
data_format = 'JSONEachRow';
External streams enable integration with Kafka, Redpanda, and other message brokers without data duplication.
Materialized Views as Streams
Create streams that are populated by materialized views:
-- Target stream
CREATE STREAM aggregated_metrics (
device_id string,
avg_value float64,
window_start datetime64(3),
window_end datetime64(3)
);
-- Materialized view that populates it
CREATE MATERIALIZED VIEW metrics_mv INTO aggregated_metrics AS
SELECT
device_id,
avg(value) as avg_value,
window_start,
window_end
FROM tumble(raw_metrics, timestamp, INTERVAL 1 MINUTE)
GROUP BY device_id, window_start, window_end;
IF NOT EXISTS
Prevent errors when creating streams that might already exist:
CREATE STREAM IF NOT EXISTS events (
id string,
data string
);
Complete Examples
IoT Sensor Stream
CREATE STREAM iot_sensors (
sensor_id string,
location string,
temperature float32,
humidity float32,
pressure float32,
battery_level int32 DEFAULT 100,
reading_time datetime64(3) DEFAULT now64(3)
)
PARTITION BY to_YYYYMMDD(reading_time)
ORDER BY (sensor_id, reading_time)
SETTINGS
logstore_retention_ms = 604800000; -- 7 days retention
E-commerce Transactions
CREATE STREAM transactions (
transaction_id string,
user_id int64,
product_id string,
amount float64,
currency string DEFAULT 'USD',
status string DEFAULT 'pending',
created_at datetime64(3) DEFAULT now64(3),
metadata string -- JSON metadata
)
PRIMARY KEY transaction_id
ORDER BY (user_id, created_at);
Random Test Data Stream
CREATE RANDOM STREAM test_events (
user_id int DEFAULT rand() % 1000,
action string DEFAULT ['click', 'view', 'purchase', 'signup'][rand() % 4 + 1],
value float64 DEFAULT rand() % 100,
timestamp datetime64(3) DEFAULT now64(3)
)
SETTINGS eps = 100; -- 100 events/second for load testing
Viewing Stream Schema
Inspect stream definitions:
-- Show create statement
SHOW CREATE STREAM events;
-- Describe stream columns
DESCRIBE events;
-- List all streams
SHOW STREAMS;
Dropping Streams
Remove streams when no longer needed:
DROP STREAM IF EXISTS events;
Dropping a stream permanently deletes all data. This operation cannot be undone.
Best Practices
Choose appropriate data types
Use the smallest data type that fits your needs:
int32 vs int64 for integers
float32 vs float64 for decimals
datetime64(3) for millisecond precision timestamps
Configure retention based on your use case:SETTINGS
logstore_retention_bytes = 10737418240, -- 10GB
logstore_retention_ms = 2592000000; -- 30 days
Use partitioning for time-series data
Partition by date for better query performance:PARTITION BY to_YYYYMMDD(_tp_time)
Use DEFAULT expressions to auto-populate columns:created_at datetime64(3) DEFAULT now64(3),
id string DEFAULT generate_uuid_v4()
Next Steps
Reading Data
Learn how to query streams
Writing Data
Insert data into streams