Operator HDFS2FileSink

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.teracloud.streams.hdfs/op$com.teracloud.streams.hdfs$HDFS2FileSink.svg

The HDFS2FileSink operator writes files to a Hadoop Distributed File System.

The HDFS2FileSink operator is similar to the FileSink operator. This operator writes tuples that arrive on its input port to the output file that is named by the file parameter. You can optionally control whether the operator closes the current output file and creates a new file for writing based on the size of the file in bytes, the number of tuples that are written to the file, or the time in seconds that the file is open for writing,

or when the operator receives a punctuation marker.

Behavior in a consistent region

The HDFS2FileSink operator can participate in a consistent region, however this is not supported when connecting to IBM Analytics Engine on IBM Cloud. The operator can be part of a consistent region, but cannot be at the start of a consistent region. The operator guarantees that tuples are written to a file in HDFS at least once, but duplicated tuples can be written to the file if application failure occurs.

For the operator to support consistent region, the Hadoop Distributed File System must be configured with file append enabled. For information about how to properly enable this feature, refer to the documentation of your Hadoop distribution.

On drain, the operator flushes its internal buffer to the file. On checkpoint, the operator stores the current file name, file size, tuple count, and file number to the checkpoint. On reset, the operator closes the current file, and opens the file from checkpoint. File states like file size and tuple count are reset to the file. The file is opened in append mode, and data is written to the end of the file.

Exceptions

The HDFS2FileSink operator terminates in the following cases:
  • The operator cannot connect to HDFS.
  • The file cannot be written.
Examples

Summary

Ports
This operator has 1 input port and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 25 parameters.

Optional: appConfigName, authKeytab, authPrincipal, bytesPerFile, closeOnPunct, configPath, credFile, credentials, encoding, file, fileAttributeName, hdfsPassword, hdfsUri, hdfsUser, keyStorePassword, keyStorePath, libPath, policyFilePath, reconnectionBound, reconnectionInterval, reconnectionPolicy, tempFile, timeFormat, timePerFile, tuplesPerFile

Metrics
This operator does not report any metrics.

Properties

Implementation
Java

Input Ports

Ports (0)

The HDFS2FileSink operator has one input port, which writes the contents of the input stream to the file that you specified. The input port is non-mutating, and its punctuation mode is Oblivious . The HDFS2FileSink supports writing data into HDFS in two formats. For line format, the schema of the input port is tuple<rstring line> or tuple<ustring line>, which specifies a single rstring or ustring attribute that represents a line to be written to the file. For binary format, the schema of the input port is tuple<blob data>, which specifies a block of data to be written to the file.

Properties

Output Ports

Assignments
Java operators do not support output assignments.
Ports (0)

The HDFS2FileSink operator is configurable with an optional output port. The output port is non-mutating and its punctuation mode is Free . The schema of the output port is <string fileName, uint64 fileSize>, which specifies the name and size of files that are written to HDFS.

Properties

Parameters

This operator supports 25 parameters.

Optional: appConfigName, authKeytab, authPrincipal, bytesPerFile, closeOnPunct, configPath, credFile, credentials, encoding, file, fileAttributeName, hdfsPassword, hdfsUri, hdfsUser, keyStorePassword, keyStorePath, libPath, policyFilePath, reconnectionBound, reconnectionInterval, reconnectionPolicy, tempFile, timeFormat, timePerFile, tuplesPerFile

appConfigName

This optional parameter specifies the name of the application configuration that contains HDFS connection related configuration parameter credentials. The credentials is a JSON string that contains key/value pairs for user and password and webhdfs . If a value is specified in the application configuration and as operator parameter, the application configuration parameter value takes precedence. An application configuration can be created in the Streams Console or using the streamtool mkappconfig ... <configObject name>.

Properties
authKeytab

This optional parameter specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Properties
authPrincipal

This parameter specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Properties
bytesPerFile

This parameter specifies the approximate size of the output file, in bytes. When the file size exceeds the specified number of bytes, the current output file is closed and a new file is opened. The bytesPerFile, timePerFile, and tuplesPerFile parameters are mutually exclusive; you can specify only one of these parameters at a time.

Properties
closeOnPunct

This parameter specifies whether the operator closes the current output file and creates a new file when a punctuation marker is received. The default value is false .

Properties
configPath

This optional parameter specifies the path to the directory that contains the HDFS configuration file core-site.xml .

If you have extra HDFS configuration in your hdfs-site.xml file, you have to copy also this configuration file from your Hadoop server into configPath directory.

If this parameter is not specified, by default the operator looks for the core-site.xml and hdfs-site.xml files in the following locations:
  • $HADOOP_HOME/etc/hadoop
  • $HADOOP_HOME/conf
  • $HADOOP_HOME/lib
  • $HADOOP_HOME/
Note: For connections to Hadoop instances via webhdfs, the $HADOOP_HOME environment variable is not supported and can not be used.
Properties
credFile

This optional parameter specifies a file that contains login credentials. The credentials are used to connect to WEBHDF remotely by using the schema: webhdfs://hdfshost:webhdfsport The credentials file must be a valid JSON string and must contain the hdfs credentials key/value pairs for user, password and webhdfs in JSON format.

Properties
credentials

This optional parameter specifies the JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .

This parameter can also be specified in an application configuration.

The JSON string must to have the following format:


{
    "user"     : "clsadmin",
    "password" : "IAE-password",
    "webhdfs"  : "webhdfs://ip-address:8443"
}
It is possible to use hdfsUser or user for hdfs user and hdfsPassword or password for hdfs passwor and webhdfs or hdfsUri for hdfs URL in jSON string.
Properties
encoding

This optional parameter specifies the encoding to use when reading files. The default value is UTF-8 .

Properties
file
This parameter specifies the name of the file that the operator writes to. The file parameter can optionally contain the following variables, which the operator evaluates at runtime to generate the file name:
  • %HOST The host that is running the processing element (PE) of this operator.
  • %FILENUM The file number, which starts at 0 and counts up as a new file is created for writing.
  • %PROCID The process ID of the processing element.
  • %PEID The processing element ID.
  • %PELAUNCHNUM The PE launch count.
  • %TIME The time when the file is created. If the timeFormat parameter is not specified, the default time format is yyyyMMdd_HHmmss .

For example, if you specify a file parameter of myFile%FILENUM%TIME.txt, and the first three files are created in the afternoon on November 30, 2014, the file names are myFile020141130_132443.txt, myfile120141130_132443.txt, and myFile220141130_132443.txt .

Important: If the %FILENUM specification is not included, the file is overwritten every time a new file is created.

Properties
fileAttributeName

If set, this points to an attribute containing the filename. The operator will close a file when value of this attribute changes. If the string contains substitutions, the check for a change happens before substituations, and the filename contains the substitutions based on the first tuple.

Properties
hdfsPassword

This parameter specifies the password to use when you connecting to a Hadoop instance deployed on IBM Analytics Engine. If this parameter is not specified, attempts to connect to a Hadoop instance deployed on IBM Analytics Engine will cause an exception.

Properties
hdfsUri
This parameter specifies the uniform resource identifier (URI) that you can use to connect to the HDFS file system. The URI has the following format:
  • To access HDFS locally or remotely, use hdfs://hdfshost:hdfsport
  • To access GPFS locally, use gpfs:/// .
  • To access GPFS remotely, use webhdfs://hdfshost:webhdfsport .
  • To access HDFS via a web connection for HDFS deployed on IBM Analytics Engine, use webhdfs://webhdfshost:webhdfsport .

If this parameter is not specified, the operator expects that the HDFS URI is specified as the fs.defaultFS or fs.default.name property in the core-site.xml HDFS configuration file. The operator expects the core-site.xml file to be in $HADOOP_HOME/../hadoop-conf or $HADOOP_HOME/etc/hadoop or in the directory specified by the configPath parameter. Note: For connections to HDFS on IBM Analytics Engine, the $HADOOP_HOME environment variable is not supported and so either hdfsUri or configPath must be specified.

Properties
hdfsUser

This parameter specifies the user ID to use when you connect to the HDFS file system. If this parameter is not specified, the operator uses the instance owner ID to connect to HDFS. When connecting to Hadoop instances on IBM Analytics Engine, this parameter must be specified otherwise the connection will be unsuccessful. When you use Kerberos authentication, the operator authenticates with the Hadoop file system as the instance owner by using the values that are specified in the authPrincipal and authKeytab parameters. After successful authentication, the operator uses the user ID that is specified by the hdfsUser parameter to perform all other operations on the file system.

Properties
keyStorePassword

This optional parameter is only supported when connecting to a Hadoop instance deployed on IBM Analytics Engine. It specifies the password for the keystore file. This attribute is specified when the keyStore attribute is specified and the keystore file is protected by a password. If the keyStorePassword is invalid the operator terminates.

Properties
keyStorePath

This optional parameter is only supported when connecting to a Hadoop instance deployed on IBM Analytics Engine. It specifies the path to the keystore file, which is in PEM format. The keystore file is used when making a secure connection to the HDFS server and must contain the public certificate of the HDFS server that will be connected to. Note: If this parameter is omitted, invalid certificates for secure connections will be accepted. If the keystore file does not exist, or if the certificate it contains is invalid, the operator terminates. The location of the keystore file can be absolute path on the filesystem or a path that is relative to the application directory. See the section on SSL Configuration in the main page of this toolkit's documentation for information on how to configure the keystore. The location of the keystore file can be absolute path on the filesystem or a path that is relative to the application directory.

Properties
libPath

This optional parameter specifies the absolute path to the directory that contains the Hadoop library files. If this parameter is omitted and $HADOOP_HOME is not set, the apache hadoop specific libraries within the impl/lib/ext folder of the toolkit will be used. When specified, this parameter takes precedence over the $HADOOP_HOME environment variable and the libraries within the folder indicated by $HADOOP_HOME will not be used.

Properties
policyFilePath

This optional parameter is relevant when connecting to IBM Analytics Engine on IBM Cloud. It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar). The policy files enable the Java operators to use encryption with key sizes beyond the limits specified by the JDK. See the section on Policy file configuration in the main page of this toolkit's documentation for information on how to configure the policy files. If this parameter is omitted the JVM default policy files will be used. When specified, this parameter takes precedence over the JVM default policy files.

Note: This parameter changes a JVM property. If you set this property, be sure it is set to the same value in all HDFS operators that are in the same PE. The location of the policy file directory can be absolute path on the file system or a path that is relative to the application directory.

Properties
reconnectionBound

This optional parameter specifies the number of successive connection attempts that occur when a connection fails or a disconnect occurs. It is used only when the reconnectionPolicy parameter is set to BoundedRetry; otherwise, it is ignored. The default value is 5 .

Properties
reconnectionInterval

This optional parameter specifies the amount of time (in seconds) that the operator waits between successive connection attempts. It is used only when the reconnectionPolicy parameter is set to BoundedRetry or InfiniteRetry; othewise, it is ignored. The default value is 10 .

Properties
reconnectionPolicy

This optional parameter specifies the policy that is used by the operator to handle HDFS connection failures. The valid values are: NoRetry, InfiniteRetry , and BoundedRetry . The default value is BoundedRetry . If NoRetry is specified and a HDFS connection failure occurs, the operator does not try to connect to the HDFS again. The operator shuts down at startup time if the initial connection attempt fails. If BoundedRetry is specified and a HDFS connection failure occurs, the operator tries to connect to the HDFS again up to a maximum number of times. The maximum number of connection attempts is specified in the reconnectionBound parameter. The sequence of connection attempts occurs at startup time. If a connection does not exist, the sequence of connection attempts also occurs before each operator is run. If InfiniteRetry is specified, the operator continues to try and connect indefinitely until a connection is made. This behavior blocks all other operator operations while a connection is not successful. For example, if an incorrect connection password is specified in the connection configuration document, the operator remains in an infinite startup loop until a shutdown is requested.

Properties
tempFile
This parameter specifies the name of the file that the operator writes to. When the file is closed the file is renamed to the final filename defined by the file parameter or fileAttributeName parameter. The tempFile parameter can optionally contain the following variables, which the operator evaluates at runtime to generate the file name:
  • %HOST The host that is running the processing element (PE) of this operator.
  • %PROCID The process ID of the processing element.
  • %PEID The processing element ID.
  • %PELAUNCHNUM The PE launch count.
  • %TIME The time when the file is created. If the timeFormat parameter is not specified, the default time format is yyyyMMdd_HHmmss .
Important: This parameter must not be used in a consistent region.
Properties
timeFormat

This parameter specifies the time format to use when the file parameter value contains %TIME . The parameter value must contain conversion specifications that are supported by the java.text.SimpleDateFormat. The default format is yyyyMMdd_HHmmss .

Properties
timePerFile

This parameter specifies the approximate time, in seconds, after which the current output file is closed and a new file is opened for writing. The bytesPerFile, timePerFile, and tuplesPerFile parameters are mutually exclusive; you can specify only one of these parameters.

Properties
tuplesPerFile

This parameter specifies the maximum number of tuples that can be received for each output file. When the specified number of tuples are received, the current output file is closed and a new file is opened for writing. The bytesPerFile, timePerFile, and tuplesPerFile parameters are mutually exclusive; you can specify only one of these parameters at a time.

Properties

Libraries

Operator class library
Library Path: ../../impl/lib/com.teracloud.streams.hdfs.jar