forked from abrosen/thesis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
CHRONUS_old.tex
394 lines (232 loc) · 46.9 KB
/
CHRONUS_old.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
\documentclass[10pt, conference, compsocconf]{IEEEtran}
\usepackage[utf8]{inputenc} % set input encoding (not needed with XeLaTeX)
%%% PAGE DIMENSIONS
%\usepackage{geometry} % to change the page dimensions
%\geometry{a4paper} % or letterpaper (US) or a5paper or....
% \geometry{margins=2in} % for example, change the margins to 2 inches all round
% \geometry{landscape} % set up the page for landscape
% read geometry.pdf for detailed page layout information
\usepackage{graphicx} % support the \includegraphics command and options
% \usepackage[parfill]{parskip} % Activate to begin paragraphs with an empty line rather than an indent
%%% PACKAGES
\usepackage{booktabs} % for much better looking tables
\usepackage{array} % for better arrays (eg matrices) in maths
\usepackage{paralist} % very flexible & customisable lists (eg. enumerate/itemize, etc.)
\usepackage{verbatim} % adds environment for commenting out blocks of text & for better verbatim
\usepackage{subfig} % make it possible to include more than one captioned figure/table in a single float
% These packages are all incorporated in the memoir class to one degree or another...
\usepackage{algorithm}
\usepackage{algpseudocode}
\usepackage{amsmath}
\usepackage{amsthm}
%%% HEADERS & FOOTERS
\usepackage{fancyhdr} % This should be set AFTER setting up the page geometry
\pagestyle{fancy} % options: empty , plain , fancy
\renewcommand{\headrulewidth}{0pt} % customise the layout...
\lhead{}\chead{}\rhead{}
\lfoot{}\cfoot{\thepage}\rfoot{}
%%% SECTION TITLE APPEARANCE
%\usepackage{sectsty}
%\allsectionsfont{\sffamily\mdseries\upshape} % (See the fntguide.pdf for font help)
% (This matches ConTeXt defaults)
%%% ToC (table of contents) APPEARANCE
%\usepackage[nottoc,notlof,notlot]{tocbibind} % Put the bibliography in the ToC
%\usepackage[titles,subfigure]{tocloft} % Alter the style of the Table of Contents
%\renewcommand{\cftsecfont}{\rmfamily\mdseries\upshape}
%\renewcommand{\cftsecpagefont}{\rmfamily\mdseries\upshape} % No bold!
%%% END Article customizations
%%% The "real" document content comes below...
\title{MapReduce on a Chord Distributed Hash Table}
\author{\IEEEauthorblockN{Andrew Rosen \qquad Brendan Benshoof \qquad Matt Erwin \qquad Robert W. Harrison \qquad Anu G. Bourgeois}
\IEEEauthorblockA{Department of Computer Science\\
Georgia State University\\
Atlanta, Georgia\\
}
%\author{
%Andrew Rosen \qquad Brendan Benshoof \qquad Matt Erwin \qquad Robert Harrison \qquad Anu Bourgeois \\Department of Computer Science, Georgia State University\\ 34 Peachtree St NW \\ Atlanta, Georgia 30303\\ [email protected] }
%\date{} % Activate to display a given date or no date (if empty),
% otherwise the current date is printed
\hyphenation{op-tical net-works semi-conduc-tor Chord-Reduce Map-Reduce Data-Nodes Name-Nodes}
\begin{document}
\maketitle
\begin{abstract}
This paper proposes ChordReduce, a novel implementation of Chord that acts as middleware for creating and running MapReduce jobs. ChordReduce satisfies the desired properties for a distributed MapReduce platform. Chord is a peer-to-peer networking protocol for distributed storage and file sharing that provides $\log_{2}(n)$ lookup time for any particular file or node. Files and nodes are evenly distributed across a ring overlay and organized such that the responsibilities of a failed node are automatically reassigned. ChordReduce leverages these features to distribute Map and Reduce tasks evenly among nodes and maintain a high degree of robustness during execution. The loss of a single node or a group of nodes during execution does not impact the soundness of the results and their tasks are automatically reassigned. An additional benefit is that nodes joining the ring during runtime can automatically have work distributed to them.
MapReduce frameworks are generally hierarchical, with the responsibility of scheduling work, distributing data and tasks, and tracking progress at the top. This leads to centralized MapReduce implementations having a single point of failure. A MapReduce framework with both responsibility and work distributed among its members would eliminate the need for a central source of coordination. An ideal MapReduce framework would need to be highly scalable, fault-tolerant during execution, able to handle a high degree of churn, and minimize the amount of traffic that results from maintaining the network.
Our experiments show that ChordReduce is an efficient implementation of MapReduce. The applications are far-reaching, especially for big data problems and those that are massively parallel. Implementing MapReduce on a Chord peer-to-peer network demonstrates that the Chord network is an excellent platform for distributed and concurrent programming in cloud and loosely coupled environments.
\end{abstract}
\begin{IEEEkeywords}
MapReduce; P2P; Parallel Processing; Peer-to-Peer Computing; Cloud Computing; Middleware;
\end{IEEEkeywords}
\section{Introduction}
Google's MapReduce \cite{mapreduce} paradigm has rapidly become an integral part in the world of data processing and is capable of efficiently executing numerous Big Data programming and data-reduction tasks. By using MapReduce, a user can take a large problem, split it into small, equivalent tasks and send those tasks to other processors for computation. The results are sent back to the user and combined into one answer. Many popular platforms for MapReduce, such as Hadoop \cite{Hadoop}, utilize a central source of coordination and organization to store and operate on data. The hierarchical structure of Hadoop results in a single point of failure at the node that concentrates the results and also requires a complicated scheme for handling node failures.
We have developed a system, called ChordReduce, which employs a less hierarchical structure. It is a system that can scale, is fault tolerant, has a minimal amount of latency, and distributes tasks evenly. ChordReduce leverages the underlying protocol from Chord \cite{Chord} to distribute Map and Reduce tasks to nodes evenly, provide greater data redundancy, and guarantee a greater amount of fault tolerance. Rather than viewing Chord solely as a means for sharing files, we see it as a means for distributing work. This paper establishes the effectiveness of using Chord as a framework for distributed programming. At the same time we avoid the architectural and file system constraints of systems like Hadoop.
Chord \cite{Chord} is a peer-to-peer (P2P) protocol for file sharing and distributed storage that guarantees a high probability $\log_{2} n$ lookup time for a particular node or file in the network. It is highly fault-tolerant to node failures and churn, the constant joining and leaving of nodes. It scales extremely well and the network requires little maintenance to handle individual nodes. Files in the network are distributed evenly among its members.
Our experiments demonstrate that the ChordReduce framework is highly scalable, solving problems significantly faster when distributed. The larger the problem is, the greater the speedup gained by incorporating more nodes into the problem. Our framework also provides a high level of robustness during execution; we can lose many nodes to churn, and still process jobs successfully. If we find a job requires more computational power, we can add more nodes to the job during runtime.
While Chord could be used to implement a grid computing environment similar to BOINC \cite{boinc} or Folding@home \cite{folding}, it would be difficult to efficiently implement the MapReduce algorithm using grid computing tools. MapReduce uses persistent data to amortize the cost of distributing the data across the nodes over many calls, while grid environments typically transmit a transient bundle of data and computation to a volunteer node. BOINC and Folding@home use a master-slave architecture with a critical point of failure similar to Hadoop, while ChordReduce is a true peer-based approach.
Section II covers the background of the Chord and MapReduce. Related Work is discussed in Section III. Details of ChordReduce's implementation and code is described in Section IV, while our experiments and their results are covered in Section V. Lastly, Section VI presents our conclusions and discusses potential avenues for future research.
\section{Background}
Chord and MapReduce are integral parts of ChordReduce. We summarize these frameworks in this section.
\subsection{Chord}
Chord \cite{Chord} is a P2P protocol for file sharing that uses a hash function to assign addresses to nodes and files for a ring overlay. The Chord protocol takes in some key and returns the identity (ID) of the node responsible for that key. These keys are generated by hashing a value of the node, such as the IP address and port, or by hashing the filename of a file. The hashing process creates a $m$-bit hash identifier.
The nodes are then arranged in a ring from the lowest hash-value to highest. Chord takes the files and places each in the node that has the same hashed identifier as it. If no such node exists, the node with the first identifier that follows this value is selected. Since the overlay is a circle, this assignment is computed in modulo $2^m$ space.
The node responsible for the key $\kappa$ is called the $successor$ of $\kappa$, or $successor(\kappa)$. For example, if there were some portion of the network with nodes 20, 25, and 27, node 25 would be responsible for the files with the keys (21,22,23,24,25). If node 25 were to decide to leave the network, its absence would be detected by node 27, who would then be responsible for all the keys node 25 was covering, in addition to its own keys.
\begin{figure}
\includegraphics[width=\linewidth]{chordreal}
\caption{A Chord ring with 16 nodes. The bold lines are incoming edges. Each node has a connection to its successor, as well as 4 fingers, some of which are duplicates.}
\label{chordreal}
\end{figure}
With this scheme, we can reliably find the node responsible for some key by asking the next node in the circle for the information, who would then pass the request through the circle until the successor was found. We can then proceed to directly connect with the successor to retrieve the file. This naive approach is largely inefficient, and is a simplification of the lookup process, but it is the basis of how Chord theoretically works.
To speed up the lookup time, each node builds and maintains a \emph{finger table}. The \emph{finger table} contains the locations of up to $m$ other nodes in the ring. The $i$th entry of node $n$'s \emph{finger table} corresponds to the node that is the $successor(n+2^{i-1})$ $mod$ $2^m$. Hash values are not perfectly distributed, it is possible to have duplicate entries in the \emph{finger table}. An example Chord network with fingers is shown in in Fig. \ref{chordreal}.
\begin{figure}
\includegraphics[width=\linewidth]{abcd1}
\caption{Alice has incorrectly determined that Carol is her appropriate successor. When Alice stabilizes, Carol will let her know about Bob.}
\label{abcd1}
\end{figure}
\begin{figure}
\includegraphics[width=\linewidth]{abcd2}
\caption{After completing stabilize, Alice makes Bob her successor and notifies him. Bob then made Alice as his predecessor.}
\label{abcd2}
\end{figure}
When a node $n$ is told to find some key, $n$ looks to see if the key is between $n$ and $successor(n)$ and return $successor(n)$'s information to the requester. If not, it looks for the entry in the finger table for the closest preceding node $n'$ it knows and asks $n'$ to find the successor. This allows each step to skip up to half the nodes in the network, giving a $\log_2(n)$ lookup time. Because nodes can constantly join and leave the network, each entry in the table is periodically checked and updated during a finger maintenance period.
To join the network, node $n$ first asks $n'$ to find $successor(n)$ for it. Node $n$ uses the information to set his successor, but the other nodes in the ring will not acknowledge $n$'s presence yet. Node $n$ relies on the stabilize routine to fully integrate into the ring.
The stabilize routine helps the network integrate new nodes and route around nodes who have left the network. Each node periodically checks to see who their successor's predecessor is. In the case of a static network, this would be the checking node. However, if the checking node gets back a different node, it looks at that returned node's hash value and changes its own successor if needed. Regardless of whether the checking node changes its successor, that node then notifies the (possibly) new successor, who then checks if he needs to change his predecessor based on this new information. While complex, the stabilization process is no more expensive than a heartbeat function. A more concrete example:
Suppose Alice, Bob, Carol, and Dan are members of the ring and everyone is ordered alphabetically (Fig. \ref{abcd1}). Alice is quite sure that Carol is her successor. Alice asks Carol who her predecessor is and Carol says Bob is. Since Bob is closer than Carol, Alice changes her successor to Bob and notifies him.
When Bob sees that notification, he can see Alice is closer than whoever his previous predecessor is and sets Alice to be his predecessor. During the next stabilization cycle, Alice will see that she is still Bob's predecessor and notify him that she's still there (Fig. \ref{abcd2}).
To prevent loss of data due to churn, each node sends a backup of their data to their successor. Section IV discusses the implementation of the backup process in ChordReduce and expands upon it for backing up Map and Reduce tasks.
%\subsection{Extensions of Chord}
%The Cooperative File System (CFS) is an anonymous, distributed file sharing system built on top of Chord \cite{CFS}. In CFS, rather than storing an entire file at a single node, the file is split up into multiple chunks around 10 kilbytes in size. These chunks are each assigned a hash and stored in nodes corresponding to their hash in the same way that whole files are. The node that would normally store the whole file instead stores a \emph{key block}, which holds the hash address of the chunks of the file.
%The chunking allows for numerous advantages. First, it promotes load balancing. Each piece of the overall file would (ideally) be stored in a different node, each with a different backup or backups. This would prevent any single node from becoming overwhelmed from fulfilling multiple requests for a large file. It would also prevent retrieval from being bottlenecked by a node with a relatively low bandwidth. Finally, when Chord uses some sort of caching scheme like that described in CFS \cite{CFS}, caching chunks as opposed to the entire file resulted in about 1000 times less storage overhead.
%Mutable files and IRM, which is short for Integrated File Replication and Consistancy Maintenence, has nodes keep track of file requests they initiate or forward. If they find they are frequently forwarding a request for a particular file, they store that file locally until it is no longer requested frequently. What makes IRM unqiue is that it combines caching with a
%Chunking also opens up the options for implementing additional redundancy such as erasure codes \cite{rizzo1997effective}. With erasure codes, redundant chunks are created but any combination of a particular number of chunks is sufficient to recreate the file. For example, a file that would normally be split into 10 chunks might be split into 15 encoded chunks. The retreival of any 10 of those 15 chunks is enough to recreate the file. Implementing erasure codes would presumably make the network more fault tolerant, but that is an exercise left for future work.
%Generally, related files should be kept together; Chord, however, just hashes the filename to find the responsible node and sends it to that location without any thought to organization. Our solution to this is to use allow the file owner to select first 80 bits of a file's hash, then generating the remaining least signifcant bits by hashing the filename. It does not matter if a file owner, in some infintesimally small coincidence, chooses the same 80 bit prefix as another file owner, as the purpose is to keep related files together.
\subsection{MapReduce and Hadoop}
At its core, MapReduce \cite{mapreduce} is a system for division of labor, providing a layer of separation between the programmer and the more complicated parts of concurrent processing. The programmer sends a large task to a master node, who then divides that task among slave nodes (which may further divide the task). This task has two distinct parts: Map and Reduce. Map performs some operation on a set of data and then produces a result for each Map operation. The resulting data can then be reduced, combining these sets of results into a single set, which is further combined with other sets. This process continues until one set of data remains.
The classic example given for MapReduce is counting the occurrence of each word in a collection of documents. The master node splits up the documents into multiple blocks and sends them off to workers. Each worker then goes through their blocks and creates a small word frequency list. These lists are then used by other workers, who combine them into larger and larger lists, until the master node is left with a word frequency list of all the words in the documents.
%The most popular platform for MapReduce is Hadoop \cite{Hadoop}. Hadoop is an open-source Java implementation deveolped by Apache and Yahoo! \cite{pavlo2009comparison}. Hadoop has two components, the Hadoop Distributed File System (HDFS) and the Hadoop MapReduce Framework \cite{mrsurvey}. Under HFDS, nodes are arranged in a heirarchical tree, with a master node, called the NameNode, at the top. The NameNode is responsible for keeping track of which DataNodes posess which files as well as other metadata essential for controlling the netowork. MOVE THIS DOWN
One very popular open-source implementation of MapReduce is Apache's Hadoop \cite{Hadoop}. Hadoop serves as both a distributed file system and framework for MapReduce \cite{shvachko2010hadoop}. However, Hadoop's MapReduce framework is very strongly tied to the Hadoop Distributed File System (HDFS) and the hierarchy of servers that is used by it. Hadoop is centralized around the NameNode. The NameNode's job is to organize and distribute information to the slave nodes, called DataNodes. This makes the NameNode a single point of failure \cite{shvachko2010hadoop} in the network, as well as a potential bottleneck for the system \cite{hadoop-bottle}.
To do work on Hadoop, the user stores their data on the network. This is handled by the NameNode, which equally apportions the data among the DataNodes. When a user wants to run some analysis on the data or some subset the data, then that function is sent by the NameNode to each of the DataNodes that is responsible for the indicated data. After the DataNode finishes processing, the result is handled by other nodes called Reducers which collect and reduce the results of multiple DataNodes.
\section{Related Work}
We have identified two papers that focus on combining P2P concepts with MapReduce. Both papers are similar to our research, but differ in crucial ways, as described below.
%DEFINE ISSUES OF CHURN AND RELIABLILITY
\subsection{P2P-MapReduce}
Marozzo et al. \cite{marozzo2012p2p} investigated the issue of fault tolerance in centralized MapReduce architectures such as Hadoop. They focused on creating a new P2P based MapReduce architecture built on JXTA \cite{935182} called P2P-MapReduce. P2P-MapReduce is designed to be more robust at handling node and job failures during execution.
Rather than use a single master node, P2P-MapReduce employs multiple master nodes, each responsible for some job. If one of those master nodes fails, another will be ready as a backup to take its place and manage the slave nodes assigned to that job. This avoids the single point of failure that Hadoop is vulnerable to. Failures of the slave nodes are handled by the master node responsible for it.
Experimental results were gathered via simulation and compared P2P-MapReduce to a centralized framework. Their results showed that while P2P-MapReduce generated an order of magnitude more messages than a centralized approach, the difference rapidly began to shrink at higher rates of churn. When looking at actual amounts of data being passed around the network, the bandwidth required by the centralized approach greatly increased as a function of churn, while the distributed approach again remained relatively static in terms of increased bandwidth usage.
They concluded that P2P-MapReduce would, in general, use more network resources than a centralized approach. However, this was an acceptable cost as the P2P-MapReduce would lose less time from node and job failures \cite{marozzo2012p2p}.
While P2P-MapReduce is decentralized, it still relies on a very definite master/slave hierarchy for organization, computations, and scaling.
During simulation, 1\% of the entire network was assigned as master nodes. This means for a simulation of 40000 nodes, 400 were required to organize and coordinate jobs, rendering them unable to do any processing. In addition, a loosely-consistent distributed hash table (DHT) such as JXTA can be much slower and fails to maintain the same level of guarantees as an actual DHT, such as Chord \cite{5359174}.
%The loosely-consistent DHT can be much slower than using an acutal DHT such as Chord .
%Scalability. A huge issue would be tracking all the nodes and coordinating them. Scalability is handled by mainataining a ratio of masters to slaves
%Evaluated for low rates of churn. Such low rates also mean master nodes are barely affected by churn.
%Simulations
%Our work differs from Marozzo et al.'s in that P2P-MapReduce does not examine using the underlying strengths of a particular P2P protocol or group of protocols, which would have made the architecture simpler. P2P-MapReduce is decentralized, but still relies on a very definite master/slave hierarchy, while all nodes in ChordReduce are both workers and masters. We also implemented our MapReduce system rather than simulating the work.
\subsection{MapReduce using Symphony}
Lee et al.'s work \cite{leemap} draws attention to the fact that a P2P network can be much more than a way to distribute files and demonstrates how to accomplish different tasks using Map and Reduce functions over a P2P network. Rather than using Chord, Lee et al. used Symphony \cite{symphony}, another DHT protocol with a ring topology. To run a MapReduce job over the Symphony ring, a node is selected by the user to effectively act as the master. This ad-hoc master then performs a bounded broadcast over a subsection the ring. Each node repeats this broadcast over a subsection of that subsection, resulting in a tree with the first node at the top. Map tasks are disseminated evenly throughout the tree and their results are reduced on the way back up to the ad-hoc master node. This allows the ring to disseminate Map and Reduce tasks without the need for a coordinator responsible for distributing these tasks and keeping track of them, unlike Hadoop.
Their experimental results showed that the latency experienced by a centralized configuration is similar to the latency experienced in a completely distributed framework. However, there are no mechanisms in place to handle churn in the network. If a node joins during a MapReduce job, it will be unable to contribute any of its resources to the problem. If a node in the bounded broadcast tree fails, or worse the ad-hoc master node fails, the data that node is responsible for is lost.
\section{ChordReduce}
Marozzo et al. \cite{marozzo2012p2p} shows that adding additional fault-tolerance features to a MapReduce architecture is worth the added cost of maintenance, as the time lost due to node failures is greatly reduced. However, Marozzo et al. do not explore the benefits of leveraging the properties of a P2P protocol to reduce the complexity of the architecture and completely distribute the responsibility of the task across the network. As a result, P2P-MapReduce still relies on a ratio of masters to slaves to coordinate and organize the network, meaning a percentage of the network is unable to contribute processing power to the actual solving of a problem.
Lee et al. \cite{leemap} explores the benefits of building a MapReduce module to run on top of Symphony \cite{symphony}, a P2P protocol. Unlike Hadoop, this allows the MapReduce tasks to be executed without the need of a central source of coordination by distributing tasks over a bounded broadcast tree created at runtime. The Symphony based MapReduce architecture would be greatly improved by the addition of components to handle the failure of nodes during execution. As it stands now, if a node crashes the job will fail due to the loss of data.
While both of these papers have promising results and confirm the capability of our own framework, both solely look at P2P networks for the purpose of routing data and organizing the network. Neither examines using a P2P network as a means of efficiently distributing responsibility throughout the network and using existing features to add robustness to nodes working on Map and Reduce tasks.
ChordReduce uses Chord to act as a completely distributed topology for MapReduce, negating the need to assign any explicit roles to nodes or have a scheduler or coordinator. ChordReduce does not need to assign specific nodes the task of backing up work; nodes backup their tasks using the same process that would be used for any other data being sent around the ring. Finally, results work their way back to a specified hash address, rather than a specific hash node, eliminating any single point of failure in the network. These features help prevent a bottleneck from occurring. The result is a simple, distributed, and highly robust architecture for MapReduce.
\subsection{Handling Node Failures in Chord}
Due to the potentially volatile nature of a peer-to-peer network, Chord has to be able to handle (or at the very least, tolerate) an arbitrary amount of churn. Section II described how Chord gradually guides nodes into their correct locations after they join the network. The same is true for when a node leaves the network; the stabilize procedure will guide nodes to their correct successors and predecessors. However, we can exert more control over how to handle nodes leaving the network.
When a node $n$ changes his successor, $n$ asks if the successor is holding any data $n$ should be responsible for. The successor looks at all the data $n$ is responsible for and sends it to $n$. The successor does not have to delete this data. In fact, keeping this data as a backup is beneficial to the network as a whole, as $n$ could decide to leave the network at any point.
Chord specifies two ways a node can leave the ring. A node can either suddenly drop out of existence, or a node can tell the network he is about to leave, letting his successor and predecessor immediately perform the needed changes.
When a node politely quits, he informs both his successor and predecessor and gives them all the information they need to fill the resulting gap. He also sends all of the data he is responsible for to his successor, who becomes responsible for that data when the node leaves. Fingers that pointed to that node would be corrected during the finger maintenance period. This allows for the network to adjust to churn with a minimum of overhead.
It is unlikely that every time a node leaves the network, it will do so politely. If a node suddenly quits, the data it had stored is lost. To prevent data from becoming irretrievable, a node periodically sends backups to its successor. In order to prevent a cascade of backups of backups, the node only passes along what it considers itself responsible for. What a node is responsible for changes as nodes enter and leave the network. If a node's successor leaves, the node sends a backup to his new successor.
Our prototype framework does not implement a polite disconnect; when a node quits, it does so quickly and abruptly. This design ensures that the system would be able to handle churn under the worst of cases. Polite quit could be implemented quite easily.
\subsection{Implementation}
\begin{figure}
\includegraphics[width=\linewidth]{crArch}
\caption{The basic architecture of a node in ChordReduce. MapReduce runs as a service on top of each node.}
\label{crArch}
\end{figure}
ChordReduce is a fully functional Chord implementation in Python. Our installation was designed to be as simple as possible. It consists of downloading our code \cite{code} and running chord.py. A user can specify a port and IP of a node in the ring they wish to join. The node will automatically integrate into the ring with this minimal information. The ring as implemented is stable and well organized. We created various services to run on top the network, such as a file system and distrubuted web server. Our file system is capable of storing whole files or splitting the file up among multiple nodes the ring. Our MapReduce module is a service that runs on top of our Chord implementation, similar to the file system (Fig. \ref{crArch}). We avoided any complicated additions to the Chord architecture; instead we used the protocol's properties to create the features we desired in our MapReduce framework.
In our implementation of MapReduce, each node takes on responsibilities of both a worker and master, much in the same way that a node in a P2P file-sharing service will act as both a client and a server. Jobs still must start from a single location. To start a job, the user contacts a node at a specified hash address and provides it with the tasks and data. This address can be chosen arbitrarily or be a known node in the ring. The node at this hash address is designated as the stager.
The job of this stager is to take the work and divide it into \emph{data atoms}, which are the smallest individual units that work can be done on. This might be a line of text in a document, the result of a summation for a particular intermediate value, or a subset of items to be sorted. The specifics of how to divide the work are defined by the user in a \emph{stage} function. The data atoms are then each given a random hash and sent to the node responsible for that hash address, guaranteeing they are evenly distributed throughout the network. The data atoms also contain the Map function and Reduce function as defined by the user. A job ID is also included, so that data atoms from different jobs can be differentiated. Once the data atoms are sent out, the stager's job is done and it behaves like any other node in the network. The staging period is the only time ChordReduce is vulnerable to churn, and only if the stager leaves the ring in the middle of sending out data atoms. The user would get some results back, but only for the data the stager managed to send out.
Nodes that receive data atoms apply the Map function to the data to create result data atoms, which are then sent back to the stager's hash address (or some other user defined address). This will take $\log_{2} n$ hops traveling over Chord's fingers. At each hop, the node waits a predetermined minimal amount of time to accumulate additional results (In our experiments, this was 100 milliseconds).
Nodes that receive at least two results merge them using the Reduce function. The results are continually merged until only one remains at the hash address of the stager.
Once the reductions are finished, the user retrieves his results from the node at the stager's address. This may not be the stager himself, as the stager may no longer be in the network. The stager does not need to collect the results himself, since the work is sent to the stager's hash address, rather than the stager itself. Thus, the stager could quit the network after staging, and both the user and the network would be unaffected by the change. % Here, we are leverging two features. First, we use the automatic assignment of responsibility to automatically route the data to the sucessor. %Second, the same process Chord uses to backup files is used to backup the intermediate data.
Similar precautions are taken for nodes working on Map and Reduce tasks. Those tasks are backed up by a node's successor, who will run the task if the node leaves before finishing its work (e.g. the successor loses his predecessor). The task is given a timeout by the node. If the backup node detects that the responsible node has failed, he starts the work and backs up again to \emph{his} successor. Otherwise, the data is tossed away once the timeout expires. This is done to prevent a job being submitted twice.
An advantage of our system is the ease of development and deployment. The developer does not need to worry about distributing work evenly, nor does he have to worry about any node in the network going down. The stager does not need to keep track of the status of the network. The underlying Chord ring handles that automatically. If the user finds they need additional processing power during runtime, they can boot up additional nodes, which would automatically be assigned work based on their hash value. If a node goes down while performing an operation, his successor takes over for him. This makes the system extremely robust during runtime.
All a developer needs to do is write three functions: the staging function, Map, and Reduce. These define how to split up the work into manageable portions, the work to be performed on each portion to obtain results, and how to combine these results into a single result, respectively.
\section{Experiments}
In order for ChordReduce to be a viable framework, we had to show these three properties:
\begin{enumerate}
\item ChordReduce provides significant speedup during a distributed job.
\item ChordReduce scales.
\item ChordReduce handles churn during execution.
\end{enumerate}
Speedup can be demonstrated by showing that a distributed job is generally performed more quickly than the same job handled by a single worker. More formally we need to establish that $\exists n$ such that $T_{n} < T_{1}$, where $T_{n}$ is the amount of time it takes for $n$ nodes to finish the job.
To establish scalability, we need to show that the cost of distributing the work grows logarithmically with the number of workers. In addition, we need to demonstrate that the larger the job is, the number of nodes we can have working on the problem without the overhead incurring diminishing returns increases. This can be stated as $$T_{n} = \frac{T_{1}}{n} + k \cdot \log_{2}(n)$$ where $\frac{T_{1}}{n}$ is the amount of time the job would take when distributed in an ideal universe and $k \cdot \log_{2}(n)$ is network induced overhead, $k$ being an unknown constant dependent on network latency and available processing power.
Finally, to demonstrate robustness, we need to show that ChordReduce can handle arbitrary node failure in the ring and that such failures minimally impair the overall speed of the network.
\subsection{Setup}
\begin{figure}
\includegraphics[width=\linewidth]{dartboard}
\caption{The "dartboard." The computer throws a dart by choosing a random $x$ and $y$ between 0 and 1. If $x^{2} + y^{2} < 1^{2} $, the dart landed inside the circle. $A$ and $B$ are darts that landed inside the circle, while $C$ did not.}
\label{dartboard}
\end{figure}
To stress test our framework, we ran a Monte-Carlo approximation of $\pi$. This process is largely analogous to having a square with the top-right quarter of a circle going through it (Fig. \ref{dartboard}), and then throwing darts at random locations. Counting the ratio of darts that land inside the circle to the total number of throws gives us an approximation of $\frac{\pi}{4}$. The more darts thrown, i.e. the more samples that are taken, the more accurate the approximation\footnote{This is not intended to be a particularly good approximation of $\pi$. Each additional digit of accuracy requires increasing the number of samples taken by an order of magnitude.}.
We chose this experiment for a number of reasons. The job is extremely easy to distribute. This also made it very easy to test scalability. By doubling the amount of samples, we can double the amount of work each node gets. We could also test the effectiveness of distributing the job among different numbers of workers.
Each Map job is defined by the number of throws the node must make and yields a result containing the total number of throws and the number of throws that landed inside the circular section. Reducing these results is then a matter of adding the respective fields together.
We ran our experiments using Amazons's Elastic Compute Cloud (EC2) service. Amazon EC2 allows users to purchase an arbitrary amount of virtual machines by the hour. Each node was an individual EC2 small instance \cite{amazon-instances} with a preconfigured Ubuntu 12.04 image. These instances were capable enough to provide constant computation, but still weak enough that they would be overwhelmed by traffic on occasions, creating a constant churn effect in the ring.
Once started, nodes retrieve the latest version of the code and run it as a service, automatically joining the network. We can choose any arbitrary node as the stager and tell it to run the MapReduce process. We found that the network was robust enough that we could take a node we wanted to be the stager out of the network, modify its MapReduce test code, have it rejoin the network, and then run the new code without any problems. Since only the stager has to know how to create the Map tasks, the other nodes do not have to be updated and execute the new tasks they are given.
We ran our experiments on groups of 1, 10, 20, 30, and 40 workers, which generated a $10^{8}$ sample set and a $10^{9}$ sample set. Additionally, we gathered data on a $10^{7}$ sample set using 1, 5, 10, 20, 30 workers. To test churn, we ran an experiment where each node had an equal chance of leaving and joining the network and varied the level of churn over multiple runs.
We also utilized a subroutine we wrote called $plot$, which sends a message sequentially around the ring to establish how many members there are. If $plot$ failed to return in under a second, the ring was experiencing structural instability.
\subsection{Results}
\begin{figure}
\includegraphics[width=\linewidth]{expTime}
\caption{For a sufficiently large job, it was almost always preferable to distribute it. When the job is too small, such as with the $10^{7}$ data set, our runtime is dominated by the overhead. Our results are what we would expect when overhead grows logarithmically to the number of workers.}
\label{expTime}
\end{figure}
\begin{figure}
\includegraphics[width=\linewidth]{expSpeed}
\caption{The larger the size of the job, the greater the gains of distributing with ChordReduce. In addition, the larger the job, the more workers can be added before we start seeing diminishing returns. This demonstrates that ChordReduce is scalable.}
\label{expSpeed}
\end{figure}
Fig. \ref{expTime} and Fig. \ref{expSpeed} summarize the experimental results of job duration and speedup. Our default series was the $10^{8}$ samples series. On average, it took a single node 431 seconds, or approximately 7 minutes, to generate $10^{8}$ samples. Generating the same number of samples using ChordReduce over 10, 20, 30, or 40 nodes was always quicker. The samples were generated fastest when there were 20 workers, with a speedup factor of 4.96, while increasing the number of workers to 30 yielded a speedup of only 4.03. At 30 nodes, the gains of distributing the work were present, but the cost of overhead ($k \cdot \log_{2}(n)$) had more of an impact. This effect is more pronounced at 40 workers, with a speedup of 2.25.
Since our data showed that approximating $\pi$ on one node with $10^{8}$ samples took approximately 7 minutes, collecting $10^{9}$ samples on a single node would take 70 minutes at minimum. Fig. \ref{expSpeed} shows that the $10^{9}$ set gained greater benefit from being distributed than the $10^{8}$ set, with the speedup factor at 20 workers being 9.07 compared to 4.03. In addition, the gains of distributing work further increased at 30 workers and only began to decay at 40 workers, compared with the $10^{8}$ data set, which began its drop off at 30 workers. This behavior demonstrates that the larger the job being distributed, the greater the gains of distributing the work using ChordReduce.
The $10^{7}$ sample set confirms that the network overhead is logarithmic. At that size, it is not effective to run the job concurrently and we start seeing overheard acting as the dominant factor in runtime. This matches the behavior predicted by our equation, $T_{n} = \frac{T_{1}}{n} + k \cdot \log_{2}(n)$. For a small $T_{1}$, $\frac{T_{1}}{n}$ approaches 0 as $n$ gets larger, while $k \cdot \log_{2}(n)$, our overhead, dominates the sample. The samples from our data set fit this behavior, establishing that our overhead increases logarithmically with the number of workers.
\begin{figure}
\includegraphics[width=\linewidth]{projTime}
\caption{The projected runtime using ChordReduce for differently sized jobs. Each curve projects the expected behavior for job that takes a single worker the specified amount of time.}
\label{projTime}
\end{figure}
\begin{figure}
\includegraphics[width=\linewidth]{projSpeed}
\caption{The projected speedup for different sized jobs. }
\label{projSpeed}
\end{figure}
Since we have now established that $T_{n} = \frac{T_{1}}{n} + k \cdot \log_{2}(n)$, we can estimate how long a job that takes an arbitrary amount of time to run on a single node would take using ChordReduce. Our data points indicated that the mean value of $k$ for this problem was 36.5. Fig. \ref{projTime} shows that for jobs that would take more than $10^{4}$ seconds for single worker to complete, we can expect there would still be benefit to adding an additional worker, even when there are already 5000 workers already in the ring. Fig. \ref{projSpeed} further emphasizes this. Note that as the jobs become larger, the expected speedup from ChordReduce approaches linear behavior.
\begin{table}
\centering
\begin{tabular}{|r|r|r|}
\hline
Churn rate per second & Average runtime (s) & Speedup vs 0\% churn\\ \hline{}
0.8\% & 191.25 & 2.15 \\ \hline
0.4\% & 329.20 & 1.25 \\ \hline
0.025\% & 431.86 & 0.95 \\ \hline
0.00775\% & 445.47 & 0.92 \\ \hline
0.00250\% & 331.80 & 1.24 \\ \hline
0\% & 441.57 & 1.00 \\ \hline
\end{tabular}
\caption{}
\label{churnSpeed}
\end{table}
Table \ref{churnSpeed} shows the experimental results for different rates of churn. These results show the system is relatively insensitive to churn. We started with 40 nodes in the ring and generated $10^{8}$ samples while experiencing different rates of churn, as specified in Table \ref{churnSpeed}. At the 0.8\% rate of churn, there is a 0.8\% chance each second that any given node will leave the network followed by another node joining the network at a different location. The joining rate and leaving rate being identical is not an unusual assumption to make \cite{marozzo2012p2p} \cite{load}.
Our testing rates for churn are an order of magnitude higher than the rates used in the P2P-MapReduce simulation \cite{marozzo2012p2p}. In their paper, the highest rate of churn was only 0.4\% per minute. Because we were dealing with fewer nodes, we chose larger rates to demonstrate that ChordReduce could effectively handle a high level of churn.
Our experiments show that for a given problem, ChordReduce can effectively distribute the problem, yielding a substantial speedup. Furthermore, our results showed that the larger the problem is, the more workers could be added before diminishing returns were incurred. During runtime, we experienced multiple instances where $plot$ would fail to run and the stager would report socket errors, indicating that it had lost connection with a node in the ring. Despite this turbulence, every node managed to reestablish connection with each other and report back all the data. This further demonstrated that we were able to handle the churn in the network.
\section{Conclusion and Future Work}
We presented ChordReduce, a framework for MapReduce that is completely decentralized, scalable, load balancing, and highly tolerant to churn and node failure at any point in the network. We implemented a fully functional version of ChordReduce and performed detailed experiments to test its performance. These experiments confirmed that ChordReduce is robust and effective. ChordReduce is based on Chord, which is traditionally viewed as a P2P framework for distributing and sharing files. Instead, we demonstrated that it can also be used as a platform for distributed computation. Chord provides $\log_{2} n$ connectivity throughout network and has built in mechanisms for handling backup, automatically assigning responsibility, routing, and load balancing.
Using Chord as the middleware for ChordReduce establishes its effectiveness for distributed and concurrent computation.
The effectiveness of Chord opens up new approaches for tackling other distributed problems, such as supporting databases and machine learning for Big Data, and exascale computations. We intend to further optimize the performance of ChordReduce and extend the middleware to other applications.
%For example, future work could incorporate processes that allow Chord to effectively share and distribute mutable files \cite{IRM} and alter them to perform distributed analysis of mutable data. These same adjustments can be used to improve the latency of the Chord network with mutable data, which was a major reason why a Chord-based distributed DNS \cite{cox2002serving} was abandoned.
%While ChordReduce is most efficient when each node is physically close in a cluster, minimizing the impact of latency, this does not exclude the option of tasks being distributed throughout the world. This setup may be more applicable to a volunteer computing framework, such as Folding@home \cite{folding}.
%Many clusters assume identical or near identical hardware, running an equal amount of nodes, performing an equal amount of work. This is not always a safe assumption to make. If the hardware used for computations is not equal, then some processors can be left idling when they could be doing more work, while others may be overwhelmed, holding back the rest of the network.
%Adjustments can be easily made on the user's end. If some hardware can take more work than others, then that system can boot up more instances of nodes locally. Running two nodes locally would mean that approximately twice as much work would assigned to that computer. Automatically balancing this load is also an avenue for future research.
%Chord could also be used to create a distributed authentication service.
%Our future plans for ChordReduce itself is to recode the library in Java to take advantage of the multiprocessing support, which is limited in Python.
\bibliographystyle{IEEEtran}
\bibliography{CHRONUS}
\end{document}