스트림 활용을 위한 샘플 예제

샘플 데이터 다운로드

아래 가이드를 따라 샘플 데이터를 다운로드한다.

### 1. MACHBASE git repository에서 샘플 데이터를 clone한다.
$ git clone https://www.github.com/MACHBASE/TagTutorial.git MyTutorial
 
### 2. 필요한 데이터를 압축 해제한다.
$ cd MyTutorial/
$ gunzip edu_3_plc_stream/*.gz
 
### 3. 샘플 데이터가 있는 이렉토리로 이동한다.
$ cd edu_3_plc_stream/

TAG, LOG 테이블의 생성

STREAM 기능을 사용하기 위해 아래 명령어들을 환경에 맞게 수정 후 실행해 TAG, LOG 테이블을 생성한다.

$ pwd
~/MyTutorial/edu_3_plc_stream
 
### 1-1. TAG 테이블 생성
$ machsql --server=127.0.0.1 --port=${MACHBASE_PORT_NO} --user=SYS --password=MANAGER --script=1_create_tag.sql
### 1-2. TAG Meta 로드
$ sh 2_load_meta.sh
 
### 2. LOG 테이블 생성
$ machsql --server=127.0.0.1 --port=${MACHBASE_PORT_NO} --user=SYS --password=MANAGER --script=3_create_plc_tag_table.sql

STREAM 생성, 구동

생성된 TAG, LOG 테이블에 맞게 작성된 샘플 파일을 수행해 STREAM을 시작한다.

$ machsql --server=127.0.0.1 --port=${MACHBASE_PORT_NO} --user=SYS --password=MANAGER --script=4_plc_stream_tag.sql

해당 샘플 파일에 포함된 쿼리에는 두 종류가 있는데 각각 STREAM을 생성하고 생성된 STREAM을 구동하는 역할을 한다.

### STREAM 생성 Query 
### event_v0라는 이름의 MTAG_V00를 name으로 가지고 plc_tag_table에 입력되는 데이터  tm과 v0 column 데이터를 time, value로 가지는 row를 tag 테이블에 입력하는 STREAM을 생성
EXEC STREAM_CREATE(event_v0, 'insert into tag select ''MTAG_V00'', tm, v0 from plc_tag_table;');
### STREAM 구동 Query 
EXEC STREAM_START(event_v0);

STREAM을 정상적으로 구동시켰다면 이제부터 plc_tag_table에 데이터가 입력되는 순간 각 STREAM이 동작해 해당되는 데이터를 TAG 테이블에 입력한다.

STREAM 상태 확인

Machbase에서 지원하는 가상 테이블인 v$streams를 통해 수행중인 STREAM의 갯수, 사용 쿼리, 상태, 에러 메시지 등을 확인할 수 있다.

Mach> desc v$streams;
[ COLUMN ]
----------------------------------------------------------------------------------------------------
NAME                                                        NULL?    TYPE                LENGTH
----------------------------------------------------------------------------------------------------
NAME                                                                 varchar             100
LAST_EX_TIME                                                         datetime            31
TABLE_NAME                                                           varchar             100
END_RID                                                              long                20
STATE                                                                varchar             10
QUERY_TXT                                                            varchar             2048
ERROR_MSG                                                            varchar             2048
FREQUENCY                                                            ulong               20

다음과 같이 모든 STREAM의 상태를 확인할 수 있다.

Mach> select state, name, table_name, query_txt from v$streams;
STATE   NAME     TABLE_NAME    QUERY_TXT
------------------------------------------------------------------------------------------------
RUNNING EVENT_V0 PLC_TAG_TABLE insert into tag select 'MTAG_V00', tm, v0 from plc_tag_table;
RUNNING EVENT_V1 PLC_TAG_TABLE insert into tag select 'MTAG_V00', tm, v1 from plc_tag_table;
RUNNING EVENT_C0 PLC_TAG_TABLE insert into tag select 'MTAG_C00', tm, c0 from plc_tag_table;
RUNNING EVENT_C1 PLC_TAG_TABLE insert into tag select 'MTAG_C01', tm, c1 from plc_tag_table;
RUNNING EVENT_C2 PLC_TAG_TABLE insert into tag select 'MTAG_C02', tm, c2 from plc_tag_table;
RUNNING EVENT_C3 PLC_TAG_TABLE insert into tag select 'MTAG_C03', tm, c3 from plc_tag_table;
RUNNING EVENT_C4 PLC_TAG_TABLE insert into tag select 'MTAG_C04', tm, c4 from plc_tag_table;
RUNNING EVENT_C5 PLC_TAG_TABLE insert into tag select 'MTAG_C05', tm, c5 from plc_tag_table;
RUNNING EVENT_C6 PLC_TAG_TABLE insert into tag select 'MTAG_C06', tm, c6 from plc_tag_table;
RUNNING EVENT_C7 PLC_TAG_TABLE insert into tag select 'MTAG_C07', tm, c7 from plc_tag_table;
RUNNING EVENT_C8 PLC_TAG_TABLE insert into tag select 'MTAG_C08', tm, c8 from plc_tag_table;
RUNNING EVENT_C9 PLC_TAG_TABLE insert into tag select 'MTAG_C09', tm, c9 from plc_tag_table;
RUNNING EVENT_C10 PLC_TAG_TABLE insert into tag select 'MTAG_C10', tm, c10 from plc_tag_table;
RUNNING EVENT_C11 PLC_TAG_TABLE insert into tag select 'MTAG_C11', tm, c11 from plc_tag_table;
RUNNING EVENT_C12 PLC_TAG_TABLE insert into tag select 'MTAG_C12', tm, c12 from plc_tag_table;
RUNNING EVENT_C13 PLC_TAG_TABLE insert into tag select 'MTAG_C13', tm, c13 from plc_tag_table;
RUNNING EVENT_C14 PLC_TAG_TABLE insert into tag select 'MTAG_C14', tm, c14 from plc_tag_table;
RUNNING EVENT_C15 PLC_TAG_TABLE insert into tag select 'MTAG_C15', tm, c15 from plc_tag_table;

데이터 로드

STREAM이 모두 구동중임을 확인했으니 Machloader를 사용해 데이터를 입력, 작동을 확인한다.

STREAM은 입력 방식에 관계 없이 작동하므로 CLI, JDBC, Collector 등 어떤 방식의 입력이 발생하더라도 TAG 테이블로 자동 입력된다.

$ cat 5_plc_tag_load.sh
machloader  -t plc_tag_table -i -d 5_plc_tag.csv -F "tm YYYY-MM-DD HH24:MI:SS mmm:uuu:nnn"
 
$ sh 5_plc_tag_load.sh
-----------------------------------------------------------------
     Machbase Data Import/Export Utility.
     Release Version 6.5.1.official
     Copyright 2014, MACHBASE Corporation or its subsidiaries.
     All Rights Reserved.
-----------------------------------------------------------------
NLS            : US7ASCII            EXECUTE MODE   : IMPORT
TARGET TABLE   : plc_tag_table       DATA FILE      : 5_plc_tag.csv
IMPORT MODE    : APPEND              FIELD TERM     : ,
ROW TERM       : \n                  ENCLOSURE      : "
ESCAPE         : \                   ARRIVAL_TIME   : FALSE
ENCODING       : NONE                HEADER         : FALSE
CREATE TABLE   : FALSE
 
 Progress bar                       Imported records        Error records
                                               80000                    0

데이터 로드 중 TAG 테이블 데이터를 확인해보면 실시간으로 데이터가 입력됨을 확인할 수 있다.

Mach> select count(*) from TAG;
count(*)
-----------------------
16775979
[1] row(s) selected.
Mach> select count(*) from TAG;
count(*)
-----------------------
17609187
[1] row(s) selected.
Mach> select count(*) from TAG;
count(*)
-----------------------
18238357
[1] row(s) selected.
Elapsed time: 0.000

STREAM 동작의 결과 확인

아래와 같이 STREAM이 소스 테이블(plc_tag_table)의 데이터를 어디까지 읽었는지를 확인할 수 있다.

Mach> select name, state, end_rid from v$streams;
name      state   end_rid
---------------------------------------------------------
EVENT_V0  RUNNING 909912
EVENT_V1  RUNNING 1584671
EVENT_C0  RUNNING 1312416
EVENT_C1  RUNNING 1268520
EVENT_C2  RUNNING 1636800
EVENT_C3  RUNNING 1197840
EVENT_C4  RUNNING 622728
EVENT_C5  RUNNING 972780
EVENT_C6  RUNNING 1021512
EVENT_C7  RUNNING 1287474
EVENT_C8  RUNNING 826956
EVENT_C9  RUNNING 1639032
EVENT_C10 RUNNING 725954
EVENT_C11 RUNNING 1511436
EVENT_C12 RUNNING 531079
EVENT_C13 RUNNING 1004400
EVENT_C14 RUNNING 741768
EVENT_C15 RUNNING 746604
[18] row(s) selected.

end_rid column의 값이 소스 테이블의 레코드 갯수와 동일하면 소스 테이블에서 더 이상 읽을 것이 없다는 뜻이다.

Mach> select name, state, end_rid from v$streams;
name      state   end_rid
---------------------------------------------------------
EVENT_V0  RUNNING 2000000
EVENT_V1  RUNNING 2000000
EVENT_C0  RUNNING 2000000
EVENT_C1  RUNNING 2000000
EVENT_C2  RUNNING 2000000
EVENT_C3  RUNNING 2000000
EVENT_C4  RUNNING 2000000
EVENT_C5  RUNNING 2000000
EVENT_C6  RUNNING 2000000
EVENT_C7  RUNNING 2000000
EVENT_C8  RUNNING 2000000
EVENT_C9  RUNNING 2000000
EVENT_C10 RUNNING 2000000
EVENT_C11 RUNNING 2000000
EVENT_C12 RUNNING 2000000
EVENT_C13 RUNNING 2000000
EVENT_C14 RUNNING 2000000
EVENT_C15 RUNNING 2000000
[18] row(s) selected.

TAG 테이블의 데이터 갯수가 소스 테이블의 갯수 * STREAM의 갯수와 같으므로 STREAM이 정상적으로 모든 데이터를 읽었음을 확인할 수 있다.

Mach> select count(*) from TAG;
count(*)
-----------------------
36000000
[1] row(s) selected.

입력된 데이터의 시간 범위도 다음과 같이 확인할 수 있다.

Mach> select min(time), max(time) from TAG;
min(time)                       max(time)
-------------------------------------------------------------------
2009-01-28 07:03:34 000:000:000 2009-01-28 12:36:58 020:000:000
[1] row(s) selected.

데이터 추가

STREAM이 실제로 각 데이터 입력마다 반응하는지 확인하기 위해 insert 구문을 통해 확인해볼 수 있다.

Mach> insert into plc_tag_table values(TO_DATE('2009-01-28 12:37:00 000:000:000'), 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000, 50000);
1 row(s) inserted.

PLC_TAG_TABLE에 레코드 하나를 더 추가한 순간 아래와 같이 각 스트림의 end_rid가 1건 늘어 2000001 건이 된 것을 확인할 수 있다.

Mach> select name, state, end_rid from v$streams;
name      state   end_rid
---------------------------------------------------------------
EVENT_V0  RUNNING 2000001
EVENT_V1  RUNNING 2000001
EVENT_C0  RUNNING 2000001
EVENT_C1  RUNNING 2000001
EVENT_C2  RUNNING 2000001
EVENT_C3  RUNNING 2000001
EVENT_C4  RUNNING 2000001
EVENT_C5  RUNNING 2000001
EVENT_C6  RUNNING 2000001
EVENT_C7  RUNNING 2000001
EVENT_C8  RUNNING 2000001
EVENT_C9  RUNNING 2000001
EVENT_C10 RUNNING 2000001
EVENT_C11 RUNNING 2000001
EVENT_C12 RUNNING 2000001
EVENT_C13 RUNNING 2000001
EVENT_C14 RUNNING 2000001
EVENT_C15 RUNNING 2000001
[18] row(s) selected.
최근 업데이트