Anomaly Detection System

เทคโนโลยี Anomaly Detection เป็นระบบปัญญาประดิษฐ์ขั้นสูงที่ใช้ในการตรวจจับความผิดปกติหรือค่าที่ไม่ปกติ ในข้อมูลจากมิเตอร์และเซนเซอร์ต่างๆ ด้วยอัลกอริธึม Machine Learning ที่สามารถเรียนรู้รูปแบบปกติของระบบ และเตือนภัยทันทีเมื่อพบความผิดปกติ ช่วยป้องกันอุบัติเหตุและลดความเสียหายจากการทำงานที่ผิดปกติ

99.8%
Detection Rate
AI-powered Accuracy
<0.05%
False Positive
Minimal False Alarms
24/7
Real-time Monitor
Continuous Analysis
2s
Response Time
Instant Alert

Anomaly Detection คืออะไร?

Anomaly Detection เป็นเทคโนโลยีปัญญาประดิษฐ์ที่ใช้ในการตรวจจับค่าหรือพฤติกรรมที่ผิดปกติ หรือแตกต่างจากรูปแบบปกติที่ระบบได้เรียนรู้ไว้ โดยใช้อัลกอริธึม Machine Learning ขั้นสูง เช่น Isolation Forest, One-Class SVM และ Autoencoders ในการวิเคราะห์ข้อมูล

ในระบบ GaugeSnap เทคโนโลยี Anomaly Detection ทำหน้าที่เฝ้าระวังและตรวจสอบข้อมูลจากมิเตอร์ต่างๆ ตลอด 24 ชั่วโมง เพื่อเตือนภัยทันทีเมื่อพบค่าที่ผิดปกติ ไม่ว่าจะเป็นการรั่วไหล การเปลี่ยนแปลงความดันกะทันหัน หรือการทำงานผิดปกติของอุปกรณ์

ประเภทความผิดปกติที่ตรวจจับได้

  • Point Anomalies: ค่าเดี่ยวที่ผิดปกติ เช่น ค่าความดันสูงผิดปกติ
  • Contextual Anomalies: ความผิดปกติตามบริบท เช่น การใช้ไฟฟ้าสูงในเวลากลางคืน
  • Collective Anomalies: รูปแบบผิดปกติ เช่น การเปลี่ยนแปลงอย่างต่อเนื่อง
    Point Anomaly: ค่าที่ผิดปกติ ณ จุดเวลาหนึ่ง
  • Contextual Anomaly: ค่าปกติแต่อยู่ในบริบทที่ผิด
  • Collective Anomaly: กลุ่มของข้อมูลที่ผิดปกติ

ประโยชน์หลัก

  • ป้องกันอุบัติเหตุและความเสียหาย
  • ลดเวลาหยุดทำงานของเครื่องจักร
  • เพิ่มความปลอดภัยในการทำงาน

Detection Process

Data Collection
Continuous Gauge Monitoring
Pattern Learning
Normal Behavior Analysis
Real-time Analysis
Statistical & ML Detection
Anomaly Scoring
Confidence & Severity Rating
Alert System
Automated Notifications

วิธีการตรวจจับความผิดปกติ

เทคนิคและอัลกอริธึมที่ใช้ในการตรวจจับ

Statistical Methods

ใช้สถิติเพื่อหาค่าที่อยู่นอกเหนือขอบเขตปกติ

Z-Score Analysis

ตรวจจับค่าที่อยู่นอก 3 standard deviations

IQR Method

หาค่าที่อยู่นอก interquartile range

Control Charts

Statistical process control สำหรับ manufacturing

Machine Learning

อัลกอริธึม ML ที่เรียนรู้รูปแบบปกติ

Isolation Forest

แยกข้อมูลผิดปกติออกจากข้อมูลปกติ

One-Class SVM

เรียนรู้ boundary ของข้อมูลปกติ

Local Outlier Factor

ตรวจจับ outliers ท้องถิ่น

Deep Learning

Neural networks สำหรับรูปแบบซับซ้อน

Autoencoders

Reconstruction error เป็นตัวบ่งชี้ความผิดปกติ

LSTM Networks

ตรวจจับ sequential anomalies

GAN-based

ใช้ generator และ discriminator

การใช้งานในอุตสาหกรรม

ตัวอย่างการประยุกต์ใช้ในสถานการณ์จริง

ระบบความดัน

ความผิดปกติที่พบ

  • ความดันสูงกว่าปกติ (Overpressure)
  • ความดันต่ำกว่าปกติ (Underpressure)
  • ความดันผันผวน (Pressure Fluctuation)
  • การรั่วไหล (Leak Detection)

Detection Algorithm

Method: LSTM + Statistical Control
Window Size: 60 seconds
Threshold: 3σ from moving average
Alert Delay: < 5 seconds

Success Story

ระบบตรวจจับการรั่วไหลในโรงงานปิโตรเคมี สามารถตรวจพบการรั่วไหล ก่อนที่จะเกิดอันตราย ลดความเสียหายได้ 90%

การติดตามอุณหภูมิ

รูปแบบความผิดปกติ

  • อุณหภูมิสูงเกินไป (Overheating)
  • การเปลี่ยนแปลงอย่างรวดเร็ว (Rapid Change)
  • รูปแบบการขึ้นลงผิดปกติ (Abnormal Pattern)
  • ระบบทำความเย็นล้มเหลว

Detection Features

Rate of Change: °C/minute monitoring
Seasonal Adjustment: Time-based normalization
Multi-sensor Fusion: Cross-validation
Predictive Window: 10-15 minutes ahead

Performance Metrics

Detection Rate: 98.7%
False Positive: 0.3%
Early Warning: 12 min avg
Cost Savings: $200K/year

ระบบ Anomaly Detection

สถาปัตยกรรมและส่วนประกอบของระบบ

สถาปัตยกรรมระบบ

Data Layer

  • • Real-time gauge readings
  • • Historical data storage
  • • Time-series database
  • • Data quality validation

Processing Layer

  • • Feature engineering
  • • Anomaly detection models
  • • Ensemble methods
  • • Real-time inference

Alert Layer

  • • Severity classification
  • • Multi-channel notifications
  • • Escalation procedures
  • • Alert suppression

Dashboard Layer

  • • Real-time monitoring
  • • Historical analysis
  • • Performance metrics
  • • System health status

การจัดการ Alert

ระดับความรุนแรง

Critical
ต้องหยุดระบบทันที
High
ต้องตรวจสอบภายใน 15 นาที
Medium
ต้องตรวจสอบภายใน 1 ชั่วโมง
Low
ติดตามต่อไป

ช่องทางแจ้งเตือน

Email notifications
SMS alerts
Slack/Teams integration
Dashboard alarms

Auto-Response Actions

  • • Automatic equipment shutdown
  • • Backup system activation
  • • Safety protocol execution
  • • Maintenance ticket creation

ตัวอย่างการ Implement

โค้ดสำหรับสร้างระบบ Anomaly Detection

# Industrial Anomaly Detection System
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LSTM
import warnings
warnings.filterwarnings('ignore')

class AnomalyDetector:
    def __init__(self, method='ensemble'):
        self.method = method
        self.models = {}
        self.scalers = {}
        self.thresholds = {}
        self.is_trained = False
        
    def prepare_features(self, data):
        """Extract features for anomaly detection"""
        features = []
        
        # Statistical features
        features.extend([
            data.mean(),
            data.std(),
            data.min(),
            data.max(),
            np.percentile(data, 25),
            np.percentile(data, 75)
        ])
        
        # Time-based features
        if len(data) > 1:
            # Rate of change
            features.append(np.mean(np.diff(data)))
            features.append(np.std(np.diff(data)))
            
            # Trend analysis
            x = np.arange(len(data))
            slope = np.polyfit(x, data, 1)[0]
            features.append(slope)
        else:
            features.extend([0, 0, 0])
        
        # Frequency domain features (simplified)
        if len(data) > 4:
            fft_data = np.fft.fft(data)
            features.extend([
                np.mean(np.abs(fft_data)),
                np.std(np.abs(fft_data))
            ])
        else:
            features.extend([0, 0])
        
        return np.array(features)
    
    def create_autoencoder(self, input_dim):
        """Create autoencoder for anomaly detection"""
        # Encoder
        input_layer = Input(shape=(input_dim,))
        encoded = Dense(32, activation='relu')(input_layer)
        encoded = Dense(16, activation='relu')(encoded)
        encoded = Dense(8, activation='relu')(encoded)
        
        # Decoder
        decoded = Dense(16, activation='relu')(encoded)
        decoded = Dense(32, activation='relu')(decoded)
        decoded = Dense(input_dim, activation='linear')(decoded)
        
        # Autoencoder model
        autoencoder = Model(input_layer, decoded)
        autoencoder.compile(optimizer='adam', loss='mse')
        
        return autoencoder
    
    def create_lstm_model(self, sequence_length, n_features):
        """Create LSTM model for time series anomaly detection"""
        model = tf.keras.Sequential([
            LSTM(50, return_sequences=True, input_shape=(sequence_length, n_features)),
            LSTM(50, return_sequences=False),
            Dense(25),
            Dense(n_features)
        ])
        
        model.compile(optimizer='adam', loss='mse')
        return model
    
    def train(self, data, gauge_id):
        """Train anomaly detection models"""
        print(f"Training anomaly detector for gauge {gauge_id}")
        
        # Prepare training data
        features_list = []
        sequences = []
        
        # Create sliding windows for feature extraction
        window_size = 60  # 1 minute windows (assuming 1 reading per second)
        
        for i in range(window_size, len(data)):
            window_data = data[i-window_size:i]
            features = self.prepare_features(window_data)
            features_list.append(features)
            sequences.append(window_data.values.reshape(-1, 1))
        
        if len(features_list) == 0:
            raise ValueError("Not enough data for training")
        
        features_array = np.array(features_list)
        sequences_array = np.array(sequences)
        
        # Initialize models for this gauge
        self.models[gauge_id] = {}
        self.scalers[gauge_id] = {}
        self.thresholds[gauge_id] = {}
        
        # 1. Statistical Model (Isolation Forest)
        self.scalers[gauge_id]['statistical'] = StandardScaler()
        features_scaled = self.scalers[gauge_id]['statistical'].fit_transform(features_array)
        
        iso_forest = IsolationForest(contamination=0.1, random_state=42)
        iso_forest.fit(features_scaled)
        self.models[gauge_id]['isolation_forest'] = iso_forest
        
        # Calculate threshold
        scores = iso_forest.decision_function(features_scaled)
        self.thresholds[gauge_id]['isolation_forest'] = np.percentile(scores, 10)
        
        # 2. Autoencoder Model
        self.scalers[gauge_id]['autoencoder'] = StandardScaler()
        features_ae_scaled = self.scalers[gauge_id]['autoencoder'].fit_transform(features_array)
        
        autoencoder = self.create_autoencoder(features_ae_scaled.shape[1])
        autoencoder.fit(features_ae_scaled, features_ae_scaled, 
                       epochs=100, batch_size=32, verbose=0)
        self.models[gauge_id]['autoencoder'] = autoencoder
        
        # Calculate reconstruction error threshold
        reconstructed = autoencoder.predict(features_ae_scaled)
        reconstruction_errors = np.mean(np.square(features_ae_scaled - reconstructed), axis=1)
        self.thresholds[gauge_id]['autoencoder'] = np.percentile(reconstruction_errors, 95)
        
        # 3. LSTM Model for time series
        self.scalers[gauge_id]['lstm'] = StandardScaler()
        sequences_scaled = self.scalers[gauge_id]['lstm'].fit_transform(
            sequences_array.reshape(-1, 1)
        ).reshape(sequences_array.shape)
        
        lstm_model = self.create_lstm_model(window_size, 1)
        
        # Prepare LSTM training data
        X_train = sequences_scaled[:-1]
        y_train = sequences_scaled[1:, -1, :]
        
        lstm_model.fit(X_train, y_train, epochs=50, batch_size=32, verbose=0)
        self.models[gauge_id]['lstm'] = lstm_model
        
        # Calculate prediction error threshold
        predictions = lstm_model.predict(X_train)
        prediction_errors = np.mean(np.square(y_train - predictions), axis=1)
        self.thresholds[gauge_id]['lstm'] = np.percentile(prediction_errors, 95)
        
        print(f"Training completed for gauge {gauge_id}")
        self.is_trained = True
    
    def detect_anomalies(self, data, gauge_id):
        """Detect anomalies in real-time data"""
        if not self.is_trained or gauge_id not in self.models:
            raise ValueError(f"Model not trained for gauge {gauge_id}")
        
        # Prepare features from recent data
        if len(data) < 60:
            return {
                'is_anomaly': False,
                'confidence': 0.0,
                'method': 'insufficient_data',
                'details': 'Need at least 60 data points'
            }
        
        window_data = data[-60:]  # Last 60 readings
        features = self.prepare_features(window_data)
        
        anomaly_scores = {}
        
        # 1. Isolation Forest Detection
        try:
            features_scaled = self.scalers[gauge_id]['statistical'].transform([features])
            iso_score = self.models[gauge_id]['isolation_forest'].decision_function(features_scaled)[0]
            iso_anomaly = iso_score < self.thresholds[gauge_id]['isolation_forest']
            anomaly_scores['isolation_forest'] = {
                'score': iso_score,
                'threshold': self.thresholds[gauge_id]['isolation_forest'],
                'is_anomaly': iso_anomaly
            }
        except Exception as e:
            anomaly_scores['isolation_forest'] = {'error': str(e)}
        
        # 2. Autoencoder Detection
        try:
            features_ae_scaled = self.scalers[gauge_id]['autoencoder'].transform([features])
            reconstructed = self.models[gauge_id]['autoencoder'].predict(features_ae_scaled)
            reconstruction_error = np.mean(np.square(features_ae_scaled - reconstructed))
            ae_anomaly = reconstruction_error > self.thresholds[gauge_id]['autoencoder']
            anomaly_scores['autoencoder'] = {
                'score': reconstruction_error,
                'threshold': self.thresholds[gauge_id]['autoencoder'],
                'is_anomaly': ae_anomaly
            }
        except Exception as e:
            anomaly_scores['autoencoder'] = {'error': str(e)}
        
        # 3. LSTM Detection
        try:
            sequence = window_data.values.reshape(1, -1, 1)
            sequence_scaled = self.scalers[gauge_id]['lstm'].transform(
                sequence.reshape(-1, 1)
            ).reshape(sequence.shape)
            
            prediction = self.models[gauge_id]['lstm'].predict(sequence_scaled)
            current_value_scaled = sequence_scaled[0, -1, 0]
            prediction_error = np.square(current_value_scaled - prediction[0, 0])
            lstm_anomaly = prediction_error > self.thresholds[gauge_id]['lstm']
            anomaly_scores['lstm'] = {
                'score': prediction_error,
                'threshold': self.thresholds[gauge_id]['lstm'],
                'is_anomaly': lstm_anomaly
            }
        except Exception as e:
            anomaly_scores['lstm'] = {'error': str(e)}
        
        # Ensemble decision
        valid_scores = [score for score in anomaly_scores.values() 
                       if 'is_anomaly' in score]
        
        if not valid_scores:
            return {
                'is_anomaly': False,
                'confidence': 0.0,
                'method': 'ensemble',
                'details': 'No valid model predictions'
            }
        
        # Majority voting with confidence scoring
        anomaly_votes = sum(1 for score in valid_scores if score['is_anomaly'])
        total_votes = len(valid_scores)
        confidence = anomaly_votes / total_votes
        
        # Decision threshold (at least 2 out of 3 models agree)
        is_anomaly = confidence >= 0.67
        
        return {
            'is_anomaly': is_anomaly,
            'confidence': confidence,
            'method': 'ensemble',
            'individual_scores': anomaly_scores,
            'voting_result': f"{anomaly_votes}/{total_votes}",
            'timestamp': pd.Timestamp.now().isoformat()
        }
    
    def get_anomaly_explanation(self, result, current_value):
        """Generate human-readable explanation of anomaly"""
        if not result['is_anomaly']:
            return "No anomaly detected. All systems operating normally."
        
        explanations = []
        
        if 'individual_scores' in result:
            scores = result['individual_scores']
            
            if ('isolation_forest' in scores and 
                scores['isolation_forest'].get('is_anomaly', False)):
                explanations.append("Statistical pattern deviation detected")
            
            if ('autoencoder' in scores and 
                scores['autoencoder'].get('is_anomaly', False)):
                explanations.append("Feature reconstruction anomaly detected")
            
            if ('lstm' in scores and 
                scores['lstm'].get('is_anomaly', False)):
                explanations.append("Time series prediction anomaly detected")
        
        severity = "HIGH" if result['confidence'] > 0.8 else "MEDIUM"
        
        explanation = f"ANOMALY DETECTED (Severity: {severity})\n"
        explanation += f"Current Value: {current_value}\n"
        explanation += f"Confidence: {result['confidence']:.2%}\n"
        explanation += f"Detection Methods: {', '.join(explanations)}\n"
        explanation += f"Recommendation: Immediate investigation required"
        
        return explanation

# Usage example
def main():
    # Generate sample data (in real application, this would come from gauges)
    np.random.seed(42)
    
    # Normal operation data for training
    normal_data = pd.Series(
        np.random.normal(100, 5, 1000) + 
        10 * np.sin(np.linspace(0, 20*np.pi, 1000))  # Add seasonal pattern
    )
    
    # Initialize and train detector
    detector = AnomalyDetector()
    detector.train(normal_data, gauge_id="PRESSURE_01")
    
    # Test with anomalous data
    test_data = normal_data.copy()
    # Inject anomalies
    test_data.iloc[-10:] = 150  # Sudden spike
    
    # Detect anomalies
    result = detector.detect_anomalies(test_data, "PRESSURE_01")
    
    print("Anomaly Detection Result:")
    print(f"Is Anomaly: {result['is_anomaly']}")
    print(f"Confidence: {result['confidence']:.2%}")
    print(f"Method: {result['method']}")
    
    if result['is_anomaly']:
        explanation = detector.get_anomaly_explanation(result, test_data.iloc[-1])
        print("\nDetailed Explanation:")
        print(explanation)

if __name__ == "__main__":
    main()

เทคโนโลยีที่เกี่ยวข้องและการทำงานร่วมกัน

ระบบ Anomaly Detection ไม่ได้ทำงานแบบอิสระ แต่ต้องอาศัยการบูรณาการกับเทคโนโลยีอื่นๆ เพื่อให้เกิดประสิทธิภาพสูงสุดในการตรวจสอบและป้องกันความผิดปกติในระบบอุตสาหกรรม

การทำงานร่วมกันของเทคโนโลยี

🔍 Data Input Layer

Computer Vision + OCR: รับข้อมูลจากมิเตอร์และเซนเซอร์ต่างๆ แปลงภาพเป็นข้อมูลดิจิทัลสำหรับการวิเคราะห์ความผิดปกติ

🧠 Processing Layer

Machine Learning + Deep Learning: ประมวลผลข้อมูลด้วยอัลกอริธึม Isolation Forest, Autoencoder และ LSTM เพื่อตรวจจับรูปแบบผิดปกติ

⚡ Edge Computing

Edge AI: ประมวลผลแบบ Real-time บนอุปกรณ์ในพื้นที่ ให้การตอบสนองที่เร็วกว่า 10 เท่าและไม่ต้องพึ่งพาเครือข่าย

🔮 Prediction Layer

Predictive Analytics: คาดการณ์แนวโน้มความผิดปกติในอนาคต ช่วยให้สามารถป้องกันปัญหาก่อนเกิดขึ้นจริงได้ถึง 30-45 วันล่วงหน้า

📚 Learning Layer

Self-Learning System: ปรับปรุงความแม่นยำอัตโนมัติ เรียนรู้จากข้อมูลใหม่และ feedback จากผู้ใช้งานอย่างต่อเนื่อง

🔗 Integration Layer

System Integration: เชื่อมต่อกับ SCADA, MES, ERP ส่งข้อมูลและการแจ้งเตือนไปยังระบบจัดการองค์กร

เทคโนโลยีสนับสนุนขั้นสูง

Time Series Analysis

วิเคราะห์ข้อมูลตามเวลา ARIMA, LSTM

Signal Processing

กรองสัญญาณรบกวน FFT, Wavelet

Data Fusion

รวมข้อมูลหลายแหล่ง Kalman Filter

Statistical Control

ควบคุมคุณภาพ SPC, Control Charts