Quick start guide

The shortest path to start streaming data with Datamin is to integrate with the source from where you want to stream, with the destination to where you want to stream and build a streaming pipeline for it.

Let's create two simple pipeline examples.

The first pipeline will stream from a database to an API and will be run by a schedule. The second one will stream from Apache Kafka to a database and will be triggered from Kafka when a new message arrives on a topic.

Example 1. Streaming from a database to an API

Let's imagine, you have a PostgreSQL database and you want to stream new invoices from it to an API of your internal financial software.

Our goal here would be to build a pipeline like this:

Before that, let's start with creating a connection to the PostgreSQL and an API Endpoint, you want to stream to.

Adding PostgreSQL integration:

  1. Click on "Add integration"

  2. Select 'PostgreSQL"

  3. Fill in your connection details to the form as described here

  4. Click on "Save"

  5. To make sure that the connection works, you can also click on "Test connection" and fix the details if you get an error.

Adding API integration:

Now repeat the same procedure for the API integration.

  1. Click on "Add integration"

  2. Select 'API"

  3. Fill in your connection details to the form as described here

  4. Click on "Save"

Creating a pipeline

When both integrations are created, you can create the pipeline itself.

  1. Click on "Add pipeline" -> "Create blank"

  2. Drag and drop four tasks on the canvas: Query, Condition, For_each, API Call and connect them as shown in the picture below:

  1. Configure the "Query" task. Select the database integration you created on the first step and configure how you want to retrieve data from it.

For this example, we will select all invoices that were created within the last 30 minutes. We use the UI constructor, but you can, of course, use plain SQL as well.

  1. Configure the "Condition" task. Let's make sure that the dataset is not empty before we move it further through the pipeline:

  1. If the condition is true and the input dataset is not empty, the "For_each" task will split it into separate items and send them to the "API_call" one by one. No additional configuration is needed for this task.

  2. Last but not least, let's configure the "API_call" task. Select the API Endpoint, you configured in the second step and describe the JSON, you want to send to it. For example, like this:

Running and testing the pipeline

Now when we are done with creating the pipeline, you can run it manually and test if everything works as expected.

Click on "Run pipeline" in the top right corner.

The pipeline will start running and you will see the status of each task in real-time:

You can also click on "Show output log" in the same corner:

It will display the entire log in plain text format:

As you can see, my "Query" returns 0 rows and the pipeline doesn't pass through the "Condition" task. It can be also seen in the output for each task individually:

It means I need to check my "Query" or simply go to my database and make sure I have the right data in place. When it is fixed, the "Query" task will start showing the correct output:

Scheduling the pipeline

Now, when we make sure that our pipeline works and is tested, we can decide how we want to automate the execution of it.

Close the canvas and go to "Triggers":

Datamin is an agile software and supports multiple ways of triggering the pipeline.

Let's not focus on all of them for the quick start purpose, more information can be found in the pipeline documentation.

For this particular pipeline, let's say we want to run it twice per hour at 00th and 30th minutes. Click on "Edit pipeline schedule", configure this schedule in the form, and click on "Save".

Now, when the pipeline is scheduled, you can follow its statistics and execution log by clicking on its tabs:

Example 2. Streaming from Apache Kafka to an API

Now that you know how to build simple scheduling pipelines, it will be easier to make real-time streaming one.

Triggering a pipeline from outside of Datamin

The main change that we need to make to the previous one is to replace the "Query" block with the "External trigger":

By doing this we don't need to retrieve the invoices created within the last 30 minutes from the database and run this pipeline every 30 minutes anymore.

Instead, the pipeline can be integrated with one of the message brokers or queue software if they are already used in your organization. It can be Apache Kafka, RabbitMQ, Amazon SQS, Google Pub/Sub, and others. As soon as the new invoice is created, you can send it to one of these solutions and it will automatically trigger the pipeline at Datamin and forward the JSON data to it.

Remove the scheduling

First of all, we need to remove the scheduling since it is not needed anymore.

  1. Click on "Edit pipeline schedule"

  2. Click on "Clear"

  3. Click on "Save"

Now the schedule is cleared and the pipeline won't be run by it anymore.

Installing and configuring the Apache Kafka Trigger library

Let's imagine, that you have Apache Kafka already used in your organization. The good news, we already have an open-source listener for Kafka topics, that you can install and configure to send data to this particular pipeline.

Kafka-trigger is configured with environment variables. Besides the conventional way, the config variables can also be specified in the .env or .env.local file.

To configure from which Kafka topic you want to stream to this pipeline, you will need the pipeline UUID, which can be found on the preview page:

For example, if the Kafka topic you want to stream from is called test_topic and the pipeline UUID: is e87ddc79-8e3f-4dae-92a8-8fff57ca81d3 the topic-to-pipeline mapping will be DTMN_KT_KAFKA_TOPIC_MAPPING="test_topic:e87ddc79-8e3f-4dae-92a8-8fff57ca81d3"

As soon as you have the Kafka trigger installed and configured, you can start pushing data to the topic and see in the statistics of runs and logs, how the pipeline handles it.

That's pretty much it for the start!

Now that you are familiar with the basic concepts of how to set up data streaming with Datamin, you can learn more about

Last updated