Create a Task

This page walks you through how to create, execute, and complete a task with your AI Engineer.

Overview

Define what you want your AI Engineer to achieve. Describe the goal, share relevant data locations, and give any context needed. The engineer will determine the best approach to complete the task, updating or generating code as necessary across multiple assets.

Step 1: Create a Task

Go to Select AI Data Engineer > Select Your AI Data Engineer Name > Select Start a new Task

When starting a task, you’ll define the goal and give your AI Engineer the context it needs to succeed:

  • Describe the goal: Clearly state what you want the engineer to achieve.

  • Provide context: Share relevant data locations, table references, and any dependencies.

  • Leave the how flexible: The AI Engineer will determine how to update or generate code as needed across multiple assets.

At this stage, you are telling the system what to do, not how to do it.

Step 2: Review #Task

  • This is a default descriptive action for the task. It is generic in nature and does not require handling, but can be modified if needed.

Step 3: Add ## Source Data

Add the location of the source data for the task.

  • Base path (Volumes): /Volumes/main/commerce/raw/orders/

  • Expected files (example and not limited to): *.json, *.csv

  • Layout (typical):

    • /Volumes/main/commerce/raw/orders/YYYY/MM/DD/*.json

    • /Volumes/main/commerce/raw/orders/YYYY/MM/DD/*.csv

Step 4: Add ## Destination Information

Add the location of the destination table for the task, or let the task create your table automatically.

  • Primary destination table (Delta): main.commerce.fact_orders

  • Storage: Managed Delta (Unity Catalog). Do not change table properties; respect existing. delta.columnMapping.mode.

Step 5: Add ## Task Information

Add instructions in natural language on how to transform the data.

Examples of task-level functions that can be executed

  • Ingest & normalize

    • Discover candidate files under /Volumes/main/commerce/raw/orders/ using glob patterns for *.json and *.csv.

    • Read files defensively; trim strings; coerce numeric strings to DECIMAL(18,2); parse order_ts to order_timestamp (support epoch millis and ISO‑8601). Assume UTC unless offset present.

    • For JSON with items, explode and aggregate as needed to validate totals.

    • For CSV inputs, ensure mandatory columns are present; backfill optional columns with nulls/defaults.

  • Deduplicate

    • Use windowing by order_id; keep the record from the file with the latest last‑modified timestamp (ties broken by file path) to accommodate re‑drops/updates.

  • Transformations

    • Compute net_amount from available fields (e.g., total_amount - coalesce(discount_amount, 0) if tax is already included; otherwise total_amount - discount_amount - tax_amount per business rule).

    • Derive order_date = date(order_timestamp).

    • Populate lineage fields: ingestion_file_path, ingestion_file_modified_ts, ingestion_job_name, ingestion_job_start_time.

  • Data Quality (DQ) validation

    • Enforce non‑null order_id, customer_id, and order_timestamp.

    • Validate currency as three uppercase letters (basic ISO‑4217 check); optionally join to a reference table when available.

    • Ensure monetary fields are non‑negative unless status represents a reversal/refund.

    • Log row counts, null ratios, and total vs. derived total mismatches (tolerance configurable).

  • Write (draft‑first)

    • Append transformed rows to main.commerce.fact_orders_osmos_ai_draft with job_run_id and write_timestamp. Never delete/overwrite existing draft data.

    • Verify: count of written rows equals expected; schema parity with destination.

  • Promote to destination (guarded)

    • If and only if DRAFT_ONLY_MODE == False, insert only the rows with the current job_run_id from draft into main.commerce.fact_orders, then append processed files to main.default.osmos_engineer_job_state.

    • If DRAFT_ONLY_MODE == True, do not write to destination and do not update the state table; instead, log a summary of what would have been written.

  • Scheduling & state

    • Use main.default.osmos_engineer_job_state(job_name, job_start_time, file_path, file_last_modified) to pick files: include those not present for job_name or whose file_last_modified has changed.

    • Handle the "no files" case gracefully (warning only).

Step 6: Optional: Modify the ## Control flow

There may be certain circumstances where you wish to modify the code requirements.

Examples include, but not limited to:

  • Default DRAFT_ONLY_MODE = True for all development and test runs.

  • All writes use option("mergeSchema","false") and are append‑only.

  • If SparkUpgradeException for datetime rebase occurs, set option("datetimeRebaseMode","CORRECTED") on the problematic write; use LEGACY only when reading ancient files.

  • Promotion queries must filter by the current job_run_id to prevent accidental replay.

Step 7: Select Start Task

This will kick off the task's job.

Note: Today, there is no method to stop the task once it is started.

Last updated

Was this helpful?