Pentaho Data Integration
InstallationBusiness AnalyticsCToolsData CatalogData QualityLLMs
  • Overview
    • Pentaho Data Integration ..
  • Data Integration
    • Getting Started
      • Configuring PDI UI
      • KETTLE Variables
    • Concepts & Terminolgy
      • Hello World
      • Logging
      • Error Handling
    • Data Sources
      • Flat Files
        • Text
          • Text File Input
          • Text File Output
        • Excel
          • Excel Writer
        • XML
          • Read XML
        • JSON
          • Read JSON
      • Databases
        • CRUID
          • Database Connections
          • Create DB
          • Read DB
          • Update DB
          • Insert / Update DB
          • Delete DB
        • SCDs
          • SCDs
      • Object Stores
        • MinIO
      • SMB
      • Big Data
        • Hadoop
          • Apache Hadoop
    • Enrich Data
      • Merge
        • Merge Streams
        • Merge Rows (diff)
      • Joins
        • Cross Join
        • Merge Join
        • Database Join
        • XML Join
      • Lookups
        • Database Lookups
      • Scripting
        • Formula
        • Modified JavaScript Value
        • User Defined Java Class
    • Enterprise Solution
      • Jobs
        • Job - Hello World
        • Backward Chaining
        • Parallel
      • Parameters & Variables
        • Parameters
        • Variables
      • Scalability
        • Run Configurations
        • Partition
      • Monitoring & Scheduling
        • Monitoring & Scheduling
      • Logging
        • Logging
      • Dockmaker
        • BA & DI Servers
      • Metadata Injection
        • MDI
    • Plugins
      • Hierarchical Data Type
  • Use Cases
    • Streaming Data
      • MQTT
        • Mosquitto
        • HiveMQ
      • AMQP
        • RabbitMQ
      • Kafka
        • Kafka
    • Machine Learning
      • Prerequiste Tasks
      • AutoML
      • Credit Card
    • RESTful API
    • Jenkins
    • GenAI
  • Reference
    • Page 1
Powered by GitBook
On this page
  1. Data Integration
  2. Data Sources
  3. Big Data
  4. Hadoop

Apache Hadoop

Big Data stuff ..

PreviousHadoopNextEnrich Data

Last updated 1 month ago

x

Hadoop is a highly scalable, open-source, distributed computing platform that allows you to store and process large amounts of data. It is used to store and analyze data from a variety of sources, including databases, web servers, and file systems.

Hadoop is designed to be scalable by distributing the processing of data across a large number of computers. It also allows you to store and analyze data in a way that is faster than traditional methods. Hadoop is used to store and analyze data from a variety of sources, including databases, web servers, and file systems.

Components

The master node keeps track of the status of all the data nodes. If a data node goes down, the master node takes over the processing of that block. The slave nodes process the data on their own. HDFS requires a high-speed Internet connection. It is usually best to have at least a 10 Mbps network connection.

HDFS works on a time-based algorithm, which means that every block is processed in a predetermined time interval. This provides a high degree of scalability as all the nodes process the data in real time. HDFS is a great solution for data warehouse and business intelligence tasks. It is also a good solution for high-volume, high-frequency data analysis.

When a client connection is received from a master server, the NameNode running on the master server looks into both the local namespace and the master server’s namespace to find the matching records. The NameNode running on the master server then executes the lookup and returns a list of records that match the query.

DataNodes then get the records and start storing them. A data block is a record of data. Data nodes use the data blocks to store different types of data, such as text, images, videos, etc. NameNode maintains data node connections with clients based on the replication status of DataNodes. If a DataNode goes down, the client can still communicate with the NameNode. The client then gets the latest list of data blocks from the NameNode and communicates with the DataNode that has the newest data block.

Thus, DataNode is a compute-intensive task. It is therefore recommended to use the smallest possible DataNode. As DataNode stores data, it is recommended to choose a node that is close to the centre of the data. In a distributed system, all the nodes have to run the same version of Java, so it is recommended to use open-source Java. If there are multiple DataNodes, then they are expected to work in tandem to achieve a certain level of performance.

A file stored in Hadoop has the default 128 MB or 256 MB block size.

We have to ensure that the storage consumed by our application doesn’t exceed the level of data storage. If the storage consumed by our application is too much, then we have to choose our block size to avoid excessive metadata growth. If you are using a standard block size of 16KB, then you are good to go. You won’t even have to think about it.

Our HDFS block size will be chosen automatically. However, if we are using a large block size, then we have to think about the metadata explosion. We can either keep the metadata as small as possible or keep it as large as possible. HDFS has the option of keeping the metadata as large as possible.


Replication Management

When a node fails, the data stored on it is copied to another healthy node. This process is known as “replication”. If a DataNode fails, the data stored on it is not lost. It is simply stored on another DataNode. This is a good thing because it helps in the high availability of data. When you are using a DataNode as the primary storage for your data, you must keep in mind that it is a highly-available resource. If the primary storage for your data is a DataNode, you must make sure to have a proper backup and restoration mechanism in place.

A given file can have a replication factor of 3 or 1, but it will require 3 times the storage if we keep using a replication factor of 3. The NameNode keeps track of each data node’s block report and whenever a block is under-or over-replicated, it adds or removes replicas accordingly.

A given file can have a replication factor of 3 or 1, but it will require 3 times the storage if we keep using a replication factor of 3. The NameNode keeps track of each data node’s block report and whenever a block is under-or over-replicated, it adds or removes replicas accordingly.


Rack Awareness

When a block is deleted from a rack, the next available block will be placed on the rack. When a block is updated, the previous block is automatically updated in the same rack. This ensures data consistency across the storage network. In case of a fault in any of the data storage nodes, other storage nodes can be alerted through the rack awareness algorithm to take over the responsibility of the failed node.

This helps in providing failover capability across the storage network. This helps in providing high availability of data storage. As data stored in HDFS is massive, it makes sense to use multiple storage nodes for high-speed data access.

HDFS uses a distributed data store architecture to provide high-speed data access to its users. This distributed data store architecture allows for parallel processing of data in multiple data nodes. This parallel processing of data allows for high-speed storage of large data sets.

MapReduce is a data processing language and software framework that allows you to process large amounts of data in a reliable and efficient manner. It is a great fit for data-intensive, real-time and/or streaming applications. Basically, MapReduce allows you to partition your data and process items only when they are needed.

A map-reduce job consists of several maps and reduces functions. Each map function generates, parses, transforms, and filters data before passing it on to the next function. The reduced function groups, aggregates, and partitioning of this intermediate data from the map functions. The map task runs on the same node as the input source. Map tasks are responsible for generating summary statistics about data in the form of a report. The report can be viewed on a web browser or printed.

The output of a map task is the same as that of a reduced task. The only difference is that the map task returns a result whereas the reduced task returns a data structure that is applicable for further analysis. A map task is usually repetitive and is triggered when the data volume on the source is greater than the volume of data that can be processed in a short period of time.

YARN, also known as Yet Another Resource Negotiator, is a resource management and job scheduling/monitoring daemon in Hadoop. A resource manager in YARN isolates resource management from job scheduling and monitoring. A global Resource Manager oversees operations for the entire YARN network, including per-application Application Master. A job or a DAG of jobs may be defined as an application.

The Resource Manager manages resources for all the competing applications in the YARN framework. The Node Manager monitors resource usage by the container and passes it on to Resource Manger. There are resources such as CPU, memory, disk, and connectivity, among others. To perform and monitor the application, the Applcation Master talks to the ResourceManager and the Node Manager to handle and manage resources.

This section is for Reference only.

Run Hadoop containers

  1. Before you begin ensure Docker & Docker Compose have been installed & configured.

docker-compose --version
  1. Run the docker containers using docker-compose

cd
cd ~/Hadoop
docker-compose up -d
[+] Running 28/5
 ✔ datanode Pulled                                                        32.7s 
 ✔ namenode Pulled                                                        32.6s 
 ✔ nodemanager1 Pulled                                                    32.5s 
 ✔ resourcemanager Pulled                                                 32.3s 
 ✔ historyserver Pulled                                                   32.5s 
[+] Running 9/9
 ✔ Network hadoop_default                Creat...                          0.5s 
 ✔ Volume "hadoop_hadoop_datanode"       Created                           0.0s 
 ✔ Volume "hadoop_hadoop_historyserver"  Created                           0.0s 
 ✔ Volume "hadoop_hadoop_namenode"       Created                           0.0s 
 ✔ Container datanode                    Started                           3.8s 
 ✔ Container namenode                    Started                           3.9s 
 ✔ Container nodemanager                 Starte...                         3.9s 
 ✔ Container historyserver               Star...                           3.8s 
 ✔ Container resourcemanager             St...                             3.9s 
... 

Access the Cluster

  1. Can login into any node by specifying the container.

docker exec -it datanode /bin/bash 
  1. Navigate to mapped data volume.

cd hadoop/dfs/

Accessing the UI

The Namenode UI can be accessed at:

ResourceManager UI can be accessed at:

History Server UI can be accessed at:


Shutdown Cluster

To shut down the cluster.

docker-compose down

Time to check we can run some Hadoop Jobs.

So we're going to run a Job that counts the number of instances a word appears in the Canterbury Tales.

Test - Word Count Algorithm

  1. List all the files in our HDFS system.

hdfs dfs -l /
  1. Create a /user/root/ file.

hdfs dfs -mkdir -p /user/root
  1. Verify directory.

hdfs dfs -ls /user/
Found 1 items
drwxr-xr-x   - root supergroup          0 2024-08-10 13:59 /user/root
  1. Download the hadoop-mapreduce-examples-3.2.1-sources.jar file

We will use a .jar file containing the classes needed to execute MapReduce algorithm.

  1. Save - hadoop-mapreduce-examples-3.2.1-sources.jar to: ~/Hadoop

  2. Copy the files into the namenode container.

cd
cd ~/Hadoop/assets
docker cp hadoop-mapreduce-examples-3.2.1-sources.jar namenode:/tmp
docker cp pg2383.txt namenode:/tmp
  1. Create the Input folder.

docker exec -it namenode bash
hdfs dfs -mkdir /user/root/input
  1. Copy over /tmp/pg2383.txt to /user/root/input.

cd
cd /tmp
hdfs dfs -put pg2383.txt /user/root/input

10 . Run MapReduce

hadoop jar hadoop-mapreduce-examples-3.2.1-sources.jar org.apache.hadoop.examples.WordCount input output
2024-08-10 14:10:15,533 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.18.0.6:8032
2024-08-10 14:10:15,702 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.18.0.3:10200
2024-08-10 14:10:15,879 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1723287966223_0001
2024-08-10 14:10:15,969 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2024-08-10 14:10:16,068 INFO input.FileInputFormat: Total input files to process : 1
2024-08-10 14:10:16,088 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2024-08-10 14:10:16,101 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2024-08-10 14:10:16,107 INFO mapreduce.JobSubmitter: number of splits:1
2024-08-10 14:10:16,189 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2024-08-10 14:10:16,200 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1723287966223_0001
2024-08-10 14:10:16,200 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-08-10 14:10:16,345 INFO conf.Configuration: resource-types.xml not found
2024-08-10 14:10:16,346 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2024-08-10 14:10:16,813 INFO impl.YarnClientImpl: Submitted application application_1723287966223_0001
2024-08-10 14:10:16,867 INFO mapreduce.Job: The url to track the job: http://resourcemanager:8088/proxy/application_1723287966223_0001/
2024-08-10 14:10:16,868 INFO mapreduce.Job: Running job: job_1723287966223_0001
2024-08-10 14:10:23,970 INFO mapreduce.Job: Job job_1723287966223_0001 running in uber mode : false
2024-08-10 14:10:23,971 INFO mapreduce.Job:  map 0% reduce 0%
2024-08-10 14:10:30,048 INFO mapreduce.Job:  map 100% reduce 0%
2024-08-10 14:10:34,065 INFO mapreduce.Job:  map 100% reduce 100%
2024-08-10 14:10:35,074 INFO mapreduce.Job: Job job_1723287966223_0001 completed successfully
2024-08-10 14:10:35,163 INFO mapreduce.Job: Counters: 54
	File System Counters
		FILE: Number of bytes read=187024
		FILE: Number of bytes written=832593
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=1692663
		HDFS: Number of bytes written=438623
		HDFS: Number of read operations=8
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
		HDFS: Number of bytes read erasure-coded=0
	Job Counters 
		Launched map tasks=1
		Launched reduce tasks=1
		Rack-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=10968
		Total time spent by all reduces in occupied slots (ms)=16448
		Total time spent by all map tasks (ms)=2742
		Total time spent by all reduce tasks (ms)=2056
		Total vcore-milliseconds taken by all map tasks=2742
		Total vcore-milliseconds taken by all reduce tasks=2056
		Total megabyte-milliseconds taken by all map tasks=11231232
		Total megabyte-milliseconds taken by all reduce tasks=16842752
	Map-Reduce Framework
		Map input records=36758
		Map output records=282822
		Map output bytes=2691784
		Map output materialized bytes=187016
		Input split bytes=112
		Combine input records=282822
		Combine output records=41330
		Reduce input groups=41330
		Reduce shuffle bytes=187016
		Reduce input records=41330
		Reduce output records=41330
		Spilled Records=82660
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=237
		CPU time spent (ms)=4180
		Physical memory (bytes) snapshot=862277632
		Virtual memory (bytes) snapshot=13577064448
		Total committed heap usage (bytes)=1277165568
		Peak Map Physical memory (bytes)=608587776
		Peak Map Virtual memory (bytes)=5115801600
		Peak Reduce Physical memory (bytes)=253689856
		Peak Reduce Virtual memory (bytes)=8461262848
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=1692551
	File Output Format Counters 
		Bytes Written=438623
  1. View the output.

hdfs dfs -cat /user/root/output/*
...
reserved,	1
reserved.	1
reserves,	1
resided	1
residence,	1
resign	1
resign,	1
resign.	1
resignation,	1
resist	1
resisted;	1
resistence,	1
resolution	4
resolved	8
resolves	1
resolving	2
resort,	3
resort;	1
resounded	1
resources	1
respect	2
respect.	1
respective	3
...
  1. Check the results accesing to the output folder.

hdfs dfs -ls /user/root/output
  1. Output the text file.

hdfs dfs -cat /user/root/output/part-r-00000 > /tmp/pg2383_wc.txt
exit
docker cp namenode:/tmp/pg2383_wc.txt 

x

Download & Save text file - or

Canterbury Tales
Ulysses
Apache Hadoop
Link to Apache Hadoop
http://localhost:9870/dfshealth.html#tab-overviewlocalhost
Logo
http://localhost:8088/localhost
http://localhost:8188/applicationhistorylocalhost
Central Repository: org/apache/hadoop/hadoop-mapreduce-examples/3.2.1
Logo
Apache Hadoop Architecture
Master / Slave Nodes
Block size
Block Replication
Rack Awareness
MapReduce