Apache Hadoop
Big Data stuff ..
Last updated
Big Data stuff ..
Last updated
x
Hadoop is a highly scalable, open-source, distributed computing platform that allows you to store and process large amounts of data. It is used to store and analyze data from a variety of sources, including databases, web servers, and file systems.
Hadoop is designed to be scalable by distributing the processing of data across a large number of computers. It also allows you to store and analyze data in a way that is faster than traditional methods. Hadoop is used to store and analyze data from a variety of sources, including databases, web servers, and file systems.
The master node keeps track of the status of all the data nodes. If a data node goes down, the master node takes over the processing of that block. The slave nodes process the data on their own. HDFS requires a high-speed Internet connection. It is usually best to have at least a 10 Mbps network connection.
HDFS works on a time-based algorithm, which means that every block is processed in a predetermined time interval. This provides a high degree of scalability as all the nodes process the data in real time. HDFS is a great solution for data warehouse and business intelligence tasks. It is also a good solution for high-volume, high-frequency data analysis.
When a client connection is received from a master server, the NameNode running on the master server looks into both the local namespace and the master server’s namespace to find the matching records. The NameNode running on the master server then executes the lookup and returns a list of records that match the query.
DataNodes then get the records and start storing them. A data block is a record of data. Data nodes use the data blocks to store different types of data, such as text, images, videos, etc. NameNode maintains data node connections with clients based on the replication status of DataNodes. If a DataNode goes down, the client can still communicate with the NameNode. The client then gets the latest list of data blocks from the NameNode and communicates with the DataNode that has the newest data block.
Thus, DataNode is a compute-intensive task. It is therefore recommended to use the smallest possible DataNode. As DataNode stores data, it is recommended to choose a node that is close to the centre of the data. In a distributed system, all the nodes have to run the same version of Java, so it is recommended to use open-source Java. If there are multiple DataNodes, then they are expected to work in tandem to achieve a certain level of performance.
A file stored in Hadoop has the default 128 MB or 256 MB block size.
We have to ensure that the storage consumed by our application doesn’t exceed the level of data storage. If the storage consumed by our application is too much, then we have to choose our block size to avoid excessive metadata growth. If you are using a standard block size of 16KB, then you are good to go. You won’t even have to think about it.
Our HDFS block size will be chosen automatically. However, if we are using a large block size, then we have to think about the metadata explosion. We can either keep the metadata as small as possible or keep it as large as possible. HDFS has the option of keeping the metadata as large as possible.
Replication Management
When a node fails, the data stored on it is copied to another healthy node. This process is known as “replication”. If a DataNode fails, the data stored on it is not lost. It is simply stored on another DataNode. This is a good thing because it helps in the high availability of data. When you are using a DataNode as the primary storage for your data, you must keep in mind that it is a highly-available resource. If the primary storage for your data is a DataNode, you must make sure to have a proper backup and restoration mechanism in place.
A given file can have a replication factor of 3 or 1, but it will require 3 times the storage if we keep using a replication factor of 3. The NameNode keeps track of each data node’s block report and whenever a block is under-or over-replicated, it adds or removes replicas accordingly.
A given file can have a replication factor of 3 or 1, but it will require 3 times the storage if we keep using a replication factor of 3. The NameNode keeps track of each data node’s block report and whenever a block is under-or over-replicated, it adds or removes replicas accordingly.
Rack Awareness
When a block is deleted from a rack, the next available block will be placed on the rack. When a block is updated, the previous block is automatically updated in the same rack. This ensures data consistency across the storage network. In case of a fault in any of the data storage nodes, other storage nodes can be alerted through the rack awareness algorithm to take over the responsibility of the failed node.
This helps in providing failover capability across the storage network. This helps in providing high availability of data storage. As data stored in HDFS is massive, it makes sense to use multiple storage nodes for high-speed data access.
HDFS uses a distributed data store architecture to provide high-speed data access to its users. This distributed data store architecture allows for parallel processing of data in multiple data nodes. This parallel processing of data allows for high-speed storage of large data sets.
MapReduce is a data processing language and software framework that allows you to process large amounts of data in a reliable and efficient manner. It is a great fit for data-intensive, real-time and/or streaming applications. Basically, MapReduce allows you to partition your data and process items only when they are needed.
A map-reduce job consists of several maps and reduces functions. Each map function generates, parses, transforms, and filters data before passing it on to the next function. The reduced function groups, aggregates, and partitioning of this intermediate data from the map functions. The map task runs on the same node as the input source. Map tasks are responsible for generating summary statistics about data in the form of a report. The report can be viewed on a web browser or printed.
The output of a map task is the same as that of a reduced task. The only difference is that the map task returns a result whereas the reduced task returns a data structure that is applicable for further analysis. A map task is usually repetitive and is triggered when the data volume on the source is greater than the volume of data that can be processed in a short period of time.
YARN, also known as Yet Another Resource Negotiator, is a resource management and job scheduling/monitoring daemon in Hadoop. A resource manager in YARN isolates resource management from job scheduling and monitoring. A global Resource Manager oversees operations for the entire YARN network, including per-application Application Master. A job or a DAG of jobs may be defined as an application.
The Resource Manager manages resources for all the competing applications in the YARN framework. The Node Manager monitors resource usage by the container and passes it on to Resource Manger. There are resources such as CPU, memory, disk, and connectivity, among others. To perform and monitor the application, the Applcation Master talks to the ResourceManager and the Node Manager to handle and manage resources.
This section is for Reference only.
Run Hadoop containers
Before you begin ensure Docker & Docker Compose have been installed & configured.
docker-compose --version
Run the docker containers using docker-compose
cd
cd ~/Hadoop
docker-compose up -d
[+] Running 28/5
✔ datanode Pulled 32.7s
✔ namenode Pulled 32.6s
✔ nodemanager1 Pulled 32.5s
✔ resourcemanager Pulled 32.3s
✔ historyserver Pulled 32.5s
[+] Running 9/9
✔ Network hadoop_default Creat... 0.5s
✔ Volume "hadoop_hadoop_datanode" Created 0.0s
✔ Volume "hadoop_hadoop_historyserver" Created 0.0s
✔ Volume "hadoop_hadoop_namenode" Created 0.0s
✔ Container datanode Started 3.8s
✔ Container namenode Started 3.9s
✔ Container nodemanager Starte... 3.9s
✔ Container historyserver Star... 3.8s
✔ Container resourcemanager St... 3.9s
...
Access the Cluster
Can login into any node by specifying the container.
docker exec -it datanode /bin/bash
Navigate to mapped data volume.
cd hadoop/dfs/
Accessing the UI
The Namenode UI can be accessed at:
ResourceManager UI can be accessed at:
History Server UI can be accessed at:
Shutdown Cluster
To shut down the cluster.
docker-compose down
Time to check we can run some Hadoop Jobs.
So we're going to run a Job that counts the number of instances a word appears in the Canterbury Tales.
List all the files in our HDFS system.
hdfs dfs -l /
Create a /user/root/ file.
hdfs dfs -mkdir -p /user/root
Verify directory.
hdfs dfs -ls /user/
Found 1 items
drwxr-xr-x - root supergroup 0 2024-08-10 13:59 /user/root
Download the hadoop-mapreduce-examples-3.2.1-sources.jar file
We will use a .jar file containing the classes needed to execute MapReduce algorithm.
Save - hadoop-mapreduce-examples-3.2.1-sources.jar to: ~/Hadoop
Copy the files into the namenode container.
cd
cd ~/Hadoop/assets
docker cp hadoop-mapreduce-examples-3.2.1-sources.jar namenode:/tmp
docker cp pg2383.txt namenode:/tmp
Create the Input folder.
docker exec -it namenode bash
hdfs dfs -mkdir /user/root/input
Copy over /tmp/pg2383.txt to /user/root/input.
cd
cd /tmp
hdfs dfs -put pg2383.txt /user/root/input
10 . Run MapReduce
hadoop jar hadoop-mapreduce-examples-3.2.1-sources.jar org.apache.hadoop.examples.WordCount input output
2024-08-10 14:10:15,533 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.18.0.6:8032
2024-08-10 14:10:15,702 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.18.0.3:10200
2024-08-10 14:10:15,879 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1723287966223_0001
2024-08-10 14:10:15,969 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2024-08-10 14:10:16,068 INFO input.FileInputFormat: Total input files to process : 1
2024-08-10 14:10:16,088 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2024-08-10 14:10:16,101 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2024-08-10 14:10:16,107 INFO mapreduce.JobSubmitter: number of splits:1
2024-08-10 14:10:16,189 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2024-08-10 14:10:16,200 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1723287966223_0001
2024-08-10 14:10:16,200 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-08-10 14:10:16,345 INFO conf.Configuration: resource-types.xml not found
2024-08-10 14:10:16,346 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2024-08-10 14:10:16,813 INFO impl.YarnClientImpl: Submitted application application_1723287966223_0001
2024-08-10 14:10:16,867 INFO mapreduce.Job: The url to track the job: http://resourcemanager:8088/proxy/application_1723287966223_0001/
2024-08-10 14:10:16,868 INFO mapreduce.Job: Running job: job_1723287966223_0001
2024-08-10 14:10:23,970 INFO mapreduce.Job: Job job_1723287966223_0001 running in uber mode : false
2024-08-10 14:10:23,971 INFO mapreduce.Job: map 0% reduce 0%
2024-08-10 14:10:30,048 INFO mapreduce.Job: map 100% reduce 0%
2024-08-10 14:10:34,065 INFO mapreduce.Job: map 100% reduce 100%
2024-08-10 14:10:35,074 INFO mapreduce.Job: Job job_1723287966223_0001 completed successfully
2024-08-10 14:10:35,163 INFO mapreduce.Job: Counters: 54
File System Counters
FILE: Number of bytes read=187024
FILE: Number of bytes written=832593
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1692663
HDFS: Number of bytes written=438623
HDFS: Number of read operations=8
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
HDFS: Number of bytes read erasure-coded=0
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=10968
Total time spent by all reduces in occupied slots (ms)=16448
Total time spent by all map tasks (ms)=2742
Total time spent by all reduce tasks (ms)=2056
Total vcore-milliseconds taken by all map tasks=2742
Total vcore-milliseconds taken by all reduce tasks=2056
Total megabyte-milliseconds taken by all map tasks=11231232
Total megabyte-milliseconds taken by all reduce tasks=16842752
Map-Reduce Framework
Map input records=36758
Map output records=282822
Map output bytes=2691784
Map output materialized bytes=187016
Input split bytes=112
Combine input records=282822
Combine output records=41330
Reduce input groups=41330
Reduce shuffle bytes=187016
Reduce input records=41330
Reduce output records=41330
Spilled Records=82660
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=237
CPU time spent (ms)=4180
Physical memory (bytes) snapshot=862277632
Virtual memory (bytes) snapshot=13577064448
Total committed heap usage (bytes)=1277165568
Peak Map Physical memory (bytes)=608587776
Peak Map Virtual memory (bytes)=5115801600
Peak Reduce Physical memory (bytes)=253689856
Peak Reduce Virtual memory (bytes)=8461262848
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1692551
File Output Format Counters
Bytes Written=438623
View the output.
hdfs dfs -cat /user/root/output/*
...
reserved, 1
reserved. 1
reserves, 1
resided 1
residence, 1
resign 1
resign, 1
resign. 1
resignation, 1
resist 1
resisted; 1
resistence, 1
resolution 4
resolved 8
resolves 1
resolving 2
resort, 3
resort; 1
resounded 1
resources 1
respect 2
respect. 1
respective 3
...
Check the results accesing to the output folder.
hdfs dfs -ls /user/root/output
Output the text file.
hdfs dfs -cat /user/root/output/part-r-00000 > /tmp/pg2383_wc.txt
exit
docker cp namenode:/tmp/pg2383_wc.txt
x
Download & Save text file - or