Model Examples
DataSurface uses a declarative Python DSL to define your entire data ecosystem. Here are examples of how to model common patterns.
Defining a Producer (Ingestion)
This snippet defines a Datastore that pulls data from an external SQL database. It uses Environment Maps to switch between Prod and QA databases automatically.
def create_producer_model(eco: Ecosystem):
# Get the Team (assuming it exists)
gz = eco.getZoneOrThrow("USA")
team = gz.getTeamOrThrow("team1")
# 1. Define Environment Maps (Prod vs QA Database Connections)
# This allows the logical reference "customer_db" to resolve to different physical databases
# QA Connection
team.add(EnvironmentMap(
keyword="qa",
dataContainers={
frozenset(["customer_db"]): PostgresDatabase(
"CustomerDB_QA",
hostPort=HostPortPair("pg-data.qa.svc.cluster.local", 5432),
locations={LocationKey("MyCorp:USA/NY_1")},
databaseName="customer_db_qa",
productionStatus=ProductionStatus.NOT_PRODUCTION
)
}
))
# Prod Connection
team.add(EnvironmentMap(
keyword="prod",
dataContainers={
frozenset(["customer_db"]): PostgresDatabase(
"CustomerDB",
hostPort=HostPortPair("pg-data.prod.svc.cluster.local", 5432),
locations={LocationKey("MyCorp:USA/NY_1")},
databaseName="customer_db",
productionStatus=ProductionStatus.PRODUCTION
)
}
))
# 2. Add the Datastore (Producer)
team.add(
Datastore(
"Store1",
documentation=PlainTextDocumentation("Customer Data Store"),
# Define how to ingest this data
capture_metadata=SQLSnapshotIngestion(
EnvRefDataContainer("customer_db"), # <--- Resolves to Prod or QA DB based on environment
CronTrigger("Every 5 minute", "*/5 * * * *"),
IngestionConsistencyType.MULTI_DATASET,
Credential("postgres", CredentialType.USER_PASSWORD)
),
datasets=[
Dataset(
"customers",
schema=DDLTable(
columns=[
DDLColumn("id", VarChar(20), nullable=NullableStatus.NOT_NULLABLE, primary_key=PrimaryKeyStatus.PK),
DDLColumn("firstname", VarChar(100), nullable=NullableStatus.NOT_NULLABLE),
DDLColumn("email", VarChar(100))
]
)
)
]
)
)
Defining a Consumer Workspace
Workspaces define consumer contracts. They group required datasets and define service levels (History, Latency).
def create_consumer_model(team: Team):
# Define a Workspace for a consumer
team.add(
Workspace(
"Consumer1",
# The platform will manage the container for this workspace
DataPlatformManagedDataContainer("Consumer1 container"),
# Group 1: Real-time Live Data
DatasetGroup(
"LiveDSG",
sinks=[
DatasetSink("Store1", "customers"),
DatasetSink("Store1", "addresses")
],
platform_chooser=WorkspacePlatformConfig(
hist=ConsumerRetentionRequirements(
r=DataMilestoningStrategy.LIVE_ONLY, # Just the latest values
latency=DataLatency.MINUTES
)
),
),
# Group 2: Forensic History Data (SCD2)
DatasetGroup(
"ForensicDSG",
sinks=[
DatasetSink("Store1", "customers")
],
platform_chooser=WorkspacePlatformConfig(
hist=ConsumerRetentionRequirements(
r=DataMilestoningStrategy.FORENSIC, # Full history tracking
latency=DataLatency.MINUTES
)
)
)
)
)
Defining a Data Transformer
Transformers consume data from a Workspace and produce new Datasets. They use Environment Maps to manage code versions and configuration across Prod/QA.
def create_transformer_model(team: Team):
# 1. Define Environment Maps for Code Versioning & Configuration
# QA Environment: Uses QA code tag and QA keys
team.add(EnvironmentMap(
keyword="qa",
dtReleaseSelectors={
"custMaskRev": VersionPatternReleaseSelector("v1.0-qa", ReleaseType.STABLE_ONLY)
},
configMaps=[
StaticConfigMap("mask_customer_dt", {"mask_key": "qa_secret_key"})
]
))
# Prod Environment: Uses Prod code tag and Prod keys
team.add(EnvironmentMap(
keyword="prod",
dtReleaseSelectors={
"custMaskRev": VersionPatternReleaseSelector("v1.0-prod", ReleaseType.STABLE_ONLY)
},
configMaps=[
StaticConfigMap("mask_customer_dt", {"mask_key": "prod_secret_key"})
]
))
# 2. Define the Transformer Workspace
team.add(
Workspace(
"MaskingWorkspace",
DataPlatformManagedDataContainer("MaskingContainer"),
# Input Data
DatasetGroup("InputData", sinks=[DatasetSink("Store1", "customers")]),
# Transformer Definition
DataTransformer(
name="MaskedCustomerGenerator",
# Code Reference (Resolves via EnvMap 'custMaskRev')
code=PythonRepoCodeArtifact(
VersionedRepository(
GitHubRepository("myorg/transforms", "main"),
EnvRefReleaseSelector("custMaskRev")
)
),
# Configuration Reference (Resolves via EnvMap 'mask_customer_dt')
kv=EnvRefConfigMap("mask_customer_dt"),
trigger=CronTrigger("Every 1 minute", "*/1 * * * *"),
# Output Schema
store=Datastore(
name="MaskedCustomers",
documentation=PlainTextDocumentation("Anonymized Customer Data"),
datasets=[
Dataset(
"customers",
schema=DDLTable(
columns=[
DDLColumn("id", VarChar(20), nullable=NullableStatus.NOT_NULLABLE, primary_key=PrimaryKeyStatus.PK),
DDLColumn("hashed_name", VarChar(100), nullable=NullableStatus.NOT_NULLABLE)
]
),
classifications=[SimpleDC(SimpleDCTypes.PUB, "Public")]
)
]
)
)
)
)
Defining Runtime Environments (RTEs)
RTEs bind logical environments to specific infrastructure and code versions. This snippet shows how to define QA and Prod environments.
def create_runtime_environments(eco: Ecosystem, psp_prod: PlatformServiceProvider, psp_qa: PlatformServiceProvider):
# 1. Define the Production RTE
rte_prod = eco.getRuntimeEnvironmentOrThrow("prod")
# Configure Prod: Use 'prod' tagged models, run on the Production PSP
rte_prod.configure(
VersionPatternReleaseSelector(VersionPatterns.VN_N_N + "-prod", ReleaseType.STABLE_ONLY),
[PSPDeclaration(psp_prod.name, eco.owningRepo)], # Use the Prod PSP
ProductionStatus.PRODUCTION
)
rte_prod.setPSP(psp_prod)
# 2. Define the QA RTE
rte_qa = eco.getRuntimeEnvironmentOrThrow("qa")
# Configure QA: Use 'qa' tagged models, run on the QA PSP
rte_qa.configure(
VersionPatternReleaseSelector(VersionPatterns.VN_N_N + "-qa", ReleaseType.STABLE_ONLY),
[PSPDeclaration(psp_qa.name, eco.owningRepo)],
ProductionStatus.NOT_PRODUCTION
)
rte_qa.setPSP(psp_qa)