Sample for using stream
Download Sample Data
Download sample data by following guide.
## ## 1. Clone the sample data from MACHBASE git repository.
$ git clone https://www.github.com/MACHBASE/TagTutorial.git MyTutorial
## ## 2. Unzip the data you need.
$ cd MyTutorial/
$ gunzip edu_3_plc_stream/*.gz
## ## 3. Move to the directory where the sample data exist.
$ cd edu_3_plc_stream/
Create TAG, LOG Table
To use the STREAM function, modify the following commands according to the environment and execute them to create TAG and LOG tables.
$ pwd
~/MyTutorial/edu_3_plc_stream
## ## 1-1. Create TAG table
$ machsql --server=127.0.0.1 --port=${MACHBASE_PORT_NO} --user=SYS --password=MANAGER --script=1_create_tag.sql
## ## 1-2. Load TAG Meta
$ sh 2_load_meta.sh
## ## 2. Create LOG table
$ machsql --server=127.0.0.1 --port=${MACHBASE_PORT_NO} --user=SYS --password=MANAGER --script=3_create_plc_tag_table.sql
Create and Run STREAM
Execute sample file to run STREAM that are Start STREAM by executing the sample file made for the created TAG and LOG tables.
$ machsql --server=127.0.0.1 --port=${MACHBASE_PORT_NO} --user=SYS --password=MANAGER --script=4_plc_stream_tag.sql
There are two types of query in the sample file, one create STREAM, the other one run STREAM.
## ## Create STREAM Query Example
EXEC STREAM_CREATE(event_v0, 'insert into tag select ''MTAG_V00'', tm, v0 from plc_tag_table;');
## ## Run STREAM Query Example
EXEC STREAM_START(event_v0);
If STREAM run normally, when data are inserted to plc_tag_table, every STREAM runs to insert that data to TAG table.
Check STREAM Status
Through v$streams, a virtual table supported by Machbase, you can check the number of streams being executed, queries used, status, and error messages.
Mach> desc v$streams;
[ COLUMN ]
----------------------------------------------------------------------------------------------------
NAME NULL? TYPE LENGTH
----------------------------------------------------------------------------------------------------
NAME varchar 100
LAST_EX_TIME datetime 31
TABLE_NAME varchar 100
END_RID long 20
STATE varchar 10
QUERY_TXT varchar 2048
ERROR_MSG varchar 2048
FREQUENCY ulong 20
Checking all of the STREAM status is available like below.
Mach> select state, name, table_name, query_txt from v$streams;
STATE NAME TABLE_NAME QUERY_TXT
------------------------------------------------------------------------------------------------
RUNNING EVENT_V0 PLC_TAG_TABLE insert into tag select 'MTAG_V00', tm, v0 from plc_tag_table;
RUNNING EVENT_V1 PLC_TAG_TABLE insert into tag select 'MTAG_V00', tm, v1 from plc_tag_table;
RUNNING EVENT_C0 PLC_TAG_TABLE insert into tag select 'MTAG_C00', tm, c0 from plc_tag_table;
RUNNING EVENT_C1 PLC_TAG_TABLE insert into tag select 'MTAG_C01', tm, c1 from plc_tag_table;
RUNNING EVENT_C2 PLC_TAG_TABLE insert into tag select 'MTAG_C02', tm, c2 from plc_tag_table;
RUNNING EVENT_C3 PLC_TAG_TABLE insert into tag select 'MTAG_C03', tm, c3 from plc_tag_table;
RUNNING EVENT_C4 PLC_TAG_TABLE insert into tag select 'MTAG_C04', tm, c4 from plc_tag_table;
RUNNING EVENT_C5 PLC_TAG_TABLE insert into tag select 'MTAG_C05', tm, c5 from plc_tag_table;
RUNNING EVENT_C6 PLC_TAG_TABLE insert into tag select 'MTAG_C06', tm, c6 from plc_tag_table;
RUNNING EVENT_C7 PLC_TAG_TABLE insert into tag select 'MTAG_C07', tm, c7 from plc_tag_table;
RUNNING EVENT_C8 PLC_TAG_TABLE insert into tag select 'MTAG_C08', tm, c8 from plc_tag_table;
RUNNING EVENT_C9 PLC_TAG_TABLE insert into tag select 'MTAG_C09', tm, c9 from plc_tag_table;
RUNNING EVENT_C10 PLC_TAG_TABLE insert into tag select 'MTAG_C10', tm, c10 from plc_tag_table;
RUNNING EVENT_C11 PLC_TAG_TABLE insert into tag select 'MTAG_C11', tm, c11 from plc_tag_table;
RUNNING EVENT_C12 PLC_TAG_TABLE insert into tag select 'MTAG_C12', tm, c12 from plc_tag_table;
RUNNING EVENT_C13 PLC_TAG_TABLE insert into tag select 'MTAG_C13', tm, c13 from plc_tag_table;
RUNNING EVENT_C14 PLC_TAG_TABLE insert into tag select 'MTAG_C14', tm, c14 from plc_tag_table;
RUNNING EVENT_C15 PLC_TAG_TABLE insert into tag select 'MTAG_C15', tm, c15 from plc_tag_table;
Load Data
After confirming that all STREAM is running, input data using Machloader and check the operation. Since STREAM works regardless of the input method, it will automatically insert into the TAG table regardless of any input methods such as CLI, JDBC, or Collector.
$ cat 5_plc_tag_load.sh
machloader -t plc_tag_table -i -d 5_plc_tag.csv -F "tm YYYY-MM-DD HH24:MI:SS mmm:uuu:nnn"
$ sh 5_plc_tag_load.sh
-----------------------------------------------------------------
Machbase Data Import/Export Utility.
Release Version 6.5.1.official
Copyright 2014, MACHBASE Corporation or its subsidiaries.
All Rights Reserved.
-----------------------------------------------------------------
NLS : US7ASCII EXECUTE MODE : IMPORT
TARGET TABLE : plc_tag_table DATA FILE : 5_plc_tag.csv
IMPORT MODE : APPEND FIELD TERM : ,
ROW TERM : \n ENCLOSURE : "
ESCAPE : \ ARRIVAL_TIME : FALSE
ENCODING : NONE HEADER : FALSE
CREATE TABLE : FALSE
Progress bar Imported records Error records
80000 0
If you check the TAG table data during data loading, you can see that the data is inserted in real time.
Mach> select count(*) from TAG;
count(*)
-----------------------
16775979
[1] row(s) selected.
Mach> select count(*) from TAG;
count(*)
-----------------------
17609187
[1] row(s) selected.
Mach> select count(*) from TAG;
count(*)
-----------------------
18238357
[1] row(s) selected.
Elapsed time: 0.000
Result of STREAM
You can check how far STREAM has read the data of the source table (plc_tag_table) just like below.
Mach> select name, state, end_rid from v$streams;
name state end_rid
---------------------------------------------------------
EVENT_V0 RUNNING 909912
EVENT_V1 RUNNING 1584671
EVENT_C0 RUNNING 1312416
EVENT_C1 RUNNING 1268520
EVENT_C2 RUNNING 1636800
EVENT_C3 RUNNING 1197840
EVENT_C4 RUNNING 622728
EVENT_C5 RUNNING 972780
EVENT_C6 RUNNING 1021512
EVENT_C7 RUNNING 1287474
EVENT_C8 RUNNING 826956
EVENT_C9 RUNNING 1639032
EVENT_C10 RUNNING 725954
EVENT_C11 RUNNING 1511436
EVENT_C12 RUNNING 531079
EVENT_C13 RUNNING 1004400
EVENT_C14 RUNNING 741768
EVENT_C15 RUNNING 746604
[18] row(s) selected.
If end_rid column value is same as record number of source table, it means there is nothing more to read in source table;
Mach> select name, state, end_rid from v$streams;
name state end_rid
---------------------------------------------------------
EVENT_V0 RUNNING 2000000
EVENT_V1 RUNNING 2000000
EVENT_C0 RUNNING 2000000
EVENT_C1 RUNNING 2000000
EVENT_C2 RUNNING 2000000
EVENT_C3 RUNNING 2000000
EVENT_C4 RUNNING 2000000
EVENT_C5 RUNNING 2000000
EVENT_C6 RUNNING 2000000
EVENT_C7 RUNNING 2000000
EVENT_C8 RUNNING 2000000
EVENT_C9 RUNNING 2000000
EVENT_C10 RUNNING 2000000
EVENT_C11 RUNNING 2000000
EVENT_C12 RUNNING 2000000
EVENT_C13 RUNNING 2000000
EVENT_C14 RUNNING 2000000
EVENT_C15 RUNNING 2000000
[18] row(s) selected.
Since the number of data in the TAG table is the same as ’the number of source tables’ * ’the number of STREAMs’, it can be confirmed that the STREAM has read all the data normally.
Mach> select count(*) from TAG;
count(*)
-----------------------
36000000
[1] row(s) selected.
You can also check the time range of the input data as follows.
Mach> select min(time), max(time) from TAG;
min(time) max(time)
-------------------------------------------------------------------
2009-01-28 07:03:34 000:000:000 2009-01-28 12:36:58 020:000:000
[1] row(s) selected.
Add Data
You can check through the insert statement to see if STREAM actually responds to each data input.
Mach> insert into plc_tag_table values(TO_DATE('2009-01-28 12:37:00 000:000:000'), 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000);
1 row(s) inserted.
As soon as one more record is added to PLC_TAG_TABLE, it can be seen that the end_rid of each stream increases to 2000001 as shown below.
Mach> select name, state, end_rid from v$streams;
name state end_rid
---------------------------------------------------------------
EVENT_V0 RUNNING 2000001
EVENT_V1 RUNNING 2000001
EVENT_C0 RUNNING 2000001
EVENT_C1 RUNNING 2000001
EVENT_C2 RUNNING 2000001
EVENT_C3 RUNNING 2000001
EVENT_C4 RUNNING 2000001
EVENT_C5 RUNNING 2000001
EVENT_C6 RUNNING 2000001
EVENT_C7 RUNNING 2000001
EVENT_C8 RUNNING 2000001
EVENT_C9 RUNNING 2000001
EVENT_C10 RUNNING 2000001
EVENT_C11 RUNNING 2000001
EVENT_C12 RUNNING 2000001
EVENT_C13 RUNNING 2000001
EVENT_C14 RUNNING 2000001
EVENT_C15 RUNNING 2000001
[18] row(s) selected.