Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,17 @@ Deploying resources...
Updating deployment state...
Deployment complete!

=== Rename b_task to b_task_renamed (4 tasks, 2 with depends_on, 1 without)
=== Rename b_task, replace a_task notebook_path, add synced_task
=== Detect task key rename
Detected changes in 1 resource(s):

Resource: resources.jobs.rename_task_job
tasks[task_key='a_task'].notebook_task.notebook_path: replace
tasks[task_key='b_task']: remove
tasks[task_key='b_task_renamed']: add
tasks[task_key='c_task'].depends_on[0].task_key: replace
tasks[task_key='d_task'].depends_on[0].task_key: replace
tasks[task_key='synced_task']: add



Expand Down Expand Up @@ -114,6 +116,22 @@ Resource: resources.jobs.rename_task_job
+ - task_key: b_task_renamed
notebook_task:
notebook_path: /Users/{{workspace_user_name}}/c_task
@@ -67,7 +67,14 @@
- task_key: a_task
notebook_task:
- notebook_path: /Users/{{workspace_user_name}}/a_task
+ notebook_path: ./synced_notebook.py
new_cluster:
spark_version: 13.3.x-snapshot-scala2.12
node_type_id: [NODE_TYPE_ID]
num_workers: 1
+ - new_cluster:
+ node_type_id: [NODE_TYPE_ID]
+ num_workers: 1
+ spark_version: 13.3.x-snapshot-scala2.12
+ notebook_task:
+ notebook_path: ./synced_notebook.py
+ task_key: synced_task

>>> [CLI] bundle destroy --auto-approve
The following resources will be deleted:
Expand Down
22 changes: 20 additions & 2 deletions acceptance/bundle/config-remote-sync/job_multiple_tasks/script
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,34 @@ mv databricks.yml.resolved databricks.yml
# Deploy the updated configuration to sync state
$CLI bundle deploy

title "Rename b_task to b_task_renamed (4 tasks, 2 with depends_on, 1 without)"
title "Rename b_task, replace a_task notebook_path, add synced_task"
rename_job_id="$(read_id.py rename_task_job)"
edit_resource.py jobs $rename_job_id <<'EOF'
edit_resource.py jobs $rename_job_id <<EOF
for task in r["tasks"]:
if task["task_key"] == "b_task":
task["task_key"] = "b_task_renamed"
if "depends_on" in task:
for dep in task["depends_on"]:
if dep["task_key"] == "b_task":
dep["task_key"] = "b_task_renamed"

# Replace a_task's notebook_path with sync-root path (tests Replace operation)
for task in r["tasks"]:
if task["task_key"] == "a_task":
task["notebook_task"]["notebook_path"] = "${PWD}/synced_notebook"

# Add synced_task with path inside sync root
r["tasks"].append({
"task_key": "synced_task",
"notebook_task": {
"notebook_path": "${PWD}/synced_notebook"
},
"new_cluster": {
"spark_version": "${DEFAULT_SPARK_VERSION}",
"node_type_id": "${NODE_TYPE_ID}",
"num_workers": 1
}
})
EOF

title "Detect task key rename"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Databricks notebook source
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Cloud = true

RecordRequests = false
Ignore = [".databricks", "dummy.whl", "databricks.yml", "databricks.yml.backup"]
Ignore = [".databricks", "dummy.whl", "databricks.yml", "databricks.yml.backup", "synced_notebook.py"]

[Env]
DATABRICKS_BUNDLE_ENABLE_EXPERIMENTAL_YAML_SYNC = "true"
Expand Down
11 changes: 10 additions & 1 deletion acceptance/bundle/config-remote-sync/multiple_files/output.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Updating deployment state...
Deployment complete!

=== Modify job_one: max_concurrent_runs, rename c_task
=== Add synced_task to job_one (notebook in sync root, resource in subdirectory)
=== Modify job_two: max_concurrent_runs, rename first task, add extra_task
=== Add extra_task to local config (not in saved state, triggers entity-level replace)
=== Detect and save changes
Expand All @@ -14,6 +15,7 @@ Resource: resources.jobs.job_one
tasks[task_key='a_task'].depends_on[0].task_key: replace
tasks[task_key='c_task']: remove
tasks[task_key='c_task_renamed']: add
tasks[task_key='synced_task']: add

Resource: resources.jobs.job_two
max_concurrent_runs: replace
Expand Down Expand Up @@ -50,11 +52,18 @@ Resource: resources.jobs.job_two
+ task_key: c_task_renamed
- task_key: a_task
notebook_task:
@@ -21,3 +21,3 @@
@@ -21,3 +21,10 @@
num_workers: 1
depends_on:
- - task_key: c_task
+ - task_key: c_task_renamed
+ - new_cluster:
+ node_type_id: [NODE_TYPE_ID]
+ num_workers: 1
+ spark_version: 13.3.x-snapshot-scala2.12
+ notebook_task:
+ notebook_path: ../sample_exploration.ipynb
+ task_key: synced_task

=== Changes in job2.yml

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "[UUID]",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"### Example Exploratory Notebook\n",
"\n",
"Use this notebook to explore the data generated by the pipeline in your preferred programming language.\n",
"\n",
"**Note**: This notebook is not executed as part of the pipeline."
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "[UUID]",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"# !!! Before performing any data analysis, make sure to run the pipeline to materialize the sample datasets. The tables referenced in this notebook depend on that step.\n",
"\n",
"display(spark.sql(\"SELECT * FROM hive_metastore.[USERNAME].sample_trips_lakeflow_project\"))"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"computePreferences": null,
"dashboards": [],
"environmentMetadata": null,
"inputWidgetPreferences": null,
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 2
},
"notebookName": "sample_exploration",
"widgets": {}
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
15 changes: 15 additions & 0 deletions acceptance/bundle/config-remote-sync/multiple_files/script
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ for task in r["tasks"]:
dep["task_key"] = "c_task_renamed"
EOF

title "Add synced_task to job_one (notebook in sync root, resource in subdirectory)"
edit_resource.py jobs $job_one_id <<EOF
r["tasks"].append({
"task_key": "synced_task",
"notebook_task": {
"notebook_path": "${PWD}/sample_exploration"
},
"new_cluster": {
"spark_version": "${DEFAULT_SPARK_VERSION}",
"node_type_id": "${NODE_TYPE_ID}",
"num_workers": 1
}
})
EOF

title "Modify job_two: max_concurrent_runs, rename first task, add extra_task"
edit_resource.py jobs $job_two_id <<'EOF'
r["max_concurrent_runs"] = 10
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Cloud = true

RecordRequests = false
Ignore = [".databricks", "dummy.whl", "databricks.yml", "resources/job1.yml", "resources/job2.yml"]
Ignore = [".databricks", "dummy.whl", "databricks.yml", "resources/job1.yml", "resources/job2.yml", "sample_exploration.ipynb"]

[Env]
DATABRICKS_BUNDLE_ENABLE_EXPERIMENTAL_YAML_SYNC = "true"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Resource: resources.pipelines.my_pipeline
environment.dependencies: replace
notifications[0].alerts: replace
notifications[0].email_recipients: replace
root_path: add
schema: replace
tags['foo']: add

Expand All @@ -28,7 +29,7 @@ Resource: resources.pipelines.my_pipeline
-
resources:
pipelines:
@@ -7,19 +6,24 @@
@@ -7,19 +6,25 @@
name: test-pipeline-[UNIQUE_NAME]
catalog: main
- schema: default
Expand All @@ -53,6 +54,7 @@ Resource: resources.pipelines.my_pipeline
-
+ tags:
+ foo: bar
+ root_path: ./pipeline_root
targets:
default:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ r["configuration"]["key2"] = "value2"
r["notifications"][0]["email_recipients"].append("admin@example.com")
r["notifications"][0]["alerts"].append("on-update-failure")
r["schema"] = "prod"
r["root_path"] = "${PWD}/pipeline_root"
if "tags" not in r:
r["tags"] = {}
r["tags"]["foo"] = "bar"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Cloud = true
RequiresUnityCatalog = true

RecordRequests = false
Ignore = [".databricks", "databricks.yml", "databricks.yml.backup"]
Ignore = [".databricks", "databricks.yml", "databricks.yml.backup", "pipeline_root"]

[Env]
DATABRICKS_BUNDLE_ENABLE_EXPERIMENTAL_YAML_SYNC = "true"
Expand Down
14 changes: 3 additions & 11 deletions bundle/config/mutator/translate_paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,29 +184,21 @@ func (t *translateContext) translateNotebookPath(ctx context.Context, literal, l
return "", fmt.Errorf("notebook %s not found", literal)
}

extensions := []string{
notebook.ExtensionPython,
notebook.ExtensionR,
notebook.ExtensionScala,
notebook.ExtensionSql,
notebook.ExtensionJupyter,
}

// Check whether a file with a notebook extension already exists. This
// way we can provide a more targeted error message.
for _, ext := range extensions {
for _, ext := range notebook.Extensions {
literalWithExt := literal + ext
localRelPathWithExt := localRelPath + ext
if _, err := fs.Stat(t.b.SyncRoot, localRelPathWithExt); err == nil {
return "", fmt.Errorf(`notebook %q not found. Did you mean %q?
Local notebook references are expected to contain one of the following
file extensions: [%s]`, literal, literalWithExt, strings.Join(extensions, ", "))
file extensions: [%s]`, literal, literalWithExt, strings.Join(notebook.Extensions, ", "))
}
}

// Return a generic error message if no matching possible file is found.
return "", fmt.Errorf(`notebook %q not found. Local notebook references are expected
to contain one of the following file extensions: [%s]`, literal, strings.Join(extensions, ", "))
to contain one of the following file extensions: [%s]`, literal, strings.Join(notebook.Extensions, ", "))
}
if err != nil {
return "", fmt.Errorf("unable to determine if %s is a notebook: %w", localFullPath, err)
Expand Down
67 changes: 67 additions & 0 deletions bundle/configsync/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package configsync
import (
"context"
"fmt"
"io/fs"
"path/filepath"
"sort"
"strings"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/notebook"
"github.com/databricks/cli/libs/structs/structpath"
)

Expand Down Expand Up @@ -284,6 +288,13 @@ func ResolveChanges(ctx context.Context, b *bundle.Bundle, configChanges Changes
log.Debugf(ctx, "Field %s has no location, using resource location: %s", fullPath, filePath)
}

if (configChange.Operation == OperationAdd || configChange.Operation == OperationReplace) && b.SyncRootPath != "" {
configChange = &ConfigChangeDesc{
Operation: configChange.Operation,
Value: translateWorkspacePaths(configChange.Value, b.SyncRootPath, b.SyncRoot, filepath.Dir(filePath)),
}
}

result = append(result, FieldChange{
FilePath: filePath,
Change: configChange,
Expand All @@ -294,3 +305,59 @@ func ResolveChanges(ctx context.Context, b *bundle.Bundle, configChanges Changes

return result, nil
}

// translateWorkspacePaths recursively converts absolute workspace paths to relative
// paths when they fall within the bundle's sync root. Paths are made relative to
// targetDir (the directory of the YAML file being patched). For notebook paths
// where the extension was stripped by translate_paths, it restores the extension
// by checking the local filesystem.
func translateWorkspacePaths(value any, syncRootPath string, syncRoot fs.FS, targetDir string) any {
switch v := value.(type) {
case string:
after, ok := strings.CutPrefix(v, syncRootPath+"/")
if !ok {
return v
}
after = resolveNotebookExtension(syncRoot, after)
fullPath := filepath.Join(syncRootPath, after)
relPath, err := filepath.Rel(targetDir, fullPath)
if err != nil {
return "./" + after
}
relPathSlash := filepath.ToSlash(relPath)
if !strings.HasPrefix(relPathSlash, "..") {
relPathSlash = "./" + relPathSlash
}
return relPathSlash
case map[string]any:
for key, val := range v {
v[key] = translateWorkspacePaths(val, syncRootPath, syncRoot, targetDir)
}
return v
case []any:
for i, val := range v {
v[i] = translateWorkspacePaths(val, syncRootPath, syncRoot, targetDir)
}
return v
default:
return value
}
}

// resolveNotebookExtension checks if a relative path refers to a notebook whose
// extension was stripped by translate_paths. If the file doesn't exist as-is but
// exists with a notebook extension, the extension is appended.
func resolveNotebookExtension(syncRoot fs.FS, relPath string) string {
if syncRoot == nil {
return relPath
}
if _, err := fs.Stat(syncRoot, relPath); err == nil {
return relPath
}
for _, ext := range notebook.Extensions {
if _, err := fs.Stat(syncRoot, relPath+ext); err == nil {
return relPath + ext
}
}
return relPath
}
Loading