-
Notifications
You must be signed in to change notification settings - Fork 68
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Change ETLv2 file locking to use flock (#167)
* Documentation and logging improvements * Use flock to manage ETLv2 lockfiles
- Loading branch information
Showing
1 changed file
with
134 additions
and
127 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,9 @@ | ||
<?php | ||
|
||
/* ========================================================================================== | ||
* Handling of ETL lock files. In order to allow multiple ETL processes to be run concurrently, the | ||
* lock files are not solely pid-based, but also take into account the actions that are being | ||
* executed. This allows multiple etl pipelines to be executed concurrently as long as the actions | ||
* being performed do not overlap. | ||
* Manage ETL lock files. In order to allow multiple ETL processes to be run concurrently, | ||
* the lock files are not solely pid-based, but also take into account the actions that | ||
* are being executed. This allows multiple ETL pipelines to be executed concurrently as | ||
* long as the actions being performed do not overlap. | ||
* | ||
* @author Steve Gallo <[email protected]> | ||
* @date 2016-12-19 | ||
|
@@ -13,25 +12,56 @@ | |
|
||
namespace ETL; | ||
|
||
use \Exception; | ||
use Exception; | ||
use Log; | ||
|
||
class LockFile extends Loggable | ||
{ | ||
// Process ID of the current process | ||
/** | ||
* Current process PID | ||
* | ||
* @var integer|null | ||
*/ | ||
|
||
protected $pid = null; | ||
|
||
// Directory where lock files are stored, read from the configuration file | ||
/** | ||
* Directory where lock files are stored, read from the configuration file | ||
* | ||
* @var string|null | ||
*/ | ||
|
||
protected $lockDir = null; | ||
|
||
// Optional prefix for lock files, read from the configuration file | ||
/** | ||
* Optional prefix for lock files, read from the configuration file | ||
* | ||
* @var string|null | ||
*/ | ||
|
||
protected $lockFilePrefix = null; | ||
|
||
/* ------------------------------------------------------------------------------------------ | ||
* Set the provided logger or instantiate a null logger if one was not provided. The null handler | ||
* consumes log events and does nothing with them. | ||
/** | ||
* File handle to the current lockfile | ||
* | ||
* @param $logger A PEAR Log object or null to use the null logger. | ||
* @var resource|null | ||
*/ | ||
|
||
protected $lockFileHandle = null; | ||
|
||
/** | ||
* Path to the current lockfile | ||
* | ||
* @var string|null | ||
*/ | ||
|
||
protected $lockFile = null; | ||
|
||
/** ----------------------------------------------------------------------------------------- | ||
* Set the provided logger or instantiate a null logger if one was not provided. The | ||
* null handler consumes log events and does nothing with them. | ||
* | ||
* @param Log $logger A PEAR Log object or null to use the null logger. | ||
* ------------------------------------------------------------------------------------------ | ||
*/ | ||
|
||
|
@@ -41,8 +71,7 @@ public function __construct($lockDir, $lockPrefix = null, Log $logger = null) | |
|
||
if ( null === $lockDir || "" === $lockDir ) { | ||
$lockDir = getcwd(); | ||
$msg = "Empty lock directory specified, using current directory '$lockDir'"; | ||
$this->logger->info($msg); | ||
$this->logger->info("Empty lock directory specified, using current directory: $lockDir"); | ||
} | ||
|
||
$this->lockDir = $lockDir; | ||
|
@@ -53,19 +82,20 @@ public function __construct($lockDir, $lockPrefix = null, Log $logger = null) | |
$this->logger->info("Creating lock directory '" . $this->lockDir . "'"); | ||
if ( false === @mkdir($this->lockDir) ) { | ||
$error = error_get_last(); | ||
$msg = "Error creating lock directory '" . $this->lockDir . "': " . $error['message']; | ||
$this->logAndThrowException($msg); | ||
$this->logAndThrowException( | ||
sprintf("Error creating lock directory '%s': %s", $this->lockDir, $error['message']) | ||
); | ||
} | ||
} // if ( ! is_dir($this->lockDir) ) | ||
|
||
} // __construct() | ||
|
||
/* ------------------------------------------------------------------------------------------ | ||
/** ----------------------------------------------------------------------------------------- | ||
* Generate a lock file name. | ||
* | ||
* @param $pid An optional PID to use rather than the current PID | ||
* @param integer $pid An optional PID to use rather than the current PID | ||
* | ||
* @return A fully qualified path to the lock file. | ||
* @return string A fully qualified path to the lock file. | ||
* ------------------------------------------------------------------------------------------ | ||
*/ | ||
|
||
|
@@ -79,35 +109,13 @@ protected function generateLockfileName($pid = null) | |
); | ||
} // generateLockfileName() | ||
|
||
/* ------------------------------------------------------------------------------------------ | ||
* Check if the specified process is running. | ||
* | ||
* @param $pid PID to check | ||
* | ||
* @return TRUE if a process with the specified PID is is running, FALSE otherwise. | ||
* ------------------------------------------------------------------------------------------ | ||
*/ | ||
|
||
protected function isProcessRunning($pid = null) | ||
{ | ||
$pid = ( null === $pid ? $this->pid : $pid ); | ||
$pidList = explode(PHP_EOL, shell_exec("ps -A | awk '{print $1}'")); | ||
|
||
if ( in_array($pid, $pidList) ) { | ||
return true; | ||
} | ||
|
||
return false; | ||
|
||
} // isProcessRunning() | ||
|
||
/* ------------------------------------------------------------------------------------------ | ||
* Generate a lock file for the current process and action list. If any of the actions are | ||
* present in any other lock files, we cannot generate the lock. | ||
/** ----------------------------------------------------------------------------------------- | ||
* Generate a lock file for the current process and action list. If any of the actions | ||
* are present in any other lock files, we cannot generate the lock. | ||
* | ||
* @param $actionList An array of action names to be executed by this ETL process. | ||
* @param array $actionList A list of action names to be executed by this ETL process. | ||
* | ||
* @return TRUE if the lock was generated, FALSE otherwise. | ||
* @return boolean TRUE if the lock was generated, FALSE otherwise. | ||
* ------------------------------------------------------------------------------------------ | ||
*/ | ||
|
||
|
@@ -119,30 +127,43 @@ public function lock(array $actionList) | |
|
||
if ( false === ($dh = opendir($this->lockDir)) ) { | ||
$error = error_get_last(); | ||
$msg = "Error opening lock directory '" . $this->lockDir . "': " . $error['message']; | ||
$this->logger->warning($msg); | ||
$this->logger->warning( | ||
sprintf("Error opening lock directory '%s': %s", $this->lockDir, $error['message']) | ||
); | ||
return false; | ||
} | ||
|
||
// Cleanup any lockfiles not associated with a running process | ||
|
||
while ( ($file = readdir($dh) ) !== false ) { | ||
if ( '.' == $file || '..' == $file ) { | ||
continue; | ||
} elseif ( null !== $this->lockFilePrefix && 0 !== strpos($file, $this->lockFilePrefix) ) { | ||
// If the prefix is set, the file must match the prefix | ||
continue; | ||
} | ||
|
||
$file = $this->lockDir . '/' . $file; | ||
|
||
// If the proecess is not running, remove this lock file and continue to the next. | ||
|
||
if ( $this->_cleanup($file) ) { | ||
if ( $this->unlock($file) ) { | ||
continue; | ||
} | ||
|
||
// Ensure that there are no overlapping actions with a running ETL process | ||
|
||
$lockFileActionList = explode(PHP_EOL, file_get_contents($file)); | ||
$pid = array_shift($lockFileActionList); | ||
$actionIntersection = array_intersect($lockFileActionList, $actionList); | ||
if ( 0 != count($actionIntersection) ) { | ||
$msg = "Cannot obtain lock. Process '$pid' already running and executing overlapping actions (" . implode(", ", $actionIntersection) . ")"; | ||
$this->logAndThrowException($msg); | ||
$this->logAndThrowException( | ||
sprintf( | ||
"Cannot obtain lock. Process '%d' already running and executing overlapping actions (%s)", | ||
$pid, | ||
implode(", ", $actionIntersection) | ||
) | ||
); | ||
} | ||
} // while ( ($file = readdir($dh) ) !== false ) | ||
|
||
|
@@ -152,106 +173,92 @@ public function lock(array $actionList) | |
|
||
$contents = implode(PHP_EOL, array_merge(array($this->pid), $actionList)); | ||
|
||
if ( false === @file_put_contents($lockFile, $contents) ) { | ||
if ( false === ($fp = @fopen($lockFile, 'w')) ) { | ||
$error = error_get_last(); | ||
$msg = "Error creating lock file '$lockFile': " . $error['message']; | ||
$this->logger->warning($msg); | ||
$this->logger->warning( | ||
sprintf("Error creating lock file '%s': %s", $lockFile, $error['message']) | ||
); | ||
return false; | ||
} | ||
|
||
// Obtain an advisory lock. This advisory will be memoved if the process dies or | ||
// the file is closed. This appears to be contrary to the PHP docs at | ||
// http://php.net/manual/en/function.flock.php stating "5.3.2 The automatic | ||
// unlocking when the file's resource handle is closed was removed. Unlocking now | ||
// always has to be done manually." because the OS releases the lock automatically | ||
// when the file is closed. | ||
|
||
if ( ! flock($fp, LOCK_EX | LOCK_NB) ) { | ||
$this->logAndThrowException( | ||
sprintf("Unexpected failure to obtain lock for process %d on file %s", $this->pid, $lockFile) | ||
); | ||
} | ||
fwrite($fp, $contents); | ||
fflush($fp); | ||
|
||
$this->lockFile = $lockFile; | ||
$this->lockFileHandle = $fp; | ||
|
||
return true; | ||
|
||
} // lock() | ||
|
||
/* ------------------------------------------------------------------------------------------ | ||
* Release the lock for the specified PID. | ||
/** ----------------------------------------------------------------------------------------- | ||
* Release the specified lock file. If no file is specified, release then lockfile for | ||
* the current current process. | ||
* | ||
* @param $pid PID to check, NULL to use the PID of the current process. | ||
* @param string $file The name of the file to release, or NULL to release the current | ||
* lockfile. | ||
* | ||
* @return TRUE if the lock was released, FALSE otherwise. | ||
* @return boolean TRUE if the lock was released, FALSE otherwise. | ||
* ------------------------------------------------------------------------------------------ | ||
*/ | ||
|
||
public function unlock($pid = null) | ||
public function unlock($file = null) | ||
{ | ||
$lockFile = $this->generateLockfileName($pid); | ||
$isRunning = $this->isProcessRunning($pid); | ||
|
||
$pid = ( null === $pid ? $this->pid : $pid ); | ||
|
||
if ( file_exists($lockFile) ) { | ||
if ( null === $file && null !== $this->lockFile ) { | ||
|
||
if ( ! $isRunning ) { | ||
$msg = "Process '$pid' is not running"; | ||
$this->logger->warning($msg); | ||
} | ||
@flock($this->lockFileHandle, LOCK_UN); | ||
@fclose($this->lockFileHandle); | ||
$file = $this->lockFile; | ||
$this->lockFileHandle = null; | ||
$this->lockFile = null; | ||
|
||
$this->logger->info("Releasing lock '$lockFile'"); | ||
} elseif ( null !== $file ) { | ||
|
||
if ( false === @unlink($lockFile) ) { | ||
if ( false === ($fp = @fopen($file, 'r')) ) { | ||
$error = error_get_last(); | ||
$msg = "Error removing lock file '$lockFile': " . $error['message']; | ||
$this->logger->warning($msg); | ||
$this->logger->warning( | ||
sprintf("Error opening file '%s': %s", $file, $error['message']) | ||
); | ||
return false; | ||
} | ||
|
||
} // if ( file_exists($lockFile) ) | ||
|
||
return true; | ||
|
||
} // unlock() | ||
|
||
/* ------------------------------------------------------------------------------------------ | ||
* Clean up any lock files that do not have corresponding running processes. | ||
* | ||
* @return TRUE on success, FALSE if there was an error. | ||
* ------------------------------------------------------------------------------------------ | ||
*/ | ||
|
||
public function cleanup() | ||
{ | ||
|
||
if ( false === ($dh = opendir($this->lockDir)) ) { | ||
$error = error_get_last(); | ||
$msg = "Error opening lock directory '" . $this->lockDir . "': " . $error['message']; | ||
$this->logger->warning($msg); | ||
return false; | ||
} | ||
|
||
while ( ($file = readdir($dh) ) !== false ) { | ||
if ( '.' == $file || '..' == $file ) { | ||
continue; | ||
if ( flock($fp, LOCK_EX | LOCK_NB) ) { | ||
$pid = trim(fgets($fp)); | ||
$this->logger->warning("Process '$pid' is not running, releasing lock file."); | ||
flock($fp, LOCK_UN); | ||
fclose($fp); | ||
} else { | ||
fclose($fp); | ||
return false; | ||
} | ||
$file = $this->lockDir . '/' . $file; | ||
$this->_cleanup($file); | ||
} // while ( ($file = readdir($dh) ) !== false ) | ||
|
||
closedir($dh); | ||
|
||
return true; | ||
|
||
} // cleanup() | ||
|
||
/* ------------------------------------------------------------------------------------------ | ||
* Check that the process associated with the lock file is running, if not then clean up the | ||
* lock file. | ||
* | ||
* @param $file Lock file to check | ||
* | ||
* @return TRUE if the lock was released, FALSE otherwise. | ||
* ------------------------------------------------------------------------------------------ | ||
*/ | ||
} | ||
|
||
private function _cleanup($file) | ||
{ | ||
$contents = explode(PHP_EOL, file_get_contents($file)); | ||
$pid = array_shift($contents); | ||
// Guard against the case that someone calls unlock() before lock() | ||
|
||
if ( ! $this->isProcessRunning($pid) ) { | ||
return $this->unlock($pid); | ||
if ( null !== $file ) { | ||
$this->logger->info("Releasing lock file '$file'"); | ||
if ( null !== $file && false === @unlink($file) ) { | ||
$error = error_get_last(); | ||
$this->logger->warning( | ||
sprintf("Error removing lock file '%s': %s", $file, $error['message']) | ||
); | ||
return false; | ||
} | ||
} | ||
return true; | ||
|
||
return false; | ||
|
||
} // _cleanup() | ||
} // unlock() | ||
} // class Lockfile |