How it Works
Pricing
About
Customers
Careers
Resources
Get a saving estimate
Log in
Book a Call

Sign up for our
Databricks optimizer

Welcome!

Espresso AI makes real-time decisions powered by ML models to optimize your Databricks SQL workloads.

Please run the following commands in Databricks SQL. This will set up a user account we can use to access the metadata we need for our models as well as  optimize your Databricks SQL account's operations.

Note that we never access, log, or store any data from Databricks SQL. We only look at metadata.

Prerequisites: Make sure you are a Databricks workspace admin, account admin, and metastore admin.

Steps
‍

1. Open your Databricks workspace, create a Python notebook, and attach it to Serverless compute.
‍
2. In the first cell, install/upgrade the SDK:  
%pip install databricks-sdk --upgrade
‍
3. When finished, create a second cell and restart the Python kernel:  
%restart_python
‍
4. In the next cell, copy-paste the code from below and run it.
‍
5. Copy the JSON credentials printed between the ===== lines and share it securely with Espresso AI.

You can securely upload the output here.

What the Script Does:
- Create or reuse a service principal named "espresso-ai-optimizer" for Espresso AI.
- Grant that service principle the ability to manage SQL Warehouses.
- Grant SELECT access on Databricks system tables used for usage and cost analysis (e.g. query history, warehouse events, node timeline)
- Print a JSON blob with credentials and your workspace URL for you to share securely with Espresso AI.

import json
from datetime import datetime

import pandas as pd
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import catalog
from databricks.sdk.service.iam import (
    AccessControlRequest,
    ComplexValue,
    Patch,
    PatchOp,
    PatchSchema,
    PermissionLevel,
    ServicePrincipal,
)
from databricks.sdk.service.oauth2 import CreateServicePrincipalSecretResponse
from pydantic import BaseModel, field_validator
from pyspark.sql import SparkSession


class DatabricksOAuthToken(BaseModel):
    id: str
    workspace_id: str
    oauth_secret: str
    created_at: datetime
    expires_at: datetime
    client_id: str

    @field_validator("oauth_secret", "client_id", mode="before")
    @classmethod
    def strip_whitespace(cls, v: str) -> str:
        """Strip whitespace from string fields."""
        return v.strip() if isinstance(v, str) else v


class DatabricksCredentials(BaseModel):
    oauth_token: DatabricksOAuthToken
    workspace_url: str
    service_principal_name: str = "espresso-ai-optimizer"
    service_principal_id: str
    warehouse_id: str
    warehouse_name: str

    @field_validator("workspace_url")
    @classmethod
    def validate_databricks_url(cls, v: str) -> str:
        if not v.startswith(("https://", "http://")):
            raise ValueError(
                "workspace_url must be a valid URL starting with https:// or http://"
            )
        if "databricks" not in v.lower():
            raise ValueError("workspace_url should be a Databricks workspace URL")

        return v

    def to_json(self) -> str:
        return json.dumps(self.model_dump(mode="json"))


def get_spark() -> SparkSession:
    if session := SparkSession.getActiveSession():
        return session
    return SparkSession.builder.getOrCreate()


def get_workspace_url() -> str:
    if url := get_spark().conf.get("spark.databricks.workspaceUrl"):
        return "https://" + url
    raise RuntimeError("Unable to determine Databricks workspace URL")


def get_account_id(client: WorkspaceClient) -> str:
    return client.config.account_id


def is_user_workspace_admin(client: WorkspaceClient) -> bool:
    current_user = client.current_user.me()
    return current_user.groups is not None and any(
        group.display == "admins" for group in current_user.groups
    )


def find_workspace_admin_group(client: WorkspaceClient) -> str:
    if admin_groups := client.groups.list(filter="displayName eq 'admins'"):
        if group_id := next(admin_groups).id:
            return group_id

    raise RuntimeError("Unable to find 'admins' group in the workspace")


def get_workspace_admins(client: WorkspaceClient) -> pd.DataFrame:
    try:
        admin_group_id = find_workspace_admin_group(client)
        admin_group = client.groups.get(admin_group_id)
        if not admin_group.members:
            raise RuntimeError("There are no members in the 'admins' group")
        admin_users = []
        for member in admin_group.members:
            # Skip non-user members (e.g., service principals)
            if not member.value:
                continue

            user = client.users.get(member.value)
            primary_email = None
            for email_obj in user.emails or []:
                if email_obj.primary or email_obj.value:
                    primary_email = email_obj.value
                    break

            admin_users.append(
                {
                    "Display Name": user.display_name or "N/A",
                    "User Name": user.user_name or "N/A",
                    "Email": primary_email or "N/A",
                    "User ID": user.id or "N/A",
                }
            )

        df = pd.DataFrame(admin_users)

        if not df.empty:
            df = df.sort_values(
                by=["Display Name"], ascending=[True], na_position="last"
            ).reset_index(drop=True)

        return df

    except Exception as e:
        raise RuntimeError(f"Failed to retrieve workspace admins: {e}")


def get_or_create_service_principal(client: WorkspaceClient) -> ServicePrincipal:
    DISPLAY_NAME = "espresso-ai-optimizer"
    if sps := list(client.service_principals.list(filter=f"displayName eq '{DISPLAY_NAME}'")):
        sp = sps[0]
        client.service_principals.patch(
            id=sp.id,  # type: ignore
            operations=[
                Patch(
                    op=PatchOp.ADD,
                    path="entitlements",
                    value=[
                        {"value": "databricks-sql-access"},
                        {"value": "allow-cluster-create"},
                    ],
                ),
            ],
            schemas=[PatchSchema.URN_IETF_PARAMS_SCIM_API_MESSAGES_2_0_PATCH_OP],
        )
        return sp

    # No existing service principal was found, so create one
    sp = client.service_principals.create(
        display_name=DISPLAY_NAME,
        active=True,
        entitlements=[
            ComplexValue(value="allow-cluster-create"),
            ComplexValue(value="databricks-sql-access"),
        ],
    )

    if sp.id is None:
        raise RuntimeError("Failed to get ID for service principal")
    return sp


def create_oauth_token(
    client: WorkspaceClient, service_principal: ServicePrincipal
) -> DatabricksOAuthToken:
    token: CreateServicePrincipalSecretResponse = (
        client.service_principal_secrets_proxy.create(
            service_principal_id=service_principal.id,  # type: ignore
            lifetime=f"{2 * 365 * 24 * 60 * 60}s",  # 2 years
        )
    )
    return DatabricksOAuthToken(
        id=token.id,
        workspace_id=str(client.get_workspace_id()),
        oauth_secret=token.secret,
        created_at=token.create_time,
        expires_at=token.expire_time,
        client_id=service_principal.application_id,  # type: ignore
    )


def allow_service_principal_to_manage_warehouses(
    client: WorkspaceClient, service_principal: ServicePrincipal
) -> None:
    successes = []

    for warehouse in list(client.warehouses.list()):
        try:
            client.permissions.update(
                request_object_type="warehouses",
                request_object_id=warehouse.id,  # type: ignore
                access_control_list=[
                    AccessControlRequest(
                        service_principal_name=service_principal.application_id,
                        permission_level=PermissionLevel.CAN_MANAGE,
                    )
                ],
            )
            successes.append(warehouse.name)

        except Exception as e:
            print(
                f"✗ Failed to allow {service_principal.application_id} to manage '{warehouse.name}': {e}"
            )


def make_service_principal_workspace_admin(
    client: WorkspaceClient, service_principal: ServicePrincipal
) -> None:
    admin_group_id = find_workspace_admin_group(client)

    try:
        client.groups.patch(
            id=admin_group_id,
            operations=[
                Patch(
                    op=PatchOp.ADD,
                    value={
                        "members": [
                            {
                                "value": service_principal.id,
                            }
                        ]
                    },
                )
            ],
            schemas=[PatchSchema.URN_IETF_PARAMS_SCIM_API_MESSAGES_2_0_PATCH_OP],
        )
    except Exception as e:
        error_msg = str(e)
        if "already exists" in error_msg.lower() or "duplicate" in error_msg.lower():
            print(f"✅ {service_principal.display_name} is already a workspace admin")
        else:
            print(f"✗ Failed to add {service_principal.display_name} to admin group: {e}")
            raise RuntimeError(f"Failed to grant workspace admin permissions: {e}")


def get_or_create_warehouse(
    client: WorkspaceClient, name: str = "ESPRESSO_AI_WAREHOUSE"
) -> str:
    for warehouse in client.warehouses.list():
        if warehouse.name == name:
            return warehouse.id  # type: ignore

    created_warehouse = client.warehouses.create_and_wait(
        name=name,
        cluster_size="X-Small",
        auto_stop_mins=1,
        enable_serverless_compute=True,
        min_num_clusters=1,
        max_num_clusters=1,
    )
    return created_warehouse.id  # type: ignore


def allow_service_principal_to_read_system_logs(
    client: WorkspaceClient, service_principal: ServicePrincipal
) -> None:
    """Grant service principal read access to all tables in the system catalog.

    This leverages Unity Catalog's privilege inheritance: granting SELECT at the schema level
    automatically grants SELECT on all current and future tables within that schema.
    """

    has_metastore_error = False
    has_account_admin_error = False

    def grant(
        asset_name: str, asset_type: catalog.SecurableType, privilege: catalog.Privilege
    ) -> None:
        nonlocal has_metastore_error, has_account_admin_error
        try:
            client.grants.update(
                full_name=asset_name,
                securable_type=asset_type.value,
                changes=[
                    catalog.PermissionsChange(
                        add=[privilege],
                        principal=service_principal.application_id,
                    )
                ],
            )
        except Exception as e:
            error_msg = str(e)
            print(f"✗ Permission denied for {asset_name}: {e}")

            has_account_admin_error = (
                has_account_admin_error or "not an account admin" in error_msg.lower()
            )
            has_metastore_error = (
                has_metastore_error
                or "does not have MANAGE on Catalog" in error_msg
                or "metastore" in error_msg.lower()
            )

    grant("system", catalog.SecurableType.CATALOG, catalog.Privilege.USE_CATALOG)

    try:
        for schema in list(client.schemas.list(catalog_name="system")):
            schema_full_name = f"system.{schema.name}"
            grant(schema_full_name, catalog.SecurableType.SCHEMA, catalog.Privilege.USE_SCHEMA)
            # Grant SELECT on the schema - this will propagate to all tables in the schema
            grant(schema_full_name, catalog.SecurableType.SCHEMA, catalog.Privilege.SELECT)

    except Exception as e:
        print(f"✗ Failed to list schemas in system catalog: {e}")
        has_metastore_error = True

    # If we encountered permission errors, provide specific guidance
    if has_account_admin_error:
        print("\n" + "=" * 80)
        print("❌ ACCOUNT ADMIN PERMISSIONS REQUIRED")
        print("=" * 80)
        print("\nYou need to be an account admin to grant certain system table permissions.")
        print("\nTo become an account admin:")
        print("1. An existing account admin must grant you this permission")
        print("2. Visit https://accounts.cloud.databricks.com/ to verify access")
        print("\nAlternatively, ask an existing account admin to run this script.")
        print("=" * 80 + "\n")

    if has_metastore_error:
        print("\n" + "=" * 80)
        print("❌ METASTORE ADMIN PERMISSIONS REQUIRED")
        print("=" * 80)
        print("\nYou need to be a metastore admin to grant access to system catalog.")
        print("\nTo become a metastore admin:")
        print("1. Visit https://accounts.cloud.databricks.com/data")
        print(
            "2. Click on the name of the metastore listed (this opens the Configuration page)"
        )
        print("3. Add yourself as a 'Metastore Admin'")
        print("\nAlternatively, ask an existing metastore admin to run this script.")
        print("=" * 80 + "\n")


if __name__ == "__main__":
    client = WorkspaceClient()
    if not is_user_workspace_admin(client):
        print(
            "❌ You are not a workspace admin. Contact one of the following admins to request workspace admin access:\n"
        )
        print(get_workspace_admins(client).to_string(index=False))
        raise RuntimeError("You need workspace admin permissions to run this setup.")
    service_principal = get_or_create_service_principal(client)
    oauth_token = create_oauth_token(client, service_principal)
    ESPRESSO_AI_WAREHOUSE_NAME = "ESPRESSO_AI_WAREHOUSE"
    warehouse_id = get_or_create_warehouse(client, name=ESPRESSO_AI_WAREHOUSE_NAME)
    allow_service_principal_to_manage_warehouses(client, service_principal)
    make_service_principal_workspace_admin(client, service_principal)
    allow_service_principal_to_read_system_logs(client, service_principal)
    credentials = DatabricksCredentials(
        oauth_token=oauth_token,
        workspace_url=get_workspace_url(),
        service_principal_name=service_principal.display_name,
        service_principal_id=service_principal.id,
        warehouse_id=warehouse_id,
        warehouse_name=ESPRESSO_AI_WAREHOUSE_NAME,
    )

    print("\n🎉 Setup complete! Here are the Databricks credentials to send to Espresso AI:")
    print("=" * 50)
    print(credentials.to_json())
    print("=" * 50)

Troubleshooting

The script provides error messages for permission issues:
1. "You are not a workspace admin" - Contact one of the listed admins for access
2. "ACCOUNT ADMIN PERMISSIONS REQUIRED" - You need account admin to create service principals
3. "METASTORE ADMIN PERMISSIONS REQUIRED" - You need metastore admin to grant system table access

Quick permission checks:
- Verify that you started a serverless notebook, not a notebook on a specific cluster
- Verify you're a workspace admin:
   - Click on the circle with your initials
   - Click on "Settings"
    - You should see "Workspace Admin" section in addition to a "User" section.
- Verify you're an account admin:
   - You should be able to login to https://accounts.cloud.databricks.com/ and see the console to manage your account.
- Verify you're a metastore admin:
   - Visit https://accounts.cloud.databricks.com/data
   - Click on the name of the metastore listed, which will open up a "Configuration" page.
   - Set yourself as the "Metastore Admin".

Please rerun the script once you have the required permissions.

Questions?

Book a Call
Sign up for news
Sign up
Success!
Oops! Something went wrong while submitting the form.
OnboardingSaving Estimate
SecurityTerms of ServiceSupport
© 2025 Espresso AI. All rights reserved.
Website Visitor Tracking