diff --git a/website/docs/clustering.md b/website/docs/clustering.md index d2ceb196d023..8eb0dfbfaa1e 100644 --- a/website/docs/clustering.md +++ b/website/docs/clustering.md @@ -283,17 +283,17 @@ hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run hoodie.clustering.plan.strategy.sort.columns=column1,column2 ``` -### HoodieDeltaStreamer +### HoodieStreamer -This brings us to our users' favorite utility in Hudi. Now, we can trigger asynchronous clustering with DeltaStreamer. +This brings us to our users' favorite utility in Hudi. Now, we can trigger asynchronous clustering with Hudi Streamer. Just set the `hoodie.clustering.async.enabled` config to true and specify other clustering config in properties file -whose location can be pased as `—props` when starting the deltastreamer (just like in the case of HoodieClusteringJob). +whose location can be pased as `—props` when starting the Hudi Streamer (just like in the case of HoodieClusteringJob). -A sample spark-submit command to setup HoodieDeltaStreamer is as below: +A sample spark-submit command to setup HoodieStreamer is as below: ```bash spark-submit \ ---class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ +--class org.apache.hudi.utilities.streamer.HoodieStreamer \ /path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \ --props /path/to/config/clustering_kafka.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ diff --git a/website/docs/compaction.md b/website/docs/compaction.md index a6249b7ae7c4..9f7b119db431 100644 --- a/website/docs/compaction.md +++ b/website/docs/compaction.md @@ -45,14 +45,14 @@ import org.apache.spark.sql.streaming.ProcessingTime; writer.trigger(new ProcessingTime(30000)).start(tablePath); ``` -### DeltaStreamer Continuous Mode -Hudi DeltaStreamer provides continuous ingestion mode where a single long running spark application +### Hudi Streamer Continuous Mode +Hudi Streamer provides continuous ingestion mode where a single long running spark application ingests data to Hudi table continuously from upstream sources. In this mode, Hudi supports managing asynchronous compactions. Here is an example snippet for running in continuous mode with async compactions ```properties spark-submit --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \ ---class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ +--class org.apache.hudi.utilities.streamer.HoodieStreamer \ --table-type MERGE_ON_READ \ --target-base-path \ --target-table \ @@ -76,7 +76,7 @@ you may want Synchronous compaction, which means that as a commit is written it Compaction is run synchronously by passing the flag "--disable-compaction" (Meaning to disable async compaction scheduling). When both ingestion and compaction is running in the same spark context, you can use resource allocation configuration -in DeltaStreamer CLI such as ("--delta-sync-scheduling-weight", +in Hudi Streamer CLI such as ("--delta-sync-scheduling-weight", "--compact-scheduling-weight", ""--delta-sync-scheduling-minshare", and "--compact-scheduling-minshare") to control executor allocation between ingestion and compaction. diff --git a/website/docs/concurrency_control.md b/website/docs/concurrency_control.md index efa7f3212e25..7a014d16140c 100644 --- a/website/docs/concurrency_control.md +++ b/website/docs/concurrency_control.md @@ -5,7 +5,7 @@ toc: true last_modified_at: 2021-03-19T15:59:57-04:00 --- -In this section, we will cover Hudi's concurrency model and describe ways to ingest data into a Hudi Table from multiple writers; using the [DeltaStreamer](#deltastreamer) tool as well as +In this section, we will cover Hudi's concurrency model and describe ways to ingest data into a Hudi Table from multiple writers; using the [Hudi Streamer](#hudi-streamer) tool as well as using the [Hudi datasource](#datasource-writer). ## Supported Concurrency Controls @@ -19,7 +19,7 @@ between multiple table service writers and readers. Additionally, using MVCC, Hu the same Hudi Table. Hudi supports `file level OCC`, i.e., for any 2 commits (or writers) happening to the same table, if they do not have writes to overlapping files being changed, both writers are allowed to succeed. This feature is currently *experimental* and requires either Zookeeper or HiveMetastore to acquire locks. -It may be helpful to understand the different guarantees provided by [write operations](/docs/write_operations/) via Hudi datasource or the delta streamer. +It may be helpful to understand the different guarantees provided by [write operations](/docs/write_operations/) via Hudi datasource or the Hudi Streamer. ## Single Writer Guarantees @@ -171,21 +171,21 @@ inputDF.write.format("hudi") .save(basePath) ``` -## DeltaStreamer +## Hudi Streamer -The `HoodieDeltaStreamer` utility (part of hudi-utilities-bundle) provides ways to ingest from different sources such as DFS or Kafka, with the following capabilities. +The `HoodieStreamer` utility (part of hudi-utilities-bundle) provides ways to ingest from different sources such as DFS or Kafka, with the following capabilities. -Using optimistic_concurrency_control via delta streamer requires adding the above configs to the properties file that can be passed to the -job. For example below, adding the configs to kafka-source.properties file and passing them to deltastreamer will enable optimistic concurrency. -A deltastreamer job can then be triggered as follows: +Using optimistic_concurrency_control via Hudi Streamer requires adding the above configs to the properties file that can be passed to the +job. For example below, adding the configs to kafka-source.properties file and passing them to Hudi Streamer will enable optimistic concurrency. +A Hudi Streamer job can then be triggered as follows: ```java -[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \ - --props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \ +[hoodie]$ spark-submit --class org.apache.hudi.utilities.streamer.HoodieStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \ + --props file://${PWD}/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --source-ordering-field impresssiontime \ - --target-base-path file:\/\/\/tmp/hudi-deltastreamer-op \ + --target-base-path file:\/\/\/tmp/hudi-streamer-op \ --target-table uber.impressions \ --op BULK_INSERT ``` diff --git a/website/docs/deployment.md b/website/docs/deployment.md index 6d26df1a5d54..088f99834ad2 100644 --- a/website/docs/deployment.md +++ b/website/docs/deployment.md @@ -16,19 +16,19 @@ Specifically, we will cover the following aspects. ## Deploying All in all, Hudi deploys with no long running servers or additional infrastructure cost to your data lake. In fact, Hudi pioneered this model of building a transactional distributed storage layer -using existing infrastructure and its heartening to see other systems adopting similar approaches as well. Hudi writing is done via Spark jobs (DeltaStreamer or custom Spark datasource jobs), deployed per standard Apache Spark [recommendations](https://spark.apache.org/docs/latest/cluster-overview). +using existing infrastructure and its heartening to see other systems adopting similar approaches as well. Hudi writing is done via Spark jobs (Hudi Streamer or custom Spark datasource jobs), deployed per standard Apache Spark [recommendations](https://spark.apache.org/docs/latest/cluster-overview). Querying Hudi tables happens via libraries installed into Apache Hive, Apache Spark or PrestoDB and hence no additional infrastructure is necessary. A typical Hudi data ingestion can be achieved in 2 modes. In a single run mode, Hudi ingestion reads next batch of data, ingest them to Hudi table and exits. In continuous mode, Hudi ingestion runs as a long-running service executing ingestion in a loop. With Merge_On_Read Table, Hudi ingestion needs to also take care of compacting delta files. Again, compaction can be performed in an asynchronous-mode by letting compaction run concurrently with ingestion or in a serial fashion with one after another. -### DeltaStreamer +### Hudi Streamer -[DeltaStreamer](/docs/hoodie_deltastreamer#deltastreamer) is the standalone utility to incrementally pull upstream changes +[Hudi Streamer](/docs/hoodie_deltastreamer#hudi-streamer) is the standalone utility to incrementally pull upstream changes from varied sources such as DFS, Kafka and DB Changelogs and ingest them to hudi tables. It runs as a spark application in two modes. -To use DeltaStreamer in Spark, the `hudi-utilities-bundle` is required, by adding +To use Hudi Streamer in Spark, the `hudi-utilities-bundle` is required, by adding `--packages org.apache.hudi:hudi-utilities-bundle_2.11:0.13.0` to the `spark-submit` command. From 0.11.0 release, we start to provide a new `hudi-utilities-slim-bundle` which aims to exclude dependencies that can cause conflicts and compatibility issues with different versions of Spark. The `hudi-utilities-slim-bundle` should be used along with a Hudi Spark bundle @@ -36,7 +36,7 @@ corresponding to the Spark version used, e.g., `--packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.13.0,org.apache.hudi:hudi-spark3.1-bundle_2.12:0.13.0`, if using `hudi-utilities-bundle` solely in Spark encounters compatibility issues. - - **Run Once Mode** : In this mode, Deltastreamer performs one ingestion round which includes incrementally pulling events from upstream sources and ingesting them to hudi table. Background operations like cleaning old file versions and archiving hoodie timeline are automatically executed as part of the run. For Merge-On-Read tables, Compaction is also run inline as part of ingestion unless disabled by passing the flag "--disable-compaction". By default, Compaction is run inline for every ingestion run and this can be changed by setting the property "hoodie.compact.inline.max.delta.commits". You can either manually run this spark application or use any cron trigger or workflow orchestrator (most common deployment strategy) such as Apache Airflow to spawn this application. See command line options in [this section](/docs/hoodie_deltastreamer#deltastreamer) for running the spark application. + - **Run Once Mode** : In this mode, Hudi Streamer performs one ingestion round which includes incrementally pulling events from upstream sources and ingesting them to hudi table. Background operations like cleaning old file versions and archiving hoodie timeline are automatically executed as part of the run. For Merge-On-Read tables, Compaction is also run inline as part of ingestion unless disabled by passing the flag "--disable-compaction". By default, Compaction is run inline for every ingestion run and this can be changed by setting the property "hoodie.compact.inline.max.delta.commits". You can either manually run this spark application or use any cron trigger or workflow orchestrator (most common deployment strategy) such as Apache Airflow to spawn this application. See command line options in [this section](/docs/hoodie_deltastreamer#hudi-streamer) for running the spark application. Here is an example invocation for reading from kafka topic in a single-run mode and writing to Merge On Read table type in a yarn cluster. @@ -74,7 +74,7 @@ Here is an example invocation for reading from kafka topic in a single-run mode --conf spark.sql.catalogImplementation=hive \ --conf spark.sql.shuffle.partitions=100 \ --driver-class-path $HADOOP_CONF_DIR \ - --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ + --class org.apache.hudi.utilities.streamer.HoodieStreamer \ --table-type MERGE_ON_READ \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ @@ -84,7 +84,7 @@ Here is an example invocation for reading from kafka topic in a single-run mode --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider ``` - - **Continuous Mode** : Here, deltastreamer runs an infinite loop with each round performing one ingestion round as described in **Run Once Mode**. The frequency of data ingestion can be controlled by the configuration "--min-sync-interval-seconds". For Merge-On-Read tables, Compaction is run in asynchronous fashion concurrently with ingestion unless disabled by passing the flag "--disable-compaction". Every ingestion run triggers a compaction request asynchronously and this frequency can be changed by setting the property "hoodie.compact.inline.max.delta.commits". As both ingestion and compaction is running in the same spark context, you can use resource allocation configuration in DeltaStreamer CLI such as ("--delta-sync-scheduling-weight", "--compact-scheduling-weight", ""--delta-sync-scheduling-minshare", and "--compact-scheduling-minshare") to control executor allocation between ingestion and compaction. + - **Continuous Mode** : Here, Hudi Streamer runs an infinite loop with each round performing one ingestion round as described in **Run Once Mode**. The frequency of data ingestion can be controlled by the configuration "--min-sync-interval-seconds". For Merge-On-Read tables, Compaction is run in asynchronous fashion concurrently with ingestion unless disabled by passing the flag "--disable-compaction". Every ingestion run triggers a compaction request asynchronously and this frequency can be changed by setting the property "hoodie.compact.inline.max.delta.commits". As both ingestion and compaction is running in the same spark context, you can use resource allocation configuration in Hudi Streamer CLI such as ("--delta-sync-scheduling-weight", "--compact-scheduling-weight", ""--delta-sync-scheduling-minshare", and "--compact-scheduling-minshare") to control executor allocation between ingestion and compaction. Here is an example invocation for reading from kafka topic in a continuous mode and writing to Merge On Read table type in a yarn cluster. @@ -122,7 +122,7 @@ Here is an example invocation for reading from kafka topic in a continuous mode --conf spark.sql.catalogImplementation=hive \ --conf spark.sql.shuffle.partitions=100 \ --driver-class-path $HADOOP_CONF_DIR \ - --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ + --class org.apache.hudi.utilities.streamer.HoodieStreamer \ --table-type MERGE_ON_READ \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ @@ -161,7 +161,7 @@ As general guidelines, - We strive to keep all changes backwards compatible (i.e new code can read old data/timeline files) and when we cannot, we will provide upgrade/downgrade tools via the CLI - We cannot always guarantee forward compatibility (i.e old code being able to read data/timeline files written by a greater version). This is generally the norm, since no new features can be built otherwise. However any large such changes, will be turned off by default, for smooth transition to newer release. After a few releases and once enough users deem the feature stable in production, we will flip the defaults in a subsequent release. - - Always upgrade the query bundles (mr-bundle, presto-bundle, spark-bundle) first and then upgrade the writers (deltastreamer, spark jobs using datasource). This often provides the best experience and it's easy to fix + - Always upgrade the query bundles (mr-bundle, presto-bundle, spark-bundle) first and then upgrade the writers (Hudi Streamer, spark jobs using datasource). This often provides the best experience and it's easy to fix any issues by rolling forward/back the writer code (which typically you might have more control over) - With large, feature rich releases we recommend migrating slowly, by first testing in staging environments and running your own tests. Upgrading Hudi is no different than upgrading any database system. diff --git a/website/docs/docker_demo.md b/website/docs/docker_demo.md index 4d811925de55..0564bce20a7c 100644 --- a/website/docs/docker_demo.md +++ b/website/docs/docker_demo.md @@ -249,7 +249,7 @@ kcat -b kafkabroker -L -J | jq . ### Step 2: Incrementally ingest data from Kafka topic -Hudi comes with a tool named DeltaStreamer. This tool can connect to variety of data sources (including Kafka) to +Hudi comes with a tool named Hudi Streamer. This tool can connect to variety of data sources (including Kafka) to pull changes and apply to Hudi table using upsert/insert primitives. Here, we will use the tool to download json data from kafka topic and ingest to both COW and MOR tables we initialized in the previous step. This tool automatically initializes the tables in the file-system if they do not exist yet. @@ -257,9 +257,9 @@ automatically initializes the tables in the file-system if they do not exist yet ```java docker exec -it adhoc-2 /bin/bash -# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow table in HDFS +# Run the following spark-submit command to execute the Hudi Streamer and ingest to stock_ticks_cow table in HDFS spark-submit \ - --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ + --class org.apache.hudi.utilities.streamer.HoodieStreamer $HUDI_UTILITIES_BUNDLE \ --table-type COPY_ON_WRITE \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ @@ -267,9 +267,9 @@ spark-submit \ --target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider -# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_mor table in HDFS +# Run the following spark-submit command to execute the Hudi Streamer and ingest to stock_ticks_mor table in HDFS spark-submit \ - --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ + --class org.apache.hudi.utilities.streamer.HoodieStreamer $HUDI_UTILITIES_BUNDLE \ --table-type MERGE_ON_READ \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ @@ -279,7 +279,7 @@ spark-submit \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --disable-compaction -# As part of the setup (Look at setup_demo.sh), the configs needed for DeltaStreamer is uploaded to HDFS. The configs +# As part of the setup (Look at setup_demo.sh), the configs needed for Hudi Streamer is uploaded to HDFS. The configs # contain mostly Kafa connectivity settings, the avro-schema to be used for ingesting along with key and partitioning fields. exit @@ -743,9 +743,9 @@ Splits: 17 total, 17 done (100.00%) trino:default> exit ``` -### Step 5: Upload second batch to Kafka and run DeltaStreamer to ingest +### Step 5: Upload second batch to Kafka and run Hudi Streamer to ingest -Upload the second batch of data and ingest this batch using delta-streamer. As this batch does not bring in any new +Upload the second batch of data and ingest this batch using Hudi Streamer. As this batch does not bring in any new partitions, there is no need to run hive-sync ```java @@ -754,9 +754,9 @@ cat docker/demo/data/batch_2.json | kcat -b kafkabroker -t stock_ticks -P # Within Docker container, run the ingestion command docker exec -it adhoc-2 /bin/bash -# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow table in HDFS +# Run the following spark-submit command to execute the Hudi Streamer and ingest to stock_ticks_cow table in HDFS spark-submit \ - --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ + --class org.apache.hudi.utilities.streamer.HoodieStreamer $HUDI_UTILITIES_BUNDLE \ --table-type COPY_ON_WRITE \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ @@ -765,9 +765,9 @@ spark-submit \ --props /var/demo/config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider -# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_mor table in HDFS +# Run the following spark-submit command to execute the Hudi Streamer and ingest to stock_ticks_mor table in HDFS spark-submit \ - --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ + --class org.apache.hudi.utilities.streamer.HoodieStreamer $HUDI_UTILITIES_BUNDLE \ --table-type MERGE_ON_READ \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ @@ -780,7 +780,7 @@ spark-submit \ exit ``` -With Copy-On-Write table, the second ingestion by DeltaStreamer resulted in a new version of Parquet file getting created. +With Copy-On-Write table, the second ingestion by Hudi Streamer resulted in a new version of Parquet file getting created. See `http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_cow/2018/08/31` With Merge-On-Read table, the second ingestion merely appended the batch to an unmerged delta (log) file. diff --git a/website/docs/faq.md b/website/docs/faq.md index 0043aa97a2fa..d4801ae10f62 100644 --- a/website/docs/faq.md +++ b/website/docs/faq.md @@ -111,7 +111,7 @@ for even more flexibility and get away from Hive-style partition evol route. ### What are some ways to write a Hudi dataset? -Typically, you obtain a set of partial updates/inserts from your source and issue [write operations](https://hudi.apache.org/docs/write_operations/) against a Hudi dataset. If you ingesting data from any of the standard sources like Kafka, or tailing DFS, the [delta streamer](https://hudi.apache.org/docs/hoodie_deltastreamer#deltastreamer) tool is invaluable and provides an easy, self-managed solution to getting data written into Hudi. You can also write your own code to capture data from a custom source using the Spark datasource API and use a [Hudi datasource](https://hudi.apache.org/docs/writing_data/#spark-datasource-writer) to write into Hudi. +Typically, you obtain a set of partial updates/inserts from your source and issue [write operations](https://hudi.apache.org/docs/write_operations/) against a Hudi dataset. If you ingesting data from any of the standard sources like Kafka, or tailing DFS, the [Hudi Streamer](https://hudi.apache.org/docs/hoodie_deltastreamer#hudi-streamer) tool is invaluable and provides an easy, self-managed solution to getting data written into Hudi. You can also write your own code to capture data from a custom source using the Spark datasource API and use a [Hudi datasource](https://hudi.apache.org/docs/writing_data/#spark-datasource-writer) to write into Hudi. ### How is a Hudi job deployed? @@ -135,13 +135,13 @@ Limitations: Note that currently the reading realtime view natively out of the Spark datasource is not supported. Please use the Hive path below ``` -if Hive Sync is enabled in the [deltastreamer](https://github.com/apache/hudi/blob/d3edac4612bde2fa9deca9536801dbc48961fb95/docker/demo/sparksql-incremental.commands#L50) tool or [datasource](https://hudi.apache.org/docs/configurations#hoodiedatasourcehive_syncenable), the dataset is available in Hive as a couple of tables, that can now be read using HiveQL, Presto or SparkSQL. See [here](https://hudi.apache.org/docs/querying_data/) for more. +if Hive Sync is enabled in the [Hudi Streamer](https://github.com/apache/hudi/blob/d3edac4612bde2fa9deca9536801dbc48961fb95/docker/demo/sparksql-incremental.commands#L50) tool or [datasource](https://hudi.apache.org/docs/configurations#hoodiedatasourcehive_syncenable), the dataset is available in Hive as a couple of tables, that can now be read using HiveQL, Presto or SparkSQL. See [here](https://hudi.apache.org/docs/querying_data/) for more. ### How does Hudi handle duplicate record keys in an input? When issuing an `upsert` operation on a dataset and the batch of records provided contains multiple entries for a given key, then all of them are reduced into a single final value by repeatedly calling payload class's [preCombine()](https://github.com/apache/hudi/blob/d3edac4612bde2fa9deca9536801dbc48961fb95/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java#L40) method . By default, we pick the record with the greatest value (determined by calling .compareTo()) giving latest-write-wins style semantics. [This FAQ entry](https://hudi.apache.org/learn/faq#can-i-implement-my-own-logic-for-how-input-records-are-merged-with-record-on-storage) shows the interface for HoodieRecordPayload if you are interested. -For an insert or bulk_insert operation, no such pre-combining is performed. Thus, if your input contains duplicates, the dataset would also contain duplicates. If you don't want duplicate records either issue an upsert or consider specifying option to de-duplicate input in either [datasource](https://hudi.apache.org/docs/configurations#hoodiedatasourcewriteinsertdropduplicates) or [deltastreamer](https://github.com/apache/hudi/blob/d3edac4612bde2fa9deca9536801dbc48961fb95/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L229). +For an insert or bulk_insert operation, no such pre-combining is performed. Thus, if your input contains duplicates, the dataset would also contain duplicates. If you don't want duplicate records either issue an upsert or consider specifying option to de-duplicate input in either [datasource](https://hudi.apache.org/docs/configurations#hoodiedatasourcewriteinsertdropduplicates) or [Hudi Streamer](https://github.com/apache/hudi/blob/d3edac4612bde2fa9deca9536801dbc48961fb95/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java#L229). ### Can I implement my own logic for how input records are merged with record on storage? @@ -214,7 +214,7 @@ Hudi provides built in support for rewriting your entire dataset into Hudi one-t ### How can I pass hudi configurations to my spark job? -Hudi configuration options covering the datasource and low level Hudi write client (which both deltastreamer & datasource internally call) are [here](https://hudi.apache.org/docs/configurations/). Invoking *--help* on any tool such as DeltaStreamer would print all the usage options. A lot of the options that control upsert, file sizing behavior are defined at the write client level and below is how we pass them to different options available for writing data. +Hudi configuration options covering the datasource and low level Hudi write client (which both Hudi Streamer & datasource internally call) are [here](https://hudi.apache.org/docs/configurations/). Invoking *--help* on any tool such as Hudi Streamer would print all the usage options. A lot of the options that control upsert, file sizing behavior are defined at the write client level and below is how we pass them to different options available for writing data. - For Spark DataSource, you can use the "options" API of DataFrameWriter to pass in these configs. @@ -227,7 +227,7 @@ inputDF.write().format("org.apache.hudi") - When using `HoodieWriteClient` directly, you can simply construct HoodieWriteConfig object with the configs in the link you mentioned. - - When using HoodieDeltaStreamer tool to ingest, you can set the configs in properties file and pass the file as the cmdline argument "*--props*" + - When using HoodieStreamer tool to ingest, you can set the configs in properties file and pass the file as the cmdline argument "*--props*" ### How to create Hive style partition folder structure? @@ -253,7 +253,7 @@ set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat ### Can I register my Hudi dataset with Apache Hive metastore? -Yes. This can be performed either via the standalone [Hive Sync tool](https://hudi.apache.org/docs/syncing_metastore#hive-sync-tool) or using options in [deltastreamer](https://github.com/apache/hudi/blob/d3edac4612bde2fa9deca9536801dbc48961fb95/docker/demo/sparksql-incremental.commands#L50) tool or [datasource](https://hudi.apache.org/docs/configurations#hoodiedatasourcehive_syncenable). +Yes. This can be performed either via the standalone [Hive Sync tool](https://hudi.apache.org/docs/syncing_metastore#hive-sync-tool) or using options in [Hudi Streamer](https://github.com/apache/hudi/blob/d3edac4612bde2fa9deca9536801dbc48961fb95/docker/demo/sparksql-incremental.commands#L50) tool or [datasource](https://hudi.apache.org/docs/configurations#hoodiedatasourcehive_syncenable). ### How does the Hudi indexing work & what are its benefits? @@ -277,13 +277,13 @@ The Hudi cleaner process often runs right after a commit and deltacommit and goe ### What's Hudi's schema evolution story? -Hudi uses Avro as the internal canonical representation for records, primarily due to its nice [schema compatibility & evolution](https://docs.confluent.io/platform/current/schema-registry/avro.html) properties. This is a key aspect of having reliability in your ingestion or ETL pipelines. As long as the schema passed to Hudi (either explicitly in DeltaStreamer schema provider configs or implicitly by Spark Datasource's Dataset schemas) is backwards compatible (e.g no field deletes, only appending new fields to schema), Hudi will seamlessly handle read/write of old and new data and also keep the Hive schema up-to date. +Hudi uses Avro as the internal canonical representation for records, primarily due to its nice [schema compatibility & evolution](https://docs.confluent.io/platform/current/schema-registry/avro.html) properties. This is a key aspect of having reliability in your ingestion or ETL pipelines. As long as the schema passed to Hudi (either explicitly in Hudi Streamer schema provider configs or implicitly by Spark Datasource's Dataset schemas) is backwards compatible (e.g no field deletes, only appending new fields to schema), Hudi will seamlessly handle read/write of old and new data and also keep the Hive schema up-to date. ### How do I run compaction for a MOR dataset? Simplest way to run compaction on MOR dataset is to run the [compaction inline](https://hudi.apache.org/docs/configurations#hoodiecompactinline), at the cost of spending more time ingesting; This could be particularly useful, in common cases where you have small amount of late arriving data trickling into older partitions. In such a scenario, you may want to just aggressively compact the last N partitions while waiting for enough logs to accumulate for older partitions. The net effect is that you have converted most of the recent data, that is more likely to be queried to optimized columnar format. -That said, for obvious reasons of not blocking ingesting for compaction, you may want to run it asynchronously as well. This can be done either via a separate [compaction job](https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java) that is scheduled by your workflow scheduler/notebook independently. If you are using delta streamer, then you can run in [continuous mode](https://github.com/apache/hudi/blob/d3edac4612bde2fa9deca9536801dbc48961fb95/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L241) where the ingestion and compaction are both managed concurrently in a single spark run time. +That said, for obvious reasons of not blocking ingesting for compaction, you may want to run it asynchronously as well. This can be done either via a separate [compaction job](https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java) that is scheduled by your workflow scheduler/notebook independently. If you are using Hudi Streamer, then you can run in [continuous mode](https://github.com/apache/hudi/blob/d3edac4612bde2fa9deca9536801dbc48961fb95/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java#L241) where the ingestion and compaction are both managed concurrently in a single spark run time. ### What options do I have for asynchronous/offline compactions on MOR dataset? @@ -292,7 +292,7 @@ There are a couple of options depending on how you write to Hudi. But first let - Execution: In this step the compaction plan is read and file slices are compacted. Execution doesnt need the same level of coordination with other writers as Scheduling step and can be decoupled from ingestion job easily. Depending on how you write to Hudi these are the possible options currently. -- DeltaStreamer: +- Hudi Streamer: - In Continuous mode, asynchronous compaction is achieved by default. Here scheduling is done by the ingestion job inline and compaction execution is achieved asynchronously by a separate parallel thread. - In non continuous mode, only inline compaction is possible. - Please note in either mode, by passing --disable-compaction compaction is completely disabled @@ -359,7 +359,7 @@ b) **[Clustering](https://hudi.apache.org/blog/2021/01/27/hudi-clustering-intro) Hudi runs cleaner to remove old file versions as part of writing data either in inline or in asynchronous mode (0.6.0 onwards). Hudi Cleaner retains at-least one previous commit when cleaning old file versions. This is to prevent the case when concurrently running queries which are reading the latest file versions suddenly see those files getting deleted by cleaner because a new file version got added . In other words, retaining at-least one previous commit is needed for ensuring snapshot isolation for readers. -### How do I use DeltaStreamer or Spark DataSource API to write to a Non-partitioned Hudi dataset ? +### How do I use Hudi Streamer or Spark DataSource API to write to a Non-partitioned Hudi dataset ? Hudi supports writing to non-partitioned datasets. For writing to a non-partitioned Hudi dataset and performing hive table syncing, you need to set the below configurations in the properties passed: @@ -483,8 +483,8 @@ sudo ln -sf hudi-timeline-server-bundle-0.7.0.jar hudi-timeline-server-bundle.ja sudo ln -sf hudi-utilities-bundle_2.12-0.7.0.jar hudi-utilities-bundle.jar ``` -**Using the overriden jar in Deltastreamer:** -When invoking DeltaStreamer specify the above jar location as part of spark-submit command. +**Using the overriden jar in Hudi Streamer:** +When invoking Hudi Streamer specify the above jar location as part of spark-submit command. ### Why partition fields are also stored in parquet files in addition to the partition path ? diff --git a/website/docs/gcp_bigquery.md b/website/docs/gcp_bigquery.md index 58ef43435c0e..52823d7fc143 100644 --- a/website/docs/gcp_bigquery.md +++ b/website/docs/gcp_bigquery.md @@ -9,7 +9,7 @@ now, the Hudi-BigQuery integration only works for hive-style partitioned Copy-On ## Configurations -Hudi uses `org.apache.hudi.gcp.bigquery.BigQuerySyncTool` to sync tables. It works with `HoodieDeltaStreamer` via +Hudi uses `org.apache.hudi.gcp.bigquery.BigQuerySyncTool` to sync tables. It works with `HoodieStreamer` via setting sync tool class. A few BigQuery-specific configurations are required. | Config | Notes | @@ -33,22 +33,22 @@ hoodie.partition.metafile.use.base.format = 'true' ## Example -Below shows an example for running `BigQuerySyncTool` with `HoodieDeltaStreamer`. +Below shows an example for running `BigQuerySyncTool` with `HoodieStreamer`. ```shell spark-submit --master yarn \ --packages com.google.cloud:google-cloud-bigquery:2.10.4 \ --jars /opt/hudi-gcp-bundle-0.13.0.jar \ ---class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ +--class org.apache.hudi.utilities.streamer.HoodieStreamer \ /opt/hudi-utilities-bundle_2.12-0.13.0.jar \ --target-base-path gs://my-hoodie-table/path \ --target-table mytable \ --table-type COPY_ON_WRITE \ --base-file-format PARQUET \ -# ... other deltastreamer options +# ... other Hudi Streamer options --enable-sync \ --sync-tool-classes org.apache.hudi.gcp.bigquery.BigQuerySyncTool \ ---hoodie-conf hoodie.deltastreamer.source.dfs.root=gs://my-source-data/path \ +--hoodie-conf hoodie.streamer.source.dfs.root=gs://my-source-data/path \ --hoodie-conf hoodie.gcp.bigquery.sync.project_id=hudi-bq \ --hoodie-conf hoodie.gcp.bigquery.sync.dataset_name=rxusandbox \ --hoodie-conf hoodie.gcp.bigquery.sync.dataset_location=asia-southeast1 \ diff --git a/website/docs/hoodie_deltastreamer.md b/website/docs/hoodie_deltastreamer.md index bb95bd52b161..9342b5c53bb4 100644 --- a/website/docs/hoodie_deltastreamer.md +++ b/website/docs/hoodie_deltastreamer.md @@ -1,11 +1,24 @@ --- title: Streaming Ingestion -keywords: [hudi, deltastreamer, hoodiedeltastreamer] +keywords: [hudi, streamer, hoodiestreamer] --- -## DeltaStreamer +## Hudi Streamer +:::danger Breaking Change -The `HoodieDeltaStreamer` utility (part of `hudi-utilities-bundle`) provides the way to ingest from different sources such as DFS or Kafka, with the following capabilities. +The following classes were renamed and relocated to `org.apache.hudi.utilities.streamer` package. +- `DeltastreamerMultiWriterCkptUpdateFunc` is renamed to `StreamerMultiWriterCkptUpdateFunc` +- `DeltaSync` is renamed to `StreamSync` +- `HoodieDeltaStreamer` is renamed to `HoodieStreamer` +- `HoodieDeltaStreamerMetrics` is renamed to `HoodieStreamerMetrics` +- `HoodieMultiTableDeltaStreamer` is renamed to `HoodieMultiTableStreamer` + +To maintain backward compatiblity, the original classes are still present in the org.apache.hudi.utilities.deltastreamer +package, but have been deprecated. + +::: + +The `HoodieStreamer` utility (part of `hudi-utilities-bundle`) provides the way to ingest from different sources such as DFS or Kafka, with the following capabilities. - Exactly once ingestion of new events from Kafka, [incremental imports](https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide#_incremental_imports) from Sqoop or output of `HiveIncrementalPuller` or files under a DFS folder - Support json, avro or a custom record types for the incoming data @@ -16,11 +29,11 @@ The `HoodieDeltaStreamer` utility (part of `hudi-utilities-bundle`) provides the Command line options describe capabilities in more detail ```java -[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` --help +[hoodie]$ spark-submit --class org.apache.hudi.utilities.streamer.HoodieStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` --help Usage:
[options] Options: --checkpoint - Resume Delta Streamer from this checkpoint. + Resume Hudi Streamer from this checkpoint. --commit-on-errors Commit even when some records failed to be written Default: false @@ -33,7 +46,7 @@ Options: https://spark.apache.org/docs/latest/job-scheduling Default: 1 --continuous - Delta Streamer runs in continuous mode running source-fetch -> Transform + Hudi Streamer runs in continuous mode running source-fetch -> Transform -> Hudi Write in loop Default: false --delta-sync-scheduling-minshare @@ -91,7 +104,7 @@ Options: hoodie client props, sane defaults are used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, referto individual classes, for supported properties. - Default: file:///Users/vinoth/bin/hoodie/src/test/resources/delta-streamer-config/dfs-source.properties + Default: file:///Users/vinoth/bin/hoodie/src/test/resources/streamer-config/dfs-source.properties --schemaprovider-class subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach schemas to input & target table data, built in options: @@ -135,7 +148,7 @@ Options: ``` The tool takes a hierarchically composed property file and has pluggable interfaces for extracting data, key generation and providing schema. Sample configs for ingesting from kafka and dfs are -provided under `hudi-utilities/src/test/resources/delta-streamer-config`. +provided under `hudi-utilities/src/test/resources/streamer-config`. For e.g: once you have Confluent Kafka, Schema registry up & running, produce some test data using ([impressions.avro](https://docs.confluent.io/current/ksql/docs/tutorials/generate-custom-test-data) provided by schema-registry repo) @@ -146,12 +159,12 @@ For e.g: once you have Confluent Kafka, Schema registry up & running, produce so and then ingest it as follows. ```java -[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \ - --props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \ +[hoodie]$ spark-submit --class org.apache.hudi.utilities.streamer.HoodieStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \ + --props file://${PWD}/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --source-ordering-field impresssiontime \ - --target-base-path file:\/\/\/tmp/hudi-deltastreamer-op \ + --target-base-path file:\/\/\/tmp/hudi-streamer-op \ --target-table uber.impressions \ --op BULK_INSERT ``` @@ -163,61 +176,61 @@ From 0.11.0 release, we start to provide a new `hudi-utilities-slim-bundle` whic cause conflicts and compatibility issues with different versions of Spark. The `hudi-utilities-slim-bundle` should be used along with a Hudi Spark bundle corresponding the Spark version used to make utilities work with Spark, e.g., `--packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.13.0,org.apache.hudi:hudi-spark3.1-bundle_2.12:0.13.0`, -if using `hudi-utilities-bundle` solely to run `HoodieDeltaStreamer` in Spark encounters compatibility issues. +if using `hudi-utilities-bundle` solely to run `HoodieStreamer` in Spark encounters compatibility issues. -#### MultiTableDeltaStreamer +#### MultiTableStreamer -`HoodieMultiTableDeltaStreamer`, a wrapper on top of `HoodieDeltaStreamer`, enables one to ingest multiple tables at a single go into hudi datasets. Currently it only supports sequential processing of tables to be ingested and COPY_ON_WRITE storage type. The command line options for `HoodieMultiTableDeltaStreamer` are pretty much similar to `HoodieDeltaStreamer` with the only exception that you are required to provide table wise configs in separate files in a dedicated config folder. The following command line options are introduced +`HoodieMultiTableStreamer`, a wrapper on top of `HoodieStreamer`, enables one to ingest multiple tables at a single go into hudi datasets. Currently it only supports sequential processing of tables to be ingested and COPY_ON_WRITE storage type. The command line options for `HoodieMultiTableStreamer` are pretty much similar to `HoodieStreamer` with the only exception that you are required to provide table wise configs in separate files in a dedicated config folder. The following command line options are introduced ```java * --config-folder the path to the folder which contains all the table wise config files --base-path-prefix - this is added to enable users to create all the hudi datasets for related tables under one path in FS. The datasets are then created under the path - //. However you can override the paths for every table by setting the property hoodie.deltastreamer.ingestion.targetBasePath + this is added to enable users to create all the hudi datasets for related tables under one path in FS. The datasets are then created under the path - //. However you can override the paths for every table by setting the property hoodie.streamer.ingestion.targetBasePath ``` -The following properties are needed to be set properly to ingest data using `HoodieMultiTableDeltaStreamer`. +The following properties are needed to be set properly to ingest data using `HoodieMultiTableStreamer`. ```java -hoodie.deltastreamer.ingestion.tablesToBeIngested +hoodie.streamer.ingestion.tablesToBeIngested comma separated names of tables to be ingested in the format ., for example db1.table1,db1.table2 -hoodie.deltastreamer.ingestion.targetBasePath +hoodie.streamer.ingestion.targetBasePath if you wish to ingest a particular table in a separate path, you can mention that path here -hoodie.deltastreamer.ingestion..
.configFile +hoodie.streamer.ingestion..
.configFile path to the config file in dedicated config folder which contains table overridden properties for the particular table to be ingested. ``` -Sample config files for table wise overridden properties can be found under `hudi-utilities/src/test/resources/delta-streamer-config`. The command to run `HoodieMultiTableDeltaStreamer` is also similar to how you run `HoodieDeltaStreamer`. +Sample config files for table wise overridden properties can be found under `hudi-utilities/src/test/resources/streamer-config`. The command to run `HoodieMultiTableStreamer` is also similar to how you run `HoodieStreamer`. ```java -[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \ - --props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \ +[hoodie]$ spark-submit --class org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \ + --props file://${PWD}/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties \ --config-folder file://tmp/hudi-ingestion-config \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --source-ordering-field impresssiontime \ - --base-path-prefix file:\/\/\/tmp/hudi-deltastreamer-op \ + --base-path-prefix file:\/\/\/tmp/hudi-streamer-op \ --target-table uber.impressions \ --op BULK_INSERT ``` -For detailed information on how to configure and use `HoodieMultiTableDeltaStreamer`, please refer [blog section](/blog/2020/08/22/ingest-multiple-tables-using-hudi). +For detailed information on how to configure and use `HoodieMultiTableStreamer`, please refer [blog section](/blog/2020/08/22/ingest-multiple-tables-using-hudi). ### Concurrency Control -The `HoodieDeltaStreamer` utility (part of hudi-utilities-bundle) provides ways to ingest from different sources such as DFS or Kafka, with the following capabilities. +The `HoodieStreamer` utility (part of hudi-utilities-bundle) provides ways to ingest from different sources such as DFS or Kafka, with the following capabilities. -Using optimistic_concurrency_control via delta streamer requires adding the above configs to the properties file that can be passed to the -job. For example below, adding the configs to kafka-source.properties file and passing them to deltastreamer will enable optimistic concurrency. -A deltastreamer job can then be triggered as follows: +Using optimistic_concurrency_control via Hudi Streamer requires adding the above configs to the properties file that can be passed to the +job. For example below, adding the configs to kafka-source.properties file and passing them to Hudi Streamer will enable optimistic concurrency. +A Hudi Streamer job can then be triggered as follows: ```java -[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \ - --props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \ +[hoodie]$ spark-submit --class org.apache.hudi.utilities.streamer.HoodieStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \ + --props file://${PWD}/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --source-ordering-field impresssiontime \ - --target-base-path file:\/\/\/tmp/hudi-deltastreamer-op \ + --target-base-path file:\/\/\/tmp/hudi-streamer-op \ --target-table uber.impressions \ --op BULK_INSERT ``` @@ -225,14 +238,14 @@ A deltastreamer job can then be triggered as follows: Read more in depth about concurrency control in the [concurrency control concepts](/docs/concurrency_control) section ## Checkpointing -`HoodieDeltaStreamer` uses checkpoints to keep track of what data has been read already so it can resume without needing to reprocess all data. +`HoodieStreamer` uses checkpoints to keep track of what data has been read already so it can resume without needing to reprocess all data. When using a Kafka source, the checkpoint is the [Kafka Offset](https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management) When using a DFS source, the checkpoint is the 'last modified' timestamp of the latest file read. -Checkpoints are saved in the .hoodie commit file as `deltastreamer.checkpoint.key`. +Checkpoints are saved in the .hoodie commit file as `streamer.checkpoint.key`. If you need to change the checkpoints for reprocessing or replaying data you can use the following options: -- `--checkpoint` will set `deltastreamer.checkpoint.reset_key` in the commit file to overwrite the current checkpoint. +- `--checkpoint` will set `streamer.checkpoint.reset_key` in the commit file to overwrite the current checkpoint. - `--source-limit` will set a maximum amount of data to read from the source. For DFS sources, this is max # of bytes read. For Kafka, this is the max # of events to read. @@ -249,35 +262,35 @@ When fetching schemas from a registry, you can specify both the source schema an |Config|Description|Example| |---|---|---| -|hoodie.deltastreamer.schemaprovider.registry.url|The schema of the source you are reading from|https://foo:bar@schemaregistry.org| -|hoodie.deltastreamer.schemaprovider.registry.targetUrl|The schema of the target you are writing to|https://foo:bar@schemaregistry.org| +|hoodie.streamer.schemaprovider.registry.url|The schema of the source you are reading from|https://foo:bar@schemaregistry.org| +|hoodie.streamer.schemaprovider.registry.targetUrl|The schema of the target you are writing to|https://foo:bar@schemaregistry.org| -The above configs are passed to DeltaStreamer spark-submit command like: -```--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://foo:bar@schemaregistry.org``` +The above configs are passed to Hudi Streamer spark-submit command like: +```--hoodie-conf hoodie.streamer.schemaprovider.registry.url=https://foo:bar@schemaregistry.org``` ### JDBC Schema Provider You can obtain the latest schema through a JDBC connection. |Config|Description|Example| |---|---|---| -|hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url|The JDBC URL to connect to. You can specify source specific connection properties in the URL|jdbc:postgresql://localhost/test?user=fred&password=secret| -|hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type|The class name of the JDBC driver to use to connect to this URL|org.h2.Driver| -|hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username|username for the connection|fred| -|hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password|password for the connection|secret| -|hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable|The table with the schema to reference|test_database.test1_table or test1_table| -|hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout|The number of seconds the driver will wait for a Statement object to execute to the given number of seconds. Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.|0| -|hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable|If true, all columns are nullable|true| +|hoodie.streamer.schemaprovider.source.schema.jdbc.connection.url|The JDBC URL to connect to. You can specify source specific connection properties in the URL|jdbc:postgresql://localhost/test?user=fred&password=secret| +|hoodie.streamer.schemaprovider.source.schema.jdbc.driver.type|The class name of the JDBC driver to use to connect to this URL|org.h2.Driver| +|hoodie.streamer.schemaprovider.source.schema.jdbc.username|username for the connection|fred| +|hoodie.streamer.schemaprovider.source.schema.jdbc.password|password for the connection|secret| +|hoodie.streamer.schemaprovider.source.schema.jdbc.dbtable|The table with the schema to reference|test_database.test1_table or test1_table| +|hoodie.streamer.schemaprovider.source.schema.jdbc.timeout|The number of seconds the driver will wait for a Statement object to execute to the given number of seconds. Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.|0| +|hoodie.streamer.schemaprovider.source.schema.jdbc.nullable|If true, all columns are nullable|true| -The above configs are passed to DeltaStreamer spark-submit command like: -```--hoodie-conf hoodie.deltastreamer.jdbcbasedschemaprovider.connection.url=jdbc:postgresql://localhost/test?user=fred&password=secret``` +The above configs are passed to Hudi Streamer spark-submit command like: +```--hoodie-conf hoodie.streamer.jdbcbasedschemaprovider.connection.url=jdbc:postgresql://localhost/test?user=fred&password=secret``` ### File Based Schema Provider You can use a .avsc file to define your schema. You can then point to this file on DFS as a schema provider. |Config|Description|Example| |---|---|---| -|hoodie.deltastreamer.schemaprovider.source.schema.file|The schema of the source you are reading from|[example schema file](https://github.com/apache/hudi/blob/a8fb69656f522648233f0310ca3756188d954281/docker/demo/config/test-suite/source.avsc)| -|hoodie.deltastreamer.schemaprovider.target.schema.file|The schema of the target you are writing to|[example schema file](https://github.com/apache/hudi/blob/a8fb69656f522648233f0310ca3756188d954281/docker/demo/config/test-suite/target.avsc)| +|hoodie.streamer.schemaprovider.source.schema.file|The schema of the source you are reading from|[example schema file](https://github.com/apache/hudi/blob/a8fb69656f522648233f0310ca3756188d954281/docker/demo/config/test-suite/source.avsc)| +|hoodie.streamer.schemaprovider.target.schema.file|The schema of the target you are writing to|[example schema file](https://github.com/apache/hudi/blob/a8fb69656f522648233f0310ca3756188d954281/docker/demo/config/test-suite/target.avsc)| ### Hive Schema Provider @@ -285,10 +298,10 @@ You can use hive tables to fetch source and target schema. |Config| Description | |---|-------------------------------------------------------| -|hoodie.deltastreamer.schemaprovider.source.schema.hive.database| Hive database from where source schema can be fetched | -|hoodie.deltastreamer.schemaprovider.source.schema.hive.table| Hive table from where source schema can be fetched | -|hoodie.deltastreamer.schemaprovider.target.schema.hive.database| Hive database from where target schema can be fetched | -|hoodie.deltastreamer.schemaprovider.target.schema.hive.table| Hive table from where target schema can be fetched | +|hoodie.streamer.schemaprovider.source.schema.hive.database| Hive database from where source schema can be fetched | +|hoodie.streamer.schemaprovider.source.schema.hive.table| Hive table from where source schema can be fetched | +|hoodie.streamer.schemaprovider.target.schema.hive.database| Hive database from where target schema can be fetched | +|hoodie.streamer.schemaprovider.target.schema.hive.table| Hive table from where target schema can be fetched | ### Schema Provider with Post Processor @@ -297,7 +310,7 @@ then will apply a post processor to change the schema before it is used. You can this class: https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java ## Sources -Hoodie DeltaStreamer can read data from a wide variety of sources. The following are a list of supported sources: +Hoodie Streamer can read data from a wide variety of sources. The following are a list of supported sources: ### Distributed File System (DFS) See the storage configurations page to see some examples of DFS applications Hudi can read from. The following are the @@ -314,10 +327,10 @@ other formats and then write data as Hudi format.) For DFS sources the following behaviors are expected: - For JSON DFS source, you always need to set a schema. If the target Hudi table follows the same schema as from the source file, you just need to set the source schema. If not, you need to set schemas for both source and target. -- `HoodieDeltaStreamer` reads the files under the source base path (`hoodie.deltastreamer.source.dfs.root`) directly, and it won't use the partition paths under this base path as fields of the dataset. Detailed examples can be found [here](https://github.com/apache/hudi/issues/5485). +- `HoodieStreamer` reads the files under the source base path (`hoodie.streamer.source.dfs.root`) directly, and it won't use the partition paths under this base path as fields of the dataset. Detailed examples can be found [here](https://github.com/apache/hudi/issues/5485). ### Kafka -Hudi can read directly from Kafka clusters. See more details on `HoodieDeltaStreamer` to learn how to setup streaming +Hudi can read directly from Kafka clusters. See more details on `HoodieStreamer` to learn how to setup streaming ingestion with exactly once semantics, checkpointing, and plugin transformations. The following formats are supported when reading data from Kafka: @@ -334,9 +347,9 @@ to trigger/processing of new or changed data as soon as it is available on S3. 1. Enable S3 Event Notifications https://docs.aws.amazon.com/AmazonS3/latest/userguide/NotificationHowTo.html 2. Download the aws-java-sdk-sqs jar. 3. Find the queue URL and Region to set these configurations: - 1. hoodie.deltastreamer.s3.source.queue.url=https://sqs.us-west-2.amazonaws.com/queue/url - 2. hoodie.deltastreamer.s3.source.queue.region=us-west-2 -4. start the S3EventsSource and S3EventsHoodieIncrSource using the `HoodieDeltaStreamer` utility as shown in sample commands below: + 1. hoodie.streamer.s3.source.queue.url=https://sqs.us-west-2.amazonaws.com/queue/url + 2. hoodie.streamer.s3.source.queue.region=us-west-2 +4. start the S3EventsSource and S3EventsHoodieIncrSource using the `HoodieStreamer` utility as shown in sample commands below: Insert code sample from this blog: https://hudi.apache.org/blog/2021/08/23/s3-events-source/#configuration-and-setup @@ -345,27 +358,27 @@ Hudi can read from a JDBC source with a full fetch of a table, or Hudi can even |Config|Description|Example| |---|---|---| -|hoodie.deltastreamer.jdbc.url|URL of the JDBC connection|jdbc:postgresql://localhost/test| -|hoodie.deltastreamer.jdbc.user|User to use for authentication of the JDBC connection|fred| -|hoodie.deltastreamer.jdbc.password|Password to use for authentication of the JDBC connection|secret| -|hoodie.deltastreamer.jdbc.password.file|If you prefer to use a password file for the connection|| -|hoodie.deltastreamer.jdbc.driver.class|Driver class to use for the JDBC connection|| -|hoodie.deltastreamer.jdbc.table.name||my_table| -|hoodie.deltastreamer.jdbc.table.incr.column.name|If run in incremental mode, this field will be used to pull new data incrementally|| -|hoodie.deltastreamer.jdbc.incr.pull|Will the JDBC connection perform an incremental pull?|| -|hoodie.deltastreamer.jdbc.extra.options.|How you pass extra configurations that would normally by specified as spark.read.option()|hoodie.deltastreamer.jdbc.extra.options.fetchSize=100 hoodie.deltastreamer.jdbc.extra.options.upperBound=1 hoodie.deltastreamer.jdbc.extra.options.lowerBound=100| -|hoodie.deltastreamer.jdbc.storage.level|Used to control the persistence level|Default = MEMORY_AND_DISK_SER| -|hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch|Boolean which if set true makes an incremental fetch fallback to a full fetch if there is any error in the incremental read|FALSE| +|hoodie.streamer.jdbc.url|URL of the JDBC connection|jdbc:postgresql://localhost/test| +|hoodie.streamer.jdbc.user|User to use for authentication of the JDBC connection|fred| +|hoodie.streamer.jdbc.password|Password to use for authentication of the JDBC connection|secret| +|hoodie.streamer.jdbc.password.file|If you prefer to use a password file for the connection|| +|hoodie.streamer.jdbc.driver.class|Driver class to use for the JDBC connection|| +|hoodie.streamer.jdbc.table.name||my_table| +|hoodie.streamer.jdbc.table.incr.column.name|If run in incremental mode, this field will be used to pull new data incrementally|| +|hoodie.streamer.jdbc.incr.pull|Will the JDBC connection perform an incremental pull?|| +|hoodie.streamer.jdbc.extra.options.|How you pass extra configurations that would normally by specified as spark.read.option()|hoodie.streamer.jdbc.extra.options.fetchSize=100 hoodie.streamer.jdbc.extra.options.upperBound=1 hoodie.streamer.jdbc.extra.options.lowerBound=100| +|hoodie.streamer.jdbc.storage.level|Used to control the persistence level|Default = MEMORY_AND_DISK_SER| +|hoodie.streamer.jdbc.incr.fallback.to.full.fetch|Boolean which if set true makes an incremental fetch fallback to a full fetch if there is any error in the incremental read|FALSE| ### SQL Source SQL Source that reads from any table, used mainly for backfill jobs which will process specific partition dates. -This won't update the deltastreamer.checkpoint.key to the processed commit, instead it will fetch the latest successful +This won't update the streamer.checkpoint.key to the processed commit, instead it will fetch the latest successful checkpoint key and set that value as this backfill commits checkpoint so that it won't interrupt the regular incremental -processing. To fetch and use the latest incremental checkpoint, you need to also set this hoodie_conf for deltastremer -jobs: `hoodie.write.meta.key.prefixes = 'deltastreamer.checkpoint.key'` +processing. To fetch and use the latest incremental checkpoint, you need to also set this hoodie_conf for Hudi Streamer +jobs: `hoodie.write.meta.key.prefixes = 'streamer.checkpoint.key'` Spark SQL should be configured using this hoodie config: -hoodie.deltastreamer.source.sql.sql.query = 'select * from source_table' +hoodie.streamer.source.sql.sql.query = 'select * from source_table' ## Flink Ingestion @@ -545,7 +558,7 @@ There are many use cases that user put the full history data set onto the messag | `write.rate.limit` | `false` | `0` | Default disable the rate limit | ## Kafka Connect Sink -If you want to perform streaming ingestion into Hudi format similar to `HoodieDeltaStreamer`, but you don't want to depend on Spark, +If you want to perform streaming ingestion into Hudi format similar to `HoodieStreamer`, but you don't want to depend on Spark, try out the new experimental release of Hudi Kafka Connect Sink. Read the [ReadMe](https://github.com/apache/hudi/tree/master/hudi-kafka-connect) for full documentation. diff --git a/website/docs/key_generation.md b/website/docs/key_generation.md index 570e086ebe23..0c636a094224 100644 --- a/website/docs/key_generation.md +++ b/website/docs/key_generation.md @@ -106,9 +106,9 @@ Configs to be set: | Config | Meaning/purpose | | ------------- | -------------| -| ```hoodie.deltastreamer.keygen.timebased.timestamp.type``` | One of the timestamp types supported(UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR) | -| ```hoodie.deltastreamer.keygen.timebased.output.dateformat```| Output date format | -| ```hoodie.deltastreamer.keygen.timebased.timezone```| Timezone of the data format| +| ```hoodie.streamer.keygen.timebased.timestamp.type``` | One of the timestamp types supported(UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR) | +| ```hoodie.streamer.keygen.timebased.output.dateformat```| Output date format | +| ```hoodie.streamer.keygen.timebased.timezone```| Timezone of the data format| | ```oodie.deltastreamer.keygen.timebased.input.dateformat```| Input date format | Let's go over some example values for TimestampBasedKeyGenerator. @@ -117,9 +117,9 @@ Let's go over some example values for TimestampBasedKeyGenerator. | Config field | Value | | ------------- | -------------| -|```hoodie.deltastreamer.keygen.timebased.timestamp.type```| "EPOCHMILLISECONDS"| -|```hoodie.deltastreamer.keygen.timebased.output.dateformat``` | "yyyy-MM-dd hh" | -|```hoodie.deltastreamer.keygen.timebased.timezone```| "GMT+8:00" | +|```hoodie.streamer.keygen.timebased.timestamp.type```| "EPOCHMILLISECONDS"| +|```hoodie.streamer.keygen.timebased.output.dateformat``` | "yyyy-MM-dd hh" | +|```hoodie.streamer.keygen.timebased.timezone```| "GMT+8:00" | Input Field value: “1578283932000L”
Partition path generated from key generator: “2020-01-06 12” @@ -131,10 +131,10 @@ Partition path generated from key generator: “1970-01-01 08” | Config field | Value | | ------------- | -------------| -|```hoodie.deltastreamer.keygen.timebased.timestamp.type```| "DATE_STRING" | -|```hoodie.deltastreamer.keygen.timebased.output.dateformat```| "yyyy-MM-dd hh" | -|```hoodie.deltastreamer.keygen.timebased.timezone```| "GMT+8:00" | -|```hoodie.deltastreamer.keygen.timebased.input.dateformat```| "yyyy-MM-dd hh:mm:ss" | +|```hoodie.streamer.keygen.timebased.timestamp.type```| "DATE_STRING" | +|```hoodie.streamer.keygen.timebased.output.dateformat```| "yyyy-MM-dd hh" | +|```hoodie.streamer.keygen.timebased.timezone```| "GMT+8:00" | +|```hoodie.streamer.keygen.timebased.input.dateformat```| "yyyy-MM-dd hh:mm:ss" | Input field value: “2020-01-06 12:12:12”
Partition path generated from key generator: “2020-01-06 12” @@ -147,10 +147,10 @@ Partition path generated from key generator: “1970-01-01 12:00:00” | Config field | Value | | ------------- | -------------| -|```hoodie.deltastreamer.keygen.timebased.timestamp.type```| "SCALAR"| -|```hoodie.deltastreamer.keygen.timebased.output.dateformat```| "yyyy-MM-dd hh" | -|```hoodie.deltastreamer.keygen.timebased.timezone```| "GMT" | -|```hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit```| "days" | +|```hoodie.streamer.keygen.timebased.timestamp.type```| "SCALAR"| +|```hoodie.streamer.keygen.timebased.output.dateformat```| "yyyy-MM-dd hh" | +|```hoodie.streamer.keygen.timebased.timezone```| "GMT" | +|```hoodie.streamer.keygen.timebased.timestamp.scalar.time.unit```| "days" | Input field value: “20000L”
Partition path generated from key generator: “2024-10-04 12” @@ -162,12 +162,12 @@ Partition path generated from key generator: “1970-01-02 12” | Config field | Value | | ------------- | -------------| -|```hoodie.deltastreamer.keygen.timebased.timestamp.type```| "DATE_STRING"| -|```hoodie.deltastreamer.keygen.timebased.input.dateformat```| "yyyy-MM-dd'T'HH:mm:ss.SSSZ" | -|```hoodie.deltastreamer.keygen.timebased.input.dateformat.list.delimiter.regex```| "" | -|```hoodie.deltastreamer.keygen.timebased.input.timezone```| "" | -|```hoodie.deltastreamer.keygen.timebased.output.dateformat```| "yyyyMMddHH" | -|```hoodie.deltastreamer.keygen.timebased.output.timezone```| "GMT" | +|```hoodie.streamer.keygen.timebased.timestamp.type```| "DATE_STRING"| +|```hoodie.streamer.keygen.timebased.input.dateformat```| "yyyy-MM-dd'T'HH:mm:ss.SSSZ" | +|```hoodie.streamer.keygen.timebased.input.dateformat.list.delimiter.regex```| "" | +|```hoodie.streamer.keygen.timebased.input.timezone```| "" | +|```hoodie.streamer.keygen.timebased.output.dateformat```| "yyyyMMddHH" | +|```hoodie.streamer.keygen.timebased.output.timezone```| "GMT" | Input field value: "2020-04-01T13:01:33.428Z"
Partition path generated from key generator: "2020040113" @@ -176,12 +176,12 @@ Partition path generated from key generator: "2020040113" | Config field | Value | | ------------- | -------------| -|```hoodie.deltastreamer.keygen.timebased.timestamp.type```| "DATE_STRING"| -|```hoodie.deltastreamer.keygen.timebased.input.dateformat```| "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ" | -|```hoodie.deltastreamer.keygen.timebased.input.dateformat.list.delimiter.regex```| "" | -|```hoodie.deltastreamer.keygen.timebased.input.timezone```| "" | -|```hoodie.deltastreamer.keygen.timebased.output.dateformat```| "yyyyMMddHH" | -|```hoodie.deltastreamer.keygen.timebased.output.timezone```| "UTC" | +|```hoodie.streamer.keygen.timebased.timestamp.type```| "DATE_STRING"| +|```hoodie.streamer.keygen.timebased.input.dateformat```| "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ" | +|```hoodie.streamer.keygen.timebased.input.dateformat.list.delimiter.regex```| "" | +|```hoodie.streamer.keygen.timebased.input.timezone```| "" | +|```hoodie.streamer.keygen.timebased.output.dateformat```| "yyyyMMddHH" | +|```hoodie.streamer.keygen.timebased.output.timezone```| "UTC" | Input field value: "2020-04-01T13:01:33.428Z"
Partition path generated from key generator: "2020040113" @@ -190,12 +190,12 @@ Partition path generated from key generator: "2020040113" | Config field | Value | | ------------- | -------------| -|```hoodie.deltastreamer.keygen.timebased.timestamp.type```| "DATE_STRING"| -|```hoodie.deltastreamer.keygen.timebased.input.dateformat```| "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ" | -|```hoodie.deltastreamer.keygen.timebased.input.dateformat.list.delimiter.regex```| "" | -|```hoodie.deltastreamer.keygen.timebased.input.timezone```| "" | -|```hoodie.deltastreamer.keygen.timebased.output.dateformat```| "yyyyMMddHH" | -|```hoodie.deltastreamer.keygen.timebased.output.timezone```| "UTC" | +|```hoodie.streamer.keygen.timebased.timestamp.type```| "DATE_STRING"| +|```hoodie.streamer.keygen.timebased.input.dateformat```| "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ" | +|```hoodie.streamer.keygen.timebased.input.dateformat.list.delimiter.regex```| "" | +|```hoodie.streamer.keygen.timebased.input.timezone```| "" | +|```hoodie.streamer.keygen.timebased.output.dateformat```| "yyyyMMddHH" | +|```hoodie.streamer.keygen.timebased.output.timezone```| "UTC" | Input field value: "2020-04-01T13:01:33-**05:00**"
Partition path generated from key generator: "2020040118" @@ -204,12 +204,12 @@ Partition path generated from key generator: "2020040118" | Config field | Value | | ------------- | -------------| -|```hoodie.deltastreamer.keygen.timebased.timestamp.type```| "DATE_STRING"| -|```hoodie.deltastreamer.keygen.timebased.input.dateformat```| "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ,yyyyMMdd" | -|```hoodie.deltastreamer.keygen.timebased.input.dateformat.list.delimiter.regex```| "" | -|```hoodie.deltastreamer.keygen.timebased.input.timezone```| "UTC" | -|```hoodie.deltastreamer.keygen.timebased.output.dateformat```| "MM/dd/yyyy" | -|```hoodie.deltastreamer.keygen.timebased.output.timezone```| "UTC" | +|```hoodie.streamer.keygen.timebased.timestamp.type```| "DATE_STRING"| +|```hoodie.streamer.keygen.timebased.input.dateformat```| "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ,yyyyMMdd" | +|```hoodie.streamer.keygen.timebased.input.dateformat.list.delimiter.regex```| "" | +|```hoodie.streamer.keygen.timebased.input.timezone```| "UTC" | +|```hoodie.streamer.keygen.timebased.output.dateformat```| "MM/dd/yyyy" | +|```hoodie.streamer.keygen.timebased.output.timezone```| "UTC" | Input field value: "20200401"
Partition path generated from key generator: "04/01/2020" diff --git a/website/docs/metadata_indexing.md b/website/docs/metadata_indexing.md index 3a785c41d6c9..85a10dcec8d9 100644 --- a/website/docs/metadata_indexing.md +++ b/website/docs/metadata_indexing.md @@ -25,7 +25,7 @@ feature, please check out [this blog](https://www.onehouse.ai/blog/asynchronous- ## Setup Async Indexing -First, we will generate a continuous workload. In the below example, we are going to start a [deltastreamer](/docs/hoodie_deltastreamer#deltastreamer) which will continuously write data +First, we will generate a continuous workload. In the below example, we are going to start a [Hudi Streamer](/docs/hoodie_deltastreamer#hudi-streamer) which will continuously write data from raw parquet to Hudi table. We used the widely available [NY Taxi dataset](https://registry.opendata.aws/nyc-tlc-trip-records-pds/), whose setup details are as below:
Ingestion write config @@ -35,9 +35,9 @@ from raw parquet to Hudi table. We used the widely available [NY Taxi dataset](h hoodie.datasource.write.recordkey.field=VendorID hoodie.datasource.write.partitionpath.field=tpep_dropoff_datetime hoodie.datasource.write.precombine.field=tpep_dropoff_datetime -hoodie.deltastreamer.source.dfs.root=/Users/home/path/to/data/parquet_files/ -hoodie.deltastreamer.schemaprovider.target.schema.file=/Users/home/path/to/schema/schema.avsc -hoodie.deltastreamer.schemaprovider.source.schema.file=/Users/home/path/to/schema/schema.avsc +hoodie.streamer.source.dfs.root=/Users/home/path/to/data/parquet_files/ +hoodie.streamer.schemaprovider.target.schema.file=/Users/home/path/to/schema/schema.avsc +hoodie.streamer.schemaprovider.source.schema.file=/Users/home/path/to/schema/schema.avsc // set lock provider configs hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider hoodie.write.lock.zookeeper.url= @@ -50,12 +50,12 @@ hoodie.write.lock.zookeeper.base_path=
- Run deltastreamer + Run Hudi Streamer

```bash spark-submit \ ---class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /Users/home/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.13.0.jar` \ +--class org.apache.hudi.utilities.streamer.HoodieStreamer `ls /Users/home/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.13.0.jar` \ --props `ls /Users/home/path/to/write/config.properties` \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --source-ordering-field tpep_dropoff_datetime \ @@ -71,7 +71,7 @@ spark-submit \

-From version 0.11.0 onwards, Hudi metadata table is enabled by default and the files index will be automatically created. While the deltastreamer is running in continuous mode, let +From version 0.11.0 onwards, Hudi metadata table is enabled by default and the files index will be automatically created. While the Hudi Streamer is running in continuous mode, let us schedule the indexing for COLUMN_STATS index. First we need to define a properties file for the indexer. ### Configurations diff --git a/website/docs/metrics.md b/website/docs/metrics.md index eddba2a45d05..c6f2833f5ada 100644 --- a/website/docs/metrics.md +++ b/website/docs/metrics.md @@ -73,7 +73,7 @@ hoodie.metrics.datadog.metric.prefix= * `hoodie.metrics.datadog.metric.prefix` will help segregate metrics by setting different prefixes for different jobs. Note that it will use `.` to delimit the prefix and the metric name. For example, if the prefix is set to `foo`, then `foo.` will be prepended to the metric name. #### Demo -In this demo, we ran a `HoodieDeltaStreamer` job with `HoodieMetrics` turned on and other configurations set properly. +In this demo, we ran a `HoodieStreamer` job with `HoodieMetrics` turned on and other configurations set properly.
hudi_datadog_metrics.png @@ -85,7 +85,7 @@ In this demo, we ran a `HoodieDeltaStreamer` job with `HoodieMetrics` turned on * `.
.clean.duration` * `.
.index.lookup.duration` - as well as `HoodieDeltaStreamer`-specific metrics + as well as `HoodieStreamer`-specific metrics * `.
.deltastreamer.duration` * `.
.deltastreamer.hiveSyncDuration` diff --git a/website/docs/migration_guide.md b/website/docs/migration_guide.md index 449d65c376a5..31cbdbcb9561 100644 --- a/website/docs/migration_guide.md +++ b/website/docs/migration_guide.md @@ -36,15 +36,15 @@ Import your existing table into a Hudi managed table. Since all the data is Hudi There are a few options when choosing this approach. **Option 1** -Use the HoodieDeltaStreamer tool. HoodieDeltaStreamer supports bootstrap with --run-bootstrap command line option. There are two types of bootstrap, +Use the HoodieStreamer tool. HoodieStreamer supports bootstrap with --run-bootstrap command line option. There are two types of bootstrap, METADATA_ONLY and FULL_RECORD. METADATA_ONLY will generate just skeleton base files with keys/footers, avoiding full cost of rewriting the dataset. FULL_RECORD will perform a full copy/rewrite of the data as a Hudi table. -Here is an example for running FULL_RECORD bootstrap and keeping hive style partition with HoodieDeltaStreamer. +Here is an example for running FULL_RECORD bootstrap and keeping hive style partition with HoodieStreamer. ``` spark-submit --master local \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ ---class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \ +--class org.apache.hudi.utilities.streamer.HoodieStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \ --run-bootstrap \ --target-base-path /tmp/hoodie/bootstrap_table \ --target-table bootstrap_table \ diff --git a/website/docs/precommit_validator.md b/website/docs/precommit_validator.md index f7466002f89a..6dbd3354305d 100644 --- a/website/docs/precommit_validator.md +++ b/website/docs/precommit_validator.md @@ -5,7 +5,7 @@ keywords: [ hudi, quality, expectations, pre-commit validator] Data quality refers to the overall accuracy, completeness, consistency, and validity of data. Ensuring data quality is vital for accurate analysis and reporting, as well as for compliance with regulations and maintaining trust in your organization's data infrastructure. -Hudi offers **Pre-Commit Validators** that allow you to ensure that your data meets certain data quality expectations as you are writing with DeltaStreamer or Spark Datasource writers. +Hudi offers **Pre-Commit Validators** that allow you to ensure that your data meets certain data quality expectations as you are writing with Hudi Streamer or Spark Datasource writers. To configure pre-commit validators, use this setting `hoodie.precommit.validators=`. diff --git a/website/docs/querying_data.md b/website/docs/querying_data.md index b743fec3893c..f0f72f267eec 100644 --- a/website/docs/querying_data.md +++ b/website/docs/querying_data.md @@ -226,7 +226,7 @@ table like any other Hive table. ### Incremental query `HiveIncrementalPuller` allows incrementally extracting changes from large fact/dimension tables via HiveQL, combining the benefits of Hive (reliably process complex SQL queries) and incremental primitives (speed up querying tables incrementally instead of scanning fully). The tool uses Hive JDBC to run the hive query and saves its results in a temp table. -that can later be upserted. Upsert utility (`HoodieDeltaStreamer`) has all the state it needs from the directory structure to know what should be the commit time on the target table. +that can later be upserted. Upsert utility (`HoodieStreamer`) has all the state it needs from the directory structure to know what should be the commit time on the target table. e.g: `/app/incremental-hql/intermediate/{source_table_name}_temp/{last_commit_included}`.The Delta Hive table registered will be of the form `{tmpdb}.{source_table}_{last_commit_included}`. The following are the configuration options for HiveIncrementalPuller diff --git a/website/docs/quick-start-guide.md b/website/docs/quick-start-guide.md index 8de81f2856fc..a23ce2753942 100644 --- a/website/docs/quick-start-guide.md +++ b/website/docs/quick-start-guide.md @@ -328,7 +328,7 @@ location '/tmp/hudi/hudi_cow_pt_tbl'; **Create Table for an existing Hudi Table** -We can create a table on an existing hudi table(created with spark-shell or deltastreamer). This is useful to +We can create a table on an existing hudi table(created with spark-shell or Hudi Streamer). This is useful to read/write to/from a pre-existing hudi table. ```sql diff --git a/website/docs/s3_hoodie.md b/website/docs/s3_hoodie.md index 044b201c7cba..37f79ae75342 100644 --- a/website/docs/s3_hoodie.md +++ b/website/docs/s3_hoodie.md @@ -62,7 +62,7 @@ Alternatively, add the required configs in your core-site.xml from where Hudi ca ``` -Utilities such as hudi-cli or deltastreamer tool, can pick up s3 creds via environmental variable prefixed with `HOODIE_ENV_`. For e.g below is a bash snippet to setup +Utilities such as hudi-cli or Hudi Streamer tool, can pick up s3 creds via environmental variable prefixed with `HOODIE_ENV_`. For e.g below is a bash snippet to setup such variables and then have cli be able to work on datasets stored in s3 ```java diff --git a/website/docs/syncing_aws_glue_data_catalog.md b/website/docs/syncing_aws_glue_data_catalog.md index 0d9075993ec6..3ab47deeab77 100644 --- a/website/docs/syncing_aws_glue_data_catalog.md +++ b/website/docs/syncing_aws_glue_data_catalog.md @@ -10,7 +10,7 @@ and send them to AWS Glue. ### Configurations There is no additional configuration for using `AwsGlueCatalogSyncTool`; you just need to set it as one of the sync tool -classes for `HoodieDeltaStreamer` and everything configured as shown in [Sync to Hive Metastore](syncing_metastore) will +classes for `HoodieStreamer` and everything configured as shown in [Sync to Hive Metastore](syncing_metastore) will be passed along. ```shell diff --git a/website/docs/syncing_datahub.md b/website/docs/syncing_datahub.md index da7fdc876c6f..40fcd1d1891e 100644 --- a/website/docs/syncing_datahub.md +++ b/website/docs/syncing_datahub.md @@ -7,7 +7,7 @@ keywords: [hudi, datahub, sync] obeservability, federated governance, etc. Since Hudi 0.11.0, you can now sync to a DataHub instance by setting `DataHubSyncTool` as one of the sync tool classes -for `HoodieDeltaStreamer`. +for `HoodieStreamer`. The target Hudi table will be sync'ed to DataHub as a `Dataset`. The Hudi table's avro schema will be sync'ed, along with the commit timestamp when running the sync. @@ -29,18 +29,18 @@ the URN creation. ### Example -The following shows an example configuration to run `HoodieDeltaStreamer` with `DataHubSyncTool`. +The following shows an example configuration to run `HoodieStreamer` with `DataHubSyncTool`. -In addition to `hudi-utilities-bundle` that contains `HoodieDeltaStreamer`, you also add `hudi-datahub-sync-bundle` to +In addition to `hudi-utilities-bundle` that contains `HoodieStreamer`, you also add `hudi-datahub-sync-bundle` to the classpath. ```shell spark-submit --master yarn \ --jars /opt/hudi-datahub-sync-bundle-0.13.0.jar \ ---class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ +--class org.apache.hudi.utilities.streamer.HoodieStreamer \ /opt/hudi-utilities-bundle_2.12-0.13.0.jar \ --target-table mytable \ -# ... other HoodieDeltaStreamer's configs +# ... other HoodieStreamer's configs --enable-sync \ --sync-tool-classes org.apache.hudi.sync.datahub.DataHubSyncTool \ --hoodie-conf hoodie.meta.sync.datahub.emitter.server=http://url-to-datahub-instance:8080 \ diff --git a/website/docs/syncing_metastore.md b/website/docs/syncing_metastore.md index 10e6b9b88861..d1600be39670 100644 --- a/website/docs/syncing_metastore.md +++ b/website/docs/syncing_metastore.md @@ -5,7 +5,7 @@ keywords: [hudi, hive, sync] ## Hive Sync Tool -Writing data with [DataSource](/docs/writing_data) writer or [HoodieDeltaStreamer](/docs/hoodie_deltastreamer) supports syncing of the table's latest schema to Hive metastore, such that queries can pick up new columns and partitions. +Writing data with [DataSource](/docs/writing_data) writer or [HoodieStreamer](/docs/hoodie_deltastreamer) supports syncing of the table's latest schema to Hive metastore, such that queries can pick up new columns and partitions. In case, it's preferable to run this from commandline or in an independent jvm, Hudi provides a `HiveSyncTool`, which can be invoked as below, once you have built the hudi-hive module. Following is how we sync the above Datasource Writer written table to Hive metastore. diff --git a/website/docs/transforms.md b/website/docs/transforms.md index 4b594f7b8019..e03f58b0ece3 100644 --- a/website/docs/transforms.md +++ b/website/docs/transforms.md @@ -12,12 +12,12 @@ You can pass a SQL Query to be executed during write. ```scala --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer ---hoodie-conf hoodie.deltastreamer.transformer.sql=SELECT a.col1, a.col3, a.col4 FROM a +--hoodie-conf hoodie.streamer.transformer.sql=SELECT a.col1, a.col3, a.col4 FROM a ``` ### SQL File Transformer You can specify a File with a SQL script to be executed during write. The SQL file is configured with this hoodie property: -hoodie.deltastreamer.transformer.sql.file +hoodie.streamer.transformer.sql.file The query should reference the source as a table named "\" @@ -51,7 +51,7 @@ If you wish to use multiple transformers together, you can use the Chained trans Example below first flattens the incoming records and then does sql projection based on the query specified: ```scala --transformer-class org.apache.hudi.utilities.transform.FlatteningTransformer,org.apache.hudi.utilities.transform.SqlQueryBasedTransformer ---hoodie-conf hoodie.deltastreamer.transformer.sql=SELECT a.col1, a.col3, a.col4 FROM a +--hoodie-conf hoodie.streamer.transformer.sql=SELECT a.col1, a.col3, a.col4 FROM a ``` ### AWS DMS Transformer diff --git a/website/docs/use_cases.md b/website/docs/use_cases.md index f3fabdf04d40..4efb3bc4736e 100644 --- a/website/docs/use_cases.md +++ b/website/docs/use_cases.md @@ -29,7 +29,7 @@ are needed if ingestion is to keep up with the typically high update volumes. Even for immutable data sources like [Kafka](https://kafka.apache.org), there is often a need to de-duplicate the incoming events against what's stored on DFS. Hudi achieves this by [employing indexes](http://hudi.apache.org/blog/2020/11/11/hudi-indexing-mechanisms/) of different kinds, quickly and efficiently. -All of this is seamlessly achieved by the Hudi DeltaStreamer tool, which is maintained in tight integration with rest of the code +All of this is seamlessly achieved by the Hudi Streamer tool, which is maintained in tight integration with rest of the code and we are always trying to add more capture sources, to make this easier for the users. The tool also has a continuous mode, where it can self-manage clustering/compaction asynchronously, without blocking ingestion, significantly improving data freshness. diff --git a/website/docs/write_operations.md b/website/docs/write_operations.md index 746a93d057b2..fc0791bcf202 100644 --- a/website/docs/write_operations.md +++ b/website/docs/write_operations.md @@ -32,7 +32,7 @@ Hudi supports implementing two types of deletes on data stored in Hudi tables, b - **Hard Deletes** : A stronger form of deletion is to physically remove any trace of the record from the table. This can be achieved in 3 different ways. - Using DataSource, set `OPERATION_OPT_KEY` to `DELETE_OPERATION_OPT_VAL`. This will remove all the records in the DataSet being submitted. - Using DataSource, set `PAYLOAD_CLASS_OPT_KEY` to `"org.apache.hudi.EmptyHoodieRecordPayload"`. This will remove all the records in the DataSet being submitted. - - Using DataSource or DeltaStreamer, add a column named `_hoodie_is_deleted` to DataSet. The value of this column must be set to `true` for all the records to be deleted and either `false` or left null for any records which are to be upserted. + - Using DataSource or Hudi Streamer, add a column named `_hoodie_is_deleted` to DataSet. The value of this column must be set to `true` for all the records to be deleted and either `false` or left null for any records which are to be upserted. ## Writing path The following is an inside look on the Hudi write path and the sequence of events that occur during a write. diff --git a/website/docs/writing_data.md b/website/docs/writing_data.md index 51679a6cc88a..3cc0f3d22643 100644 --- a/website/docs/writing_data.md +++ b/website/docs/writing_data.md @@ -9,7 +9,7 @@ import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; In this section, we will cover ways to ingest new changes from external sources or even other Hudi tables. -The two main tools available are the [DeltaStreamer](/docs/hoodie_deltastreamer#deltastreamer) tool, as well as the [Spark Hudi datasource](#spark-datasource-writer). +The two main tools available are the [Hudi Streamer](/docs/hoodie_deltastreamer#hudi-streamer) tool, as well as the [Spark Hudi datasource](#spark-datasource-writer). ## Spark Datasource Writer