External Streams
External streams allow Timeplus Proton to read from and write to external systems like Kafka, Pulsar, Redpanda, and HTTP endpoints without data duplication. They act as virtual streams that proxy to external sources.Overview
Key characteristics:- Zero-copy integration with external systems
- No data stored in Proton (data stays in source system)
- Bi-directional: read and write support
- Schema on read: define columns to extract
- Multiple formats: JSON, Avro, Protobuf, CSV, etc.
External streams are perfect for building real-time ETL pipelines without data movement.
Kafka / Redpanda
Most common external stream type for production workloads.Basic Connection
CREATE EXTERNAL STREAM kafka_events (
raw string -- Single column for raw message
)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='events';
Structured Schema
CREATE EXTERNAL STREAM user_events (
user_id uint64,
event_type string,
timestamp datetime64(3),
properties string
)
SETTINGS
type='kafka',
brokers='kafka-1:9092,kafka-2:9092,kafka-3:9092',
topic='user-events',
data_format='JSONEachRow';
Reading from Kafka
-- Scan incoming events (streaming)
SELECT * FROM kafka_events;
-- Filter and transform
SELECT
user_id,
event_type,
to_string(timestamp) as event_time
FROM user_events
WHERE event_type = 'purchase';
-- Live aggregation
SELECT
event_type,
count() as total,
count(DISTINCT user_id) as unique_users
FROM user_events
GROUP BY event_type;
- Seek Position
- Consumer Group
-- Start from earliest messages
SELECT * FROM kafka_events SETTINGS seek_to='earliest';
-- Start from latest (default)
SELECT * FROM kafka_events SETTINGS seek_to='latest';
-- Start from specific timestamp
SELECT * FROM kafka_events
SETTINGS seek_to='1640000000000'; -- Unix timestamp in ms
-- Custom consumer group
CREATE EXTERNAL STREAM kafka_events(raw string)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='events',
properties='group.id=my-consumer-group';
Writing to Kafka
-- Create output external stream
CREATE EXTERNAL STREAM kafka_output (
user_id uint64,
event_type string,
processed_time datetime64(3)
)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='processed-events',
data_format='JSONEachRow';
-- Direct insert
INSERT INTO kafka_output
VALUES (12345, 'click', now64());
-- Continuous ETL via materialized view
CREATE MATERIALIZED VIEW mv_process INTO kafka_output AS
SELECT
user_id,
event_type,
now64() as processed_time
FROM kafka_events
WHERE event_type IN ('click', 'purchase');
Authentication
SASL/PLAIN
SASL/PLAIN
CREATE EXTERNAL STREAM secure_kafka(raw string)
SETTINGS
type='kafka',
brokers='kafka.example.com:9093',
topic='secure-topic',
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
username='my-user',
password='my-password';
SASL/SCRAM-SHA-256
SASL/SCRAM-SHA-256
CREATE EXTERNAL STREAM secure_kafka(raw string)
SETTINGS
type='kafka',
brokers='kafka.example.com:9093',
topic='secure-topic',
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
username='my-user',
password='my-password';
AWS MSK IAM
AWS MSK IAM
CREATE EXTERNAL STREAM msk_stream(raw string)
SETTINGS
type='kafka',
brokers='b-1.msk.us-west-2.amazonaws.com:9098',
topic='my-topic',
security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM';
SSL/TLS
SSL/TLS
CREATE EXTERNAL STREAM ssl_kafka(raw string)
SETTINGS
type='kafka',
brokers='kafka.example.com:9093',
topic='secure-topic',
security_protocol='SSL',
ssl_ca_cert_file='/path/to/ca-cert.pem';
-- Or provide CA cert inline
CREATE EXTERNAL STREAM ssl_kafka2(raw string)
SETTINGS
type='kafka',
brokers='kafka.example.com:9093',
topic='secure-topic',
security_protocol='SSL',
ssl_ca_pem='-----BEGIN CERTIFICATE-----\n...';
-- Skip certificate verification (not recommended for production)
CREATE EXTERNAL STREAM ssl_kafka3(raw string)
SETTINGS
type='kafka',
brokers='kafka.example.com:9093',
topic='secure-topic',
security_protocol='SSL',
skip_ssl_cert_check=true;
Data Formats
Proton supports multiple serialization formats:- JSON
- Avro
- Protobuf
- Raw String
-- JSONEachRow: one JSON object per line
CREATE EXTERNAL STREAM json_stream (
id uint64,
name string,
timestamp datetime64(3)
)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='json-topic',
data_format='JSONEachRow';
-- Avro with Confluent Schema Registry
CREATE EXTERNAL STREAM avro_stream (
id uint64,
name string,
timestamp datetime64(3)
)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='avro-topic',
data_format='Avro',
kafka_schema_registry_url='http://localhost:8081';
-- Protobuf with schema registry
CREATE EXTERNAL STREAM proto_stream (
id uint64,
name string,
timestamp datetime64(3)
)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='proto-topic',
data_format='Protobuf',
kafka_schema_registry_url='http://localhost:8081',
schema_subject_name='my-proto-schema';
-- Single raw string column
CREATE EXTERNAL STREAM raw_stream (raw string)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='raw-topic';
-- Parse JSON inline
SELECT
raw:user_id::uint64 as user_id,
raw:event_type as event_type,
raw:timestamp::datetime64(3) as timestamp
FROM raw_stream;
Advanced Configuration
CREATE EXTERNAL STREAM advanced_kafka (
user_id uint64,
data string
)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='events',
data_format='JSONEachRow',
-- Advanced Kafka properties (semicolon-separated)
properties='max.poll.interval.ms=300000;session.timeout.ms=45000;fetch.min.bytes=1024',
-- Consumer behavior
poll_waittime_ms=500, -- Poll timeout
consumer_stall_timeout_ms=60000, -- Recreate consumer if stalled
-- Sharding for writes
sharding_expr='user_id', -- Partition by user_id
one_message_per_row=true; -- One Kafka message per row
// Key configuration options
M(String, brokers, "", "Comma-separated list of brokers", 0)
M(String, topic, "", "Kafka topic name", 0)
M(String, security_protocol, "plaintext", "Connection protocol", 0)
M(String, sasl_mechanism, "", "PLAIN, SCRAM-SHA-256, SCRAM-SHA-512", 0)
M(UInt64, poll_waittime_ms, 500, "Poll wait time in ms", 0)
M(Milliseconds, consumer_stall_timeout_ms, 60000, "Consumer stall timeout", 0)
Apache Pulsar
Basic Connection
CREATE EXTERNAL STREAM pulsar_stream (
user_id uint64,
event_type string,
data string
)
SETTINGS
type='pulsar',
service_url='pulsar://localhost:6650',
topic='persistent://public/default/events',
data_format='JSONEachRow';
Pulsar Authentication
- JWT
- mTLS
CREATE EXTERNAL STREAM pulsar_jwt(raw string)
SETTINGS
type='pulsar',
service_url='pulsar+ssl://pulsar.example.com:6651',
topic='persistent://tenant/namespace/topic',
jwt='eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...';
CREATE EXTERNAL STREAM pulsar_mtls(raw string)
SETTINGS
type='pulsar',
service_url='pulsar+ssl://pulsar.example.com:6651',
topic='persistent://tenant/namespace/topic',
ca_cert='/path/to/ca-cert.pem',
client_cert='/path/to/client-cert.pem',
client_key='/path/to/client-key.pem';
Pulsar Configuration
CREATE EXTERNAL STREAM pulsar_advanced (
payload string
)
SETTINGS
type='pulsar',
service_url='pulsar://localhost:6650',
topic='persistent://public/default/events',
connections_per_broker=5, -- Increase for better throughput
io_threads=4, -- Parallel IO threads
memory_limit=1073741824; -- 1GB memory limit
HTTP Streams
Read from HTTP Endpoint
CREATE EXTERNAL STREAM http_source (
id uint64,
value string
)
SETTINGS
type='http',
url='https://api.example.com/events',
read_method='GET',
data_format='JSONEachRow';
Write to HTTP Endpoint
CREATE EXTERNAL STREAM http_sink (
event_id uint64,
event_data string,
timestamp datetime64(3)
)
SETTINGS
type='http',
url='https://webhook.example.com/ingest',
write_method='POST',
data_format='JSONEachRow',
compression_method='gzip', -- Compress payload
use_chunked_encoding=true; -- Chunked transfer
HTTP Authentication
-- Basic auth via headers
CREATE EXTERNAL STREAM http_auth (
data string
)
SETTINGS
type='http',
url='https://api.example.com/stream',
http_headers='Authorization: Bearer token123,X-API-Key: key456';
NATS JetStream
CREATE EXTERNAL STREAM nats_stream (
subject string,
data string,
timestamp datetime64(3)
)
SETTINGS
type='nats',
brokers='nats://localhost:4222',
stream_name='EVENTS', -- JetStream stream
subject='events.>', -- Subject filter with wildcard
consumer_name='proton-consumer',
durable=true,
ack_policy='explicit',
deliver_policy='all',
batch_size=256,
fetch_timeout_ms=5000;
Timeplus Cloud / Enterprise
Connect to another Timeplus instance:CREATE EXTERNAL STREAM remote_timeplus (
user_id uint64,
event_type string,
timestamp datetime64(3)
)
SETTINGS
type='timeplus',
hosts='timeplus.example.com:8463',
db='analytics',
stream='events',
user='readonly',
password='secret',
secure=true;
Real-World Examples
Kafka ETL Pipeline
Fromexamples/ecommerce/etl.sql:
-- Read raw events from Kafka
CREATE EXTERNAL STREAM frontend_events(raw string)
SETTINGS
type='kafka',
brokers='redpanda:9092',
topic='owlshop-frontend-events';
-- Write processed events to another topic
CREATE EXTERNAL STREAM target(
_tp_time datetime64(3),
url string,
method string,
ip string
)
SETTINGS
type='kafka',
brokers='redpanda:9092',
topic='masked-fe-event',
data_format='JSONEachRow';
-- ETL pipeline with data masking
CREATE MATERIALIZED VIEW mv INTO target AS
SELECT
now64() AS _tp_time,
raw:requestedUrl AS url,
raw:method AS method,
lower(hex(md5(raw:ipAddress))) AS ip -- Hash IP address
FROM frontend_events;
CDC Replication
Fromexamples/cdc/cdc.sql:
-- External stream for Debezium CDC
CREATE EXTERNAL STREAM customers_cdc(raw string)
SETTINGS
type='kafka',
brokers='redpanda:9092',
topic='dbserver1.inventory.customers';
-- Changelog stream to maintain current state
CREATE STREAM customers(
id int,
first_name string,
last_name string,
email string
)
PRIMARY KEY id
SETTINGS mode='changelog_kv', version_column='_tp_time';
-- Process INSERT events
CREATE MATERIALIZED VIEW mv_customers_c INTO customers AS
SELECT
to_time(raw:payload.ts_ms) AS _tp_time,
raw:payload.after.id::int AS id,
raw:payload.after.first_name AS first_name,
raw:payload.after.last_name AS last_name,
raw:payload.after.email AS email,
1::int8 as _tp_delta
FROM customers_cdc
WHERE raw:payload.op='c'
SETTINGS seek_to='earliest';
-- Process UPDATE events (emit old and new)
CREATE MATERIALIZED VIEW mv_customers_u INTO customers AS
WITH cdc_changes AS (
SELECT
to_time(raw:payload.ts_ms) AS ts_ms,
[(raw:payload.before, -1::int8), (raw:payload.after, 1::int8)] AS changes
FROM customers_cdc
WHERE raw:payload.op = 'u'
SETTINGS seek_to = 'earliest'
)
SELECT
ts_ms AS _tp_time,
array_join(changes).1:id::int AS id,
array_join(changes).1:first_name AS first_name,
array_join(changes).1:last_name AS last_name,
array_join(changes).1:email AS email,
array_join(changes).2 AS _tp_delta
FROM cdc_changes;
-- Process DELETE events
CREATE MATERIALIZED VIEW mv_customers_d INTO customers AS
SELECT
to_time(raw:payload.ts_ms) AS _tp_time,
raw:payload.before.id::int AS id,
raw:payload.before.first_name AS first_name,
raw:payload.before.last_name AS last_name,
raw:payload.before.email AS email,
-1::int8 as _tp_delta
FROM customers_cdc
WHERE raw:payload.op='d'
SETTINGS seek_to='earliest';
Performance Tuning
Batch size
-- Increase batch size for higher throughput
SETTINGS
properties='fetch.min.bytes=65536;fetch.max.wait.ms=500'
Parallel consumers
-- Use multiple shards for parallel consumption
CREATE EXTERNAL STREAM kafka_parallel(raw string)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='high-volume',
properties='max.poll.records=1000';
Troubleshooting
Consumer not making progress
Consumer not making progress
-- Check consumer lag
SELECT * FROM system.kafka_consumers;
-- Adjust stall timeout
ALTER STREAM kafka_events
MODIFY SETTING consumer_stall_timeout_ms=120000;
Connection timeout
Connection timeout
-- Increase connection timeout
CREATE EXTERNAL STREAM kafka_slow(raw string)
SETTINGS
type='kafka',
brokers='slow-kafka:9092',
topic='events',
connection_timeout_ms=30000; -- 30 seconds
Schema mismatch
Schema mismatch
-- Use raw string and parse manually
CREATE EXTERNAL STREAM kafka_raw(raw string)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='events';
-- Parse with error handling
SELECT
raw,
try_cast(raw:id as uint64) as id,
raw:name::string as name
FROM kafka_raw;
Next Steps
Materialized Views
Build streaming ETL pipelines
Data Formats
Supported serialization formats
Kafka Tutorial
End-to-end Kafka integration
CDC Tutorial
Change Data Capture patterns