ingestr¶
About¶
ingestr is a versatile data I/O framework and command-line application to copy data between any source and any destination. It supports many data sources, destinations, and data loading strategies out of the box.
Adapters for CrateDB let you migrate data from any proprietary enterprise data warehouse or database to CrateDB or CrateDB Cloud, to consolidate infrastructure and save operational costs.
Install¶
uv tool install --prerelease=allow --upgrade 'cratedb-toolkit[io-ingestr]'
Synopsis¶
Load data from BigQuery into CrateDB.
ctk load table \
"bigquery://<project-name>?credentials_path=/path/to/service/account.json&location=<location>?table=<table-name>" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/bigquery_demo"
Load data from Databricks into CrateDB.
ctk load table \
"databricks://token:<access_token>@<server_hostname>?http_path=<http_path>&catalog=<catalog>&schema=<schema>&table=demo" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/databricks_demo"
Load data from PostgreSQL into CrateDB.
ctk load table \
"postgresql://<username>:<password>@postgresql.example.org:5432/postgres?table=information_schema.tables" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/postgresql_tables"
Load data from Salesforce into CrateDB.
ctk load table \
"salesforce://?username=<username>&password=<password>&token=<token>&table=opportunity" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/salesforce_opportunity"
Coverage¶
ingestr supports migration from 20-plus databases, data platforms, analytics engines, and other services, see sources supported by ingestr and databases supported by SQLAlchemy.
Databases
Actian Data Platform, Vector, Actian X, Ingres, Amazon Athena, Amazon Redshift, Amazon S3, Apache Drill, Apache Druid, Apache Hive and Presto, Apache Solr, Clickhouse, CockroachDB, CrateDB, Databend, Databricks, Denodo, DuckDB, EXASOL DB, Elasticsearch, Firebird, Firebolt, Google BigQuery, Google Sheets, Greenplum, HyperSQL (hsqldb), IBM DB2 and Informix, IBM Netezza Performance Server, Impala, Kinetica, Microsoft Access, Microsoft SQL Server, MonetDB, MongoDB, MySQL and MariaDB, OpenGauss, OpenSearch, Oracle, PostgreSQL, Rockset, SAP ASE, SAP HANA, SAP Sybase SQL Anywhere, Snowflake, SQLite, Teradata Vantage, TiDB, YDB, YugabyteDB.
Brokers
Amazon Kinesis, Apache Kafka (Amazon MSK, Confluent Kafka, Redpanda, RobustMQ)
File formats
CSV, JSONL/NDJSON, Parquet
Object stores
Amazon S3, Google Cloud Storage
Services
Airtable, Asana, GitHub, Google Ads, Google Analytics, Google Sheets, HubSpot, Notion, Personio, Salesforce, Slack, Stripe, Zendesk, etc.
Configure¶
For hands-on examples of the configuration parameters enumerated here, see usage section below.
Data source and destination
ingestr uses four parameters to address source and destination, while
ctk load table
uses just two, by embedding the source and destination
table names into the address URLs themselves, using the table
query
parameter.
Batch size
Because the underlying framework uses dlt, you will configure parameters like
batch size in your .dlt/config.toml
.
[data_writer]
buffer_max_items=1_000
file_max_items=100_000
file_max_bytes=50_000
Incremental loading
ingestr supports incremental loading, which means you can choose to append, merge or delete+insert data into the destination table using different strategies. Incremental loading allows you to ingest only the new rows from the source table into the destination table, which means that you don’t have to load the entire table every time you run the data migration procedure.
Credentials
Source: Data pipeline source elements use their specific way for configuring access credentials, using individual parameters.
Destination: CrateDB as the pipeline sink element uses the same way to
specify credentials across the board within the --cluster-url
CLI option.
Please note you must specify a password. If your account doesn’t use a password,
use an arbitrary string like na
.
Custom queries
ingestr provides custom queries for SQL sources, by using the query:
prefix
to the source table name. The feature can be used for partial table loading
to limit the import to use just a subset of the source columns, or for general
data filtering and aggregation purposes. Example:
ctk load table \
"crate://crate:na@localhost:4200/?table=query:SELECT * FROM sys.summits WHERE height > 4000" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/cratedb_summits"
Learn¶
ingestr
A few video tutorials about using ingestr with Google Analytics, Shopify, and Kafka.
Ingest data from Google Analytics with Ingestr
Ingest data from Shopify with Ingestr
Ingest data from Kafka with Ingestr
dlt
Like many other adapters, the CrateDB destination adapter for ingestr is implemented using dlt. If you have the need to embed an ETL pipeline into your application, please visit the dlt CrateDB examples to get an idea of what this would look like.
Usage¶
Using the framework is straight-forward. Either use the ingestr ingest
CLI
directly, or use the universal ctk load table
interface.
Tip
When using ingestr ingest
, please note you must address CrateDB like
PostgreSQL, but using the cratedb://
URL scheme instead of postgresql://
.
Please install ingestr v0.13.61 or higher, e.g. by using
pip install 'ingestr>=0.13.61'
.
When using ctk load table
like outlined below, you will use the --cluster-url
CLI option to address CrateDB using an SQLAlchemy URL, which uses the crate://
URL scheme.
Amazon Kinesis to CrateDB¶
ctk load table \
"kinesis://?aws_access_key_id=${AWS_ACCESS_KEY_ID}&aws_secret_access_key=${AWS_SECRET_ACCESS_KEY}®ion_name=eu-central-1&table=arn:aws:kinesis:eu-central-1:831394476016:stream/testdrive" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/kinesis_demo"
Source URL template: kinesis://?aws_access_key_id=<aws-access-key-id>&aws_secret_access_key=<aws-secret-access-key>®ion_name=<region-name>&table=arn:aws:kinesis:<region-name>:<aws-account-id>:stream/<stream_name>
Amazon Redshift to CrateDB¶
ctk load table \
"redshift+psycopg2://<username>:<password>@host.amazonaws.com:5439/database?table=demo" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/redshift_demo"
Amazon S3 to CrateDB¶
ctk load table \
"s3://?access_key_id=${AWS_ACCESS_KEY_ID}&secret_access_key=${AWS_SECRET_ACCESS_KEY}&table=openaq-fetches/realtime/2023-02-25/1677351953_eea_2aa299a7-b688-4200-864a-8df7bac3af5b.ndjson#jsonl" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/s3_ndjson_demo"
See documentation about ingestr and Amazon S3 about details of the URI format, file globbing patterns, compression options, and file type hinting options.
Source URL template: s3://?access_key_id=<aws-access-key-id>&secret_access_key=<aws-secret-access-key>&table=<bucket-name>/<file-glob>
Apache Kafka to CrateDB¶
ctk load table \
"kafka://?bootstrap_servers=localhost:9092&group_id=test_group&security_protocol=SASL_SSL&sasl_mechanisms=PLAIN&sasl_username=example_username&sasl_password=example_secret&batch_size=1000&batch_timeout=3&table=my-topic" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/kafka_demo"
Apache Solr to CrateDB¶
ctk load table \
"solr://<username>:<password>@<host>:<port>/solr/<collection>?table=<collection>" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/solr_demo"
Clickhouse to CrateDB¶
ctk load table \
"clickhouse://<username>:<password>@<host>:<port>?secure=<secure>&table=demo" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/clickhouse_demo"
CrateDB to CrateDB¶
ctk load table \
"crate://crate:na@localhost:4200/?table=sys.summits" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/cratedb_summits"
ctk load table \
"crate://crate:na@localhost:4200/?table=information_schema.tables" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/cratedb_tables"
CSV to CrateDB¶
ctk load table \
"csv://./examples/cdc/postgresql/diamonds.csv?table=sample" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/csv_diamonds"
Databricks to CrateDB¶
ctk load table \
"databricks://token:<access_token>@<server_hostname>?http_path=<http_path>&catalog=<catalog>&schema=<schema>&table=demo" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/databricks_demo"
DuckDB to CrateDB¶
ctk load table \
"duckdb:////path/to/demo.duckdb?table=information_schema.tables" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/duckdb_tables"
EXASOL DB to CrateDB¶
ctk load table \
"exa+websocket://sys:exasol@127.0.0.1:8888?table=demo" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/exasol_demo"
Elasticsearch to CrateDB¶
ctk load table \
"elasticsearch://<username>:<password>@es.example.org:9200?secure=false&verify_certs=false&table=demo" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/elastic_demo"
GitHub to CrateDB¶
ctk load table \
"github://?access_token=${GH_TOKEN}&owner=crate&repo=cratedb-toolkit&table=issues" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/github_ctk_issues"
Google Cloud Storage to CrateDB¶
ctk load table \
"gs://?credentials_path=/path/to/service-account.json?table=<bucket-name>/<file-glob>" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/gcs_demo"
Google BigQuery to CrateDB¶
ctk load table \
"bigquery://<project-name>?credentials_path=/path/to/service/account.json&location=<location>?table=<table-name>" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/bigquery_demo"
Google Sheets to CrateDB¶
ctk load table \
"gsheets://?credentials_path=/path/to/service/account.json&table=fkdUQ2bjdNfUq2CA.Sheet1" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/gsheets_demo"
HubSpot to CrateDB¶
ctk load table \
"hubspot://?api_key=<api-key-here>&table=deals" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/hubspot_deals"
See HubSpot entities about any labels you can use for the table
parameter
in the source URL.
MySQL to CrateDB¶
ctk load table \
"mysql://<username>:<password>@host:port/dbname?table=demo" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/mysql_demo"
Oracle to CrateDB¶
ctk load table \
"oracle+cx_oracle://<username>:<password>@<hostname>:<port>/dbname?table=demo" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/oracle_demo"
PostgreSQL to CrateDB¶
ctk load table \
"postgresql://<username>:<password>@postgresql.example.org:5432/postgres?table=information_schema.tables" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/postgresql_tables"
Salesforce to CrateDB¶
ctk load table \
"salesforce://?username=<username>&password=<password>&token=<token>&table=opportunity" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/salesforce_opportunity"
See Salesforce entities about any labels you can use for the table
parameter
in the source URL.
Slack to CrateDB¶
ctk load table \
"slack://?api_key=${SLACK_TOKEN}&table=channels" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/slack_channels"
See Slack entities about any labels you can use for the table
parameter
in the source URL.
Snowflake to CrateDB¶
ctk load table \
"snowflake://<username>:<password>@account/dbname?warehouse=COMPUTE_WH&role=data_scientist&table=demo" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/snowflake_demo"
SQLite to CrateDB¶
ctk load table \
"sqlite:////path/to/demo.sqlite?table=demo" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/sqlite_demo"
Teradata to CrateDB¶
ctk load table \
"teradatasql://guest:please@teradata.example.com/?table=demo" \
--cluster-url="crate://crate:na@localhost:4200/testdrive/teradata_demo"