Skip to main content
CREATE EXTERNAL STREAM creates a connection to external data sources such as Kafka, Pulsar, ClickHouse, and others. External streams allow you to read from and write to external systems without storing data in Timeplus.

Syntax

CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name
(
    column_name1 type1,
    column_name2 type2,
    ...
)
SETTINGS
    type='kafka'|'pulsar'|'nats'|'timeplus'|...,
    [setting1=value1,]
    [setting2=value2,]
    ...;

Common Parameters

stream_name
identifier
required
The name of the external stream to create.
IF NOT EXISTS
boolean
If specified, the statement will not raise an error if the stream already exists.
type
string
required
The type of external stream. Options: kafka, pulsar, nats, timeplus, http, iceberg
data_format
string
The message format for serialization/deserialization. Examples: JSONEachRow, CSV, Avro, Protobuf, RawBLOB

Kafka External Stream

Required Settings

brokers
string
required
Comma-separated list of Kafka broker addresses. Example: 'localhost:9092' or 'broker1:9092,broker2:9092'
topic
string
required
The Kafka topic name to read from or write to.

Authentication Settings

security_protocol
string
The security protocol for connecting to Kafka. Options:
  • plaintext (default): No encryption or authentication
  • SASL_SSL: SASL authentication over SSL/TLS
  • SASL_PLAINTEXT: SASL authentication without encryption
  • SSL: SSL/TLS encryption without SASL
sasl_mechanism
string
SASL mechanism for authentication. Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM
username
string
Username for SASL authentication.
password
string
Password for SASL authentication.

SSL/TLS Settings

ssl_ca_cert_file
string
Path to the CA certificate file for SSL/TLS verification.
ssl_ca_pem
string
CA certificate string in PEM format for verifying the server’s key.
skip_ssl_cert_check
boolean
If true, the server’s certificate won’t be verified. Default: false.

AWS MSK IAM Authentication

region
string
AWS region for MSK IAM authentication.
access_key_id
string
AWS access key ID.
secret_access_key
string
AWS secret access key.
session_token
string
AWS session token for temporary credentials.
use_environment_credentials
boolean
Use credentials from environment variables. Default: false.

Advanced Settings

properties
string
Semicolon-separated key-value pairs for additional Kafka client properties. Example: 'client.id=my-client;group.id=my-group'
poll_waittime_ms
integer
How long poll should wait in milliseconds. Default: 500.
one_message_per_row
boolean
If true, produces one Kafka message per row when using row-based formats like JSONEachRow. Default: false.
sharding_expr
string
Expression to calculate partition ID for data distribution.
schema_subject_name
string
Avro/Protobuf schema subject name in the schema registry.
consumer_stall_timeout_ms
integer
Time in milliseconds after which a stalled consumer is recreated. Default: 60000. Set to 0 to disable.
connection_timeout_ms
integer
Timeout in milliseconds for establishing a broker connection. Default: 10000.

Kafka Examples

Basic Kafka External Stream

CREATE EXTERNAL STREAM frontend_events (
    raw string
)
SETTINGS
    type='kafka',
    brokers='redpanda:9092',
    topic='owlshop-frontend-events';

Kafka with SASL Authentication

CREATE EXTERNAL STREAM user_events (
    user_id int64,
    event_type string,
    timestamp datetime64(3)
)
SETTINGS
    type='kafka',
    brokers='broker:9093',
    topic='events',
    security_protocol='SASL_SSL',
    sasl_mechanism='PLAIN',
    username='myuser',
    password='mypassword';

AWS MSK with IAM Authentication

CREATE EXTERNAL STREAM aws_msk_stream (
    device string,
    temperature float
)
SETTINGS
    type='kafka',
    brokers='prefix.kafka.us-west-2.amazonaws.com:9098',
    topic='iot-data',
    security_protocol='SASL_SSL',
    sasl_mechanism='AWS_MSK_IAM';

Kafka with Avro Schema Registry

CREATE EXTERNAL STREAM orders (
    order_id int,
    customer_id int,
    amount float
)
SETTINGS
    type='kafka',
    brokers='localhost:9092',
    topic='orders',
    data_format='Avro',
    schema_subject_name='orders-value';

Debezium CDC from Kafka

CREATE EXTERNAL STREAM customers_cdc (
    raw string
)
SETTINGS
    type='kafka',
    brokers='redpanda:9092',
    topic='dbserver1.inventory.customers';

Pulsar External Stream

Pulsar Settings

service_url
string
required
The Pulsar service URL. Example: 'pulsar://localhost:6650' or 'pulsar+ssl://localhost:6651'
topic
string
required
The Pulsar topic name.
skip_server_cert_check
boolean
Accept untrusted TLS certificates from brokers. Default: false.
validate_hostname
boolean
Validate hostname when connecting over TLS. Default: false.
ca_cert
string
CA certificate in PEM format for server verification.
client_cert
string
Client certificate in PEM format for mTLS authentication.
client_key
string
Client private key in PEM format for mTLS authentication.
jwt
string
JSON Web Token for JWT authentication.
connections_per_broker
integer
Maximum connections per broker. Default: 1.
memory_limit
integer
Memory limit in bytes. 0 means unlimited. Default: 0.
io_threads
integer
Number of I/O threads. Default: 1.

Pulsar Example

CREATE EXTERNAL STREAM pulsar_events (
    event_id string,
    data string
)
SETTINGS
    type='pulsar',
    service_url='pulsar://pulsar-broker:6650',
    topic='persistent://public/default/events';

NATS JetStream External Stream

NATS Settings

service_url
string
required
NATS server URL. Example: 'nats://localhost:4222'
subject
string
required
Subject filter for subscribing. Supports wildcards: orders.> or orders.*
stream_name
string
required
JetStream stream name (must already exist).
consumer_name
string
Durable consumer name. Auto-generated if empty.
durable
boolean
Create a durable consumer. Default: true.
ack_policy
string
Acknowledgment policy: none, all, explicit. Default: explicit.
deliver_policy
string
Delivery policy: all, last, new, by_start_sequence, by_start_time. Default: all.
batch_size
integer
Messages to fetch per pull request. Default: 256.

HTTP External Stream

HTTP Settings

url
string
HTTP endpoint for both read and write.
read_url
string
HTTP endpoint for read operations.
write_url
string
HTTP endpoint for write operations.
read_method
string
HTTP method for reading. Default: GET.
write_method
string
HTTP method for writing. Default: POST.
compression_method
string
HTTP body compression: none, gzip, br, deflate. Default: none.
use_chunked_encoding
boolean
Use chunked transfer encoding. Default: true.

Special Columns

External streams support special columns for metadata:
  • _tp_time - Event timestamp
  • _tp_message_key - Message key (for Kafka/Pulsar)
  • _tp_partition - Partition ID
  • _tp_offset - Message offset
  • _tp_sn - Sequence number

See Also