-
Notifications
You must be signed in to change notification settings - Fork 1k
PHOENIX-7786 Improved handling for empty files in Replication Log Processor #2392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: PHOENIX-7562-feature-new
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -238,7 +238,7 @@ public void processLogFile(FileSystem fs, Path filePath) throws IOException { | |
|
|
||
| if (!logFileReaderOptional.isPresent()) { | ||
| // This is an empty file, assume processed successfully and return | ||
| LOG.warn("Found empty file to process {}", filePath); | ||
| LOG.info("Ignoring zero length replication log file {}", filePath); | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -303,32 +303,33 @@ protected Optional<LogFileReader> createLogFileReader(FileSystem fs, Path filePa | |
| LogFileReader logFileReader = new LogFileReader(); | ||
| LogFileReaderContext logFileReaderContext = | ||
| new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath); | ||
| boolean isClosed = isFileClosed(fs, filePath); | ||
| if (isClosed) { | ||
| // As file is closed, ensure that the file has a valid header and trailer | ||
| logFileReader.init(logFileReaderContext); | ||
| return Optional.of(logFileReader); | ||
| } else { | ||
| LOG.warn("Found un-closed file {}. Starting lease recovery.", filePath); | ||
| try { | ||
| // Ensure to recover lease first, in case file was un-closed. If it was already closed, | ||
| // recoverLease would return true immediately. | ||
| recoverLease(fs, filePath); | ||
| if (fs.getFileStatus(filePath).getLen() <= 0) { | ||
| // Found empty file, returning null LogReader | ||
| if (fs.getFileStatus(filePath).getLen() > 0) { | ||
| try { | ||
| // Acquired the lease, try to create reader with validation both header and trailer | ||
| logFileReader.init(logFileReaderContext); | ||
| return Optional.of(logFileReader); | ||
| } catch (InvalidLogTrailerException invalidLogTrailerException) { | ||
| // If trailer is missing or corrupt, create reader without trailer validation | ||
| LOG.warn("Invalid Trailer for file {}", filePath, invalidLogTrailerException); | ||
| // close the reader first to avoid leaking socket connection | ||
| logFileReader.close(); | ||
| logFileReaderContext.setValidateTrailer(false); | ||
| logFileReader.init(logFileReaderContext); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is still a leak here because the socket is created in the init call. https://github.com/apache/phoenix/blob/PHOENIX-7562-feature-new/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java#L49
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are 2 init calls here so we need to handle failures from any of the two and close the reader |
||
| return Optional.of(logFileReader); | ||
| } | ||
| } else { | ||
| // Ignore the file and returning empty LogReader. | ||
| return Optional.empty(); | ||
| } | ||
| try { | ||
| // Acquired the lease, try to create reader with validation both header and trailer | ||
| logFileReader.init(logFileReaderContext); | ||
| return Optional.of(logFileReader); | ||
| } catch (InvalidLogTrailerException invalidLogTrailerException) { | ||
| // If trailer is missing or corrupt, create reader without trailer validation | ||
| LOG.warn("Invalid Trailer for file {}", filePath, invalidLogTrailerException); | ||
| logFileReaderContext.setValidateTrailer(false); | ||
| logFileReader.init(logFileReaderContext); | ||
| return Optional.of(logFileReader); | ||
| } catch (IOException exception) { | ||
| LOG.error("Failed to initialize new LogFileReader for path {}", filePath, exception); | ||
| throw exception; | ||
| } | ||
| } catch (IOException exception) { | ||
| LOG.error("Failed to initialize new LogFileReader for path {}", filePath, exception); | ||
| // close the reader to avoid leaking socket connection | ||
| closeReader(logFileReader); | ||
| throw exception; | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, this still doesn't completely fix the issue. While it fixes the leak, you simply cannot use the same LogFileReader object because it internally maintains a
closedstate and then when you try to iterate it will not return any result. https://github.com/apache/phoenix/blob/PHOENIX-7562-feature-new/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java#L73-L74You have to construct a new LogFileReader object. This also means you need to enhance your test to correctly capture the issue.