AWS EC2 — twitter data flow using Nifi, Kafka and Clickhouse.

Paweł Klimiuk
8 min readFeb 27, 2021

The main purpose of this article is to show to the reader how to create data streaming system for gathering twits in real time using Nifi, Kafka and Clickhouse. The solution was done in AWS cloud using EC2 machines. I also used docker to set up Nifi. Kafka and Clickhouse where installed from scratch. If you want to repeat all steps you need AWS account and developer account on twitter (To get keys to twitter API).

Security group

First of all we need to create security group which allow as communicate between EC2 machines. To do this go to EC2 service and on left side in Network & Security choose Security Group. Name your security group. Add rules in Inbound rules:

  • SSH , source My IP. Save rule.

Edit once again Inbound rules and add:

  • All traffic, source Your security group and again rule
  • All trafic , source My IP.

Now save this rules.

AWS EC2 — Nifi

Choose EC2 service. EC2 is AWS service which give for the user ability to rent virtual machines, set up their own operating system and install on them applications they want. The EC2 will be our main component. This steps are the same for the machines where we will set up Nifi, Kafka and Clickhouse:

  1. On EC2 click Launch Instances.
  2. For the purpose of this article we will choose Ubuntu Server 20.04 LTS (64-bit x86).
  3. Choose m5.large machine and click Next.
  4. In configuration details we change nothing. Just click Next until you reach step Configure Security Group.
  5. Choose security group you created earlier.
  6. In step Review after you click Launch you will be asked If you want to create security key. Create new key and give it a name to it. Download created key and launch EC2 machine. Pem key is for Mac/Linux and ppk is when your local machine is windows.

Now you got created your first EC2 instance. If you want you can add the name to it (on the main page of EC2):

Ok now we have running EC2 machine with Ubuntu with configured security groups. Lets get to our EC2 machine. For this we will use our terminal. I am working on Mac so I do not have problem with ssh client. If you used windows you have to work on PUTTY or MobaXterm. If you work on linux you have ssh client build in. Now you have to configure CLI to work with AWS. Instruction how to do this you can find here: https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html

Greate. Now in this quick steps we will connect to our EC2 machine:

1.Open terminal and move to directory where you have your key:

cd [where is your key]

2. Now lets connect to our EC2 instance:

ssh -i mysecretkey.pem ubuntu@[your Public IPv4 DNS you get in AWS instance summary]

Some times you have to change permissions to your key file. Just type:

chmod 400 your_key.pem

Probably you will be asked if you want to continiue. Just type yes and confirm using enter. Great now you are connected to your EC2 machine in AWS!

To set up Nifi we will use docker. When we are connected type below command to download newest version of docker and install it on EC2 machine:

curl -fsSL get.docker.com -o get-docker.sh
sudo sh get-docker.sh

After installation of docker we have to download Nifi docker image and run in docker container.

sudo docker pull apache/nifi
sudo docker run --name nifi -p 8080:8080 -d apache/nifi:latest

Now copy Public IPv4 address from your EC2 summary in AWS and paste to your web browser address adding 8080 port (because we set 8080 port to communicate with our Nifi in docker).

Ok. Now we have our Nifi setup on our EC2 machine. Lets create flow files to get data from twitter (I am assume that you managed to create your twitter developer account and you now your access keys to this account):

  1. Drag and drop processor icon to the dashboard.
  2. Find GetTwitter processor.
  3. In the properties of this processor fill fields with Consumer Key, Consumer Secret, Access Token and Access Token Secret (you get this in your twitter developer account settings).
  4. Select for Twitter Endpoint : Filter Endpoint.
  5. Choose twitter ID to follow. I choose for Joe Biden (939091).
  6. In the settings of this processor check Automatically Terminate Relationships

Start our processor and see what will happen. Choose view data provenance and you should see your twits streamed:

Choose one record and (i icon on the left side) then choose content and view. You should get json of your twit:

Now we have to prepare Nifi flow to suit Kafka processor (Kafka will be setup on other EC2 machine).

Get another processor JoltTransformJSON, go to configuration, advanced. You have to add jolt specification for your twitts for example:

In my example I choose id of the twit and text of the twit. In Settings of this processor you have to check Automatically Terminate Relationships for failure and success. Join two processor using drag and drop between them:

You can stop running your processors to stop getting twitts if you want. Ok right now we leave Nifi for a while to set up Kafka on different EC2 machine. We will back later to Nifi to connect streams of twits with our Kafka on different EC2 machine.

AWS EC2 — Kafka

The process of setting new EC2 machine is the same as it was with Nifi. You have to choose security group you created and the same ssh key. I assume you managed to launched new EC2 machine. Now we set up Kafka on new machine. Open new terminal and connect to newly created EC2 machine using ssh (the same as you did in Nifi section).

In this case we do not use docker. We will install Kafka from scratch. However you can install Kafka using docker.

  1. In terminal:
sudo apt-get update

2. Install java 8 (because Kafka require Java 8):

sudo apt-get install -y openjdk-8-jdk

3. Download Kafka:

wget https://archive.apache.org/dist/kafka/2.6.0/kafka_2.12-2.6.0.tgz

4. Extract file:

tar -xzf kafka_2.12-2.6.0.tgz

5. Run zookeeper:

kafka_2.12-2.6.0/bin/zookeeper-server-start.sh kafka_2.12-2.6.0/config/zookeeper.properties &

Push enter to quit.

6. Run Kafka:

kafka_2.12-2.6.0/bin/kafka-server-start.sh kafka_2.12-2.6.0/config/server.properties &

Enter to quit.

7. Create Kafka topic:

kafka_2.12–2.6.0/bin/kafka-topics.sh — create — bootstrap-server localhost:9092 — replication-factor 1 — partitions 1 — topic twits

8. Run Kafka consumer:

kafka_2.12-2.6.0/bin/kafka-console-consumer.sh --topic twits --from-beginning --bootstrap-server localhost:9092

Now go to Nifi and add new processor PublishKafka_1_0.

In this processor properties:

  1. Kafka Brokers add EC2 Kafka Private IPv4 addresses:9092.
  2. Topic Name : Twits.
  3. Delivery Guarantee: Guarantee Single Node Delivery.
  4. Use transaction :false.

In Settings check Automatically Terminate Relationships for failure and success.

Now run your all nodes in Nifi and look to your terminal with running kafka consumer. You should see something like this:

Great. Now we have Nifi talking to Kafka. Time to Clickhouse.

AWS EC2 — Clickhouse

Clickhouse will be our database where we could make some SQL queries to access stored twitter data.

Setup new EC2 machine the same as you did with previous one. After that connect to through SSH to EC2 machine.

There is couple of approaches you can use to connect Kafka with Clickhouse. We will connect Kafka with Clickhouse using two tables and materialized view between them. First table will be work as Kafka Consumer the second table will store our tweets. Materialized view is necessary between two tables since on the other hand we will lost our streaming twits. More information about that can be found in this article: https://medium.com/streamthoughts/how-to-build-a-real-time-analytical-platform-using-kafka-ksqldb-and-clickhouse-bfabd65d05e4

In terminal:

  • add key to connect with repository:
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4
  • define list of sources:
echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list
  • update apt:
sudo apt update
  • Install clickhouse server and clickhouse client:
sudo DEBIAN_FRONTEND=noninteractive apt install --allow-unauthenticated --yes clickhouse-server clickhouse-client
  • start clickhouse server:
sudo systemctl start clickhouse-server
  • create database in Clickhouse:
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS twits"
  • create first table twit_stream which will be our consumer:
clickhouse-client --query "CREATE TABLE twits.twit_stream (twid_id Int64, twitt_text String) ENGINE = Kafka SETTINGS kafka_broker_list = '<EC2 Kafka Private IPv4 addresses>:9092', kafka_topic_list = 'twits', kafka_group_name = 'ch-group', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 1, kafka_num_consumers = 1"
  • create our 2nd table where we will store our twitter data:
clickhouse-client --query "CREATE TABLE twits.twit_final AS twits.twit_stream ENGINE = MergeTree() PARTITION BY twid_id ORDER BY twid_id"
  • and finally our materialized view (the bridge between our first and second table):
clickhouse-client --query "CREATE MATERIALIZED VIEW twits.consumer TO twits.twit_final AS SELECT * FROM twits.twit_stream"
  • get to the clickhouse using:
Clickhouse-client
  • use our twit database:
use twits
  • make simple select on our 2nd table:
select * from twit_final

After that you will see your twits in clickhouse table.

Conclusions

The article show how to set up streaming system on AWS EC2 machines. Unfortunately I was not able to connect superset on other EC2 machine with EC2 clickhouse to make some dashboards with real time streams. If you got success with connecting Clickhouse with Superset to make visualization I will be grateful to share this with medium community.

--

--