Commit 183e9eb0 authored by harshavardhan.c's avatar harshavardhan.c

feat: Converted app to fastapi app.

parent 7845d0b6
__version__ = "v1.0.0"
...@@ -20,7 +20,7 @@ broker = KafkaBroker( ...@@ -20,7 +20,7 @@ broker = KafkaBroker(
async def consume_stream_for_processing_dependencies(message: dict): async def consume_stream_for_processing_dependencies(message: dict):
try: try:
await ModelCreatorAgent.model_creator_agent( await ModelCreatorAgent.model_creator_agent(
message=ModelCreatorSchema(meta=message) message=ModelCreatorSchema(**message)
) )
return True return True
except Exception as e: except Exception as e:
......
# app.py import gc
import asyncio
import logging as logger
import sys
from dotenv import load_dotenv gc.collect()
load_dotenv() import argparse
from faststream import FastStream
from ut_dev_utils import configure_logger
from agent_subscribers import broker ap = argparse.ArgumentParser()
configure_logger() if __name__ == "__main__":
from dotenv import load_dotenv
# Create FastStream app load_dotenv()
app = FastStream(broker)
from ut_dev_utils import configure_logger
async def run_app(): configure_logger()
try:
logger.info("Starting FastStream application...")
await app.run()
except KeyboardInterrupt:
logger.info("Application interrupted by user")
except Exception as e:
logger.error(f"Application error: {e}")
raise
finally:
logger.info("Application shutdown complete")
import asyncio
import logging as logger
import sys
# Main execution from scripts.config import Services
if __name__ == "__main__":
try: ap.add_argument(
# For better performance on Linux/Mac, use uvloop if available "--port",
if sys.platform != "win32": "-p",
try: required=False,
import uvloop default=Services.PORT,
help="Port to start the application.",
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) )
logger.info("Using uvloop for better performance") ap.add_argument(
except ImportError: "--bind",
logger.info("uvloop not available, using default event loop") "-b",
# Run the application required=False,
asyncio.run(run_app()) default=Services.HOST,
except KeyboardInterrupt: help="IP to start the application.",
print("\nApplication stopped by user") )
except Exception as e: arguments = vars(ap.parse_args())
logger.error(f"Failed to start application: {e}") logger.info(f"App Starting at {arguments['bind']}:{arguments['port']}")
sys.exit(1)
if sys.platform == "win32":
import uvicorn
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
uvicorn.run(
"main:app",
host=arguments["bind"],
port=int(arguments["port"]),
root_path="",
)
else:
from granian import Granian
from granian.constants import Interfaces
Granian(
"main:app",
address=arguments["bind"],
port=int(arguments["port"]),
interface=Interfaces.ASGI,
log_access=True,
log_enabled=True,
respawn_failed_workers=True,
threads=10,
threading_mode="runtime",
).serve()
import sys
from ut_dev_utils import FastAPIConfig, generate_fastapi_app
from ut_dev_utils.errors.exception_handlers import ExceptionHandlers
from __version__ import __version__
from scripts.config import PROJECT_NAME
from scripts.core.services import router
description = """
Databricks Platform Automation microservice for FTDMPC.
"""
tags_metadata = []
app_config = FastAPIConfig(
title="Databricks Platform Automation APP",
description=description,
version=__version__,
root_path="" if sys.platform == "win32" else "/dbx_mgmt",
tags_metadata=tags_metadata,
exception_handlers={
Exception: ExceptionHandlers.generic_exception_handler,
},
)
app = generate_fastapi_app(
app_config,
routers=[router],
project_name=PROJECT_NAME,
enable_default_openapi=True,
)
...@@ -61,27 +61,18 @@ class _KafkaConfig(BaseSettings): ...@@ -61,27 +61,18 @@ class _KafkaConfig(BaseSettings):
class _DatabricksConfig(BaseSettings): class _DatabricksConfig(BaseSettings):
DATABRICKS_HOST: str DATABRICKS_DEFAULT_PORT: int = Field(default=443)
DATABRICKS_PORT: int = Field(default=443)
DATABRICKS_URI: str
DATABRICKS_HTTP_PATH: str
DATABRICKS_ACCESS_TOKEN: str
DATABRICKS_CATALOG_NAME: str = Field(default="unified_model") DATABRICKS_CATALOG_NAME: str = Field(default="unified_model")
DATABRICKS_PUBLIC_SCHEMA_NAME: str = Field(default="public") DATABRICKS_PUBLIC_SCHEMA_NAME: str = Field(default="public")
DATABRICKS_ANALYTICAL_SCHEMA_NAME: str = Field(default="analytical") DATABRICKS_ANALYTICAL_SCHEMA_NAME: str = Field(default="analytical")
DATABRICKS_STORAGE_FORMAT: str = Field(default="PARQUET") DATABRICKS_STORAGE_FORMAT: str = Field(default="PARQUET")
DATABRICKS_STORAGE_PATH: str = Field( DATABRICKS_CLUSTER_NAME: str = Field(default="UT-Steaming-Cluster")
default="abfss://unity-catalog-storage@dbstoragenzxfhpgsipt5a.dfs.core.windows.net/416418955412087" DATABRICKS_CLUSTER_DISK_SIZE: int = Field(default=150)
) DATABRICKS_CLUSTER_MIN_WORKERS: int = Field(default=1)
DATABRICKS_CLUSTER_SPARK_VERSION: str = Field(default="15.4.x-scala2.12")
@model_validator(mode="before") DATABRICKS_CLUSTER_RUNTIME_VERSION: str = Field(default="9.1")
def prepare_databricks_uri(cls, values): DATABRICKS_CLUSTER_NODE_TYPE_ID: str = Field(default="Standard_DS3_v2")
values["DATABRICKS_URI"] = ( DATABRICKS_CLUSTER_DRIVER_NODE_TYPE_ID: str = Field(default="Standard_DS3_v2")
f"databricks://token:{values['DATABRICKS_ACCESS_TOKEN']}@{values['DATABRICKS_HOST']}:{values['DATABRICKS_PORT']}"
f"?http_path={values['DATABRICKS_HTTP_PATH']}"
)
return values
Services = _Services() Services = _Services()
...@@ -98,4 +89,5 @@ __all__ = [ ...@@ -98,4 +89,5 @@ __all__ = [
"PathToStorage", "PathToStorage",
"KafkaConfig", "KafkaConfig",
"DatabricksConfig", "DatabricksConfig",
"PROJECT_NAME",
] ]
class DatabricksConstants: class DatabricksConstants:
METADATA_INGESTION_JOB_NAME = "metadata_ingestion_job" METADATA_INGESTION_JOB_NAME = "metadata_ingestion_job"
METADATA_DELETION_JOB_NAME = "metadata_deletion_job" METADATA_DELETION_JOB_NAME = "metadata_deletion_job"
TIMESERIES_INGESTION_JOB_NAME = "timeseries_ingestion_job"
METADATA_INGESTION_NOTEBOOK_NAME = "metadata_ingestion_notebook" METADATA_INGESTION_NOTEBOOK_NAME = "metadata_ingestion_notebook"
METADATA_DELETION_NOTEBOOK_NAME = "metadata_deletion_notebook" METADATA_DELETION_NOTEBOOK_NAME = "metadata_deletion_notebook"
TIMESERIES_INGESTION_NOTEBOOK_NAME = "timeseries_ingestion_notebook" TIMESERIES_INGESTION_NOTEBOOK_NAME = "timeseries_ingestion_notebook"
VOLUME_NAME = "unity_catalog_storage"
class NotebookConstants: class NotebookConstants:
......
...@@ -8,27 +8,56 @@ spark = SparkSession.builder.appName("StreamingTimeseriesPipeline").getOrCreate( ...@@ -8,27 +8,56 @@ spark = SparkSession.builder.appName("StreamingTimeseriesPipeline").getOrCreate(
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
# COMMAND ---------- # COMMAND ----------
# Input Parameters
event_hub_connection_string = {{event_hub_connection_string}} print("🚀 Applying Spark optimizations for high-volume streaming...")
timeseries_table_path = {{timeseries_table_path}}
project_levels = {{project_levels}} # Adaptive Query Execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
# Streaming Backpressure
spark.conf.set("spark.sql.streaming.backpressure.enabled", "true")
spark.conf.set("spark.sql.streaming.backpressure.pid.minRate", "5000")
# Delta Lake Optimizations
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.merge.repartitionBeforeWrite.enabled", "true")
# Streaming State Management
spark.conf.set("spark.sql.streaming.stateStore.maintenanceInterval", "300s")
spark.conf.set("spark.sql.streaming.ui.retainedBatches", "200")
print("✅ Spark optimizations applied")
# COMMAND ---------- # COMMAND ----------
event_hub_conf = {
'eventhubs.connectionString': spark._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(event_hub_connection_string),
'eventhubs.consumerGroup': '$Default'
}
# Parameters - will be set when job runs
dbutils.widgets.text("eventhub_connection_string", "", "Event Hub Connection String")
dbutils.widgets.text("output_table", "catalog.schema.sensor_data", "Output Table")
dbutils.widgets.text("consumer_group", "$Default", "Consumer Group")
dbutils.widgets.text("checkpoint_location", "", "Checkpoint Location")
dbutils.widgets.text("batch_interval", "10 seconds", "Batch Processing Interval")
dbutils.widgets.text("project_levels", "4", "Project Template Levels")
# COMMAND ----------
# Get parameters
eventhub_conn_str = dbutils.widgets.get("eventhub_connection_string")
output_table = dbutils.widgets.get("output_table")
consumer_group = dbutils.widgets.get("consumer_group")
checkpoint_location = dbutils.widgets.get("checkpoint_location")
batch_interval = dbutils.widgets.get("batch_interval")
project_levels = int(dbutils.widgets.get("project_levels"))
# COMMAND ---------- # COMMAND ----------
message_schema = StructType([ message_schema = StructType([
StructField("data", StructType([ StructField("data", MapType(StringType(), StructType([
StructField("tag", StringType(), False),
StructField("dq", IntegerType(), True), StructField("dq", IntegerType(), True),
StructField("ta", StringType(), True), StructField("ta", StringType(), True),
StructField("val", DoubleType(), False) StructField("val", StringType(), True)
]), True), ])), True),
StructField("a_id", StringType(), True), StructField("a_id", StringType(), True),
StructField("d_id", StringType(), True), StructField("d_id", StringType(), True),
StructField("gw_id", StringType(), True), StructField("gw_id", StringType(), True),
...@@ -41,66 +70,107 @@ message_schema = StructType([ ...@@ -41,66 +70,107 @@ message_schema = StructType([
StructField("ver", DoubleType(), True) StructField("ver", DoubleType(), True)
]) ])
# COMMAND ---------- # COMMAND ----------
def safe_get_item(array_col, index): def safe_get_item(array_col, index):
return when(size(array_col) > index, array_col.getItem(index)).otherwise(lit(None)) return when(size(array_col) > index, array_col.getItem(index)).otherwise(lit(None))
def transform_timeseries_data_fully_dynamic(df, project_levels=4):
"""
Fully dynamic version where you can specify max number of project_levels
"""
from pyspark.sql.functions import col, lit, from_unixtime, to_date, hour, split, size, when, isnan, isnull, filter as spark_filter
from pyspark.sql.types import FloatType
print(f"Transforming to target schema with up to {project_levels} project levels...")
# First, let's create a column to split the tag and get the size
df_with_split = df.withColumn("tag_parts", split(col("tag"), "\\$"))
df_with_split = df_with_split.withColumn("tag_parts_count", size(col("tag_parts")))
#Remove last index
df_with_split = df_with_split.withColumn(
"hierarchy_levels", slice(col("tag_parts"), 1, size(col("tag_parts")) - 1))
def transform_timeseries_data_fully_dynamic(df, max_tag_parts=4): # Remove parts containing "ast" from hierarchy for l1,l2,l3 columns
print(f"Transforming to target schema with up to {max_tag_parts} tag parts...") df_with_split = df_with_split.withColumn(
"levels_without_ast", spark_filter(col("hierarchy_levels"), lambda x: ~x.contains("ast")))
df_with_split = df.withColumn("tag_parts", split(col("data.tag"), "\\$")) # Find the part containing "ast" for ast column
df_with_split = df_with_split.withColumn("tag_parts_count", size(col("tag_parts"))) df_with_split = df_with_split.withColumn(
df_with_split = df_with_split.withColumn("hierarchy_levels", slice(col("tag_parts"), 1, size(col("tag_parts")) - 1)) "ast",
df_with_split = df_with_split.withColumn("levels_without_ast", expr("filter(hierarchy_levels, x -> NOT x LIKE '%ast%')")) expr("filter(hierarchy_levels, x -> x like '%ast%')[0]")
df_with_split = df_with_split.withColumn("ast", expr("filter(hierarchy_levels, x -> x LIKE '%ast%')[0]")) )
# Determine value_type based on data.val content
value_type_logic = when( value_type_logic = when(
col("data.val").cast("float").isNotNull() & ~isnan(col("data.val").cast("float")), col("value.val").cast(FloatType()).isNotNull() &
~isnan(col("value.val").cast(FloatType())),
lit("float") lit("float")
).otherwise(lit("string")) ).otherwise(lit("string"))
# Build the select columns list dynamically
select_columns = []
select_columns = [ # Fixed columns first
fixed_columns = [
col("timestamp").alias("timestamp"), col("timestamp").alias("timestamp"),
from_unixtime(col("timestamp") / 1000).cast("timestamp").alias("dt_timestamp"), from_unixtime(col("timestamp") / 1000).cast("timestamp").alias("dt_timestamp"),
to_date(from_unixtime(col("timestamp") / 1000)).alias("dt_date"), to_date(from_unixtime(col("timestamp") / 1000)).alias("dt_date"),
hour(from_unixtime(col("timestamp") / 1000)).alias("dt_hour"), hour(from_unixtime(col("timestamp") / 1000)).alias("dt_hour"),
col("data.val").cast("string").alias("value"), col("value.val").cast("string").alias("value"),
value_type_logic.alias("value_type"), value_type_logic.alias("value_type"),
col("data.tag").alias("c3"), col("tag").alias("c3"),
safe_get_item(col("tag_parts"), 0).alias("c1"), safe_get_item(col("tag_parts"), 0).alias("c1"),
when(col("tag_parts_count") > 0, col("tag_parts").getItem(col("tag_parts_count") - 1)).otherwise(lit(None)).alias("c5"), when(col("tag_parts_count") > 0,
col("data.dq").cast("string").alias("Q"), col("tag_parts").getItem(col("tag_parts_count") - 1)
col("data.ta").alias("T"), ).otherwise(lit(None)).alias("c5"),
col("value.dq").cast("string").alias("Q"),
col("value.ta").alias("T"),
col("d_id").alias("D"), col("d_id").alias("D"),
col("p_id").alias("P"), col("p_id").alias("P"),
col("a_id").alias("A"), col("a_id").alias("A"),
lit(None).cast("string").alias("B") lit(None).cast("string").alias("B")
] ]
select_columns += [ # Add fixed columns
safe_get_item(col("levels_without_ast"), i).alias(f"l{i+1}") select_columns.extend(fixed_columns)
for i in range(max_tag_parts)
] + [col("ast").alias("ast")] # Dynamically create l1, l2, l3, ... ln columns
tag_part_columns = [
safe_get_item(col("levels_without_ast"), i).alias(f"l{i+1}")
for i in range(project_levels)
] + [col("ast").alias("ast")]
select_columns.extend(tag_part_columns)
# Apply the transformation
transformed_df = df_with_split.select(*select_columns)
return transformed_df
return df_with_split.select(*select_columns) # COMMAND ----------
# Event Hub configuration
eventhub_config = {
"eventhubs.connectionString": spark._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eventhub_conn_str),
"eventhubs.consumerGroup": consumer_group,
"eventhubs.maxEventsPerTrigger": "10000" #Processing records based on the batch Size
}
# COMMAND ---------- # COMMAND ----------
raw_stream_df = spark.readStream \ print("📡 Connecting to Event Hub...")
.format("eventhubs") \ try:
.options(**event_hub_conf) \ raw_stream_df = spark.readStream.format("eventhubs").options(**eventhub_config).load()
.load() print("Successfully connected to Event Hub stream")
except Exception as e:
print(f"Failed to connect to Event Hub: {e}")
dbutils.notebook.exit(f"FAILED: Event Hub connection error - {e}")
# COMMAND ----------
#Binary -> String #Binary -> String
json_df = raw_stream_df.withColumn("json_string", col("body").cast("string")) json_df = raw_stream_df.withColumn("json_string", col("body").cast("string"))
#JSON -> Struct # #JSON -> Struct
parsed_stream_df = json_df.select( parsed_stream_df = json_df.select(
from_json(col("json_string"), message_schema).alias("parsed_data") from_json(col("json_string"), message_schema).alias("parsed_data")
).select("parsed_data.*") ).select("parsed_data.*")
#Explode #Explode
...@@ -117,32 +187,50 @@ df_exploded = parsed_stream_df.select( ...@@ -117,32 +187,50 @@ df_exploded = parsed_stream_df.select(
col("timestamp"), col("timestamp"),
col("ver") col("ver")
) )
#display(df_exploded)
# COMMAND ---------- # COMMAND ----------
transformed_df = transform_timeseries_data_fully_dynamic(df_exploded, max_tag_parts=projects_levels) transformed_df = transform_timeseries_data_fully_dynamic(df_exploded, project_levels=project_levels)
# COMMAND ---------- # COMMAND ----------
# Option A: Write to Delta # CRITICAL: Start the CONTINUOUS streaming query
transformed_df.writeStream \ print("STARTING CONTINUOUS STREAMING QUERY...")
.format("delta") \ print("This will run INDEFINITELY until manually stopped!")
.outputMode("append") \
.partitionBy("dt_date", "dt_hour", "c3") \
.option("checkpointLocation", "/mnt/checkpoints/timeseries_data") \
.start(timeseries_data_path)
# COMMAND ----------
# # Option B: Write to Parquet (same as your batch) try:
# transformed_df.writeStream \ streaming_query = transformed_df.writeStream \
# .format("parquet") \ .format("parquet") \
# .outputMode("append") \ .outputMode("append") \
# .partitionBy("dt_date", "dt_hour", "c3") \ .option("checkpointLocation", checkpoint_location) \
# .option("checkpointLocation", "/mnt/checkpoints/timeseries_data") \ .option("mergeSchema", "true") \
# .start(timeseries_table_path) .trigger(processingTime=batch_interval) \
.table(output_table)
print("STREAMING QUERY STARTED SUCCESSFULLY!")
print(f"Processing Event Hub → {output_table}")
print(f"Batch interval: {batch_interval}")
print(f"Checkpoint: {checkpoint_location}")
except Exception as e:
print(f"Failed to start streaming: {e}")
dbutils.notebook.exit(f"FAILED: Streaming start error - {e}")
# COMMAND ---------- # COMMAND ----------
# Monitor the streaming query continuously
print("📊 Streaming pipeline is now running continuously...")
print("🔄 Processing Event Hub messages in real-time...")
print("⏹️ To stop: Cancel this notebook or stop the job")
try:
# This will run indefinitely until the notebook is cancelled
streaming_query.awaitTermination()
except Exception as e:
print(f"❌ Streaming pipeline error: {e}")
if streaming_query.isActive:
streaming_query.stop()
raise e
...@@ -2,13 +2,16 @@ import logging ...@@ -2,13 +2,16 @@ import logging
from sqlalchemy import MetaData from sqlalchemy import MetaData
from sqlalchemy.orm import declarative_base from sqlalchemy.orm import declarative_base
from ut_security_util import MetaInfoSchema
from ut_sql_utils.asyncio.declarative_utils import DeclarativeUtils from ut_sql_utils.asyncio.declarative_utils import DeclarativeUtils
from scripts.config import DatabricksConfig from scripts.config import DatabricksConfig
from scripts.constants import DatabricksConstants, NotebookConstants from scripts.constants import DatabricksConstants, NotebookConstants
from scripts.db.databricks import DataBricksSQLLayer from scripts.db.databricks.cluster_manager import DatabricksClusterManager
from scripts.db.databricks.external_table_manager import DataBricksSQLLayer
from scripts.db.databricks.job_manager import DatabricksJobManager from scripts.db.databricks.job_manager import DatabricksJobManager
from scripts.db.databricks.notebook_manager import NotebookManager from scripts.db.databricks.library_manager import DatabricksLibraryManager
from scripts.db.databricks.notebook_manager import DatabricksNotebookManager
from scripts.db.redis.databricks_details import databricks_details_db from scripts.db.redis.databricks_details import databricks_details_db
from scripts.db.redis.project_details import fetch_level_details, project_template_keys from scripts.db.redis.project_details import fetch_level_details, project_template_keys
from scripts.schemas import ModelCreatorSchema from scripts.schemas import ModelCreatorSchema
...@@ -17,27 +20,37 @@ from scripts.utils.model_convertor_utils import ModelConverter ...@@ -17,27 +20,37 @@ from scripts.utils.model_convertor_utils import ModelConverter
class ModelCreatorHandler: class ModelCreatorHandler:
def __init__( def __init__(
self, message: ModelCreatorSchema, declarative_utils: DeclarativeUtils self,
message: ModelCreatorSchema,
declarative_utils: DeclarativeUtils,
meta: MetaInfoSchema,
): ):
self.declarative_utils = declarative_utils self.declarative_utils = declarative_utils
self.meta = message.meta self.meta = meta
self.message = message
self.model_convertor = ModelConverter() self.model_convertor = ModelConverter()
self.job_manager = DatabricksJobManager(
databricks_host=message.databricks_host,
access_token=message.databricks_access_token,
)
self.notebook_manager = NotebookManager(
databricks_host=message.databricks_host,
access_token=message.databricks_access_token,
)
self.databricks_sql_obj = DataBricksSQLLayer( self.databricks_sql_obj = DataBricksSQLLayer(
catalog_name=DatabricksConfig.DATABRICKS_CATALOG_NAME, catalog_name=DatabricksConfig.DATABRICKS_CATALOG_NAME,
project_id=self.meta.project_id, project_id=meta.project_id,
schema=message.schema, schema=message.schema,
) )
self.message = message
self.external_location = self.message.databricks_storage_path self.external_location = self.message.databricks_storage_path
self.job_manager = DatabricksJobManager(
databricks_host=self.message.databricks_host,
access_token=self.message.databricks_access_token,
)
self.notebook_manager = DatabricksNotebookManager(
databricks_host=self.message.databricks_host,
access_token=self.message.databricks_access_token,
)
self.cluster_manager = DatabricksClusterManager(
databricks_host=self.message.databricks_host,
access_token=self.message.databricks_access_token,
)
self.library_manager = DatabricksLibraryManager(
databricks_host=self.message.databricks_host,
access_token=self.message.databricks_access_token,
)
@staticmethod @staticmethod
def create_schema_base(schema_name: str): def create_schema_base(schema_name: str):
...@@ -45,10 +58,20 @@ class ModelCreatorHandler: ...@@ -45,10 +58,20 @@ class ModelCreatorHandler:
metadata = MetaData(schema=schema_name) metadata = MetaData(schema=schema_name)
return declarative_base(metadata=metadata) return declarative_base(metadata=metadata)
async def create_models_in_unity_catalog(self): async def create_models_in_unity_catalog(self, analytical: bool = False):
cluster_id = self.setup_cluster()
if not cluster_id:
logging.error("Failed to create cluster")
self.message.databricks_http_path = (
self.cluster_manager.get_http_path_details_by_cluster_id(
cluster_id=cluster_id, workspace_url=self.message.databricks_host
)
)
overall_tables = self.get_overall_tables() overall_tables = self.get_overall_tables()
project_levels = project_template_keys(self.meta.project_id, levels=True) project_levels = project_template_keys(self.meta.project_id, levels=True)
# self.setup_notepads_and_jobs(project_levels=project_levels, cluster_id=cluster_id)
# return True
base = self.create_schema_base( base = self.create_schema_base(
schema_name=f"{self.databricks_sql_obj.catalog_name}.{self.message.schema}" schema_name=f"{self.databricks_sql_obj.catalog_name}.{self.message.schema}"
) )
...@@ -74,11 +97,11 @@ class ModelCreatorHandler: ...@@ -74,11 +97,11 @@ class ModelCreatorHandler:
table_properties=table_properties, table_properties=table_properties,
) )
ts_external_table = self.databricks_sql_obj.create_timeseries_table( self.databricks_sql_obj.create_timeseries_table(
columns=project_levels, external_location=self.external_location columns=project_levels, external_location=self.external_location
) )
self.setup_notepads_and_jobs( self.setup_notepads_and_jobs(
timeseries_table_path=ts_external_table, project_levels=project_levels project_levels=project_levels, cluster_id=cluster_id
) )
return True return True
except Exception as e: except Exception as e:
...@@ -104,10 +127,13 @@ class ModelCreatorHandler: ...@@ -104,10 +127,13 @@ class ModelCreatorHandler:
logging.info( logging.info(
f"Setting up catalog '{DatabricksConfig.DATABRICKS_CATALOG_NAME}' for project '{self.meta.project_id}'" f"Setting up catalog '{DatabricksConfig.DATABRICKS_CATALOG_NAME}' for project '{self.meta.project_id}'"
) )
self.databricks_sql_obj.connect_to_databricks() self.databricks_sql_obj.connect_to_databricks(self.message.databricks_uri)
external_location = (
f"{self.external_location}/{self.databricks_sql_obj.catalog_name}"
)
# Create catalog # Create catalog
catalog_success = self.databricks_sql_obj.create_catalog( catalog_success = self.databricks_sql_obj.create_catalog(
managed_location=f"{self.external_location}/{self.databricks_sql_obj.catalog_name}", managed_location=external_location,
) )
if not catalog_success: if not catalog_success:
return False return False
...@@ -123,65 +149,116 @@ class ModelCreatorHandler: ...@@ -123,65 +149,116 @@ class ModelCreatorHandler:
) )
if not schema_success: if not schema_success:
return False return False
self.databricks_sql_obj.create_volume(
volume_name=f"{self.databricks_sql_obj.catalog_name}.{self.message.schema}.{DatabricksConstants.VOLUME_NAME}",
location_name=f"{external_location}/{self.message.schema}",
)
return True return True
def setup_notepads_and_jobs(self, timeseries_table_path: str, project_levels: dict): @staticmethod
def get_metadata_notebook_parameters(job_name: str):
tags = {
"purpose": (
"metadata_ingestion" if "ingestion" in job_name else "metadata_deletion"
),
"compute_type": "serverless",
}
metadata_job_parameters = {"input_message": "default_value"}
return metadata_job_parameters, tags
def get_timeseries_notebook_parameters(self, project_levels: dict):
tags = {
"purpose": "timeseries_ingestion",
"compute_type": "server",
}
output_table = f"{self.databricks_sql_obj.catalog_name}.{self.message.schema}.timeseries_data"
timeseries_job_parameters = {
"project_levels": len(project_levels) - 1,
"batch_interval": "10 seconds",
"consumer_group": "$Default",
"output_table": output_table,
"eventhub_connection_string": self.message.eventhub_connection_string,
"checkpoint_location": f"/Volumes/{self.databricks_sql_obj.catalog_name}/{self.message.schema}/"
f"{DatabricksConstants.VOLUME_NAME}/checkpoints/timeseries/stream",
}
return timeseries_job_parameters, tags
def notebook_setup_for_metadata_ingestion(self):
logging.info("Setting up notebook for metadata ingestion")
notebook_path = f"/Users/{self.message.databricks_user_email}/{self.meta.project_id}_{DatabricksConstants.METADATA_INGESTION_NOTEBOOK_NAME}"
metadata_job_parameters, tags = self.get_metadata_notebook_parameters(
job_name=DatabricksConstants.METADATA_INGESTION_JOB_NAME
)
job_config = self.job_manager.create_job_config_for_serverless(
job_name=f"{self.meta.project_id}_{DatabricksConstants.METADATA_INGESTION_JOB_NAME}",
notebook_path=notebook_path,
job_parameters=metadata_job_parameters,
tags=tags,
)
self.setup_notebooks_and_jobs(
databricks_notebook_path=notebook_path,
job_name=DatabricksConstants.METADATA_INGESTION_JOB_NAME,
notebook_path=NotebookConstants.METADATA_INGESTION_NOTEBOOK_PATH,
job_config=job_config,
)
def notebook_setup_for_metadata_deletion(self):
logging.info("Setting up notebook for metadata deletion")
notebook_path = f"/Users/{self.message.databricks_user_email}/{self.meta.project_id}_{DatabricksConstants.METADATA_DELETION_NOTEBOOK_NAME}"
metadata_job_parameters, tags = self.get_metadata_notebook_parameters(
job_name=DatabricksConstants.METADATA_DELETION_JOB_NAME
)
job_config = self.job_manager.create_job_config_for_serverless(
job_name=f"{self.meta.project_id}_{DatabricksConstants.METADATA_DELETION_JOB_NAME}",
notebook_path=notebook_path,
job_parameters=metadata_job_parameters,
tags=tags,
)
self.setup_notebooks_and_jobs(
databricks_notebook_path=notebook_path,
job_name=DatabricksConstants.METADATA_DELETION_JOB_NAME,
notebook_path=NotebookConstants.METADATA_DELETION_NOTEBOOK_PATH,
job_config=job_config,
)
def notebook_setup_for_timeseries_ingestion(
self, project_levels: dict, cluster_id: str
):
logging.info("Setting up notebook for timeseries ingestion")
notebook_path = f"/Users/{self.message.databricks_user_email}/{self.meta.project_id}_{DatabricksConstants.TIMESERIES_INGESTION_NOTEBOOK_NAME}"
timeseries_job_parameters, tags = self.get_timeseries_notebook_parameters(
project_levels=project_levels
)
job_config = self.job_manager.create_job_config_for_server(
job_name=f"{self.meta.project_id}_{DatabricksConstants.TIMESERIES_INGESTION_JOB_NAME}",
notebook_path=notebook_path,
job_parameters=timeseries_job_parameters,
tags=tags,
cluster_config={"existing_cluster_id": cluster_id},
)
job_id = self.setup_notebooks_and_jobs(
databricks_notebook_path=notebook_path,
job_name=DatabricksConstants.TIMESERIES_INGESTION_JOB_NAME,
notebook_path=NotebookConstants.TIMESERIES_INGESTION_NOTEBOOK_PATH,
job_config=job_config,
)
if job_id:
logging.info("Running timeseries ingestion job")
if not self.job_manager.is_job_running(job_id=job_id)["is_running"]:
logging.info("Job is not running, running it now")
self.job_manager.run_job(job_id=job_id)
def setup_notepads_and_jobs(self, project_levels: dict, cluster_id: str):
""" """
Args: Args:
timeseries_table_path: Path for the timeseries table
project_levels: List of project levels project_levels: List of project levels
cluster_id: Cluster id
""" """
logging.info("Setting up notepads and jobs") logging.info("Setting up notepads and jobs")
meta_ingestion_notebook_path = f"/Users/{self.message.databricks_user_email}/{self.meta.project_id}_{DatabricksConstants.METADATA_INGESTION_NOTEBOOK_NAME}" self.notebook_setup_for_metadata_ingestion()
meta_deletion_notebook_path = f"/Users/{self.message.databricks_user_email}/{self.meta.project_id}_{DatabricksConstants.METADATA_DELETION_NOTEBOOK_NAME}" self.notebook_setup_for_metadata_deletion()
timeseries_notebook_path = f"/Users/{self.message.databricks_user_email}/{self.meta.project_id}_{DatabricksConstants.TIMESERIES_INGESTION_NOTEBOOK_NAME}" self.notebook_setup_for_timeseries_ingestion(
project_levels=project_levels, cluster_id=cluster_id
# Setting up of Metadata Ingestion Notebook
existing_job_id = databricks_details_db.hget(
self.meta.project_id, DatabricksConstants.METADATA_INGESTION_JOB_NAME
)
if not existing_job_id:
self.create_notebook(
notebook_path=meta_ingestion_notebook_path,
source_notebook_path=NotebookConstants.METADATA_INGESTION_NOTEBOOK_PATH,
)
ingestion_job_id = self.create_job(
job_name=f"{self.meta.project_id}_{DatabricksConstants.METADATA_INGESTION_JOB_NAME}",
notebook_path=meta_ingestion_notebook_path,
)
databricks_details_db.hset(
self.meta.project_id,
DatabricksConstants.METADATA_INGESTION_JOB_NAME,
ingestion_job_id,
)
existing_job_id = databricks_details_db.hget(
self.meta.project_id, DatabricksConstants.METADATA_DELETION_JOB_NAME
)
if not existing_job_id:
# Setting up of Metadata Deletion Notebook
self.create_notebook(
notebook_path=meta_deletion_notebook_path,
source_notebook_path=NotebookConstants.METADATA_DELETION_NOTEBOOK_PATH,
)
deletion_job_id = self.create_job(
job_name=f"{self.meta.project_id}_{DatabricksConstants.METADATA_DELETION_JOB_NAME}",
notebook_path=meta_deletion_notebook_path,
)
databricks_details_db.hset(
self.meta.project_id,
DatabricksConstants.METADATA_DELETION_JOB_NAME,
deletion_job_id,
)
# Setting up of Timeseries Ingestion Notebook
replace_mapping = {
"{{timeseries_table_path}}": f'"{timeseries_table_path}"',
"{{project_levels}}": str(len(project_levels) - 1),
"{{event_hub_connection_string}}": f'"{self.meta.project_id}"',
}
self.create_notebook(
notebook_path=timeseries_notebook_path,
source_notebook_path=NotebookConstants.TIMESERIES_INGESTION_NOTEBOOK_PATH,
replace_mapping=replace_mapping,
) )
@staticmethod @staticmethod
...@@ -212,7 +289,7 @@ class ModelCreatorHandler: ...@@ -212,7 +289,7 @@ class ModelCreatorHandler:
@staticmethod @staticmethod
def read_data_from_file(note_path: str): def read_data_from_file(note_path: str):
with open(note_path) as f: with open(note_path, encoding="utf-8") as f:
notebook_code = f.read() notebook_code = f.read()
return notebook_code return notebook_code
...@@ -232,12 +309,70 @@ class ModelCreatorHandler: ...@@ -232,12 +309,70 @@ class ModelCreatorHandler:
) )
return True return True
def create_job(self, job_name: str, notebook_path: str): def create_job(self, job_config: dict):
logging.info(f"Creating job {job_name}") logging.info(f"Creating job {job_config['name']}")
job_id = self.job_manager.create_job( job_id = self.job_manager.create_job(job_config=job_config)
job_config=self.job_manager.create_job_config_for_serverless( return job_id
job_name=job_name,
notebook_path=notebook_path, def setup_notebooks_and_jobs(
) self,
databricks_notebook_path: str,
job_name: str,
notebook_path: str,
job_config: dict,
) -> str:
"""
Notebook and job for metadata
notebook_path (str): Path to notebook
source_notebook_path (str): Path to source notebook
job_name (str): Name of the job
job_config (dict): Config to pass to the job
"""
logging.info(
f"Setting up metadata notebook at path '{databricks_notebook_path}'"
) )
job_id = databricks_details_db.hget(self.meta.project_id, job_name)
if not job_id:
self.create_notebook(
notebook_path=databricks_notebook_path,
source_notebook_path=notebook_path,
)
job_id = self.create_job(job_config=job_config)
databricks_details_db.hset(
self.meta.project_id,
job_name,
job_id,
)
return job_id return job_id
def setup_cluster(self):
logging.info("Setting up cluster")
existing_cluster = self.cluster_manager.get_existing_cluster_by_name(
cluster_name=DatabricksConfig.DATABRICKS_CLUSTER_NAME
)
if existing_cluster:
logging.info("Cluster already exists")
cluster_state = existing_cluster.get("state", "UNKNOWN")
logging.info(
f"Cluster '{DatabricksConfig.DATABRICKS_CLUSTER_NAME}' already exists: {existing_cluster['cluster_id']}"
)
logging.debug(f"Current state: {cluster_state}")
# Optionally start the cluster if it's terminated
if cluster_state in ["TERMINATED", "TERMINATING"]:
logging.info("🚀 Starting existing cluster...")
self.cluster_manager.start_cluster(
cluster_id=existing_cluster["cluster_id"]
)
return existing_cluster["cluster_id"]
cluster_id = self.cluster_manager.create_cluster(
cluster_config=self.cluster_manager.get_streaming_cluster_config(
cluster_name=DatabricksConfig.DATABRICKS_CLUSTER_NAME
)
)
if not cluster_id:
return None
self.library_manager.install_libraries(
libraries=self.library_manager.default_libraries(), cluster_id=cluster_id
)
return cluster_id
from fastapi import APIRouter
router = APIRouter()
from .v1 import v1_router
router.include_router(v1_router)
from fastapi import APIRouter
v1_router = APIRouter(prefix="/api/v1")
__all__ = ["v1_router"]
from .model_creator_services import model_creator_router
v1_router.include_router(model_creator_router)
import logging
from typing import Annotated
from fastapi import BackgroundTasks
from fastapi.params import Depends, Query
from faststream.confluent.fastapi import KafkaRouter
from ut_dev_utils.responses import DefaultResponseSchema
from ut_security_util import MetaInfoSchema
from ut_sql_utils.asyncio.declarative_utils import DeclarativeUtils
from scripts.config import KafkaConfig
from scripts.core.handlers.model_creator_handler import ModelCreatorHandler
from scripts.db.psql import get_declarative_utils
from scripts.decorators.databricks_validator import get_databricks_config
from scripts.schemas import ModelCreatorSchema
model_creator_router = KafkaRouter(KafkaConfig.KAFKA_URI)
@model_creator_router.get("/model_creator")
async def add_to_stream(
meta: MetaInfoSchema,
bg_task: BackgroundTasks,
payload: Annotated[ModelCreatorSchema, Depends(get_databricks_config)],
declarative_utils: DeclarativeUtils = Depends(get_declarative_utils),
analytical: bool = Query(default=False),
):
model_cal_obj = ModelCreatorHandler(
declarative_utils=declarative_utils, meta=meta, message=payload
)
logging.info("Adding background task for model creation...")
bg_task.add_task(
model_cal_obj.create_models_in_unity_catalog, analytical=analytical
)
return DefaultResponseSchema(message="Model creation task added to stream")
from typing import Dict, List class DatabricksManager:
def __init__(self, databricks_host: str, access_token: str):
from sqlalchemy import (
BigInteger,
Column,
Date,
DateTime,
Integer,
MetaData,
String,
Table,
)
from scripts.utils.databricks_utils import DatabricksSQLUtility
from scripts.utils.model_convertor_utils import TypeMapper
class DataBricksSQLLayer(DatabricksSQLUtility):
def __init__(self, catalog_name: str, project_id: str, schema: str):
super().__init__(catalog_name, project_id)
self.schema = schema
def create_external_table_from_structure(
self,
table: Table,
external_location: str,
file_format: str = "PARQUET",
table_properties: Dict[str, str] = None,
partition_columns: list = None,
) -> str:
"""
Create an external table from a model class.
Args:
table: The model class to create the external table from.
external_location: The external location path.
file_format: The file format of the data files.
table_properties: Additional table properties.
partition_columns: List of columns to partition the table by.
Returns:
External Location - Returns the external location
""" """
schema_table = f"{table.schema}.{table.name}" if table.schema else table.name Initialize Databricks Manager
columns_sql = TypeMapper().extract_columns_without_constraints(table)
external_location = (
f"{external_location}/{self.catalog_name}/{file_format}/{schema_table}"
)
sql_parts = [
f"CREATE TABLE IF NOT EXISTS {schema_table}",
f"({columns_sql})",
f"USING {file_format}",
f"LOCATION '{external_location}'",
]
if partition_columns:
partition_clause = ", ".join(partition_columns)
sql_parts.append(f"PARTITIONED BY ({partition_clause})")
if table_properties:
props = [f"'{k}' = '{v}'" for k, v in table_properties.items()]
props_sql = ",\n ".join(props)
sql_parts.append(f"TBLPROPERTIES (\n {props_sql}\n)")
create_sql = "\n".join(sql_parts)
self.execute_sql_statement(create_sql)
return external_location
def create_timeseries_table(self, columns: List[str], external_location: str):
"""
Create a timeseries table model and all columns will be of type String
Args: Args:
columns: List of columns in the table databricks_host: Your Databricks workspace URL
external_location: The external location path access_token: Personal access token or service principal token
Example:
columns = [l1,l2,enterprise]
Returns:
Timeseries Table model
""" """
self.host = (
table_columns = [ databricks_host
Column("timestamp", BigInteger, nullable=False), if "https://" in databricks_host
Column("dt_timestamp", DateTime, nullable=False), else f"https://{databricks_host}"
Column("dt_date", Date, nullable=False),
Column("dt_hour", Integer, nullable=False),
Column("value", String, nullable=False),
Column("value_type", String, nullable=False, default="float"),
Column("c3", String, nullable=False),
]
default_columns = ["c1", "c5", "Q", "T", "D", "P", "A", "B", *columns]
table_columns.extend(
[Column(col_name, String, nullable=True) for col_name in default_columns]
) )
partition_columns = ["dt_date", "dt_hour", "c3"] self.headers = {
table_properties = { "Authorization": f"Bearer {access_token}",
"parquet.compression": "snappy", # Fast decompression for frequent queries "Content-Type": "application/json",
"parquet.page.size": "524288", # 512KB - better time-range filtering
"parquet.block.size": "268435456", # 256MB - efficient sequential reads
"serialization.format": "1", # Support for arrays/complex types
} }
table_obj = Table(
"timeseries_data", MetaData(), *table_columns, schema=self.schema
)
self.create_external_table_from_structure(
table=table_obj,
external_location=external_location,
partition_columns=partition_columns,
table_properties=table_properties,
)
return external_location
import logging
import time
from typing import Union
from scripts.config import DatabricksConfig
from scripts.db.databricks import DatabricksManager
from scripts.utils.httpx_util import HTTPXRequestUtil
class DatabricksClusterManager(DatabricksManager):
def __init__(self, databricks_host: str, access_token: str):
"""
Initialize Databricks cluster manager
databricks_host: Your Databricks workspace URL
access_token: Personal access token or service principal token
"""
super().__init__(databricks_host, access_token)
self.base_url = f"{self.host}/api/2.1/clusters"
def create_cluster(self, cluster_config: dict):
"""
Create a new cluster in Databricks
Args:
cluster_config: Dictionary containing cluster configuration
Returns:
str: Cluster ID if successful, None if failed
"""
url = f"{self.base_url}/create"
response = HTTPXRequestUtil(url).post(headers=self.headers, json=cluster_config)
if response.status_code != 200:
logging.error(f"Failed to create cluster: {response.text}")
return None
cluster_id = response.json().get("cluster_id")
if not cluster_id:
logging.error("No cluster_id returned from create request")
return None
logging.info(f"Cluster created with ID: {cluster_id}")
# Wait for cluster to be ready
if self.wait_for_cluster_ready(cluster_id):
logging.info(f"Cluster {cluster_id} is ready for use!")
else:
logging.error(f"Cluster {cluster_id} failed to start within timeout")
return cluster_id
def fetch_cluster_stats(self, cluster_id) -> dict:
"""
Fetch the status of a cluster
Args:
cluster_id: The ID of the cluster
"""
url = f"{self.base_url}/get"
params = {"cluster_id": cluster_id}
response = HTTPXRequestUtil(url).get(headers=self.headers, params=params)
if response.status_code == 200:
return response.json()
else:
logging.error(f"Error checking cluster: {response.text}")
return {}
def start_cluster(self, cluster_id: str) -> bool:
"""
Start a terminated cluster
Args:
cluster_id: ID of the cluster to start
Returns:
bool: True if start request successful, False otherwise
"""
url = f"{self.base_url}/start"
payload = {"cluster_id": cluster_id}
response = HTTPXRequestUtil(url).post(headers=self.headers, json=payload)
if response.status_code != 200:
logging.error(f"Failed to create cluster: {response.text}")
return False
cluster_id = response.json().get("cluster_id")
if not cluster_id:
logging.error("No cluster_id returned from create request")
return False
logging.info(f"Cluster created with ID: {cluster_id}")
# Wait for cluster to be ready
if self.wait_for_cluster_ready(cluster_id):
logging.info(f"Cluster {cluster_id} is ready for use!")
else:
logging.error(f"Cluster {cluster_id} failed to start within timeout")
return True
def get_existing_cluster_by_name(self, cluster_name: str) -> Union[None, dict]:
"""
Check if a cluster with the given name already exists
Args:
cluster_name: Name of the cluster to search for
Returns:
dict: Cluster info if found, None if not found
"""
url = f"{self.base_url}/list"
response = HTTPXRequestUtil(url).get(headers=self.headers)
if response.status_code == 200:
clusters = response.json().get("clusters", [])
for cluster in clusters:
if cluster.get("cluster_name") == cluster_name:
return cluster
else:
logging.warning(f"Warning: Could not list clusters: {response.text}")
return None
def get_streaming_cluster_config(
self, cluster_name: str = "UT-Steaming-Cluster"
) -> dict:
"""
Get configuration for a continuous streaming cluster optimized for Event Hub processing
Args:
cluster_name: Name for the cluster (default: "UT-Steaming-Cluster")
Returns:
dict: Complete cluster configuration
"""
return {
"cluster_name": cluster_name,
"spark_version": DatabricksConfig.DATABRICKS_CLUSTER_SPARK_VERSION,
"node_type_id": DatabricksConfig.DATABRICKS_CLUSTER_NODE_TYPE_ID, # 8 cores, 16GB RAM
"driver_node_type_id": DatabricksConfig.DATABRICKS_CLUSTER_DRIVER_NODE_TYPE_ID,
# CRITICAL: Never auto-terminate
"auto_termination_minutes": 0, # 0 = NEVER terminate
# Auto-scaling for variable loads
"autoscale": {
"min_workers": DatabricksConfig.DATABRICKS_CLUSTER_MIN_WORKERS, # Minimum cost
"max_workers": 8, # Scale up for high Event Hub volume
},
# "is_single_node": True,
# Streaming optimizations
"spark_conf": self.get_spark_config(),
# Reliability settings
"azure_attributes": {
"availability": "ON_DEMAND_AZURE", # Most reliable
"first_on_demand": 1,
},
# Storage for checkpoints and logs
"enable_elastic_disk": True,
"disk_spec": {
"disk_type": {"azure_disk_volume_type": "PREMIUM_LRS"},
"disk_size": DatabricksConfig.DATABRICKS_CLUSTER_DISK_SIZE,
},
# Monitoring tags
"custom_tags": {
"purpose": "continuous_streaming",
"workload": "eventhub_processing",
"criticality": "high",
"auto_terminate": "never",
},
# Unity Catalog
"data_security_mode": "SINGLE_USER",
}
@staticmethod
def get_spark_config() -> dict:
return {
"spark.executor.memory": "6g",
"spark.driver.memory": "5g",
"spark.executor.cores": "3", # Reduced from 4 to 3 (leave 1 core for OS)
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.executor.instances": "4",
"spark.sql.shuffle.partitions": "32",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:MaxGCPauseMillis=200",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:MaxGCPauseMillis=200",
}
def wait_for_cluster_ready(
self, cluster_id: str, timeout_minutes: int = 10
) -> bool:
"""
Wait for cluster with exponential backoff for more efficient polling
"""
timeout_seconds = timeout_minutes * 60
start_time = time.time()
check_interval = 10 # Start with 10 seconds
max_interval = 90 # Max 90 seconds between checks
while time.time() - start_time < timeout_seconds:
cluster_stats = self.fetch_cluster_stats(cluster_id)
if cluster_stats:
state = cluster_stats.get("state", "UNKNOWN")
if state == "RUNNING":
return True
elif state in ["TERMINATED", "TERMINATING", "ERROR"]:
return False
elif state in ["PENDING", "RESTARTING", "RESIZING"]:
# These are transitional states - keep waiting
logging.info(
f"Cluster {cluster_id} is starting... Current state: {state}"
)
else:
logging.warning(f"Unknown cluster state: {state}")
# Exponential backoff
logging.info(
f"Cluster {cluster_id} not ready yet. Waiting {check_interval} seconds..."
)
time.sleep(check_interval)
check_interval = min(check_interval * 1.5, max_interval)
return False
def get_http_path_details_by_cluster_id(self, cluster_id: str, workspace_url: str):
return f"/sql/protocolv1/o/{self.extract_org_id(workspace_url)}/{cluster_id}"
@staticmethod
def extract_org_id(workspace_url: str):
"""Extract organization ID from Azure Databricks URL"""
# From URL like: https://adb-416418955412087.7.azuredatabricks.net
# Extract: 416418955412087
import re
match = re.search(r"adb-(\d+)", workspace_url.replace("https://", ""))
return match.group(1) if match else None
from typing import Dict, List
from sqlalchemy import (
BigInteger,
Column,
Date,
DateTime,
Integer,
MetaData,
String,
Table,
)
from scripts.utils.databricks_utils import DatabricksSQLUtility
from scripts.utils.model_convertor_utils import TypeMapper
class DataBricksSQLLayer(DatabricksSQLUtility):
def __init__(self, catalog_name: str, project_id: str, schema: str):
super().__init__(catalog_name=catalog_name, project_id=project_id)
self.schema = schema
def create_external_table_from_structure(
self,
table: Table,
external_location: str,
file_format: str = "PARQUET",
table_properties: Dict[str, str] = None,
partition_columns: list = None,
) -> str:
"""
Create an external table from a model class.
Args:
table: The model class to create the external table from.
external_location: The external location path.
file_format: The file format of the data files.
table_properties: Additional table properties.
partition_columns: List of columns to partition the table by.
Returns:
External Location - Returns the external location
"""
schema_table = f"{table.schema}.{table.name}" if table.schema else table.name
columns_sql = TypeMapper().extract_columns_without_constraints(table)
external_location = (
f"{external_location}/{self.catalog_name}/{file_format}/{schema_table}"
)
sql_parts = [
f"CREATE TABLE IF NOT EXISTS {schema_table}",
f"({columns_sql})",
f"USING {file_format}",
f"LOCATION '{external_location}'",
]
if partition_columns:
partition_clause = ", ".join(partition_columns)
sql_parts.append(f"PARTITIONED BY ({partition_clause})")
if table_properties:
props = [f"'{k}' = '{v}'" for k, v in table_properties.items()]
props_sql = ",\n ".join(props)
sql_parts.append(f"TBLPROPERTIES (\n {props_sql}\n)")
create_sql = "\n".join(sql_parts)
self.execute_sql_statement(create_sql)
return external_location
def create_timeseries_table(self, columns: List[str], external_location: str):
"""
Create a timeseries table model and all columns will be of type String
Args:
columns: List of columns in the table
external_location: The external location path
Example:
columns = [l1,l2,enterprise]
Returns:
Timeseries Table model
"""
table_columns = [
Column("timestamp", BigInteger, nullable=False),
Column("dt_timestamp", DateTime, nullable=False),
Column("dt_date", Date, nullable=False),
Column("dt_hour", Integer, nullable=False),
Column("value", String, nullable=False),
Column("value_type", String, nullable=False, default="float"),
Column("c3", String, nullable=False),
]
default_columns = ["c1", "c5", "Q", "T", "D", "P", "A", "B", *columns]
table_columns.extend(
[Column(col_name, String, nullable=True) for col_name in default_columns]
)
partition_columns = ["dt_date", "dt_hour", "c3"]
table_properties = {
"parquet.compression": "snappy", # Fast decompression for frequent queries
"parquet.page.size": "524288", # 512KB - better time-range filtering
"parquet.block.size": "268435456", # 256MB - efficient sequential reads
"serialization.format": "1", # Support for arrays/complex types
}
table_obj = Table(
"timeseries_data", MetaData(), *table_columns, schema=self.schema
)
self.create_external_table_from_structure(
table=table_obj,
external_location=external_location,
partition_columns=partition_columns,
table_properties=table_properties,
)
return external_location
import logging import logging
from typing import Dict, List
from ut_security_util.security_tools.auth_util import HTTPXRequestHandler from scripts.db.databricks import DatabricksManager
from scripts.utils.httpx_util import HTTPXRequestUtil from scripts.utils.httpx_util import HTTPXRequestUtil
class DatabricksJobManager: class DatabricksJobManager(DatabricksManager):
def __init__(self, databricks_host: str, access_token: str): def __init__(self, databricks_host: str, access_token: str):
""" """
Initialize Databricks job manager Initialize Databricks job manager
...@@ -14,15 +14,8 @@ class DatabricksJobManager: ...@@ -14,15 +14,8 @@ class DatabricksJobManager:
databricks_host: Your Databricks workspace URL databricks_host: Your Databricks workspace URL
access_token: Personal access token or service principal token access_token: Personal access token or service principal token
""" """
self.host = ( super().__init__(databricks_host, access_token)
databricks_host self.base_url = f"{self.host}/api/2.1/jobs"
if "https://" in databricks_host
else f"https://{databricks_host}"
)
self.headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
def create_job(self, job_config: dict): def create_job(self, job_config: dict):
""" """
...@@ -31,7 +24,7 @@ class DatabricksJobManager: ...@@ -31,7 +24,7 @@ class DatabricksJobManager:
Args: Args:
job_config: Dictionary containing job configuration job_config: Dictionary containing job configuration
""" """
url = f"{self.host}/api/2.1/jobs/create" url = f"{self.base_url}/create"
response = HTTPXRequestUtil(url).post(headers=self.headers, json=job_config) response = HTTPXRequestUtil(url).post(headers=self.headers, json=job_config)
...@@ -53,7 +46,7 @@ class DatabricksJobManager: ...@@ -53,7 +46,7 @@ class DatabricksJobManager:
job_id: The ID of the job to run job_id: The ID of the job to run
parameters: Dictionary of parameters to pass to the job parameters: Dictionary of parameters to pass to the job
""" """
url = f"{self.host}/api/2.1/jobs/run-now" url = f"{self.base_url}/run-now"
payload = {"job_id": job_id} payload = {"job_id": job_id}
...@@ -78,12 +71,10 @@ class DatabricksJobManager: ...@@ -78,12 +71,10 @@ class DatabricksJobManager:
Args: Args:
run_id: The ID of the job run run_id: The ID of the job run
""" """
url = f"{self.host}/api/2.1/jobs/runs/get" url = f"{self.base_url}/runs/get"
params = {"run_id": run_id} params = {"run_id": run_id}
response = HTTPXRequestHandler(url).get( response = HTTPXRequestUtil(url).get(url, headers=self.headers, params=params)
url, headers=self.headers, params=params
)
if response.status_code == 200: if response.status_code == 200:
return response.json() return response.json()
...@@ -93,14 +84,57 @@ class DatabricksJobManager: ...@@ -93,14 +84,57 @@ class DatabricksJobManager:
) )
return None return None
def get_job_runs(
self, job_id: int, active_only: bool = False, limit: int = 20
) -> List[Dict]:
url = f"{self.base_url}/runs/list"
params = {
"job_id": job_id,
"limit": limit,
"active_only": "true" if active_only else "false",
}
response = HTTPXRequestUtil(url).get(headers=self.headers, params=params)
response.raise_for_status()
return response.json().get("runs", [])
def is_job_running(self, job_id) -> Dict:
"""
Check if a job has any active runs
Returns:
Dict with 'is_running' boolean and 'active_runs' list
"""
try:
active_runs = self.get_job_runs(job_id, active_only=True)
running_states = ["PENDING", "RUNNING", "TERMINATING"]
active_running_runs = [
run
for run in active_runs
if run.get("state", {}).get("life_cycle_state") in running_states
]
return {
"is_running": len(active_running_runs) > 0,
"active_runs": active_running_runs,
"total_active_runs": len(active_running_runs),
}
except Exception as e:
logging.error(f"Error checking job status: {e}")
return {"is_running": False, "active_runs": [], "total_active_runs": 0}
@staticmethod @staticmethod
def create_job_config_for_serverless(notebook_path: str, job_name: str): def create_job_config_for_serverless(
notebook_path: str, job_name: str, job_parameters: dict, tags: dict
):
""" """
Create job configuration for a parameterized notebook Create job configuration for a parameterized notebook
Args: Args:
notebook_path: Path to the notebook in Databricks workspace notebook_path: Path to the notebook in Databricks workspace
job_name: Name of the job job_name: Name of the job
job_parameters: Dictionary of parameters to pass to the notebook
tags: Dictionary of tags to apply to the job
""" """
return { return {
...@@ -110,18 +144,44 @@ class DatabricksJobManager: ...@@ -110,18 +144,44 @@ class DatabricksJobManager:
"task_key": "table_update_task", "task_key": "table_update_task",
"notebook_task": { "notebook_task": {
"notebook_path": notebook_path, "notebook_path": notebook_path,
"base_parameters": {"input_message": "default_value"}, "base_parameters": job_parameters,
}, },
"timeout_seconds": 3600, "timeout_seconds": 3600,
} }
], ],
"max_concurrent_runs": 10, "max_concurrent_runs": 10,
"tags": { "tags": tags,
"purpose": ( }
"metadata_ingestion"
if "ingestion" in job_name @staticmethod
else "metadata_deletion" def create_job_config_for_server(
), notebook_path: str,
"compute_type": "serverless", job_name: str,
job_parameters: dict,
tags: dict,
cluster_config: dict,
):
"""
Create job configuration for a parameterized notebook
Args:
notebook_path: Path to the notebook in Databricks workspace
job_name: Name of the job
job_parameters: Dictionary of parameters to pass to the notebook
tags: Dictionary of tags to apply to the job
cluster_config: Dictionary of cluster configuration ({"existing_cluster_id": cluster_id})
"""
return {
"name": job_name,
**cluster_config,
"notebook_task": {
"notebook_path": notebook_path,
"base_parameters": job_parameters,
}, },
"timeout_seconds": 0, # No timeout - run indefinitely
"max_concurrent_runs": 1,
"max_retries": -1, # Infinite retries
"retry_on_timeout": True,
"tags": tags,
} }
import logging
from typing import List, Union
from scripts.db.databricks import DatabricksManager
from scripts.utils.httpx_util import HTTPXRequestUtil
class DatabricksLibraryManager(DatabricksManager):
def __init__(self, databricks_host: str, access_token: str):
"""
Initialize Databricks cluster manager
databricks_host: Your Databricks workspace URL
access_token: Personal access token or service principal token
"""
super().__init__(databricks_host, access_token)
self.base_url = f"{self.host}/api/2.0/libraries"
def install_libraries(self, libraries: Union[str, list], cluster_id: str):
"""
Install libraries in Databricks cluster
libraries: List of library names or single library name
cluster_id: ID of the cluster to install libraries in
"""
url = f"{self.base_url}/install"
payload = {"cluster_id": cluster_id, "libraries": libraries}
response = HTTPXRequestUtil(url).post(headers=self.headers, json=payload)
if response.status_code == 200:
logging.info("Libraries installed successfully")
else:
logging.error(f"Failed to install libraries: {response.text}")
@staticmethod
def default_libraries() -> List[dict]:
return [
{"pypi": {"package": "azure-eventhub"}},
{
"maven": {
"coordinates": "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22"
}
},
]
import base64 import base64
import logging import logging
from scripts.db.databricks import DatabricksManager
from scripts.utils.httpx_util import HTTPXRequestUtil from scripts.utils.httpx_util import HTTPXRequestUtil
class NotebookManager: class DatabricksNotebookManager(DatabricksManager):
def __init__(self, databricks_host, access_token): def __init__(self, databricks_host, access_token):
""" """
Initialize Databricks connection Initialize Databricks connection
...@@ -13,15 +14,7 @@ class NotebookManager: ...@@ -13,15 +14,7 @@ class NotebookManager:
databricks_host: Your Databricks workspace URL (e.g., 'https://your-workspace.cloud.databricks.com') databricks_host: Your Databricks workspace URL (e.g., 'https://your-workspace.cloud.databricks.com')
access_token: Personal access token or service principal token access_token: Personal access token or service principal token
""" """
self.host = ( super().__init__(databricks_host, access_token)
databricks_host
if "https://" in databricks_host
else f"https://{databricks_host}"
)
self.headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
def create_notebook( def create_notebook(
self, notebook_path, notebook_code: str, language="PYTHON", overwrite=True self, notebook_path, notebook_code: str, language="PYTHON", overwrite=True
......
from ut_sql_utils.asyncio import SQLSessionManager from ut_sql_utils.asyncio import DeclarativeBaseClassFactory, SQLSessionManager
from ut_sql_utils.asyncio.declarative_utils import DeclarativeUtilsFactory
from scripts.db.redis.project_details import project_details_db from scripts.db.redis.project_details import project_details_db
sql_database = "unified_model"
Base = DeclarativeBaseClassFactory(sql_database)
session_manager = SQLSessionManager(project_details_db) session_manager = SQLSessionManager(project_details_db)
get_db = session_manager.get_db_factory(database=sql_database)
get_declarative_utils = DeclarativeUtilsFactory.get_declarative_utils_factory(
sql_database, session_manager
)
import logging
from typing import Annotated, Optional
from fastapi import Cookie, Depends, Header, HTTPException, Request
from ut_dev_utils import ILensErrors
from scripts.config import DatabricksConfig
from scripts.db.redis.project_details import fetch_level_details
from scripts.schemas import ModelCreatorSchema
async def get_project_id_advanced(
request: Request,
# Cookie parameter
project_id_cookie: Annotated[Optional[str], Cookie(alias="projectId")] = None,
# Header parameter
project_id_header: Annotated[Optional[str], Header(alias="projectId")] = None,
) -> str:
"""Extract project_id with priority: Cookie > Header > Body > Query"""
project_id = (
project_id_cookie
or project_id_header
or request.query_params.get("project_id")
or request.query_params.get("projectId")
)
# Try to get from request body if not found
if not project_id and request.method in ["POST", "PUT", "PATCH"]:
try:
body = await request.json()
project_id = body.get("project_id")
except Exception as e:
logging.exception(f"Error getting project_id from request body: {e}")
pass
if not project_id:
raise HTTPException(
status_code=400,
detail={
"error": "project_id not found",
"sources_checked": [
"cookies",
"headers",
"query_params",
"request_body",
],
"example": "Add project_id in cookie, header, query param, or request body",
},
)
return project_id
async def get_databricks_config(project_id: str = Depends(get_project_id_advanced)):
"""Get Databricks configuration using project_id"""
try:
return get_databricks_details_from_redis(project_id)
except (ValueError, ILensErrors) as e:
raise ILensErrors(message=f"Configuration Error: {str(e)}")
def get_databricks_details_from_redis(project_id: str) -> ModelCreatorSchema:
project_details = fetch_level_details(project_id, raw=True)
if not project_details or "databricks_details" not in project_details:
raise ILensErrors(message=f"No Databricks config for project {project_id}")
db_config = project_details["databricks_details"]
required_keys = [
"databricks_host",
"databricks_access_token",
"databricks_storage_path",
"eventhub_connection_string",
]
if missing := [k for k in required_keys if not db_config.get(k)]:
raise ILensErrors(
message=f'Missing: {", ".join(missing)} for project {project_id}'
)
return ModelCreatorSchema(
**{k: db_config[k] for k in required_keys},
databricks_port=db_config.get(
"databricks_port", DatabricksConfig.DATABRICKS_DEFAULT_PORT
),
)
...@@ -13,7 +13,7 @@ class ModelCreatorAgent: ...@@ -13,7 +13,7 @@ class ModelCreatorAgent:
async def model_creator_agent(message: ModelCreatorSchema): async def model_creator_agent(message: ModelCreatorSchema):
declarative_utils = await DeclarativeUtilsFactory.get_declarative_utils( declarative_utils = await DeclarativeUtilsFactory.get_declarative_utils(
raw_database="unified_model", raw_database="unified_model",
project_id=message.meta.project_id, project_id=message.project_id,
session_manager=session_manager, session_manager=session_manager,
schema=message.schema, schema=message.schema,
) )
......
from ut_dev_utils.errors import ILensErrors
class ExternalServiceError(ILensErrors):
"""Raised when external service calls fail"""
def __init__(self, message: str, status_code: int = 200):
super().__init__(message=message, status_code=status_code)
class ResourceNotFoundError(ILensErrors):
"""Raised when a requested resource is not found"""
def __init__(self, message: str, status_code: int = 200):
super().__init__(message=message, status_code=status_code)
class GenericErrors(ILensErrors):
"""Raised when external service calls fail"""
def __init__(self, message: str, status_code: int = 200):
super().__init__(message=message, status_code=status_code)
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field, model_validator from pydantic import BaseModel, Field, computed_field, model_validator
from ut_security_util import MetaInfoSchema
from scripts.config import DatabricksConfig from scripts.config import DatabricksConfig
class ModelCreatorSchema(BaseModel): class ModelCreatorSchema(BaseModel):
meta: MetaInfoSchema
schema: Optional[str] = DatabricksConfig.DATABRICKS_PUBLIC_SCHEMA_NAME schema: Optional[str] = DatabricksConfig.DATABRICKS_PUBLIC_SCHEMA_NAME
databricks_host: str = DatabricksConfig.DATABRICKS_HOST databricks_host: str
databricks_port: int = DatabricksConfig.DATABRICKS_PORT databricks_port: int
databricks_access_token: str = DatabricksConfig.DATABRICKS_ACCESS_TOKEN databricks_access_token: str
databricks_http_path: str = DatabricksConfig.DATABRICKS_HTTP_PATH
databricks_user_email: str = "aniket.dhale@ilenscloud.onmicrosoft.com" databricks_user_email: str = "aniket.dhale@ilenscloud.onmicrosoft.com"
databricks_storage_path: str = DatabricksConfig.DATABRICKS_STORAGE_PATH databricks_storage_path: str
databricks_http_path: Optional[str] = None
eventhub_connection_string: str
@computed_field
@property
def databricks_uri(self) -> Optional[str]:
"""Automatically computed databricks URI that updates when databricks_http_path changes"""
if self.databricks_http_path:
return (
f"databricks://token:{self.databricks_access_token}@{self.databricks_host}:{self.databricks_port}"
f"?http_path={self.databricks_http_path}"
)
return None
class ModelInstanceSchema(BaseModel): class ModelInstanceSchema(BaseModel):
...@@ -25,15 +35,23 @@ class ModelInstanceSchema(BaseModel): ...@@ -25,15 +35,23 @@ class ModelInstanceSchema(BaseModel):
sql_schema: Optional[str] = Field( sql_schema: Optional[str] = Field(
default=DatabricksConfig.DATABRICKS_PUBLIC_SCHEMA_NAME, alias="schema" default=DatabricksConfig.DATABRICKS_PUBLIC_SCHEMA_NAME, alias="schema"
) )
databricks_host: str = DatabricksConfig.DATABRICKS_HOST databricks_host: str
databricks_port: int = DatabricksConfig.DATABRICKS_PORT databricks_port: int
databricks_access_token: str = DatabricksConfig.DATABRICKS_ACCESS_TOKEN databricks_access_token: str
databricks_http_path: str = DatabricksConfig.DATABRICKS_HTTP_PATH databricks_http_path: str
databricks_user_email: str = "aniket.dhale@ilenscloud.onmicrosoft.com" databricks_user_email: str
databricks_storage_path: str = DatabricksConfig.DATABRICKS_STORAGE_PATH databricks_storage_path: str
@model_validator(mode="before") @model_validator(mode="before")
def validate_data(cls, values: Dict[str, Any]) -> Dict[str, Any]: def validate_data(cls, values: Dict[str, Any]) -> Dict[str, Any]:
if "data" in values and isinstance(values["data"], dict): if "data" in values and isinstance(values["data"], dict):
values["data"] = [values["data"]] values["data"] = [values["data"]]
return values return values
@model_validator(mode="before")
def prepare_databricks_uri(cls, values):
values["databricks_uri"] = (
f"databricks://token:{values['databricks_access_token']}@{values['databricks_host']}:{values['databricks_port']}"
f"?http_path={values['databricks_http_path']}"
)
return values
...@@ -4,8 +4,6 @@ from typing import Optional ...@@ -4,8 +4,6 @@ from typing import Optional
from sqlalchemy import create_engine, text from sqlalchemy import create_engine, text
from ut_dev_utils import get_db_name from ut_dev_utils import get_db_name
from scripts.config import DatabricksConfig
class DatabricksSQLUtility: class DatabricksSQLUtility:
def __init__(self, catalog_name: str, project_id: str): def __init__(self, catalog_name: str, project_id: str):
...@@ -18,7 +16,7 @@ class DatabricksSQLUtility: ...@@ -18,7 +16,7 @@ class DatabricksSQLUtility:
self.catalog_name = get_db_name(project_id=project_id, database=catalog_name) self.catalog_name = get_db_name(project_id=project_id, database=catalog_name)
self.engine = None self.engine = None
def connect_to_databricks(self): def connect_to_databricks(self, databricks_uri: str):
""" """
Connect to Databricks using sqlalchemy-databricks Connect to Databricks using sqlalchemy-databricks
""" """
...@@ -26,7 +24,7 @@ class DatabricksSQLUtility: ...@@ -26,7 +24,7 @@ class DatabricksSQLUtility:
# Build connection string for sqlalchemy-databricks # Build connection string for sqlalchemy-databricks
self.engine = create_engine( self.engine = create_engine(
DatabricksConfig.DATABRICKS_URI, databricks_uri,
pool_pre_ping=True, pool_pre_ping=True,
pool_recycle=3600, pool_recycle=3600,
echo=False, echo=False,
...@@ -160,6 +158,25 @@ class DatabricksSQLUtility: ...@@ -160,6 +158,25 @@ class DatabricksSQLUtility:
) )
raise raise
def create_volume(self, volume_name: str, location_name: str = None) -> str:
"""
Create a volume in Unity Catalog
volume_name: Name for the volume(<catalog>.<schema>.<external-volume-name>)
location_name: Name of the external location
"""
if location_name:
ddl = f"CREATE EXTERNAL VOLUME IF NOT EXISTS {volume_name}"
ddl += f"\nLOCATION '{location_name}'"
else:
ddl = f"CREATE VOLUME IF NOT EXISTS `{volume_name}`"
try:
self.execute_sql_statement(ddl)
logger.info(f"Volume '{volume_name}' created successfully")
return volume_name
except Exception as e:
logger.error(f"Failed to create volume '{volume_name}': {str(e)}")
raise
def execute_sql_statement(self, query: str): def execute_sql_statement(self, query: str):
try: try:
with self.engine.connect() as conn: with self.engine.connect() as conn:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment