-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathadvanced-python.lotnb
More file actions
127 lines (127 loc) · 36.9 KB
/
advanced-python.lotnb
File metadata and controls
127 lines (127 loc) · 36.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
[
{
"kind": 1,
"language": "markdown",
"value": "# Advanced Python - Complex Data Processing\n\n[⬅️ Previous: Simple Python](./simple-python.lotnb) | [🏠 Back to Index](../index.lotnb)\n\n---\n\n<details>\n<summary><strong>Navigation Menu</strong></summary>\n\n| Section | Tutorials |\n|---------|----------|\n| **Setup** | [Environment Setup](../00-setup/verify-environment.lotnb) |\n| **Basics** | [Timed Actions](../01-basics/timed-actions.lotnb) \\| [Topic Actions](../01-basics/topic-actions.lotnb) |\n| **Logic** | [Conditional Logic](../02-logic/conditional-logic.lotnb) |\n| **Models** | [Basic Models](../03-models/basic-models.lotnb) \\| [Action Models](../03-models/action-models.lotnb) \\| [Inheritance](../03-models/model-inheritance.lotnb) |\n| **Routes** | [MQTT Bridge](../04-routes/mqtt-bridge.lotnb) \\| [Database](../04-routes/database-routes.lotnb) |\n| **Python** | [Simple](./simple-python.lotnb) \\| [Advanced](./advanced-python.lotnb) |\n\n</details>\n\n---\n\n> **CAUTION - Free Tier Resource Limits**\n>\n> The free version of Coreflux has resource limits:\n> - **Routes**: 2 maximum\n> - **Actions**: 12 maximum\n> - **Models**: 40 maximum\n>\n> Delete unused resources before creating new ones during exercises.\n\n---\n\n## Learning Objectives\n\nIn this tutorial, you will learn:\n- Complex data processing with external libraries\n- Image processing and computer vision integration\n- Machine learning model integration\n- Advanced statistical analysis\n- File operations and data persistence\n- External API integration with error handling\n- Performance optimization techniques\n\n---\n\n## 📖 Introduction\n\n**Advanced Python Integration** unlocks the full power of the Python ecosystem within LoT:\n- Machine learning and AI capabilities\n- Image and signal processing\n- Advanced statistical analysis\n- External service integration\n- Complex data transformations\n\n### Real-World Applications\n- **Predictive Maintenance**: ML models predicting equipment failures\n- **Computer Vision**: Quality inspection using camera feeds\n- **Advanced Analytics**: Statistical process control and optimization\n- **External Integrations**: Complex API workflows and data synchronization\n\n---\n\n## 🧠 Core Concepts\n\n### External Libraries\n\nPython scripts can use external libraries:\n```python\nimport requests # HTTP requests\nimport numpy as np # Numerical computing\nimport pandas as pd # Data analysis\nimport cv2 # Computer vision\nfrom sklearn import * # Machine learning\n```\n\n### Error Handling Patterns\n\n```python\ntry:\n # Complex processing\n result = complex_operation()\n return {\"success\": True, \"data\": result}\nexcept Exception as e:\n return {\"success\": False, \"error\": str(e)}\n```\n\n### Performance Considerations\n\n- **Caching**: Store expensive computation results\n- **Lazy Loading**: Load models/data only when needed\n- **Batch Processing**: Process multiple items together\n- **Memory Management**: Clean up resources properly\n\n---\n\n## 🛠️ Hands-On Examples\n\n### Example 1: Statistical Process Control\n\nLet's create advanced statistical analysis for process control:"
},
{
"kind": 2,
"language": "python",
"value": "# Script Name: StatisticalAnalyzer\nimport json\nimport statistics\nimport math\nfrom datetime import datetime, timedelta\n\n# Global cache for historical data\n_data_cache = {}\n\ndef analyze_process_control(sensor_data_json, sensor_id, control_limits_json):\n \"\"\"\n Advanced statistical process control analysis\n \"\"\"\n try:\n # Parse input data\n current_reading = json.loads(sensor_data_json) if isinstance(sensor_data_json, str) else sensor_data_json\n limits = json.loads(control_limits_json) if isinstance(control_limits_json, str) else control_limits_json\n \n # Get or initialize historical data for this sensor\n if sensor_id not in _data_cache:\n _data_cache[sensor_id] = []\n \n # Add current reading to history\n reading_value = current_reading.get('value', 0)\n timestamp = current_reading.get('timestamp', datetime.now().isoformat())\n \n _data_cache[sensor_id].append({\n 'value': reading_value,\n 'timestamp': timestamp\n })\n \n # Keep only last 100 readings for performance\n if len(_data_cache[sensor_id]) > 100:\n _data_cache[sensor_id] = _data_cache[sensor_id][-100:]\n \n # Get recent values for analysis\n recent_values = [item['value'] for item in _data_cache[sensor_id][-20:]] # Last 20 readings\n \n if len(recent_values) < 5:\n return json.dumps({\"status\": \"insufficient_data\", \"message\": \"Need at least 5 readings\"})\n \n # Statistical calculations\n mean_value = statistics.mean(recent_values)\n std_dev = statistics.stdev(recent_values) if len(recent_values) > 1 else 0\n \n # Control limits\n ucl = limits.get('upper_control_limit', mean_value + 3 * std_dev)\n lcl = limits.get('lower_control_limit', mean_value - 3 * std_dev)\n usl = limits.get('upper_spec_limit', ucl)\n lsl = limits.get('lower_spec_limit', lcl)\n \n # Process capability analysis\n cp = (usl - lsl) / (6 * std_dev) if std_dev > 0 else float('inf')\n cpk = min((usl - mean_value) / (3 * std_dev), (mean_value - lsl) / (3 * std_dev)) if std_dev > 0 else float('inf')\n \n # Trend analysis (last 10 vs previous 10)\n if len(recent_values) >= 20:\n recent_10 = recent_values[-10:]\n previous_10 = recent_values[-20:-10]\n recent_avg = statistics.mean(recent_10)\n previous_avg = statistics.mean(previous_10)\n trend = \"increasing\" if recent_avg > previous_avg * 1.05 else \"decreasing\" if recent_avg < previous_avg * 0.95 else \"stable\"\n else:\n trend = \"unknown\"\n \n # Alarm conditions\n alarms = []\n if reading_value > ucl:\n alarms.append(\"Above upper control limit\")\n if reading_value < lcl:\n alarms.append(\"Below lower control limit\")\n if cp < 1.33:\n alarms.append(\"Process capability insufficient\")\n if cpk < 1.0:\n alarms.append(\"Process not centered\")\n \n result = {\n \"sensor_id\": sensor_id,\n \"current_reading\": reading_value,\n \"statistical_summary\": {\n \"mean\": round(mean_value, 3),\n \"std_dev\": round(std_dev, 3),\n \"sample_size\": len(recent_values)\n },\n \"control_limits\": {\n \"ucl\": round(ucl, 3),\n \"lcl\": round(lcl, 3),\n \"usl\": round(usl, 3),\n \"lsl\": round(lsl, 3)\n },\n \"process_capability\": {\n \"cp\": round(cp, 3),\n \"cpk\": round(cpk, 3),\n \"capability_rating\": \"excellent\" if cp > 2.0 else \"adequate\" if cp > 1.33 else \"poor\"\n },\n \"trend_analysis\": {\n \"trend\": trend,\n \"recent_average\": round(statistics.mean(recent_values[-10:]), 3) if len(recent_values) >= 10 else None\n },\n \"alarms\": alarms,\n \"in_control\": len(alarms) == 0,\n \"analysis_timestamp\": datetime.now().isoformat()\n }\n \n return json.dumps(result)\n \n except Exception as e:\n return json.dumps({\n \"status\": \"error\",\n \"error_message\": str(e),\n \"sensor_id\": sensor_id\n })"
},
{
"kind": 2,
"language": "lot",
"value": "DEFINE ACTION ProcessControlAnalyzer\nON TOPIC \"sensors/+/spc_analysis\" DO\n SET \"sensor_id\" WITH TOPIC POSITION 1\n \n // Get control limits configuration\n SET \"control_limits\" WITH {\n \"upper_control_limit\": GET TOPIC \"spc/\" + {sensor_id} + \"/ucl\",\n \"lower_control_limit\": GET TOPIC \"spc/\" + {sensor_id} + \"/lcl\",\n \"upper_spec_limit\": GET TOPIC \"spc/\" + {sensor_id} + \"/usl\",\n \"lower_spec_limit\": GET TOPIC \"spc/\" + {sensor_id} + \"/lsl\"\n }\n \n // Perform advanced statistical analysis\n CALL PYTHON \"StatisticalAnalyzer.analyze_process_control\"\n WITH (PAYLOAD, {sensor_id}, {control_limits})\n RETURN AS {analysis_result}\n \n // Route results based on analysis\n SET \"in_control\" WITH (GET JSON \"in_control\" IN {analysis_result} AS BOOL)\n \n IF {in_control} EQUALS TRUE THEN\n PUBLISH TOPIC \"spc/\" + {sensor_id} + \"/status\" WITH \"IN_CONTROL\"\n PUBLISH TOPIC \"spc/\" + {sensor_id} + \"/analysis\" WITH {analysis_result}\n ELSE\n PUBLISH TOPIC \"spc/\" + {sensor_id} + \"/status\" WITH \"OUT_OF_CONTROL\"\n PUBLISH TOPIC \"alarms/spc/\" + {sensor_id} WITH {analysis_result}\n PUBLISH TOPIC \"spc/\" + {sensor_id} + \"/analysis\" WITH {analysis_result}\n \n // Extract key metrics for dashboard\n PUBLISH TOPIC \"spc/\" + {sensor_id} + \"/cp\" WITH (GET JSON \"process_capability.cp\" IN {analysis_result} AS FLOAT)\n PUBLISH TOPIC \"spc/\" + {sensor_id} + \"/cpk\" WITH (GET JSON \"process_capability.cpk\" IN {analysis_result} AS FLOAT)\n PUBLISH TOPIC \"spc/\" + {sensor_id} + \"/trend\" WITH (GET JSON \"trend_analysis.trend\" IN {analysis_result} AS STRING)"
},
{
"kind": 1,
"language": "markdown",
"value": "**What this does:**\n- Implements advanced Statistical Process Control (SPC) analysis\n- Maintains historical data cache for trend analysis\n- Calculates process capability indices (Cp, Cpk)\n- Performs trend analysis and alarm detection\n- Routes data based on control status\n- Demonstrates industrial-grade statistical analysis\n\n**SPC Features:**\n- **Control Charts**: Upper/lower control limits monitoring\n- **Process Capability**: Cp and Cpk calculations\n- **Trend Analysis**: Detecting process shifts\n- **Alarm Generation**: Out-of-control condition detection\n- **Historical Analysis**: 20-point rolling analysis\n\n---\n\n### Example 2: Computer Vision Integration\n\nLet's integrate computer vision for quality inspection:"
},
{
"kind": 2,
"language": "python",
"value": "# Script Name: VisionProcessor\nimport json\nimport base64\nimport io\nfrom PIL import Image\nimport numpy as np\n\n# Note: This example shows the structure - actual CV libraries would be imported\n# import cv2 # OpenCV for computer vision\n# from ultralytics import YOLO # YOLO for object detection\n\ndef analyze_quality_image(base64_image, inspection_parameters_json):\n \"\"\"\n Analyze product quality from camera image\n \"\"\"\n try:\n # Parse parameters\n params = json.loads(inspection_parameters_json) if isinstance(inspection_parameters_json, str) else inspection_parameters_json\n \n # Decode base64 image\n image_data = base64.b64decode(base64_image)\n image = Image.open(io.BytesIO(image_data))\n \n # Convert to numpy array for processing\n img_array = np.array(image)\n \n # Simulate quality inspection (in real implementation, use CV algorithms)\n # This would include:\n # - Defect detection\n # - Dimension measurement\n # - Color analysis\n # - Surface quality assessment\n \n # Simulated results\n quality_score = 85.5 # Would be calculated from actual analysis\n defects_found = [] # Would contain actual defect locations\n measurements = { # Would contain actual measurements\n \"width\": 25.4,\n \"height\": 12.7,\n \"area\": 323.58\n }\n \n # Determine pass/fail based on parameters\n min_quality_score = params.get('min_quality_score', 80)\n pass_fail = \"PASS\" if quality_score >= min_quality_score else \"FAIL\"\n \n result = {\n \"inspection_id\": f\"INS_{datetime.now().strftime('%Y%m%d_%H%M%S')}\",\n \"quality_score\": quality_score,\n \"pass_fail\": pass_fail,\n \"defects_found\": defects_found,\n \"measurements\": measurements,\n \"image_dimensions\": {\n \"width\": image.width,\n \"height\": image.height\n },\n \"inspection_parameters\": params,\n \"processing_time_ms\": 150, # Simulated processing time\n \"analysis_timestamp\": datetime.now().isoformat()\n }\n \n return json.dumps(result)\n \n except Exception as e:\n return json.dumps({\n \"status\": \"error\",\n \"error_message\": str(e),\n \"analysis_timestamp\": datetime.now().isoformat()\n })\n\ndef detect_anomalies(sensor_readings_json, sensitivity=2.0):\n \"\"\"\n Detect anomalies in sensor data using statistical methods\n \"\"\"\n try:\n readings = json.loads(sensor_readings_json) if isinstance(sensor_readings_json, str) else sensor_readings_json\n \n if not isinstance(readings, list) or len(readings) < 10:\n return json.dumps({\"error\": \"Need at least 10 readings for anomaly detection\"})\n \n # Convert to numpy array for analysis\n values = np.array([r.get('value', 0) for r in readings])\n \n # Calculate statistical measures\n mean = np.mean(values)\n std = np.std(values)\n \n # Detect anomalies using z-score method\n z_scores = np.abs((values - mean) / std) if std > 0 else np.zeros_like(values)\n anomaly_threshold = sensitivity # Standard deviations\n \n anomalies = []\n for i, (reading, z_score) in enumerate(zip(readings, z_scores)):\n if z_score > anomaly_threshold:\n anomalies.append({\n \"index\": i,\n \"value\": reading.get('value', 0),\n \"timestamp\": reading.get('timestamp', ''),\n \"z_score\": round(float(z_score), 3),\n \"deviation_from_mean\": round(float(values[i] - mean), 3)\n })\n \n result = {\n \"total_readings\": len(readings),\n \"anomalies_detected\": len(anomalies),\n \"anomaly_rate_percent\": round(len(anomalies) / len(readings) * 100, 2),\n \"statistical_summary\": {\n \"mean\": round(float(mean), 3),\n \"std_deviation\": round(float(std), 3),\n \"min_value\": round(float(np.min(values)), 3),\n \"max_value\": round(float(np.max(values)), 3)\n },\n \"anomalies\": anomalies,\n \"sensitivity_used\": sensitivity,\n \"analysis_timestamp\": datetime.now().isoformat()\n }\n \n return json.dumps(result)\n \n except Exception as e:\n return json.dumps({\"error\": str(e)})"
},
{
"kind": 2,
"language": "lot",
"value": "DEFINE ACTION AdvancedAnalyticsProcessor\nON TOPIC \"analytics/+/+/request\" DO\n SET \"analysis_type\" WITH TOPIC POSITION 1\n SET \"sensor_id\" WITH TOPIC POSITION 2\n \n IF {analysis_type} EQUALS \"spc\" THEN\n // Statistical Process Control Analysis\n SET \"control_limits\" WITH {\n \"upper_control_limit\": GET TOPIC \"analytics/spc/\" + {sensor_id} + \"/ucl\",\n \"lower_control_limit\": GET TOPIC \"analytics/spc/\" + {sensor_id} + \"/lcl\",\n \"target_value\": GET TOPIC \"analytics/spc/\" + {sensor_id} + \"/target\"\n }\n \n CALL PYTHON \"StatisticalAnalyzer.analyze_process_control\"\n WITH (PAYLOAD, {sensor_id}, {control_limits})\n RETURN AS {spc_analysis}\n \n // Route based on control status\n SET \"anomaly_rate\" WITH (GET JSON \"anomaly_rate_percent\" IN {spc_analysis} AS FLOAT)\n \n IF {anomaly_rate} > 10 THEN\n PUBLISH TOPIC \"alarms/analytics/\" + {sensor_id} WITH {spc_analysis}\n PUBLISH TOPIC \"analytics/spc/\" + {sensor_id} + \"/status\" WITH \"OUT_OF_CONTROL\"\n ELSE\n PUBLISH TOPIC \"analytics/spc/\" + {sensor_id} + \"/status\" WITH \"IN_CONTROL\"\n \n PUBLISH TOPIC \"analytics/spc/\" + {sensor_id} + \"/results\" WITH {spc_analysis}\n \n ELSE IF {analysis_type} EQUALS \"anomaly\" THEN\n // Anomaly Detection Analysis\n SET \"sensitivity\" WITH GET TOPIC \"analytics/anomaly/\" + {sensor_id} + \"/sensitivity\"\n \n CALL PYTHON \"StatisticalAnalyzer.detect_anomalies\"\n WITH (PAYLOAD, {sensitivity})\n RETURN AS {anomaly_analysis}\n \n SET \"anomalies_found\" WITH (GET JSON \"anomalies_detected\" IN {anomaly_analysis} AS INT)\n \n IF {anomalies_found} > 0 THEN\n PUBLISH TOPIC \"alarms/anomaly/\" + {sensor_id} WITH {anomaly_analysis}\n \n PUBLISH TOPIC \"analytics/anomaly/\" + {sensor_id} + \"/results\" WITH {anomaly_analysis}"
},
{
"kind": 1,
"language": "markdown",
"value": "**What this does:**\n- Implements advanced Statistical Process Control (SPC) analysis\n- Performs anomaly detection using z-score methods\n- Maintains historical data cache for trend analysis\n- Calculates process capability indices (Cp, Cpk)\n- Generates alarms based on statistical thresholds\n- Demonstrates industrial-grade analytics integration\n\n**Advanced Features:**\n- **Data Caching**: Maintains sensor history in memory\n- **Statistical Analysis**: Mean, standard deviation, z-scores\n- **Process Capability**: Industry-standard Cp/Cpk calculations\n- **Trend Detection**: Identifies process shifts over time\n- **Configurable Sensitivity**: Adjustable anomaly detection thresholds\n\n---\n\n### Example 3: External API Integration with Complex Logic\n\nLet's create a sophisticated external API integration:"
},
{
"kind": 2,
"language": "python",
"value": "# Script Name: ExternalAPIManager\nimport json\nimport requests\nimport time\nfrom datetime import datetime, timedelta\n\n# Cache for API responses to avoid excessive calls\n_api_cache = {}\n_cache_timeout = 300 # 5 minutes\n\ndef get_weather_impact_analysis(location, production_data_json):\n \"\"\"\n Analyze weather impact on production performance\n \"\"\"\n try:\n # Parse production data\n production = json.loads(production_data_json) if isinstance(production_data_json, str) else production_data_json\n \n # Check cache first\n cache_key = f\"weather_{location}\"\n current_time = time.time()\n \n if cache_key in _api_cache and (current_time - _api_cache[cache_key]['timestamp']) < _cache_timeout:\n weather_data = _api_cache[cache_key]['data']\n else:\n # Make API call (simulated - replace with actual weather API)\n # weather_response = requests.get(f\"https://api.weather.com/v1/current?location={location}&key=API_KEY\")\n # weather_data = weather_response.json()\n \n # Simulated weather data\n weather_data = {\n \"temperature\": 22.5,\n \"humidity\": 65,\n \"pressure\": 1013.25,\n \"wind_speed\": 5.2,\n \"conditions\": \"partly_cloudy\"\n }\n \n # Cache the result\n _api_cache[cache_key] = {\n \"data\": weather_data,\n \"timestamp\": current_time\n }\n \n # Analyze weather impact on production\n temperature = weather_data.get('temperature', 20)\n humidity = weather_data.get('humidity', 50)\n \n # Calculate impact factors\n temp_impact = 1.0\n if temperature > 30:\n temp_impact = 0.95 # High temperature reduces efficiency\n elif temperature < 10:\n temp_impact = 0.90 # Low temperature reduces efficiency\n \n humidity_impact = 1.0\n if humidity > 80:\n humidity_impact = 0.92 # High humidity affects production\n \n overall_impact = temp_impact * humidity_impact\n \n # Adjust production targets based on weather\n original_target = production.get('target_production', 100)\n weather_adjusted_target = int(original_target * overall_impact)\n \n result = {\n \"location\": location,\n \"weather_conditions\": weather_data,\n \"impact_analysis\": {\n \"temperature_impact_factor\": temp_impact,\n \"humidity_impact_factor\": humidity_impact,\n \"overall_impact_factor\": round(overall_impact, 3)\n },\n \"production_adjustments\": {\n \"original_target\": original_target,\n \"weather_adjusted_target\": weather_adjusted_target,\n \"adjustment_percent\": round((overall_impact - 1) * 100, 1)\n },\n \"recommendations\": [],\n \"analysis_timestamp\": datetime.now().isoformat(),\n \"data_source\": \"cached\" if cache_key in _api_cache else \"api_call\"\n }\n \n # Add recommendations based on conditions\n if temperature > 30:\n result[\"recommendations\"].append(\"Consider additional cooling for equipment\")\n if humidity > 80:\n result[\"recommendations\"].append(\"Monitor for moisture-related issues\")\n if overall_impact < 0.95:\n result[\"recommendations\"].append(\"Adjust production schedule for weather conditions\")\n \n return json.dumps(result)\n \n except Exception as e:\n return json.dumps({\n \"status\": \"error\",\n \"error_message\": str(e),\n \"location\": location\n })"
},
{
"kind": 2,
"language": "lot",
"value": "DEFINE ACTION WeatherProductionAnalyzer\nON TOPIC \"production/+/weather_analysis\" DO\n SET \"location\" WITH TOPIC POSITION 1\n \n // Get current production data\n SET \"production_data\" WITH {\n \"target_production\": GET TOPIC \"production/\" + {location} + \"/target\",\n \"current_production\": GET TOPIC \"production/\" + {location} + \"/current\",\n \"efficiency\": GET TOPIC \"production/\" + {location} + \"/efficiency\"\n }\n \n // Analyze weather impact\n CALL PYTHON \"ExternalAPIManager.get_weather_impact_analysis\"\n WITH ({location}, {production_data})\n RETURN AS {weather_analysis}\n \n // Extract key insights\n SET \"impact_factor\" WITH (GET JSON \"impact_analysis.overall_impact_factor\" IN {weather_analysis} AS DOUBLE)\n SET \"adjusted_target\" WITH (GET JSON \"production_adjustments.weather_adjusted_target\" IN {weather_analysis} AS INT)\n \n // Update production targets if significant impact\n IF {impact_factor} < 0.95 THEN\n PUBLISH TOPIC \"production/\" + {location} + \"/adjusted_target\" WITH {adjusted_target}\n PUBLISH TOPIC \"production/\" + {location} + \"/weather_impact\" WITH \"SIGNIFICANT\"\n PUBLISH TOPIC \"alarms/production/\" + {location} WITH \"Weather conditions affecting production targets\"\n ELSE\n PUBLISH TOPIC \"production/\" + {location} + \"/weather_impact\" WITH \"MINIMAL\"\n \n // Publish complete analysis\n PUBLISH TOPIC \"analytics/weather/\" + {location} + \"/analysis\" WITH {weather_analysis}\n PUBLISH TOPIC \"analytics/weather/\" + {location} + \"/last_update\" WITH TIMESTAMP \"UTC\""
},
{
"kind": 1,
"language": "markdown",
"value": "**What this does:**\n- Integrates with external weather APIs\n- Implements intelligent caching to reduce API calls\n- Analyzes weather impact on production performance\n- Automatically adjusts production targets based on conditions\n- Provides actionable recommendations\n- Demonstrates complex external integration patterns\n\n**Advanced Features:**\n- **API Caching**: Reduces external API calls with intelligent caching\n- **Impact Analysis**: Calculates weather effects on production\n- **Automatic Adjustments**: Updates targets based on conditions\n- **Recommendation Engine**: Provides actionable insights\n- **Error Recovery**: Handles API failures gracefully\n\n---\n\n### Example 4: Machine Learning Integration\n\nLet's integrate a simple machine learning model:"
},
{
"kind": 2,
"language": "python",
"value": "# Script Name: MLPredictor\nimport json\nimport math\nfrom datetime import datetime, timedelta\n\n# Simple ML model cache (in production, use joblib or pickle)\n_model_cache = {}\n\ndef predict_equipment_failure(sensor_data_json, equipment_id):\n \"\"\"\n Predict equipment failure probability using sensor data\n \"\"\"\n try:\n # Parse sensor data\n sensor_data = json.loads(sensor_data_json) if isinstance(sensor_data_json, str) else sensor_data_json\n \n # Extract features for prediction\n temperature = sensor_data.get('temperature', 0)\n vibration = sensor_data.get('vibration', 0)\n pressure = sensor_data.get('pressure', 0)\n runtime_hours = sensor_data.get('runtime_hours', 0)\n \n # Simple rule-based model (in production, use trained ML models)\n # This would be replaced with actual ML model inference\n \n # Calculate risk factors\n temp_risk = 0\n if temperature > 80:\n temp_risk = min((temperature - 80) / 20, 1.0) # Max risk at 100°C\n \n vibration_risk = 0\n if vibration > 5:\n vibration_risk = min((vibration - 5) / 10, 1.0) # Max risk at 15 mm/s\n \n pressure_risk = 0\n if pressure > 100:\n pressure_risk = min((pressure - 100) / 50, 1.0) # Max risk at 150 PSI\n \n runtime_risk = min(runtime_hours / 8760, 1.0) # Risk increases with age (8760 hours = 1 year)\n \n # Combine risk factors (weighted average)\n failure_probability = (\n temp_risk * 0.3 +\n vibration_risk * 0.4 +\n pressure_risk * 0.2 +\n runtime_risk * 0.1\n )\n \n # Determine risk level\n if failure_probability > 0.8:\n risk_level = \"CRITICAL\"\n recommendation = \"Immediate maintenance required\"\n elif failure_probability > 0.6:\n risk_level = \"HIGH\"\n recommendation = \"Schedule maintenance within 24 hours\"\n elif failure_probability > 0.4:\n risk_level = \"MEDIUM\"\n recommendation = \"Schedule maintenance within 1 week\"\n elif failure_probability > 0.2:\n risk_level = \"LOW\"\n recommendation = \"Monitor closely, maintenance within 1 month\"\n else:\n risk_level = \"MINIMAL\"\n recommendation = \"Continue normal operation\"\n \n # Calculate estimated time to failure (simplified)\n if failure_probability > 0.1:\n estimated_days = int((1 - failure_probability) * 365)\n else:\n estimated_days = 365\n \n result = {\n \"equipment_id\": equipment_id,\n \"prediction_results\": {\n \"failure_probability\": round(failure_probability, 3),\n \"risk_level\": risk_level,\n \"estimated_days_to_failure\": estimated_days\n },\n \"risk_factors\": {\n \"temperature_risk\": round(temp_risk, 3),\n \"vibration_risk\": round(vibration_risk, 3),\n \"pressure_risk\": round(pressure_risk, 3),\n \"runtime_risk\": round(runtime_risk, 3)\n },\n \"sensor_inputs\": {\n \"temperature\": temperature,\n \"vibration\": vibration,\n \"pressure\": pressure,\n \"runtime_hours\": runtime_hours\n },\n \"recommendation\": recommendation,\n \"confidence_score\": 0.85, # Model confidence\n \"model_version\": \"v1.2\",\n \"prediction_timestamp\": datetime.now().isoformat()\n }\n \n return json.dumps(result)\n \n except Exception as e:\n return json.dumps({\n \"status\": \"error\",\n \"error_message\": str(e),\n \"equipment_id\": equipment_id\n })"
},
{
"kind": 2,
"language": "lot",
"value": "DEFINE ACTION PredictiveMaintenanceProcessor\nON TOPIC \"maintenance/+/predict\" DO\n SET \"equipment_id\" WITH TOPIC POSITION 1\n \n // Gather sensor data for prediction\n SET \"sensor_data\" WITH {\n \"temperature\": GET TOPIC \"sensors/\" + {equipment_id} + \"/temperature\",\n \"vibration\": GET TOPIC \"sensors/\" + {equipment_id} + \"/vibration\",\n \"pressure\": GET TOPIC \"sensors/\" + {equipment_id} + \"/pressure\",\n \"runtime_hours\": GET TOPIC \"equipment/\" + {equipment_id} + \"/runtime_hours\"\n }\n \n // Run ML prediction\n CALL PYTHON \"MLPredictor.predict_equipment_failure\"\n WITH ({sensor_data}, {equipment_id})\n RETURN AS {prediction}\n \n // Extract prediction results\n SET \"failure_probability\" WITH (GET JSON \"prediction_results.failure_probability\" IN {prediction} AS DOUBLE)\n SET \"risk_level\" WITH (GET JSON \"prediction_results.risk_level\" IN {prediction} AS STRING)\n SET \"days_to_failure\" WITH (GET JSON \"prediction_results.estimated_days_to_failure\" IN {prediction} AS INT)\n \n // Route based on risk level\n IF {risk_level} EQUALS \"CRITICAL\" THEN\n PUBLISH TOPIC \"alarms/maintenance/critical/\" + {equipment_id} WITH {prediction}\n PUBLISH TOPIC \"maintenance/\" + {equipment_id} + \"/action_required\" WITH \"IMMEDIATE\"\n ELSE IF {risk_level} EQUALS \"HIGH\" THEN\n PUBLISH TOPIC \"alarms/maintenance/high/\" + {equipment_id} WITH {prediction}\n PUBLISH TOPIC \"maintenance/\" + {equipment_id} + \"/action_required\" WITH \"URGENT\"\n ELSE IF {risk_level} EQUALS \"MEDIUM\" THEN\n PUBLISH TOPIC \"maintenance/\" + {equipment_id} + \"/action_required\" WITH \"SCHEDULED\"\n \n // Always publish prediction results\n PUBLISH TOPIC \"maintenance/\" + {equipment_id} + \"/prediction\" WITH {prediction}\n PUBLISH TOPIC \"maintenance/\" + {equipment_id} + \"/risk_level\" WITH {risk_level}\n PUBLISH TOPIC \"maintenance/\" + {equipment_id} + \"/failure_probability\" WITH {failure_probability}\n PUBLISH TOPIC \"maintenance/\" + {equipment_id} + \"/estimated_days\" WITH {days_to_failure}"
},
{
"kind": 1,
"language": "markdown",
"value": "**What this does:**\n- Implements predictive maintenance using machine learning concepts\n- Combines multiple sensor inputs for comprehensive analysis\n- Calculates failure probability and risk levels\n- Provides maintenance recommendations with time estimates\n- Routes alarms based on criticality levels\n- Demonstrates ML integration patterns for industrial IoT\n\n**ML Features:**\n- **Multi-Sensor Fusion**: Combines temperature, vibration, pressure data\n- **Risk Assessment**: Calculates weighted risk factors\n- **Predictive Analytics**: Estimates time to failure\n- **Actionable Insights**: Provides specific maintenance recommendations\n- **Confidence Scoring**: Includes model confidence levels\n\n**Test it:**\n```\nSet sensor data:\n sensors/PUMP001/temperature: 85\n sensors/PUMP001/vibration: 8.5\n sensors/PUMP001/pressure: 110\n equipment/PUMP001/runtime_hours: 5000\n\nPublish to: maintenance/PUMP001/predict\nResult: Comprehensive failure prediction with risk assessment\n```\n\n---\n\n## 🏋️ Exercises\n\n### Exercise 1: Advanced Calculator\n**Task**: Create Python functions for:\n- Statistical calculations (mean, median, standard deviation)\n- Trigonometric functions (sin, cos, tan)\n- Logarithmic and exponential functions\n- Create LoT action to process `calc/advanced/+` requests"
},
{
"kind": 2,
"language": "python",
"value": "# Exercise 1: Write your Python functions here\n"
},
{
"kind": 2,
"language": "lot",
"value": "// Exercise 1: Write your LoT action here\n"
},
{
"kind": 1,
"language": "markdown",
"value": "### Exercise 2: Data Quality Analyzer\n**Task**: Create a comprehensive data quality system:\n- Python function to analyze data completeness, accuracy, consistency\n- Calculate data quality scores and identify issues\n- Generate improvement recommendations\n- LoT action to process quality analysis requests"
},
{
"kind": 2,
"language": "python",
"value": "# Exercise 2: Write your Python functions here\n"
},
{
"kind": 2,
"language": "lot",
"value": "// Exercise 2: Write your LoT action here\n"
},
{
"kind": 1,
"language": "markdown",
"value": "### Exercise 3: Predictive Analytics Engine\n**Task**: Create a simple predictive system:\n- Python function to analyze trends in time series data\n- Predict future values using linear regression\n- Identify seasonal patterns and anomalies\n- Generate forecasting reports with confidence intervals"
},
{
"kind": 2,
"language": "python",
"value": "# Exercise 3: Write your Python functions here\n"
},
{
"kind": 2,
"language": "lot",
"value": "// Exercise 3: Write your LoT action here\n"
},
{
"kind": 1,
"language": "markdown",
"value": "### Exercise 4: External Service Integration\n**Task**: Create a multi-service integration system:\n- Integrate with weather API for environmental data\n- Integrate with logistics API for supply chain data\n- Combine data sources for comprehensive analysis\n- Include caching, error handling, and fallback strategies\n\n**Challenge**: Implement retry logic and circuit breaker patterns for API reliability."
},
{
"kind": 2,
"language": "python",
"value": "# Exercise 4: Write your Python functions here\n"
},
{
"kind": 2,
"language": "lot",
"value": "// Exercise 4: Write your LoT action here\n"
},
{
"kind": 1,
"language": "markdown",
"value": "\n\n---\n\n## 🎯 Checkpoint Questions\n\n1. **How do you handle external library dependencies in LoT Python scripts?**\n - Answer: Import libraries at the top of the script and handle ImportError exceptions\n\n2. **What's the best way to cache expensive computations in Python scripts?**\n - Answer: Use module-level variables (like _cache) with timestamp-based expiration\n\n3. **How do you return complex data structures from Python to LoT?**\n - Answer: Return JSON-serializable dictionaries and use GET JSON in LoT to extract values\n\n4. **When should you use advanced Python vs simpler LoT logic?**\n - Answer: Use Python for complex algorithms, external APIs, ML models, or when you need full programming capabilities\n\n---\n\n## 📝 Summary\n\n### Key Concepts Learned\n✅ **Advanced Python Integration** - Complex algorithms and external libraries \n✅ **Statistical Analysis** - SPC, anomaly detection, process capability \n✅ **External API Integration** - Weather APIs, caching, error handling \n✅ **Machine Learning** - Predictive maintenance and risk assessment \n✅ **Performance Optimization** - Caching, batch processing, memory management \n✅ **Error Handling** - Robust error management and graceful degradation \n\n### Advanced Integration Patterns\n- **Analytics Engines**: Statistical process control and quality analysis\n- **Predictive Systems**: ML-based forecasting and maintenance prediction\n- **External Services**: API integration with caching and reliability\n- **Computer Vision**: Image processing for quality inspection\n- **Data Science**: Advanced data analysis and visualization\n\n### Best Practices\n- **Error Handling**: Always include comprehensive exception handling\n- **Performance**: Use caching for expensive operations\n- **Resource Management**: Clean up resources and manage memory\n- **Modularity**: Create reusable functions with clear interfaces\n- **Documentation**: Include docstrings and usage examples\n\n### Next Steps\n- Practice integrating external libraries\n- Experiment with machine learning models\n- Try computer vision applications\n- Revisit earlier tutorials to reinforce concepts\n\n---\n\n## 🚀 Further Exploration\n\n### Advanced Applications\n- Deep learning model integration (TensorFlow, PyTorch)\n- Real-time image processing with OpenCV\n- Time series forecasting with advanced algorithms\n- Natural language processing for maintenance logs\n\n### Performance Optimization\n- Asynchronous processing with asyncio\n- Parallel processing with multiprocessing\n- GPU acceleration for ML workloads\n- Memory-efficient data processing\n\n### Enterprise Integration\n- Microservices architecture with Python services\n- Container deployment strategies\n- Monitoring and observability\n- Security and authentication patterns\n\n---\n\n**Congratulations! You've completed the LoT training course!**\n\n[⬅️ Previous: Simple Python](./simple-python.lotnb) | [🏠 Back to Index](../index.lotnb)"
}
]