본문 바로가기
카테고리 없음

Amazon Redshift 신규 기능 – Kinesis Data Streams 및 Kafka용 관리형 스트리밍 수집 정식 출시

by 16비트 2023. 6. 24.

 

 

1. Kinesis Data Streams 생성

1. Kinesis 콘솔의 탐색창에서 데이터 스트림, 데이터 스트림 생성을 차례로 선택합니다. 

데이터 스트림 이름의 경우 'click-stream' 을 사용하고 다른 모든 옵션은 기본값으로 설정된 상태로 둡니다

 

 

2. IAM 생성

 IAM 역할을 생성하여 Amazon Redshift가 click-stream Kinesis Data Streams에 대한 접근 권한를 부여

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:*:782638322607:stream/click-stream"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:ListShards"
            ],
            "Resource": "*"
        }
    ]
}

역할 창으로 돌아가서 정책 선택

 

3. Amazon Redshift 콘솔의 탐색 창에서 Redshift serverless 생성

네임스페이스를 생성할 때 권한 섹션의 드롭다운 메뉴에서 IAM 역할 연결을 선택합니다

 

4. Kinesis 외부 스키마 생성

Amazon Redshift 콘솔에서 쿼리 편집기 v2를 선택합니다. 리소스 목록에서 새 서버리스 데이터베이스를 선택하여 연결합니다. 이제 SQL을 사용하여 스트리밍 수집을 구성할 수 있습니다. 먼저 스트리밍 서비스에 매핑되는 외부 스키마를 생성합니다.

IAM 붙여넣기

CREATE EXTERNAL SCHEMA clicks
FROM KINESIS
IAM_ROLE 'arn:aws:iam::451456566564:role/redshift';

sensors 스키마 생성

잘 생성되었다

 

5. 스트리밍 데이터를 읽을 Materialized view 생성

JSON 데이터를 sensor_data 구체화된 뷰의 단일 열에 저장하려면 JSON_PARSE 함수를 사용합니다.

CREATE MATERIALIZED VIEW click_data AUTO REFRESH YES AS
    SELECT approximate_arrival_timestamp,
           partition_key,
           shard_id,
           sequence_number,
           refresh_time,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'event_time')::VARCHAR(20) as event_time,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'event_type')::VARCHAR(20) as event_type,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'product_id')::VARCHAR(20) as product_id,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'category_id ')::VARCHAR(20) as category_id,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'category_code ')::VARCHAR(20) as category_code,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'price')::VARCHAR(8) as price,
           JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'user_id')::VARCHAR(20) as user_id
      FROM clicks."click-stream";

 

6. Kinesis 데이터 스트림으로 데이터 로드

API Gateway 생성

리소스 생성

 

POST 메서드를 만들기 전에 API Gateway가 Kinesis에 대한 역할을 만들어준다

 

어떤 타입인지 HTTP 헤더에 쓰기. Amazon JSON형태이다

매핑 템플릿은 컨텐츠에 대한 데이터 형태. JSON이다

Content-Type - 'application/x-amz-json-1.1'

#set ( $enter = "
")
#set($json = "$input.json('$')$enter")
{
    "Data": "$util.base64Encode("$json")",
    "PartitionKey": "$input.params('X-Amzn-Trace-Id')",
    "StreamName": "click-stream"
}

 

인스턴스 생성 후 MobaXterm 접속

curl -d "{\"event_time\":\"1111\",\"event_type\":\"2222\",\"product_id\":\"3333\",\"category_id \":\"4444\",\"category_code \":\"5555\",\"price\":\"6666\",\"user_id\":\"7777\",\"user_session\":\"8888\"}" -H 'Content-Type: application/json' -X POST https://4a6653py84.execute-api.ap-northeast-2.amazonaws.com/prod/v1

 

 

import datetime
import json
import random
import boto3

STREAM_NAME = "click-stream"


def get_random_data():
    current_temperature = round(10 + random.random() * 170, 2)
    if current_temperature > 160:
        status = "ERROR"
    elif current_temperature > 140 or random.randrange(1, 100) > 80:
        status = random.choice(["WARNING","ERROR"])
    else:
        status = "OK"
    return {
        'sensor_id': random.randrange(1, 100),
        'current_temperature': current_temperature,
        'status': status,
        'event_time': datetime.datetime.now().isoformat()
    }


def send_data(stream_name, kinesis_client):
    while True:
        data = get_random_data()
        partition_key = str(data["sensor_id"])
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key)


if __name__ == '__main__':
    kinesis_client = boto3.client('kinesis')
    send_data(STREAM_NAME, kinesis_client)

 

// $ python3 random_data_generator.py

 

7. Amazon Redshift에서 스트리밍 데이터 쿼리하기

SELECT * FROM click_data LIMIT 10;

 

8. Redshift와 QuickSight 연결

퍼블릭 액세스를 허용하고 VPC 보안그룹에서 5439 포트번호를 열어준다

 

QuickSight에서 Dataset 생성

댓글