Skip to main content
Streams are the fundamental data structures in Timeplus Proton for storing and processing event data.

Basic CREATE STREAM Syntax

The basic syntax for creating a stream:
CREATE STREAM [IF NOT EXISTS] stream_name (
  column1 type1 [DEFAULT expr1],
  column2 type2 [DEFAULT expr2],
  ...
) 
[ENGINE = engine_name]
[SETTINGS key1=value1, key2=value2];

Simple Stream Example

CREATE STREAM events (
  event_id string,
  user_id int32,
  event_type string,
  event_time datetime64(3)
);

Column Types and Defaults

Supported Data Types

Proton supports standard SQL data types:
CREATE STREAM sensor_data (
  device_id string,           -- String type
  temperature float32,        -- 32-bit float
  humidity float64,           -- 64-bit float
  reading_count int64,        -- 64-bit integer
  is_active bool,             -- Boolean
  location string,            -- String
  measured_at datetime64(3),  -- Timestamp with millisecond precision
  metadata string             -- JSON data as string
);

Default Values

Specify default values for columns:
CREATE STREAM orders (
  order_id string,
  customer_id int32,
  amount float64,
  status string DEFAULT 'pending',
  order_time datetime64(3) DEFAULT now64(3),
  random_id int32 DEFAULT rand()
);
The reserved _tp_time column is automatically added to every stream and represents ingestion time.

CREATE RANDOM STREAM

Random streams generate synthetic data for testing and development:
CREATE RANDOM STREAM metrics (
  device_id int DEFAULT rand() % 10,
  cpu_usage float32 DEFAULT rand() % 100,
  memory_usage float32 DEFAULT rand() % 100
) 
SETTINGS eps = 5;  -- Generate 5 events per second

Random Stream Settings

  • eps (events per second): Controls data generation rate
  • Useful for testing queries without external data sources
  • Supports all standard column types

Random Stream Example with Functions

CREATE RANDOM STREAM test_data (
  id int DEFAULT rand() % 100,
  category string DEFAULT ['A', 'B', 'C', 'D'][rand() % 4 + 1],
  value float64 DEFAULT rand() % 1000,
  timestamp datetime64(3) DEFAULT now64(3)
)
SETTINGS eps = 10;

Storage Engines

Default Stream Engine

By default, streams use the Stream engine:
CREATE STREAM events (
  id string,
  data string
) ENGINE = Stream;

Engine Settings

Configure engine-specific settings:
CREATE STREAM events (
  id string,
  timestamp datetime64(3)
)
SETTINGS 
  logstore_retention_bytes = 1073741824,  -- 1GB retention
  logstore_retention_ms = 86400000;       -- 24 hours

Partitioning and Ordering

Partition By

Partition streams for better query performance:
CREATE STREAM user_events (
  user_id int32,
  event_type string,
  event_time datetime64(3)
)
PARTITION BY to_YYYYMM(event_time)
ORDER BY (user_id, event_time);

Primary Key

Define primary key for efficient lookups:
CREATE STREAM metrics (
  device_id string,
  metric_name string,
  value float64,
  timestamp datetime64(3)
)
PRIMARY KEY (device_id, metric_name)
ORDER BY (device_id, metric_name, timestamp);

Advanced Stream Types

External Streams

External streams connect to external data sources:
CREATE EXTERNAL STREAM kafka_events (
  event_id string,
  payload string
)
SETTINGS 
  type = 'kafka',
  brokers = 'localhost:9092',
  topic = 'events',
  data_format = 'JSONEachRow';
External streams enable integration with Kafka, Redpanda, and other message brokers without data duplication.

Materialized Views as Streams

Create streams that are populated by materialized views:
-- Target stream
CREATE STREAM aggregated_metrics (
  device_id string,
  avg_value float64,
  window_start datetime64(3),
  window_end datetime64(3)
);

-- Materialized view that populates it
CREATE MATERIALIZED VIEW metrics_mv INTO aggregated_metrics AS
SELECT 
  device_id,
  avg(value) as avg_value,
  window_start,
  window_end
FROM tumble(raw_metrics, timestamp, INTERVAL 1 MINUTE)
GROUP BY device_id, window_start, window_end;

IF NOT EXISTS

Prevent errors when creating streams that might already exist:
CREATE STREAM IF NOT EXISTS events (
  id string,
  data string
);

Complete Examples

IoT Sensor Stream

CREATE STREAM iot_sensors (
  sensor_id string,
  location string,
  temperature float32,
  humidity float32,
  pressure float32,
  battery_level int32 DEFAULT 100,
  reading_time datetime64(3) DEFAULT now64(3)
)
PARTITION BY to_YYYYMMDD(reading_time)
ORDER BY (sensor_id, reading_time)
SETTINGS 
  logstore_retention_ms = 604800000;  -- 7 days retention

E-commerce Transactions

CREATE STREAM transactions (
  transaction_id string,
  user_id int64,
  product_id string,
  amount float64,
  currency string DEFAULT 'USD',
  status string DEFAULT 'pending',
  created_at datetime64(3) DEFAULT now64(3),
  metadata string  -- JSON metadata
)
PRIMARY KEY transaction_id
ORDER BY (user_id, created_at);

Random Test Data Stream

CREATE RANDOM STREAM test_events (
  user_id int DEFAULT rand() % 1000,
  action string DEFAULT ['click', 'view', 'purchase', 'signup'][rand() % 4 + 1],
  value float64 DEFAULT rand() % 100,
  timestamp datetime64(3) DEFAULT now64(3)
)
SETTINGS eps = 100;  -- 100 events/second for load testing

Viewing Stream Schema

Inspect stream definitions:
-- Show create statement
SHOW CREATE STREAM events;

-- Describe stream columns
DESCRIBE events;

-- List all streams
SHOW STREAMS;

Dropping Streams

Remove streams when no longer needed:
DROP STREAM IF EXISTS events;
Dropping a stream permanently deletes all data. This operation cannot be undone.

Best Practices

Use the smallest data type that fits your needs:
  • int32 vs int64 for integers
  • float32 vs float64 for decimals
  • datetime64(3) for millisecond precision timestamps
Configure retention based on your use case:
SETTINGS 
  logstore_retention_bytes = 10737418240,  -- 10GB
  logstore_retention_ms = 2592000000;      -- 30 days
Partition by date for better query performance:
PARTITION BY to_YYYYMMDD(_tp_time)
Use DEFAULT expressions to auto-populate columns:
created_at datetime64(3) DEFAULT now64(3),
id string DEFAULT generate_uuid_v4()

Next Steps

Reading Data

Learn how to query streams

Writing Data

Insert data into streams