"Generative AI가 트렌드로 부상하고 있지만, 여전히 데이터에 대한 수요는 적지 않습니다. 접근이 용이한 클라우드의 특성을 활용하여 간단한 데이터 분석 플랫폼을 구성해보면서 자원에 대한 이해도를 높일 수 있습니다.
또한, 데이터 보안에 특화된 서비스인 Macie를 포함하여 아래의 시나리오를 토대로 구성하는 방법을 알아보겠습니다.
시나리오
사용자가 웹 페이지에서 "수집 시작" 버튼을 누르면 API를 통해 현재 코인의 가격을 받아오고, "수집 중단" 버튼을 누르면 수집을 멈추도록 구성합니다.

목표 1: 코인의 거래 정보를 실시간으로 S3에 적재하고, Athena를 통해 간편하게 분석할 수 있는 환경을 구축합니다.
목표 2: Macie를 통해 개인 및 민감 정보를 검출합니다.
아키텍쳐 및 자원 설명
Data Platform을 구성할때 자주 쓰이는 자원으로 설계하였으며, 아래 자원들이 필수로 사용되어야 하는 것은 아닙니다.또한, 테스트 및 구성을 간단하게 하기위해 Public 환경으로 되어 있으며, 실제 고객사에 구성할 경우는 Private 환경으로 구성하는 것을 권장합니다.

- Static Web(S3) : “수집 시작”/”수집 중단” 버튼을 제공하는 Client(Web)
-
API Gateway(Socket) : 실시간 수집에 용이하도록 API Gateway(Socket)으로 구성
-
Lambda(Connect) : 사용자가 “수집 시작”을 클릭하면 API Gateway가 해당 Lambda를 통해 연결을 설정
-
Lambda(Default) : 데이터 수집 및 처리 후 Kinesis Data Stream으로 전달
-
Lambda(DisConnect) : 사용자가 “수집 중단”을 클릭하면 API Gateway가 해당 Lambda를 통해 연결을 해제
-
Kinesis Data Stream : 실시간으로 데이터 스트리밍을 처리하며 Lambda을 통해 데이터를 받아와 보존 기간만큼 저장
-
Kinesis Data Firehose : Kinesis Data Stream에 있는 데이터를 가져와 지정된 시간단위로 S3에 적재
-
TradeData(S3) : 최종적인 Coin 거래 데이터가 저장
-
Athena : 간단하게 데이터 분석을 위해 S3에 쿼리를 하기 위해 구성
- Macie : 가져온 Data 중 민감/개인 정보가 있는지 검사하기 위해 구성
구성하기
이번 글에서는 자원에 필요한 상세한 Role이나 설정보다는 주요 내용만을 다루도록 하겠습니다.
1. S3 Static Web Hosting
아래와 같이 Hosting 하고자 하는 S3에 "정적 웹 사이트 호스팅" 기능을 활성화합니다.

이 시나리오에서 사용한 HTML 코드는 index.html 하나에 다 작성되는 코드이며 일부만 바꿔서 사용하면 됩니다.
여기서는 Web Socket에 해당하는 부분을 바꾸면되며 API Gateway를 만들고 난뒤에 수정하면 됩니다.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket Crypto Data Collection</title>
</head>
<body>
<h1>Crypto Data Collection</h1>
<button id="start-btn">수집 시작</button>
<button id="stop-btn">수집 중단</button>
<div id="status"></div>
<div id="data"></div>
<script>
let websocket = null;
let intervalId = null; // 주기적 요청을 위한 ID
const statusDiv = document.getElementById("status");
const dataDiv = document.getElementById("data");
// 수집 시작 버튼 클릭 시 WebSocket 연결 시작
document.getElementById("start-btn").addEventListener("click", function () {
if (!websocket || websocket.readyState !== WebSocket.OPEN) {
websocket = new WebSocket('[API Gateway URL : wss로 시작되는 것]');
websocket.onopen = function () {
statusDiv.innerText = "WebSocket 연결됨";
websocket.send(JSON.stringify({ action: "startCollection", coinname: "BTCUSDT" }));
// 주기적 요청 시작
intervalId = setInterval(() => {
websocket.send(JSON.stringify({ action: "getLatestPrice" }));
}, 1000); // 1초마다 요청
};
websocket.onmessage = function (event) {
const data = JSON.parse(event.data);
dataDiv.innerText = `코인: ${data.coinname}, 가격: ${data.price}`;
};
websocket.onclose = function () {
clearInterval(intervalId); // 연결 종료 시 요청 중단
statusDiv.innerText = "WebSocket 연결 종료됨";
};
websocket.onerror = function (error) {
statusDiv.innerText = `에러 발생: ${error}`;
};
}
});
// 수집 중단 버튼 클릭 시 WebSocket 연결 종료
document.getElementById("stop-btn").addEventListener("click", function () {
if (websocket && websocket.readyState === WebSocket.OPEN) {
websocket.close();
clearInterval(intervalId); // 요청 중단
statusDiv.innerText = "수집 중단됨";
}
});
</script>
</body>
</html>
2. Lambda 구성하기
Connect, Disconnect, Default에 해당하는 Lambda가 총 3개가 필요하며 아래 코드에서도 마찬가지로 해당되는 부분을 변경하면 됩니다.
Default에 해당하는 Lambda는 코드 수정 이외에도 Binance의 API호출을 위한 패키지가 필요하여 추가 작업이 필요하므로 아래 설명을 참고하면 됩니다.
2-1. Connect(Lambda)
def lambda_handler(event, context):
# 클라이언트가 연결될 때 호출되는 Lambda 함수
return {
'statusCode': 200,
'body': 'Connected'
}
2-2. Disconnect(Lambda)
def lambda_handler(event, context):
# 클라이언트가 연결을 끊을 때 호출되는 Lambda 함수
return {
'statusCode': 200,
'body': 'Disconnected'
}
2-3. Default(Lambda)
import boto3
import json
from binance.client import Client
from datetime import datetime, timedelta, timezone
import random
import string
import re
# 바이낸스 API 키
api_key = "[API KEY]"
api_secret = "[API SECRET]"
client = Client(api_key=api_key, api_secret=api_secret)
# Kinesis Data Stream 이름
kinesis_stream_name = '[Kinesis Data Stream]'
# Kinesis 클라이언트 생성
kinesis_client = boto3.client('kinesis', region_name='ap-northeast-2')
# 임의의 비트코인 트랜잭션 ID 생성
def generate_transaction_id():
return ''.join(random.choices('abcdef0123456789', k=64))
# 임의의 주문 유형 (buy/sell) 선택
def generate_ordertype():
return random.choice(['buy', 'sell'])
# 임의의 IP 주소 생성
def generate_ip():
return '.'.join(str(random.randint(0, 255)) for _ in range(4))
# 임의의 비트코인 지갑 주소 생성 (대략적인 형식으로, 실제로는 더 복잡)
def generate_wallet_address():
# 비트코인 주소는 일반적으로 26~35자 길이의 문자열로, 1 또는 3으로 시작함
return random.choice(['1', '3']) + ''.join(random.choices(string.ascii_letters + string.digits, k=33))
def lambda_handler(event, context):
# API Gateway WebSocket Management API 엔드포인트 URL
api_gateway_url = "[https://로 시작하는 API Gateway WebSocket Management API 엔드포인트 URL]"
# WebSocket 클라이언트로부터 받은 메시지를 파싱
body = json.loads(event['body'])
coinname = body.get('coinname', 'BTCUSDT') # 요청받은 코인 심볼
# 바이낸스 API에서 현재 코인 가격 가져오기
try:
coin_data = client.get_symbol_ticker(symbol=coinname)
price = coin_data['price']
except Exception as e:
price = "Error retrieving price"
print(f"Error retrieving coin data: {e}")
# 현재 시간을 ISO 8601 형식으로 추가
timestamp = (datetime.utcnow() + timedelta(hours=9)).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + '000'
#timestamp = datetime.utcnow().isoformat()
# 임의의 데이터 생성
transaction_id = generate_transaction_id()
ordertype = generate_ordertype()
ip_address = generate_ip()
wallet_address = generate_wallet_address()
# 메시지 생성 (WebSocket과 Kinesis 모두에 전송할 동일한 메시지)
message = {
"coinname": coinname,
"price": price,
"timestamp": timestamp, # 현재 시간을 메시지에 포함
"transaction_id": transaction_id,
"ordertype": ordertype,
"ip_address": ip_address,
"wallet_address": wallet_address
}
### 1. Kinesis로 데이터 전송 ###
try:
kinesis_client.put_record(
StreamName=kinesis_stream_name,
Data=json.dumps(message),
PartitionKey=coinname # 코인 이름으로 파티션 키 설정
)
print(f"Data sent to Kinesis: {message}")
except Exception as e:
print(f"Error sending data to Kinesis: {e}")
### 2. WebSocket으로 데이터 전송 ###
# API Gateway Management API의 endpoint 생성
client_api = boto3.client('apigatewaymanagementapi', endpoint_url=api_gateway_url)
# 메시지를 전송한 클라이언트의 connectionId 가져오기
connection_id = event['requestContext']['connectionId']
try:
client_api.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps(message)
)
print(f"Message sent to WebSocket: {message}")
except Exception as e:
print(f"Error sending message to WebSocket: {e}")
return {
'statusCode': 200,
'body': 'Data sent to Kinesis and WebSocket'
}
Default에 해당하는 Lambda 코드에서 binance와 관련된 패키지를 Import하고 있어서 해당 부분은 추가 작업이 필요합니다.
아래와 같이 정상적으로 적용되면 해당 Lambda에서 확인 가능해집니다.
필요한 패키지 설치에 필요한 명령어는 아래와 같습니다.
pip install binance -t .
홰당 파일을 Zip으로 압축하여 AWS Lambda의 Layer에 등록하고 Default Lambda에 연결하면 됩니다.다만, 아래와 같은 File 구조를 반드시 지켜야하며, Linux 환경에서 설치 및 ZIP으로 압축을 권장합니다.(Binance 기준으로 Windows에서 Download 및 ZIP 압축시 오류발생)
3. Kinesis Data Stream & Data Firehose
Kinesis Data Stream은 별도로 설정할 것은 없으며, 기본설정으로 생성하고 Lambda(Default)에 해당 이름으로 수정하면 됩니다.

Kinesis Data Firehose는 소스 : Kinesis Data Stream / 대상 : S3 로 설정하여 생성합니다.

버퍼 간격을 1초로 설정하여 파일을 작게 쪼개어 S3에 저장될 수 있도록 설정합니다.(필수 사항은 아닙니다.)

그 외 자원인 Athena, Macie는 아래 테스트하기에서 생성하면서 진행하도록 하겠습니다.
테스트 하기
위 구성 및 설정을 정상적으로 완료했다면,
여기서부터는 분석을 위한 SQL 쿼리를 사용해보기 위해 Athena 구성과 민감정보를 검사하기 위한 Macie를 구성하고 테스트해보겠습니다.
Athena에서 아래 화면처럼 테이블을 생성하기 위해 "S3 버킷 데이터"로 생성을 시작합니다.

테이블 이름은 임의로 하면 되며, 데이터베이스는 테스트를 위해 신규로 생성하며 이름은 마찬가지로 임의로 지정합니다.
데이터 세트는 위에서 구성한 S3를 지정하면 되며, 아키텍쳐상에서 TradeData(S3)로 지정하면 됩니다.
데이터 형식에서는 파일 형식을 "JSON"으로 변경하고 그 외의 것들은 기본으로 설정합니다.
열 세부 정보는 아래와 같이 3개의 열을 이름을 지정하고 유형을 지정해줍니다.
Lambda(Default)에서는 API 호출시에 Coinname, Price 2개의 응답에 Timestamp, Transaction ID, Order Type 등 몇개를 더 받아서 TradeData(S3)에 적재하도록 되어있지만,
Athena에서는 굳이 필요하지 않으므로 3개 [Coinname, Price, Timestamp]만 지정하도록 합니다.


아래와 같이 목표 1에 해당하는 Athena에서 Tradedata(S3)에 Json형태로 저장된 데이터들을 SQL 쿼리로 질의할 수 있습니다.
목표 1: 코인의 거래 정보를 실시간으로 S3에 적재하고, Athena를 통해 간편하게 분석할 수 있는 환경을 구축합니다.

다음은 Macie를 통해 저장된 데이터들의 민감정보가 있는지 확인하는 테스트를 진행하겠습니다.
TradeData에 해당하는 S3를 검토하는 것으로 선택하고, 테스트를 위해 아래 화면처럼 일회성 작업을 선택합니다.

관리형 데이터 식별자 옵션에서는 테스트 하고자 하는 것과는 크게 관련이 없으므로 "추천"으로 진행합니다.
테스트 하고자 하는 것은 "사용자 지정 데이터 식별자"를 통해서 Transaction ID, IP가 포함된 데이터가 있는지를 검출하는 것으로
직접 정규식을 작성하여 지정해주어야 합니다.
Transaction ID의 정규식은 아래와 같이 작성합니다.
\b[0-9a-fA-F]{64}\b
IP의 정규식은 아래와 같이 작성합니다.
\b(?:\d{1,3}\.){3}\d{1,3}\b
아래와 같이 설정이 되었다면, 검토 및 생성하여 검사를 시작합니다.
몇 분 후 결과가 완료되면 아래와 같이 상세한 내용을 확인 할 수 있습니다.
아래와 같이 목표 2에 해당하는 민감정보(Transaction ID, IP)가 Macie를 통해 S3에서 검출이 되는 것을 확인할 수 있습니다.
목표 2: Macie를 통해 개인 및 민감 정보를 검출합니다.
결론
- Data Scientist/Engineer 만큼의 지식이 없어도, 간단하게 테스트하기 위한 환경을 구성하는 것은 어렵지 않습니다.
- 관련 자원을 생성하고 설정해보면서 각 자원의 특성을 이해하면 성능, 안정성 등을 고려하여 설계를 하는데 도움이 됩니다.
- Data Platform에 사용되는 자원들은 위에서 사용한 것 이외에도 다르며 각각의 특성을 알고 설계에 사용해야합니다.
- 이번 테스트는 Data Platform은 어떻게 구성되는건지 맛보기 위해 최소한의 설정으로만 진행되었습니다.
- Macie는 민감정보 이외에도 암호화, 퍼블릭 엑세스와 같은 보안 태세에 대해서도 평가가 가능합니다.
- 테스트에서 사용자 지정 및 정규식을 작성한것처럼 Macie가 별도의 설정없이 민감정보를 판단하는데는 한계가 있으므로 적절하게 사용해야 합니다.
