📊 สร้าง Real-time Dashboard ด้วย Kinesis + Lambda + DynamoDB + WebSocket API

Sharing is caring!

บทนำ

ในโลกของข้อมูลยุคปัจจุบัน ความเร็วคือทุกสิ่ง — ระบบ Real-time Dashboard ช่วยให้เราติดตามข้อมูลได้ทันที เช่น ยอดขาย, การจราจร, IoT Sensors หรือการใช้งานระบบ และบน AWS คุณสามารถสร้าง Dashboard แบบ Real-time ได้ง่าย โดยใช้บริการ Kinesis + Lambda + DynamoDB + WebSocket API ร่วมกัน

1. สถาปัตยกรรมโดยรวมของระบบ

ระบบ Real-time Dashboard จะประกอบด้วย 4 ส่วนหลักดังนี้:

  1. Kinesis Data Stream – รับข้อมูลที่เข้ามาอย่างต่อเนื่อง
  2. Lambda Function – ประมวลผลข้อมูลจาก Stream
  3. DynamoDB – จัดเก็บข้อมูลสถานะล่าสุด
  4. WebSocket API (API Gateway) – ส่งข้อมูลอัปเดตไปยัง Dashboard แบบเรียลไทม์

2. การสร้าง Kinesis Data Stream

เริ่มต้นด้วยการสร้าง Stream สำหรับรับข้อมูลจากแหล่งข้อมูลภายนอก เช่น Application, IoT Device หรือ API

aws kinesis create-stream \
  --stream-name RealtimeDataStream \
  --shard-count 1
    

เมื่อสร้างสำเร็จ จะสามารถส่งข้อมูลเข้า Stream ได้ เช่น ตัวอย่าง JSON:

{
  "deviceId": "sensor-01",
  "temperature": 27.5,
  "humidity": 66,
  "timestamp": "2025-10-08T09:30:00Z"
}
    

3. เขียน Lambda เพื่อประมวลผลข้อมูลจาก Kinesis

Lambda จะทำหน้าที่อ่านข้อมูลจาก Kinesis และบันทึกข้อมูลล่าสุดลงใน DynamoDB รวมถึงส่งข้อมูลอัปเดตไปยัง WebSocket API เพื่อแจ้ง Dashboard แบบเรียลไทม์

// index.js
const AWS = require('aws-sdk');
const dynamo = new AWS.DynamoDB.DocumentClient();
const apiGateway = new AWS.ApiGatewayManagementApi({
  endpoint: 'your-websocket-api-id.execute-api.ap-southeast-1.amazonaws.com/prod'
});

exports.handler = async (event) => {
  for (const record of event.Records) {
    const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
    const data = JSON.parse(payload);

    await dynamo.put({
      TableName: 'SensorData',
      Item: data
    }).promise();

    await apiGateway.postToConnection({
      ConnectionId: 'your-connection-id',
      Data: JSON.stringify(data)
    }).promise();
  }

  return { statusCode: 200 };
};
    

4. การสร้าง DynamoDB Table

ใช้ DynamoDB สำหรับเก็บข้อมูลสถานะล่าสุดของแต่ละอุปกรณ์หรือ Record:

aws dynamodb create-table \
  --table-name SensorData \
  --attribute-definitions AttributeName=deviceId,AttributeType=S \
  --key-schema AttributeName=deviceId,KeyType=HASH \
  --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
    

5. การสร้าง WebSocket API บน API Gateway

WebSocket API ใช้สำหรับเชื่อมต่อกับ Dashboard ในฝั่ง Frontend และรับข้อมูลอัปเดตจาก Lambda เมื่อมีข้อมูลใหม่เข้ามา

aws apigatewayv2 create-api \
  --name "RealtimeDashboardAPI" \
  --protocol-type WEBSOCKET \
  --route-selection-expression "$request.body.action"
    

6. สร้าง Frontend Dashboard ด้วย WebSocket

ตัวอย่างโค้ด React สำหรับแสดงข้อมูลแบบเรียลไทม์จาก WebSocket API:

// RealtimeDashboard.js
import React, { useEffect, useState } from "react";

function RealtimeDashboard() {
  const [data, setData] = useState([]);

  useEffect(() => {
    const ws = new WebSocket("wss://your-websocket-api-id.execute-api.ap-southeast-1.amazonaws.com/prod");

    ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      setData((prev) => [message, ...prev]);
    };

    return () => ws.close();
  }, []);

  return (
    Realtime Sensor Dashboard
        {data.map((d, i) => (
          
            {d.deviceId} - Temp: {d.temperature}°C - Humidity: {d.humidity}%
          
        ))}
      
  );
}

export default RealtimeDashboard;
    

7. การจำลองการส่งข้อมูลเข้า Stream

คุณสามารถส่งข้อมูลจำลองเข้า Kinesis ได้ด้วยคำสั่ง CLI หรือ Script:

aws kinesis put-record \
  --stream-name RealtimeDataStream \
  --partition-key "sensor-01" \
  --data '{"deviceId":"sensor-01","temperature":28,"humidity":64}'
    

เมื่อข้อมูลถูกส่งเข้าไป ระบบจะอัปเดต Dashboard แบบเรียลไทม์ทันที

8. เพิ่ม Observability ด้วย CloudWatch

คุณสามารถดู Logs ของ Lambda และ Metrics ของ Kinesis ได้ผ่าน Amazon CloudWatch เพื่อวิเคราะห์การทำงานแบบ Real-time และตรวจสอบความเร็วในการส่งข้อมูล

9. สรุปภาพรวมการทำงาน

เมื่อเชื่อมต่อทุกส่วนเข้าด้วยกัน จะได้ระบบ Dashboard ที่สามารถประมวลผลข้อมูล Real-time โดยไม่มี Server ให้ดูแล และสามารถขยายระบบได้อัตโนมัติ

  • ข้อมูลเข้ามาผ่าน Kinesis → Lambda
  • บันทึกสถานะล่าสุดใน DynamoDB
  • ส่งข้อมูลไปยังผู้ใช้ผ่าน WebSocket API

📘 บทความโดย King Pool

ภาพประกอบ: Kinesis, Lambda, DynamoDB, WebSocket, CloudWatch

อ่านต่อ: การทำ Observability ด้วย CloudWatch, X-Ray และ Tracing

Leave a Reply

อีเมลของคุณจะไม่แสดงให้คนอื่นเห็น ช่องข้อมูลจำเป็นถูกทำเครื่องหมาย *