Examples

This example copies the file test.txt from local path ./data/ into /user/hdfs/ directory.


streams<boolean succeed> copyFromLocal =  HDFS2FileCopy()
    param
        localFile                : "test.txt"; 
        hdfsFile                 : "/user/hdfs/test.txt"; 
        deleteSourceFile         : false; 
        overwriteDestinationFile : true; 
        direction                : copyFromLocalFile
}
This example copies all files from the local path /tmp/work into HDFS directory /user/hdfs/work .

// DirectoryScan operator with an absolute directory argument. 
// The output port returns all local file names in directory /tmp/work. 
stream<rstring localFile> DirScan = DirectoryScan() 
{  
    param 
        directory       : "/tmp/work"; 
        initDelay       : 1.0; 
} 

// Copies all incoming files from input port into /user/hdfs/work directory. 
// The output port returns the result of copy action and the elapsed time 
stream<rstring message, uint64 elapsedTime> CopyFromLocal = HDFS2FileCopy(DirScan)
{ 
    param
          hdfsUser                 : "hdfs"; 
          localFileAttrName        : "localFile"; 
          hdfsFile                 : "/user/hdfs/work/"; 
          deleteSourceFile         : false; 
          overwriteDestinationFile : true; 
          direction                : copyFromLocalFile; 
}

This example copies all files from the HDFS directory /user/hdfs/work into the local directory /tmp/work2 .

You have to perform the follwing steps for kerberos configuration:
  • Copy the kerberos keytab file of your hdfs user from HDFS server into etc directory.
  • Replace the kerberos principal with your hdsf principal.
  • Copy the core-site.xml file from your HDFS server into local etc directory.
  • Copy the kerberos configuration file krb5.conf into main /etc directory of your streams server.
The output port of HDFS2FileCopy operator returns the result of copy action and the elapsed time.

// HDFS2DirectoryScan operator with an absolute directory argument and with kerberos authentication 
// The output port returns all HDFS file names. 
stream<rstring hdfsFile> HdfsDirScan = HDFS2DirectoryScan() 
{  
    param 
        configPath               : "etc"; 
        authKeytab               : "etc/hdfs.headless.keytab"; 
        authPrincipal            : "hdfs-hdpcluster@HDP2.COM"; 
        directory                : "/user/hdfs/work"; 
        sleepTime                : 2.0; 
        vmArg                    : "-Djava.security.krb5.conf=/etc/krb5.conf"; 
} 


// CopyToLocal copies all incoming HDFS files from input port into local directory /tmp/work2 . 
stream<rstring message, uint64 elapsedTime> CopyToLocal = HDFS2FileCopy(HdfsDirScan) 
{ 
    param
        configPath               : "etc"; 
        authKeytab               : "etc/hdfs.headless.keytab"; 
        authPrincipal            : "hdfs-hdpcluster@HDP2.COM"; 
        hdfsFileAttrName         : "hdfsFile"; 
        localFile                : "/tmp/work2"; 
        deleteSourceFile         : false; 
        overwriteDestinationFile : true; 
        direction                : copyToLocalFile; 
        vmArg                    : "-Djava.security.krb5.conf=/etc/krb5.conf"; 
}