Operator DirectoryScan
The DirectoryScan operator watches a directory, and generates file names on the output, one for each file that is found in the directory. The absolute path name of the file is generated. The file name is only generated the first time that the file is seen during a directory scan until it is re-created. The change time (ctime) is used to detect if a file was re-created. Output clause and custom output functions can be used to specify additional information about a file. Subdirectories and all non-regular files that are found in the directory are ignored during the scan.
Note: Because the change time of the file is used to detect if a file was re-created, it is possible that large files are still being written when a directory is being scanned. In this case, the same file name can be generated multiple times, if the time between scans is less than the time to write the file. To avoid this situation, write the file into a different directory on the same file system as the directory that is being scanned, and then rename to the target directory when complete. If the files are on the same file system, /bin/mv does this. If a regular expression pattern is being used to match only certain files, creating the new files under a name that fails to match the pattern, and then renaming, also works.
Before you submit the file name to the output stream, the DirectoryScan operator can optionally move processed files to a different directory by using the moveToDirectory parameter. If the moveToDirectory parameter is specified, the file (or symbolic link) is moved to the moveToDirectory directory before the output tuple is generated.
When moveToDirectory is specified, it is valid to have multiple DirectoryScan operators that are reading the same directory. The DirectoryScan operator ensures that each file is submitted by only one operator by creating a temporary .rename subdirectory in the directory and moveToDirectory directories.
Checkpointed data
When the DirectoryScan operator is checkpointed in a consistent region, the list of files processed (or ignored) and the file change time (ctime) of each file when it was processed, and logic state variables (if present) are saved in checkpoint. When the DirectoryScan operator is checkpointed in an autonomous region, logic state variables (if present) are saved in checkpoint.
Behavior in a consistent region
The DirectoryScan operator can be the start operator or an operator within the reachability graph of a consistent region. When the trigger is operatorDriven, drain processing is triggered after each submitted tuple. The persisted state includes the list of files processed (or ignored), and the file change time (ctime) of each file when it was processed. Logic state variables (if present) are also automatically checkpointed and resetted.
If the sleepTime parameter is specified, the elapsed time since the last scan is not part of the checkpoint state of the operator. If the DirectoryScan operator is reset without a processing element (PE) restart, the time since last scan is not updated. If the reset is due to a PE restart, the next scan is performed when the PE is restarted and the initDelay (if any) is complete.
Tip: Use the DirectoryScan operator as the start of a consistent region when used with a FileSource operator. It is recommended for the DirectoryScan and the FileSource operators to be fused and without threaded ports when in a consistent region.
Checkpointing behavior in an autonomous region
When the DirectoryScan operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its internal state to its initial state, and restores logic state variables (if present) from the last checkpoint.
When the DirectoryScan operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.
Such checkpointing behavior is subject to change in the future.
Exceptions
- The directory or moveToDirectory does not exist.
- The directory or moveToDirectory is not a directory.
- The pattern is not a valid regular expression.
- The .rename directories cannot be created when moveToDirectory is specified.
Examples
These examples use the DirectoryScan operator.
composite Main {
graph
// DirectoryScan operator with a relative directory argument
stream<rstring name> Dir1 = DirectoryScan()
{
param
directory : "People.dir";
initDelay: 10.0;
}
// DirectoryScan operator with an absolute file argument and a file name pattern
stream<rstring name> Dir2 = DirectoryScan()
{
param
directory : "/tmp/work";
pattern : "^work.*";
}
// use a FileSource operator to process the file names
stream<rstring line> Beat6 = FileSource(Dir2)
{
param // note: param file is not specified
format : line;
deleteFile : true; // delete the file when processing is finished
}
// Use DirectoryScan operator to move files to a different directory.
// Move the scanned files to the /tmp/active directory. Generate a tuple containing
// the original filename in /tmp/work (sourceFile), and the moved filename
// in /tmp/active (movedFile).
// Generate the size of the file (fileSize).
stream<rstring sourceFile, rstring movedFile, uint64 fileSize> Dir3 = DirectoryScan()
{
param
directory : "/tmp/work";
moveToDirectory : "/tmp/active";
output Dir3 : sourceFile = FilePath(), movedFile = DestinationFilePath(),
fileSize = Size();
}
}
Summary
- Ports
- This operator has 0 input ports and 1 output port.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator supports 9 parameters.
Required: directory
Optional: ignoreDotFiles, ignoreExistingFilesAtStartup, initDelay, moveToDirectory, order, pattern, sleepTime, sortBy
- Metrics
- This operator reports 1 metric.
Properties
- Implementation
- C++
- Threading
- Always - Operator always provides a single threaded execution context.
- Assignments
- This operator allows any SPL expression of the correct type to be assigned to output attributes.
- Output Functions
-
- DirectoryScanFunctions
-
- <any T> T AsIs(T)
-
Returns the argument unchanged.
- rstring FilePath()
-
Returns the relative pathname to the file in the source directory.
- rstring FullPath()
-
Returns the absolute pathname to the file in the source directory.
- rstring FileName()
-
Returns the basename of the file.
- rstring Directory()
-
Returns the pathname to the source directory.
- rstring DestinationDirectory()
-
Returns the pathname to the destination directory that contains the file.
- rstring DestinationFilePath()
-
Returns the relative pathname to the file in the destination directory.
- rstring DestinationFullPath()
-
Returns the absolute pathname to the file in the destination directory.
- uint64 Size()
-
Returns the size of the file in bytes.
- uint64 Atime()
-
Returns the access time (atime) of the file in seconds since the epoch. Note: The atime field is set from the original file in the source directory.
- uint64 Ctime()
-
Returns the change time (ctime) of the file in seconds since the epoch. Note: The ctime field is set from the original file in the source directory.
- uint64 Mtime()
-
Returns the modification time (mtime) of the file in seconds since the epoch. Note: The mtime field is set from the original file in the source directory.
- Ports (0)
-
The DirectoryScan operator is configurable with a single output port, which produces tuples that contain the names of the scanned files. The output schema for DirectoryScan operator is a tuple. The generated tuple is populated by using the output clause. If there is no output clause, or an attribute in the tuple is not assigned by using an output clause, then the attribute must be of type rstring.
- Properties
-
- Optional: false
- TupleMutationAllowed: false
- WindowPunctuationOutputMode: Free
Required: directory
Optional: ignoreDotFiles, ignoreExistingFilesAtStartup, initDelay, moveToDirectory, order, pattern, sleepTime, sortBy
- directory
-
Specifies the name of the directory to be scanned. In a consistent region, do not include more than one instance of a DirectoryScan operator that is configured with the same directory parameter.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: false
- ExpressionMode: AttributeFree
- ignoreDotFiles
-
Specifies whether the DirectoryScan operator ignores files with a leading period (.) in the directory. By default, the value is set to false and files with a leading period are processed.
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- ignoreExistingFilesAtStartup
-
Specifies whether the DirectoryScan operator ignores pre-existing files in the directory. By default, the value is set to false and all files are processed as usual. If set to true, any files present in the directory are marked as already processed, and not submitted.
If initDelay is specified, this check is done before the DirectoryScan operator delays.
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- initDelay
-
Specifies the number of seconds that the DirectoryScan operator delays before it starts to produce tuples.
- Properties
-
- Type: float64
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- moveToDirectory
-
Specifies the name of the directory to which files are moved before the output tuple is generated.
If the moveFileToDirectory parameter is specified for an operator in a consistent region, the output tuple contains the file name before the file is moved. The file is moved after the file name is submitted and a new consistent state is successfully established. Write operators that consume the output tuples from the DirectoryScan operator before they establish the new consistent state. This parameter is not supported in periodic consistent regions.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- order
-
Controls how the sortBy parameter sorts the files. The valid values are ascending and descending. If the order parameter is not specified, the default value is set to ascending.
If sortBy is set to date, the file with the oldest change time (ctime) is generated first for ascending order. If sortBy is set to name, the file name that is lexically smallest is generated first for ascending order.
- Properties
-
- Type: SortOrder (ascending, descending)
- Optional: true
- ExpressionMode: CustomLiteral
- pattern
-
Instructs the DirectoryScan operator to ignore file names that do not match the regular expression pattern, using the same matching method as regexMatch(rstring, rstring).
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- sleepTime
-
Specifies the minimal time between scans of the directory, in seconds. If this parameter is not specified, the default is 5.0 seconds. If the time difference between the start of the last scan and the current time is less than sleepTime seconds, the DirectoryScan operator sleeps until the time since the last scan is sleepTime seconds. If more than sleepTime seconds already passed, the next scan begins immediately.
- Properties
-
- Type: float64
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- sortBy
-
Determines the order in which file names are generated during a single scan of the directory when there are multiple valid files at the same time. The valid values are date and name. If the sortBy parameter is not specified, the default sort order is set to date.
- Properties
-
- Type: SortByType (date, name)
- Cardinality: 1
- Optional: true
- ExpressionMode: CustomLiteral
- DirectoryScan
-
stream<rstring name> ${outputStream} = DirectoryScan() { param directory: "${directoryToScan}"; }
- DirectoryScan with FileSource
-
stream<rstring name> ${outputStream} = DirectoryScan() { param directory: "${directoryToScan}"; } stream<${schema}> ${fileSourceStream} = FileSource(${outputStream}) { param ${cursor} }
- nScans - Counter
-
The number of times the directory has been scanned for files.
- spl-std-tk-lib