Adding an Integration
Most integrations in Teamster follow a two-layer Library + Config pattern that separates reusable asset logic from per-school configuration.
Pattern overview
src/teamster/libraries/<integration>/assets.py ← reusable factory function
src/teamster/code_locations/<school>/
<integration>/
config/
assets-<name>.yaml ← per-school asset parameters
assets.py ← calls factory with config
Step 1 — Write the library factory
Create src/teamster/libraries/<integration>/assets.py with a
build_<integration>_asset() factory function. The factory accepts asset
parameters and returns a Dagster asset or list of assets.
# src/teamster/libraries/myintegration/assets.py
from dagster import asset
def build_myintegration_asset(name: str, endpoint: str, ...):
@asset(name=name, ...)
def _asset(context, myintegration_resource):
...
return _asset
Step 2 — Write the YAML config
Create one or more YAML files under
src/teamster/code_locations/<school>/<integration>/config/. Each file lists
the asset parameters for that school.
# config/assets-reports.yaml
assets:
- name: my_report
endpoint: /api/reports/my_report
...
Step 3 — Wire up the code location
Create src/teamster/code_locations/<school>/<integration>/assets.py and call
the factory for each YAML config:
# src/teamster/code_locations/kipptaf/myintegration/assets.py
from dagster import config_from_files
from teamster.libraries.myintegration.assets import build_myintegration_asset
assets = [
build_myintegration_asset(**config)
for config in config_from_files(
["src/teamster/code_locations/kipptaf/myintegration/config/assets-reports.yaml"]
)["assets"]
]
Step 4 — Add to definitions
Import the assets into the code location's definitions.py and add them to the
Definitions object along with any required resources.
# src/teamster/code_locations/kipptaf/definitions.py
from teamster.code_locations.kipptaf.myintegration.assets import assets as myintegration_assets
defs = Definitions(
assets=[..., *myintegration_assets],
resources={..., "myintegration_resource": MYINTEGRATION_RESOURCE},
)
Asset key convention
Asset keys follow the pattern [code_location, integration, table_name], e.g.
kippnewark/powerschool/students. The CustomDagsterDbtTranslator in
libraries/dbt/dagster_dbt_translator.py automatically prefixes dbt asset keys
with the code location name.
SFTP assets
SFTP-based ingestion uses three factory functions from
teamster.libraries.sftp.assets, all of which write Avro records to GCS via
io_manager_gcs_avro:
| Factory | Use case |
|---|---|
build_sftp_file_asset() |
Matches a single file; raises if multiple files match |
build_sftp_archive_asset() |
Downloads a zip archive and extracts one file |
build_sftp_folder_asset() |
Collects all matching files in a folder and concatenates rows |
All three share the same core parameters: asset_key, remote_dir_regex,
remote_file_regex, ssh_resource_key, avro_schema, and optionally
partitions_def.
Partition key substitution
Named regex groups in remote_dir_regex and remote_file_regex are replaced
with partition key dimension values at runtime. For multi-partition assets, each
dimension name maps to a named group in the regex.
For example, an asset partitioned by fiscal_year with
remote_dir_regex = r"/reports/(?P<fiscal_year>\d{4})/" resolves the directory
to /reports/2026/ when materializing the 2026 partition.
Asset checks (Avro schema validation)
Every SFTP and API asset should declare an Avro schema validity check using the
two helpers in teamster.core.asset_checks:
from teamster.core.asset_checks import (
build_check_spec_avro_schema_valid,
check_avro_schema_valid,
)
# In the factory — declare the check spec alongside the asset
check_specs = [build_check_spec_avro_schema_valid(asset_key)]
# In the asset body — run the check after yielding output
yield check_avro_schema_valid(asset_key, records, avro_schema)
The check warns (does not fail) when records contain fields not present in the Avro schema. This surfaces schema drift in the Dagster UI without blocking downstream assets.