# Databricks notebook source
import json
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *

# COMMAND ----------
# Sample Input
#input_message = json.dumps({'data': [{'id': 'l1_100', 'name': 'HM 1', 'description': 'HM 1', 'meta': {'created_by': 'user_099', 'created_on': 1747054186650, 'last_updated_by': 'user_099', 'last_updated_on': 1750163411541}, 'project_id': 'project_787', 'type': 'enterprise', 'latitude': None, 'parameters': [], 'longitude': None, 'is_child': None, 'multi_select_dependent_length': None, 'schema': 'public', 'offline': {'timestamp': 1753769799940}, 'project_type': 'graph_model', 'tz': 'Asia/Kolkata', 'resolution': 'lg', 'language': 'en', 'user_id': 'user_099', 'action_type': 'save'},
#{'id': 'l1_100', 'name': 'HM 2', 'description': 'HM 2', 'meta': {'created_by': 'user_099', 'created_on': 1747054186650, 'last_updated_by': 'user_099', 'last_updated_on': 1750163411541}, 'project_id': 'project_787', 'type': 'enterprise', 'latitude': None, 'parameters': [], 'longitude': None, 'is_child': None, 'multi_select_dependent_length': None, 'schema': 'public', 'offline': {'timestamp': 1753769799940}, 'project_type': 'graph_model', 'tz': 'Asia/Kolkata', 'resolution': 'lg', 'language': 'en', 'user_id': 'user_099', 'action_type': 'save'}], 'project_id': 'project_787',
#'table_properties': {
#        'table_name': 'unified_model.public.enterprise',
#        'table_path': 'abfss://unity-catalog-storage@dbstoragenzxfhpgsipt5a.dfs.core.windows.net/416418955412087/unified_model/public.enterprise'
#    }
#}
#)

# COMMAND ----------

dbutils.widgets.text("input_message", "", "Input Message JSON")
input_message = dbutils.widgets.get("input_message")
# COMMAND ----------

def extract_table_info(input_message_str: str):
    """
    Extract table name and data from input message
    Args:
        input_message_str (str): JSON string containing the message
    Returns:
        dict: Extracted information
    """
    try:
        message_data = json.loads(input_message_str)
        
        # Extract table name from data.type
        table_name = message_data['table_properties']['table_name']  # 'enterprise'
        project_id = message_data['project_id']    # 'project_787'
        data_payload = message_data['data']        # Full data object
        table_properties = message_data['table_properties']        # Fetch table properties
        
        print(f"Extracted Info:")
        print(f"Table Name: {table_name}")
        print(f"Project ID: {project_id}")
        print(f"Table Prop Keys: {list(table_properties.keys())}")
        
        return {
            'table_name': table_name,
            'project_id': project_id,
            'data_payload': data_payload,
            'raw_message': message_data,
            'table_properties': table_properties
        }
        
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid JSON in input_message: {str(e)}")
    except KeyError as e:
        raise ValueError(f"Missing required field in input_message: {str(e)}")


# COMMAND ----------

def detect_external_table_schema(table_name):
    """
    Detect schema of external Delta or Parquet table
    
    Args:
        table_name (str): Name of the table (e.g., 'enterprise')
    Returns:
        pyspark.sql.types.StructType: Schema of the table
    """
    
    try:
        # Try to get schema from catalog
        table_df = spark.table(table_name)
        schema = table_df.schema
        print(f"✓ Schema found in metastore for table: {table_name}")
        return schema
    except Exception as e:
        print(f"X Failed to get schema from metastore for table: {table_name}")
        return None

# COMMAND ----------

table_info = extract_table_info(input_message)

# COMMAND ----------

schema = detect_external_table_schema(table_info['table_name'])
if schema is None:
    raise ValueError(f"Schema not found for table: {table_name}")

# COMMAND ----------

data_df = spark.createDataFrame(table_info['data_payload'], schema=schema)
#data_df.show()

# COMMAND ----------


data_df.write \
    .mode("append") \
    .format("parquet") \
    .save(table_info['table_properties']['table_path'])