Adding an SFTP Integration
This guide walks through adding a new SFTP file drop to the data platform using
scripts/init_sftp_integration.py. The script handles inspection, codegen, and
full pipeline scaffolding.
Prerequisites
- SFTP credentials loaded in your environment (injected from 1Password at devcontainer start)
- The credential env vars follow the naming convention:
<RESOURCE>_SFTP_HOST,<RESOURCE>_SFTP_USERNAME_<CODE_LOCATION>,<RESOURCE>_SFTP_PASSWORD_<CODE_LOCATION>
Step 1 — Inspect the SFTP server
List files in the remote directory to confirm the new file is being dropped:
uv run scripts/init_sftp_integration.py list <resource> <code_location> <path> --pattern "<filter>"
Example:
uv run scripts/init_sftp_integration.py list amplify kipptaf /PM --pattern "PM_CUSTOM"
The script connects using <RESOURCE>_SFTP_HOST and per-location credentials,
lists files matching the pattern, and shows file sizes. If no files match, it
prints a warning.
Step 2 — Download a sample file
uv run scripts/init_sftp_integration.py download <resource> <code_location> <path> --pattern "<filter>" --output /tmp/sample.csv
The script downloads the most recent file (by modification time) matching the pattern.
Step 3 — Preview the Pydantic class
Generate a Pydantic schema class from the CSV headers:
uv run scripts/init_sftp_integration.py codegen --local /tmp/sample.csv --class-name <ClassName>
Review the output — field names are normalized using python-slugify
(lowercased, special characters replaced with underscores). All fields are
str | None = None, matching the project convention of deferring type
correction to dbt.
Step 4 — Scaffold the pipeline
Generate all pipeline boilerplate in one command:
uv run scripts/init_sftp_integration.py scaffold <resource> \
--local /tmp/sample.csv \
--class-name <ClassName> \
--asset-name <snake_case_name> \
--source-subpath <subpath> \
--code-locations <loc1> <loc2>
Example:
uv run scripts/init_sftp_integration.py scaffold amplify \
--local /tmp/sample.csv \
--class-name PMStudentSummaryAimline \
--asset-name pm_student_summary_aimline \
--source-subpath mclass/sftp \
--code-locations kippnewark kipppaterson
What scaffold generates
| File | Description |
|---|---|
src/teamster/libraries/<resource>/<subpath>/schema.py |
Pydantic class appended |
src/teamster/code_locations/<loc>/<resource>/<subpath>/schema.py |
Avro schema constant added |
src/teamster/code_locations/<loc>/<resource>/<subpath>/assets.py |
Asset definition added (regex placeholder) |
tests/assets/test_assets_<resource>_sftp.py |
Integration test functions added |
src/dbt/<resource>/models/sources.yml |
External table source entry added |
src/dbt/<resource>/models/<subpath>/staging/stg_*.sql |
Passthrough SELECT * stub |
src/dbt/<resource>/models/<subpath>/staging/properties/stg_*.yml |
Contract disabled, placeholder for columns |
src/dbt/kipptaf/models/<resource>/.../sources-kipp*.yml |
Source entries for union |
src/dbt/kipptaf/models/<resource>/.../staging/stg_*.sql |
Union relations view |
The scaffold is idempotent — running it again skips files that already exist.
Sensor
No sensor changes are needed. The existing SFTP sensor for the resource scans
recursively and matches against all assets in its selection. Adding the new
asset to the assets list is sufficient.
Step 5 — Fill in placeholders
The scaffold prints a numbered TODO checklist with file paths and exact commands. You must fill in:
-
remote_dir_regexandremote_file_regexin each code location'sassets.py— replace the...placeholders with the correct regex patterns. Use named groups for partition keys (e.g.,(?P<school_year>[\d-]+)). -
Type casts and derived columns in the dbt staging SQL — the scaffold generates a
SELECT *passthrough. Add* replace (...)for type casts and additional computed columns as needed. -
Column definitions in the dbt properties YAML — add all columns with
data_type,description, and a uniqueness test. Then setcontract.enforced: true.
Step 6 — Materialize test data
Run the integration test to write data to the teamster-test GCS bucket:
uv run pytest tests/assets/test_assets_<resource>_sftp.py -k <asset_name> -v
This uses get_io_manager_gcs_avro(code_location="test", test=True) which
writes to teamster-test with a test/ path prefix.
Step 7 — Stage and build in your dev schema
Stage the external table and build the staging model in your personal dev schema
(zz_<user>_*). This is for local iteration — verifying type casts, column
definitions, and test results.
dbt-sxs.py has two independent flags:
--test— points the external table at theteamster-testGCS bucket (use when prod data doesn't exist yet)--target— controls the BigQuery schema:dev(default, personalzz_<user>_*) orstaging(sharedz_dev_*used by CI)
uv run scripts/dbt-sxs.py <district_project> --test --select <source_name>.<asset_name>
uv run dbt build -s <model_name> --project-dir src/dbt/<district_project>
Review the output — check for contract violations, test failures, and data quality warnings. Run this for each district project.
Step 8 — Stage and build for CI
The kipptaf union model sources from district staging tables. In CI (dbt Cloud),
these resolve to the shared z_dev_ schema. The district staging model must
exist there before kipptaf CI can build.
Re-run the stage and build with --target staging:
uv run scripts/dbt-sxs.py <district_project> --test --target staging --select <source_name>.<asset_name>
uv run dbt build -s <model_name> --project-dir src/dbt/<district_project> --target staging
!!! note Once the asset is materialized in production, drop --test so the
external table points at the production GCS bucket instead of teamster-test.
Summary
| Phase | What happens | Who |
|---|---|---|
| Inspect (Steps 1-3) | Explore SFTP, download sample, preview schema | Developer |
| Scaffold (Step 4) | Generate all pipeline boilerplate | Script |
| Customize (Step 5) | Fill in regex, type casts, column definitions | Developer |
| Validate (Steps 6-7) | Materialize, stage, build in personal dev | Developer |
| CI Setup (Step 8) | Stage and build in shared zdev schema | Developer |