Skip to content

Commit 37a236d

Browse files
committed
rebase
1 parent 8b215af commit 37a236d

4 files changed

Lines changed: 56 additions & 40 deletions

File tree

docs/streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ methods for creating DStreams from files and Akka actors as input sources.
659659
+ Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
660660

661661
For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores.
662-
If Spark Streaming monitor the directory in nested directories, there is an easier method `streamingContext.textFileStream(dataDirectory, depth)`.
662+
If Spark Streaming monitor the directory in nested directories, there is an easier method `streamingContext.textFileStream(dataDirectory, depth)`.
663663

664664
<span class="badge" style="background-color: grey">Python API</span> As of Spark 1.2,
665665
`fileStream` is not available in the Python API, only `textFileStream` is available.

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger
2222

2323
import scala.collection.Map
2424
import scala.collection.mutable.Queue
25-
import scala.language.implicitConversions
2625
import scala.reflect.ClassTag
2726

2827
import akka.actor.{Props, SupervisorStrategy}
@@ -531,9 +530,11 @@ object StreamingContext extends Logging {
531530

532531
private[streaming] val DEFAULT_CLEANER_TTL = 3600
533532

534-
implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
533+
@deprecated("Replaced by implicit functions in the DStream companion object. This is " +
534+
"kept here only for backward compatibility.", "1.3.0")
535+
def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
535536
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
536-
new PairDStreamFunctions[K, V](stream)
537+
DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
537538
}
538539

539540
/**

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.streaming.dstream
1919

2020
import java.io.{FileNotFoundException, IOException, ObjectInputStream}
21+
import java.util.concurrent.ConcurrentHashMap
2122

2223
import scala.collection.mutable
2324
import scala.reflect.ClassTag
@@ -77,13 +78,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
7778
newFilesOnly: Boolean = true)
7879
extends InputDStream[(K, V)](ssc_) {
7980

81+
// This is a def so that it works during checkpoint recovery:
82+
private def clock = ssc.scheduler.clock
83+
8084
require(depth >= 1, "nested directories depth must >= 1")
8185
// Data to be saved as part of the streaming checkpoints
8286
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
8387

8488
// Initial ignore threshold based on which old, existing files in the directory (at the time of
8589
// starting the streaming application) will be ignored or considered
86-
private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
90+
private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
8791

8892
/*
8993
* Make sure that the information of files selected in the last few batches are remembered.
@@ -95,8 +99,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
9599
remember(durationToRemember)
96100

97101
// Map of batch-time to selected file info for the remembered batches
102+
// This is a concurrent map because it's also accessed in unit tests
98103
@transient private[streaming] var batchTimeToSelectedFiles =
99-
new mutable.HashMap[Time, Array[String]]
104+
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
100105

101106
// Set of files that were selected in the remembered batches
102107
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
@@ -159,7 +164,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
159164
*/
160165
private def findNewFiles(currentTime: Long): Array[String] = {
161166
try {
162-
lastNewFileFindingTime = System.currentTimeMillis
167+
lastNewFileFindingTime = clock.currentTime()
163168

164169
// Calculate ignore threshold
165170
val modTimeIgnoreThreshold = math.max(
@@ -212,7 +217,6 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
212217
}.flatMap(fs.listStatus(_)).toSeq
213218

214219
val newFiles = path.flatMap(dfs(_)).map(_.getPath.toString).toArray
215-
216220
val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
217221
logInfo("Finding new files took " + timeTaken + " ms")
218222
logDebug("# cached file times = " + fileToModTime.size)
@@ -321,7 +325,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
321325
logDebug(this.getClass().getSimpleName + ".readObject used")
322326
ois.defaultReadObject()
323327
generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
324-
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
328+
batchTimeToSelectedFiles =
329+
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
325330
recentlySelectedFiles = new mutable.HashSet[String]()
326331
fileToModTime = new TimeStampedHashMap[String, Long](true)
327332
lastFoundDirs = new mutable.HashSet[Path]()

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
2828
import java.util.concurrent.atomic.AtomicInteger
2929

3030
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
31-
import scala.concurrent.duration._
3231
import scala.language.postfixOps
3332

3433
import com.google.common.io.Files
@@ -121,7 +120,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
121120
testFileStream(newFilesOnly = false, 3)
122121
}
123122

124-
125123
test("multi-thread receiver") {
126124
// set up the test receiver
127125
val numThreads = 10
@@ -251,49 +249,61 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
251249
}
252250

253251
def testFileStream(newFilesOnly: Boolean, depth :Int = 1) {
254-
var ssc: StreamingContext = null
255252
val testDir: File = null
256253
try {
254+
val batchDuration = Seconds(2)
257255
var testDir = Utils.createTempDir()
258256
for (i <- 2 until depth) {
259257
testDir = Utils.createTempDir(testDir.toString)
260258
}
259+
// Create a file that exists before the StreamingContext is created:
261260
val existingFile = new File(testDir, "0")
262261
Files.write("0\n", existingFile, Charset.forName("UTF-8"))
262+
assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
263263

264-
Thread.sleep(1000)
265264
// Set up the streaming context and input streams
266-
val newConf = conf.clone.set(
267-
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
268-
ssc = new StreamingContext(newConf, batchDuration)
269-
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
270-
testDir.toString, (x: Path) => true,
271-
newFilesOnly = newFilesOnly, depth).map(_._2.toString)
272-
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
273-
val outputStream = new TestOutputStream(fileStream, outputBuffer)
274-
outputStream.register()
275-
ssc.start()
276-
277-
// Create files in the directory
278-
val input = Seq(1, 2, 3, 4, 5)
279-
input.foreach { i =>
280-
Thread.sleep(batchDuration.milliseconds)
281-
val file = new File(testDir, i.toString)
282-
Files.write(i + "\n", file, Charset.forName("UTF-8"))
283-
logInfo("Created file " + file)
284-
}
265+
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
266+
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
267+
// This `setTime` call ensures that the clock is past the creation time of `existingFile`
268+
clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
269+
val batchCounter = new BatchCounter(ssc)
270+
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
271+
testDir.toString, (x: Path) => true,
272+
newFilesOnly = newFilesOnly, depth).map(_._2.toString)
273+
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
274+
val outputStream = new TestOutputStream(fileStream, outputBuffer)
275+
outputStream.register()
276+
ssc.start()
277+
278+
// Advance the clock so that the files are created after StreamingContext starts, but
279+
// not enough to trigger a batch
280+
clock.addToTime(batchDuration.milliseconds / 2)
281+
282+
// Over time, create files in the directory
283+
val input = Seq(1, 2, 3, 4, 5)
284+
input.foreach { i =>
285+
val file = new File(testDir, i.toString)
286+
Files.write(i + "\n", file, Charset.forName("UTF-8"))
287+
assert(file.setLastModified(clock.currentTime()))
288+
assert(file.lastModified === clock.currentTime)
289+
logInfo("Created file " + file)
290+
// Advance the clock after creating the file to avoid a race when
291+
// setting its modification time
292+
clock.addToTime(batchDuration.milliseconds)
293+
eventually(eventuallyTimeout) {
294+
assert(batchCounter.getNumCompletedBatches === i)
295+
}
296+
}
285297

286-
// Verify that all the files have been read
287-
val expectedOutput = if (newFilesOnly) {
288-
input.map(_.toString).toSet
289-
} else {
290-
(Seq(0) ++ input).map(_.toString).toSet
291-
}
292-
eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) {
298+
// Verify that all the files have been read
299+
val expectedOutput = if (newFilesOnly) {
300+
input.map(_.toString).toSet
301+
} else {
302+
(Seq(0) ++ input).map(_.toString).toSet
303+
}
293304
assert(outputBuffer.flatten.toSet === expectedOutput)
294305
}
295306
} finally {
296-
if (ssc != null) ssc.stop()
297307
if (testDir != null) Utils.deleteRecursively(testDir)
298308
}
299309
}

0 commit comments

Comments
 (0)