@dagster-io/dagster-pipes
v0.1.0
Published
A typescript implementation of the pipes protocol, for integration with https://github.com/dagster-io/dagster
Downloads
1,881
Readme
dagster-pipes-typescript
A pipes implementation for the typescript programming language.
Allows integration between any typescript process, and the dagster orchestrator.
Usage
Prerequisites
- install node and npm
- install the typescript compiler (
npm install -g typescript)
Installation
@dagster-io/dagster-pipes is available as an npm package:
npm install @dagster-io/dagster-pipesExample
In addition to the examples below - you can find an example dagster deployment with a typescript pipes asset in the
example-project/directory.
After installing the @dagster-io/dagster-pipes npm package, your typescript process can import it to integrate with dagster.
import * as dagster_pipes from '@dagster-io/dagster-pipes';
using context = dagster_pipes.openDagsterPipes()
context.logger.info("this is a log message from the external process")
context.logger.warning("This is an example warning from the external process")
context.reportAssetMaterialization(
{
"row_count": 100
}
)
context.reportAssetCheck(
"example_typescript_check", true
)And, in the dagster orchestration code, add the following:
import subprocess
from pathlib import Path
import dagster as dg
@dg.asset(
check_specs=[
dg.AssetCheckSpec(
name="example_typescript_check",
asset="example_typescript_asset"
)
]
)
def example_typescript_asset(
context: dg.AssetExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:
external_script_path = dg.file_relative_path(__file__, "../external_typescript_code/")
subprocess.run(["npm install"],
cwd=external_script_path).check_returncode()
subprocess.run(["tsc"],
cwd=external_script_path).check_returncode()
return pipes_subprocess_client.run(
command=["node", Path(external_script_path)],
context=context,
).get_materialize_result()
defs = dg.Definitions(
assets=[example_typescript_asset],
resources={
"pipes_subprocess_client": dg.PipesSubprocessClient()
},
)
Development
Compiling the development version
npm install && npm run buildTesting
npm test
uv run --all-extras pytestLinting
npm run lint