



1. Kinesis Data Streams 생성
1. Kinesis 콘솔의 탐색창에서 데이터 스트림, 데이터 스트림 생성을 차례로 선택합니다.
데이터 스트림 이름의 경우 'click-stream' 을 사용하고 다른 모든 옵션은 기본값으로 설정된 상태로 둡니다

2. IAM 생성
IAM 역할을 생성하여 Amazon Redshift가 click-stream Kinesis Data Streams에 대한 접근 권한를 부여


{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:*:782638322607:stream/click-stream"
},
{
"Effect": "Allow",
"Action": [
"kinesis:ListStreams",
"kinesis:ListShards"
],
"Resource": "*"
}
]
}


역할 창으로 돌아가서 정책 선택


3. Amazon Redshift 콘솔의 탐색 창에서 Redshift serverless 생성
네임스페이스를 생성할 때 권한 섹션의 드롭다운 메뉴에서 IAM 역할 연결을 선택합니다






4. Kinesis 외부 스키마 생성
Amazon Redshift 콘솔에서 쿼리 편집기 v2를 선택합니다. 리소스 목록에서 새 서버리스 데이터베이스를 선택하여 연결합니다. 이제 SQL을 사용하여 스트리밍 수집을 구성할 수 있습니다. 먼저 스트리밍 서비스에 매핑되는 외부 스키마를 생성합니다.


IAM 붙여넣기

CREATE EXTERNAL SCHEMA clicks
FROM KINESIS
IAM_ROLE 'arn:aws:iam::451456566564:role/redshift';
sensors 스키마 생성

잘 생성되었다
5. 스트리밍 데이터를 읽을 Materialized view 생성
JSON 데이터를 sensor_data 구체화된 뷰의 단일 열에 저장하려면 JSON_PARSE 함수를 사용합니다.
CREATE MATERIALIZED VIEW click_data AUTO REFRESH YES AS
SELECT approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
refresh_time,
JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'event_time')::VARCHAR(20) as event_time,
JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'event_type')::VARCHAR(20) as event_type,
JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'product_id')::VARCHAR(20) as product_id,
JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'category_id ')::VARCHAR(20) as category_id,
JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'category_code ')::VARCHAR(20) as category_code,
JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'price')::VARCHAR(8) as price,
JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'user_id')::VARCHAR(20) as user_id
FROM clicks."click-stream";

6. Kinesis 데이터 스트림으로 데이터 로드
API Gateway 생성


리소스 생성

POST 메서드를 만들기 전에 API Gateway가 Kinesis에 대한 역할을 만들어준다








어떤 타입인지 HTTP 헤더에 쓰기. Amazon JSON형태이다
매핑 템플릿은 컨텐츠에 대한 데이터 형태. JSON이다
Content-Type - 'application/x-amz-json-1.1'



#set ( $enter = "
")
#set($json = "$input.json('$')$enter")
{
"Data": "$util.base64Encode("$json")",
"PartitionKey": "$input.params('X-Amzn-Trace-Id')",
"StreamName": "click-stream"
}



인스턴스 생성 후 MobaXterm 접속
curl -d "{\"event_time\":\"1111\",\"event_type\":\"2222\",\"product_id\":\"3333\",\"category_id \":\"4444\",\"category_code \":\"5555\",\"price\":\"6666\",\"user_id\":\"7777\",\"user_session\":\"8888\"}" -H 'Content-Type: application/json' -X POST https://4a6653py84.execute-api.ap-northeast-2.amazonaws.com/prod/v1

import datetime
import json
import random
import boto3
STREAM_NAME = "click-stream"
def get_random_data():
current_temperature = round(10 + random.random() * 170, 2)
if current_temperature > 160:
status = "ERROR"
elif current_temperature > 140 or random.randrange(1, 100) > 80:
status = random.choice(["WARNING","ERROR"])
else:
status = "OK"
return {
'sensor_id': random.randrange(1, 100),
'current_temperature': current_temperature,
'status': status,
'event_time': datetime.datetime.now().isoformat()
}
def send_data(stream_name, kinesis_client):
while True:
data = get_random_data()
partition_key = str(data["sensor_id"])
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey=partition_key)
if __name__ == '__main__':
kinesis_client = boto3.client('kinesis')
send_data(STREAM_NAME, kinesis_client)
// $ python3 random_data_generator.py

7. Amazon Redshift에서 스트리밍 데이터 쿼리하기
SELECT * FROM click_data LIMIT 10;

8. Redshift와 QuickSight 연결
퍼블릭 액세스를 허용하고 VPC 보안그룹에서 5439 포트번호를 열어준다


QuickSight에서 Dataset 생성



댓글