Streaming Processing
The following functions are supported in streaming query, but not all of them support historical query. Please check the tag like this.
✅ streaming query
🚫 historical query
table
table(stream)
turns the unbounded data stream as a bounded table, and query its historical data. For example, you may load the clickstream data from a Kafka topic into the clicks
stream in Timeplus. By default, if you run SELECT .. FROM clicks ..
This is a streaming query with unbounded data. The query will keep sending you new results whenever it's available. If you only need to analyze the past data, you can put the stream into the table
function. Taking a count
as an example:
- running
select count(*) from clicks
will show latest count every 2 seconds and never ends, until the query is canceled by the user - running
select count(*) from table(clicks)
will return immediately with the row count for the historical data for this data stream.
You can create views such as create view histrical_view as select * from table(stream_name)
, if you want to query the data in the table mode more than once. This may work well for static data, such as lookup information(city names and their zip code).
Learn more about Non-streaming queries.
tumble
tumble(stream [,timeCol], windowSize)
Create a tumble window view for the data stream, for example tumble(iot,5s)
will create windows for every 5 seconds for the data stream iot
. The SQL must end with group by
with either window_start
or window_end
or both.
✅ streaming query
✅ historical query
hop
hop(stream [,timeCol], step, windowSize)
Create a hopping window view for the data stream, for example hop(iot,1s,5s)
will create windows for every 5 seconds for the data stream iot
and move the windows forward every second. The SQL must end with group by
with either window_start
or window_end
or both.
✅ streaming query
🚫 historical query
session
session(stream [,timeCol], idle, [maxLength,] [startCondition,endCondition] )
Create dynamic windows based on the activities in the data stream.
Parameters:
stream
a data stream, a view, or a CTE/subquerytimeCol
optional, by default it will be__tp_time
(the event time for the record)idle
how long the events will be automatically splitted to 2 session windowsmaxLength
the max length of the session window. Optional. Default value is the 5 times ofidle
[startCondition, endCondition]
Optional. If specified, the session window will start when thestartCondition
is met and will close whenendCondition
is met. You can use[expression1, expression2]
to indicate start and end events will be included in the session, or(expression1, expression2]
to indicate the ending events will be included but not the starting events.
For example, if the car keeps sending data when it's moving and stops sending data when it's parked or waiting for the traffic light
session(car_live_data, 1m) partition by cid
will create session windows for each car with 1 minute idle time. Meaning if the car is not moved within one minute, the window will be closed and a new session window will be created for future events. If the car keeps moving for more than 5 minutes, different windows will be created (every 5 minutes), so that as analysts, you can get near real-time results, without waiting too long for the car to be stopped.session(car_live_data, 1m, [speed>50,speed<50)) partition by cid
create session windows to detect when the car is speeding. The first event with speed over 50 will be included, and the last event with speed lower than 50 will not be included in the session window.session(access_log, 5m, [action='login',action='logout']) partition by uid
create session windows when the user logins the system and logout. If there is no activity within 5 minutes, the window will be closed automatically.
✅ streaming query
🚫 historical query
dedup
dedup(stream, column1 [,otherColumns..] [liveInSecond,limit])
Apply the deduplication at the given data stream with the specified column(s). Rows with same column value will only show once (only the first row is selected and others are omitted.) liveInSecond
specifies how long the keys will be kept in the memory/state. By default forever. But if you only want to avoid duplicating within a certain time period, say 2 minutes, you can set 120s
, e.g. dedup(subquery,myId,120s)
The last parameter limit
is optional which is 100000
by default. It limits the max unique keys maintained in the query engine. If the limit reaches, the system will recycle the earliest keys to maintain this limit.
You can cascade this table function like tumble(dedup(table(....
and so far the wrapping order must in this sequence : tumble/hop/session -> dedup -> table.
✅ streaming query
✅ historical query
When you use dedup
function together with table()
function to get the latest status for events with same ID, you can consider ordering the data by _tp_time in the reverse way, so that the latest event for same ID is kept. e.g.
WITH latest_to_earliest AS (SEELCT * FROM table(my_stream) ORDER by _tp_time DESC)
SELECT * FROM dedup(latest_to_earliest, id)
Otherwise, if you run queries with dedup(table(my_stream),id)
the earliest event with same ID will be processed first, ignoring the rest of the updated status. In many cases, this is not what you expect.
lag
lag(<column_name> [, <offset=1>][, <default_value>])
: Work for both streaming query and historical query. If you omit the offset
the last row will be compared. E.g.
lag(total)
to get the value of total
from the last row. lag(total, 12)
to get the value from 12 rows ago. lag(total, 12, 0)
to use 0 as the default value if the specified row is not available.
✅ streaming query
🚫 historical query