SELECT queries data from streams and tables. Timeplus Proton supports both streaming queries (unbounded, continuous) and historical queries (bounded, one-time).
Syntax
[WITH cte_name AS (SELECT ...)]
SELECT [ALL | DISTINCT [ON (expr_list)]]
[TOP n [WITH TIES]]
expr_list
[FROM [db.]stream_name | (subquery) | table_function(...)]
[PREWHERE expr]
[WHERE expr]
[PARTITION BY expr_list]
[SHUFFLE BY expr_list]
[GROUP BY [ROLLUP | CUBE | GROUPING SETS] expr_list]
[WITH ROLLUP | CUBE | TOTALS]
[HAVING expr]
[WINDOW window_name AS (window_spec)]
[ORDER BY expr_list [WITH FILL]]
[LIMIT [n[, m]] [BY expr_list] [WITH TIES]]
[OFFSET n [ROW | ROWS]]
[FETCH {FIRST | NEXT} n {ROW | ROWS} {ONLY}]
[EMIT emit_policy]
[SETTINGS setting=value, ...];
Streaming vs Historical Queries
Streaming Query
Queries unbounded data and runs continuously:
-- Continuously query streaming data
SELECT device, count(*)
FROM devices
GROUP BY device;
Historical Query
Queries bounded historical data using table() function:
-- Query historical data (one-time)
SELECT device, count(*)
FROM table(devices)
GROUP BY device;
SELECT Clause
Comma-separated list of expressions to select. Can include:
- Column names:
device, temperature
- Expressions:
temperature * 1.8 + 32
- Aggregations:
count(*), avg(temperature)
- Aliases:
temperature AS temp
- Wildcards:
*, device.*
Remove duplicate rows from results.
Remove duplicates based on specified expressions.
Return only the first n rows (equivalent to LIMIT n).
Include rows that tie with the last row when using TOP or LIMIT with ORDER BY.
FROM Clause
Stream or table name to query from. Can be qualified with database: db.stream_name
FROM Options
-- From a stream (streaming query)
SELECT * FROM devices;
-- From historical data
SELECT * FROM table(devices);
-- From a subquery
SELECT * FROM (SELECT * FROM devices WHERE temperature > 50);
-- From table function
SELECT * FROM numbers(10);
-- From multiple streams (JOIN)
SELECT * FROM stream1 JOIN stream2 ON stream1.id = stream2.id;
WHERE Clause
Filter rows based on a boolean expression.
-- Simple filter
SELECT * FROM devices WHERE temperature > 80;
-- Multiple conditions
SELECT * FROM events WHERE user_id > 1000 AND event_type = 'click';
-- Using functions
SELECT * FROM logs WHERE to_hour(_tp_time) >= 9 AND to_hour(_tp_time) < 17;
-- JSON path filtering
SELECT * FROM frontend_events WHERE raw:method = 'POST';
PREWHERE Clause
Optimization for WHERE clause. Filters data before reading all columns.
SELECT *
FROM large_stream
PREWHERE event_date = today()
WHERE user_id > 1000;
GROUP BY Clause
Group rows by specified expressions for aggregation.
-- Simple grouping
SELECT device, count(*)
FROM devices
GROUP BY device;
-- Multiple columns
SELECT device, event_type, count(*)
FROM events
GROUP BY device, event_type;
-- With ROLLUP
SELECT device, event_type, count(*)
FROM events
GROUP BY ROLLUP(device, event_type);
-- With CUBE
SELECT device, event_type, count(*)
FROM events
GROUP BY CUBE(device, event_type);
-- With TOTALS
SELECT device, count(*)
FROM events
GROUP BY device WITH TOTALS;
PARTITION BY / SHUFFLE BY (Streaming)
Partition streaming data for distributed processing. Cannot be used with SHUFFLE BY.
Shuffle streaming data for distributed processing. Cannot be used with PARTITION BY.
-- Partition by device for parallel processing
SELECT device, count(*)
FROM devices
PARTITION BY device
GROUP BY device;
HAVING Clause
Filter grouped results after aggregation.
SELECT device, count(*) AS cnt
FROM devices
GROUP BY device
HAVING cnt > 100;
WINDOW Clause
Define named windows for window functions.
SELECT
device,
temperature,
avg(temperature) OVER w AS avg_temp
FROM devices
WINDOW w AS (PARTITION BY device ORDER BY _tp_time ROWS BETWEEN 10 PRECEDING AND CURRENT ROW);
ORDER BY Clause
Sort results by specified expressions.
-- Ascending order
SELECT * FROM devices ORDER BY temperature;
-- Descending order
SELECT * FROM devices ORDER BY temperature DESC;
-- Multiple columns
SELECT * FROM devices ORDER BY device ASC, temperature DESC;
-- Nulls handling
SELECT * FROM devices ORDER BY temperature NULLS FIRST;
LIMIT Clause
Limit the number of rows returned.
-- First 10 rows
SELECT * FROM devices LIMIT 10;
-- Rows 11-20 (offset 10, limit 10)
SELECT * FROM devices LIMIT 10, 10;
-- Alternative syntax
SELECT * FROM devices LIMIT 10 OFFSET 10;
-- Top 5 per group
SELECT * FROM devices LIMIT 5 BY device;
EMIT Clause (Streaming)
Control when streaming query results are emitted.
-- Emit on each event
SELECT count(*) FROM devices EMIT LAST;
-- Emit periodically
SELECT count(*) FROM devices EMIT PERIODIC 5s;
-- Emit after processing delay
SELECT count(*) FROM devices EMIT AFTER WATERMARK;
JOIN
Stream-to-Stream JOIN
-- Inner join
SELECT
events.user_id,
events.event_type,
users.username
FROM events
INNER JOIN users ON events.user_id = users.user_id;
-- Left join
SELECT *
FROM stream1
LEFT JOIN stream2 ON stream1.id = stream2.id;
Temporal JOIN with Range
SELECT *
FROM stream1 AS s1
JOIN stream2 AS s2
ON s1.id = s2.id
AND s2._tp_time BETWEEN s1._tp_time - INTERVAL 5 SECOND
AND s1._tp_time + INTERVAL 5 SECOND;
Window Functions
Tumbling Window
SELECT
window_start,
window_end,
device,
count(*) AS event_count,
avg(temperature) AS avg_temp
FROM tumble(devices, 1m)
GROUP BY window_start, window_end, device;
Hopping Window
SELECT
window_start,
window_end,
device,
avg(temperature) AS avg_temp
FROM hop(devices, 30s, 2m) -- hop size 30s, window size 2m
GROUP BY window_start, window_end, device;
Session Window
SELECT
window_start,
window_end,
user_id,
count(*) AS events_in_session
FROM session(user_events, 5m, user_id) -- 5 minute timeout
GROUP BY window_start, window_end, user_id;
WITH Clause (CTE)
Define Common Table Expressions (CTEs) for query readability.
WITH
high_temp AS (
SELECT * FROM devices WHERE temperature > 80
),
device_stats AS (
SELECT device, avg(temperature) AS avg_temp
FROM high_temp
GROUP BY device
)
SELECT * FROM device_stats WHERE avg_temp > 90;
SETTINGS Clause
Query-level settings to control behavior.
-- Seek to earliest for external streams
SELECT * FROM kafka_stream SETTINGS seek_to='earliest';
-- Set query timeout
SELECT * FROM large_stream SETTINGS max_execution_time=60;
-- Enable query profiling
SELECT * FROM devices SETTINGS enable_profiler=1;
Examples
Live Event Count
SELECT count() FROM frontend_events;
Filter by JSON Attributes
SELECT
_tp_time,
raw:ipAddress,
raw:requestedUrl
FROM frontend_events
WHERE raw:method = 'POST';
Live Aggregation with Grouping
SELECT
raw:method AS method,
count() AS cnt
FROM frontend_events
GROUP BY method
ORDER BY cnt DESC;
Windowed Aggregation
SELECT
window_start,
device,
min(temperature) AS min_temp,
max(temperature) AS max_temp,
avg(temperature) AS avg_temp
FROM tumble(devices, 10s)
GROUP BY window_start, device;
Live ASCII Chart
SELECT
raw:method,
count() AS cnt,
bar(cnt, 0, 40, 5) AS bar
FROM frontend_events
GROUP BY raw:method
ORDER BY cnt DESC
LIMIT 5 BY emit_version();
See Also