in Hadoopery

… or why do I get “Cannot assign requested address” errors?!

At some point or another, every Hadoop Operations person will have to copy large amounts of data from one cluster to another. This is a trivial task thanks to hadoop distcp.  But, it is not without its quirks and issues. I will discuss a few examples that I have encountered recently while migrating data between different clusters.

Hadoop distcp or bust!

If you have not encountered hadoop distcp before, let’s take a quick run through of why it exists.  At a basic level, it functions much like rsync or a recursive cp, allowing you to replicate data from one path to another.  Unlikersync or a recursive cp, distcp has the ability to do this copying in a massive parallel fashion.  If you have terabytes of data spread across hundreds or thousands of files, the last thing you want to do is serially copy this around.

hadoop distcp works by gathering a list of files and directories under a specified path and spawning a map-only MapReduce job to do the heavy lifting.  Each map task takes a subset of the complete file list and iterates over that list to copy each file, block by block, from the source path to the destination.

For the purposes of this discussion, I am using distcp from the Hadoop 2.6.0 lineage.

I didn’t know that!

There is a few things you should know about distcp.

  1. You can use it to copy files from cluster to cluster or from one path to another path on the same cluster.  This is faster than hadoop fs -cp.
  2. If the cluster versions are the same, you can use the hdfs:// protocol.
  3. If the cluster versions are not the same, your source (and possibly destination) must use webhdfs://
    1. This is a requirement because the Hadoop Remote Procedure Call (RPC) versions are not backwards compatible.
    2. Requires that you have WebHDFS enabled.
  4. You can copy from an insecure to a secure (kerberized) cluster if you follow some rules.
  5. distcp should run from the destination cluster.
  6. distcp aborts on error except when you tell it not to.  Unless the error occurs during the file listing phase.
  7. distcp will abort if you move or delete files from the source path during the file listing phase.
  8. distcp appears to always copy directory metadata, even if it has not changed.
  9. distcp has problems copying millions of files.  I will get into this further below.

Take a few moments to digest the distcp rules as described by Cloudera’s documentation.  It will help ease any frustration you may encounter if you are doing anything fancier than copying data from one insecure cluster to another.  Go ahead and read it.  I will wait.

Using hadoop distcp like a boss!

distcp has a few modes:  full copy, update, update and delete.

I use each of these modes for various phases of copying data.  I like to add the options for logging (-l /path), ignoring errors (-i), number of mappers to use (-m 200), and the copy strategy (-strategy dynamic), and preserving all metadata (-p).

Note: In my experience, the distcp will complete faster when using the dynamic strategy .  Read about DynamicInputFormat to understand why.

Full copy

hadoop distcp -p -i -log /tmp -strategy dynamic -m 200 \
   webhdfs://cdh4-nn/some/path/ \
   hdfs://cdh5servicename/some/path/

I use this when I start my first pass of the distcp.  In this particular case, I am copying from a secure CDH4.4.0 cluster to a secure CDH5.4.5 cluster.  Both clusters are Highly Available, but because I am using WebHDFS for the source, I have to specify the active NameNode, so distcp can find the REST interface for WebHDFS.  The number of map tasks is arbitrary, but based on how much impact I want the copying to have on the source cluster.

Update and Delete

hadoop distcp -p -i -log /tmp -strategy dynamic -m 200 \
   -update -delete \
   webhdfs://cdh4-nn/some/path/ \
   hdfs://cdh5servicename/some/path/

I use this mode on later distcp runs to keep things “perfectly” replicated.  This keeps a fully copy of any new changes and updates available on the destination cluster.  This also keeps track of any files that get removed on the source cluster.  Think of this as doing the same thing as rsync -a --delete.

Update

hadoop distcp -p -i -log /tmp -strategy dynamic -m 200 \
   -update \
   webhdfs://cdh4-nn/some/path/ \
   hdfs://cdh5servicename/some/path/

This last mode works in a special data replication case on our clusters.  This runs when I am migrating data from one cluster to another, with the intention of decommissioning the source cluster at some point.  This puts me in a transition period where I may have some data updated on the source cluster and some data updated on the destination cluster in the same path.  I do not want the data on the destination deleted because it does not exist on the source, so I skip the delete flag during the distcp.  When I run in this mode, it usually means I have a smaller subset of the data on the source, with the destination always being the canonical storage point.

Hadoop distcp ALL THE THINGS!

The short tutorial is over and we can now we can get back to the problem at hand.

I am migrating data from a CDH4 cluster to a CDH5 cluster.  I am going to decommission the old cluster and repurpose the hardware.  It is a few petabytes worth of data that is always updating.  I have to support both versions while I migrate the data production processes from one cluster to the other.  To do this safely, my developers stage the work so we do not have to take a gigantic outage and stop all work for days or weeks.  Production must still move on!

One of our data sources in HDFS has millions of files in it and one has only a few tens of thousands.  As I began our first full copy of the larger file set, I ran into a curious problem that was not occurring with the smaller set.

If I limited the distcp to less than about 30 thousand files, the distcp file gather phase would blaze through and the YARN/MapReduce job would start without issue.  If I started working on the larger set, the file gather phase would throw exceptions after a few moments and fail.

The exception looked something like this:

15/08/12 11:58:19 ERROR tools.DistCp: Exception encountered 
java.net.ConnectException: Cannot assign requested address
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:579)
        at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
        at sun.net.www.http.HttpClient.(HttpClient.java:211)
        at sun.net.www.http.HttpClient.New(HttpClient.java:308)
        at sun.net.www.http.HttpClient.New(HttpClient.java:326)
        at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:996)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:932)
        at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:850)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.connect(WebHdfsFileSystem.java:580)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.connect(WebHdfsFileSystem.java:537)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:605)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.getDelegationToken(WebHdfsFileSystem.java:1299)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.getDelegationToken(WebHdfsFileSystem.java:237)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.getAuthParameters(WebHdfsFileSystem.java:423)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.toUrl(WebHdfsFileSystem.java:444)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractFsPathRunner.getUrl(WebHdfsFileSystem.java:691)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:603)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.getHdfsFileStatus(WebHdfsFileSystem.java:838)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.getFileStatus(WebHdfsFileSystem.java:853)
        at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
        at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
        at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1625)
        at org.apache.hadoop.tools.GlobbedCopyListing.doBuildListing(GlobbedCopyListing.java:77)
        at org.apache.hadoop.tools.CopyListing.buildListing(CopyListing.java:84)
        at org.apache.hadoop.tools.DistCp.createInputFileListing(DistCp.java:353)
        at org.apache.hadoop.tools.DistCp.execute(DistCp.java:160)
        at org.apache.hadoop.tools.DistCp.run(DistCp.java:121)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.tools.DistCp.main(DistCp.java:401)

This didn’t make any sense!  Small copies should have failed as well if there was a problem doing an address lookup or making a connection request.  Since this was failing somewhere in the WebHDFS path, this was clearly having problems hitting the source NameNode.  A quick curl verified that the WebHDFS port was up, accepting connections, and happily responding.

Cannot assign requested address

This error bugged me.  I have seen it before, but could not remember in what context.

As I was debugging this, I happened to check up on another MapReduce job running on the same cluster and noticed weird connection failures.  They began occurring on the ResourceManager node where I happened to run the distcp from.  They would go away 2-3 minutes after the distcp had failed.  Curiouser and curiouser!

This feels like a resource exhaustion somewhere.  The system was not out of memory, nor out of CPU.  Digging around further, I looked to see if it opened up more TCP connections than I expected.

When distcp was not running:

$ ss -a | awk '{print $1}' | sort | uniq -c
 5 ESTAB
 14 LISTEN
 9 TIME-WAIT

When distcp was running:

$ ss -a | awk '{print $1}' | sort | uniq -c
   25 ESTAB
   14 LISTEN
28713 TIME-WAIT

Wait, wut?  That looks strangely like port exhaustion!  But what were we exhausting? We should have been able to make any number of outbound connections.  Except, the problem was that we were making 28713 connections to the same host.  This meant we were running out of ephemeral ports.

When you make an outbound connection, the TCP setup makes a pair of tuples used to uniquely identify that session for the kernel.  The pair consists of the source ip, the source port, the destination ip, and the destination port.  One connection hitting the CDH4 NameNode consisted of something that looked like this:

192.168.182.2:32756 -> 192.168.192.250:50070

To make these connections unique, the CDH5 NameNode (on the left) has to create a unique ephemeral port to track the connection to the CDH4 NameNode (on the right).

But, why were we creating more than one connection for the file gather phase?  The easiest thing to do was to run the distcp in debug mode to see what was going on. I determined that, under the hood, distcp was opening up a new TCP connection to the WebHDFS REST interface for each file being gathered.  Holy crap!

And that ~30 thousand file boundary I kept hitting?  An easy explanation!

$ sysctl net.ipv4.ip_local_port_range
net.ipv4.ip_local_port_range = 32768 61000

I was running out of ephemeral ports on the ResourceManager node.  The kernel was only configured to use ports 32768 through 61000.

Problem solved!  Well, not so much.  If I was exhausting the ephemeral port list because of how fast distcp was opening new connections, what could I do? The kernel was keeping TCP connections in the TIME-WAIT state to verify the connection was fully shutdown. TIME-WAIT takes about 2 minutes to clear out and free the ephemeral port for use again.

With a few million files that needed to be copied (and later copied with updates), it would have been expensive, both in time and effort, to break the distcp into hundreds of smaller chunks.  Nevermind that the data layout on HDFS just would not make that work.

I tuned the kernel and it made me feel dirty

The problem I needed to overcome was to expire the connections in TIME-WAIT much more quickly.  There are a few sysctl tunable variables that allow you to do this: one that is “dangerous” if your server interacts with NATed clients and one less dangerous, but that should only be set if you could guarantee that the server-client connection was truly dead (meaning: the outgoing connection was not really expecting any further communication, such as a late-arrival packet).

The first tunable is net.ipv4.tcp_tw_recycle.  This affects how TIME-WAIT gets cycled out on both incoming and outgoing connections.  Turning this on has bad side effects if your server also happens to communicate with anything behind a NAT.  I could not guarantee that, so this option is quite useless to me.

The second tunable is net.ipv4.tcp_tw_reuse.  This option, combined with TCP timestamps allows outgoing connections in TIME-WAIT to be recycled much more quickly, but in a safer(?) way than tcp_tw_recycle.

Vincent Bernat has a very thorough and approachable discussion about Coping with the TCP TIME-WAIT state on busy Linux servers.  Disabling socket lingering is the preferable way to handle this.  If you cannot modify your program, enabling net.ipv4.tcp_tw_reuse is the next best thing.

I chose enabling tcp_tw_reuse.

As soon as I flipped it on, my larger datasets began transferring without failure.

Unknown knowns

At this point, I have a solid work around for the problem.  I don’t like it, but it works.  I think there is room for improvement in distcp.  Since WebHDFS is an HTTP service, one of the biggest improvements would be to get it to make multiple requests over a single HTTP connection (think:  HTTP1.1 Pipelining and Keep-alives).

I did go digging around the code to attempt to figure out what was going on.  I have a hunch that this is opening a new FileSystem object or Configuration object for each file, which will instantiate a new connection to the WebHDFS port.

In GlobbedCopyListing.java, the following method exists (and looks suspicious):

   public void doBuildListing(Path pathToListingFile,
                             DistCpOptions options) throws IOException {

    List globbedPaths = new ArrayList();
    if (options.getSourcePaths().isEmpty()) {
      throw new InvalidInputException("Nothing to process. Source paths::EMPTY");
    }

    for (Path p : options.getSourcePaths()) {
      FileSystem fs = p.getFileSystem(getConf());
      FileStatus[] inputs = fs.globStatus(p);

      if(inputs != null && inputs.length > 0) {
        for (FileStatus onePath: inputs) {
          globbedPaths.add(onePath.getPath());
        }
      } else {
        throw new InvalidInputException(p + " doesn't exist");
      }
    }

    DistCpOptions optionsGlobbed = new DistCpOptions(options);
    optionsGlobbed.setSourcePaths(globbedPaths);
    simpleListing.buildListing(pathToListingFile, optionsGlobbed);
  }

I need to dig further.  I am debating opening up an issue on this; I found a few related Hadoop JIRAs, but nothing specific to this failure mode.

Travis Campbell
Staff Systems Engineer at ghostar
Travis Campbell is a seasoned Linux Systems Engineer with nearly two decades of experience, ranging from dozens to tens of thousands of systems in the semiconductor industry, higher education, and high volume sites on the web. His current focus is on High Performance Computing, Big Data environments, and large scale web architectures.