/** A linearizability tester for total queues (i.e. queues that do not * block, and such that dequeues on the empty queue return null). */ import scala.collection.immutable.Queue import ox.cads.util.Profiler import ox.cads.testing._ /** Object to perform linearizability testing on a queue. */ object QueueTest{ var iters = 200 // Number of iterations by each worker val MaxVal = 20 // Maximum value placed in the queue var enqueueProb = 0.3 // probability of doing an enqueue var queueType = "unbounded" // which queue type are we using? var tsLog = true // use timestamp-based log? // Maximum number of iterations to be performed by the WG Graph Search, the // JIT Graph search and a tree search algorithm, respectively. val MaxSizeWGG = 15000000 // 1300000 // 1.3 * 10^6 val MaxSizeJITG = 15000000 // 3000000 // 10 * 10^6 val MaxSizeT = 10000000000L // 10 billion // Value used to signal a timeout. val ErrorSignal = ox.cads.experiments.Experiments.ErrorSignal // Types of immutable sequential specification queue, undoable sequential // specification queue, and concurrent queue. type SeqQueue = scala.collection.immutable.Queue[Int] type USeqQueue = UndoableQueue[Int] type ConcQueue = ox.cads.collection.Queue[Int] def seqEnqueue(x: Int)(q: Queue[Int]) : (Unit, Queue[Int]) = ((), q.enqueue(x)) def seqDequeue(q: Queue[Int]) : (Option[Int], Queue[Int]) = if(q.isEmpty) (None,q) else{ val (r,q1) = q.dequeue; (Some(r), q1) } /** A worker for testers based on an immutable sequential datatype. */ def worker(me: Int, log: GenericThreadLog[SeqQueue, ConcQueue]) = { val random = new scala.util.Random(scala.util.Random.nextInt+me*45207) for(i <- 0 until iters) if(random.nextFloat <= enqueueProb){ val x = random.nextInt(MaxVal) log.log(_.enqueue(x), "enqueue("+x+")", seqEnqueue(x)) } else log.log(_.dequeue, "dequeue", seqDequeue) } /** A worker for the testers based on an undoable sequential datatype. */ def uWorker(me: Int, log: GenericThreadLog[USeqQueue, ConcQueue]) = { val random = new scala.util.Random(scala.util.Random.nextInt+me*45207) for(i <- 0 until iters) if(random.nextFloat <= enqueueProb){ val x = random.nextInt(MaxVal) log.log(_.enqueue(x), "enqueue "+x, _.enqueue(x)) } else log.log(_.dequeue, "dequeue ", _.dequeue) } /** A worker for the QueueLinTester */ def QLTWorker(me: Int, log: QueueThreadLog[Int, ConcQueue]) = { val random = new scala.util.Random(scala.util.Random.nextInt+me*45207) if(queueType == "SHC"){ for(i <- 0 until iters){ if(me%2 == 0){ // just do enqueues val x = random.nextInt(MaxVal); log.logEnqueue(x, _.enqueue(x)) } else log.logDequeue(_.dequeue) } } else{ for(i <- 0 until iters) if(random.nextFloat <= enqueueProb){ val x = random.nextInt(MaxVal); log.logEnqueue(x, _.enqueue(x)) } else log.logDequeue(_.dequeue) } } type CompLog = CompetitionThreadLog[SeqQueue, USeqQueue, ConcQueue] /** A worker for a competition linearizability tester using an immutable * and an undoable sequential specification. */ def compWorker(me: Int, log: CompLog) = { val random = new scala.util.Random(scala.util.Random.nextInt+me*45207) for(i <- 0 until iters) if(random.nextFloat <= enqueueProb){ val x = random.nextInt(MaxVal) log.log(_.enqueue(x), "enqueue "+x, seqEnqueue(x), _.enqueue(x)) } else log.log(_.dequeue, "dequeue ", seqDequeue, _.dequeue) } type QueueCompLog = QueueCompetitionThreadLog[Int, USeqQueue, ConcQueue] /** A worker for a competition linearizability tester using the * queue-oriented algorithm and an undoable sequential specification. */ def QCWorker(me: Int, log: QueueCompLog) = { val random = new scala.util.Random(scala.util.Random.nextInt+me*45207) for(i <- 0 until iters) if(random.nextFloat <= enqueueProb){ val x = random.nextInt(MaxVal) log.logEnqueue(x, _.enqueue(x), _.enqueue(x)) } else log.logDequeue(_.dequeue, _.dequeue) } /** List of testers */ val testers0 = List( // Tree search "WGTree", "JITTree", // "DFSJIT", // Graph search "WGGraph", "JITGraph", "QLT", "BFSJIT", // Competition; tree-search algorithm name comes first "WG-WG-Comp", "JIT-WG-Comp", "WG-QLT-Comp", "JIT-QLT-Comp", "WG-JIT-Comp", "JIT-JIT-Comp" ) // List of options for defining testers val testers = testers0.map("--" + _) // List of queues val queues0 = List("lockFree", "recycle", "unbounded", "SHC", "lockedQueue") // "memoryBuggy", "syncChan" val queues = queues0.map("--" + _) val usage = """|scala -J-Xmx10g QueueTest | [""" + queues.mkString(" | ") + "]\n" + """| [""" + testers.mkString(" | ") + "]\n" + """| [--sharedLog] [--iters n] [--reps n] [--enqueueProb p]""" def main(args: Array[String]) = { // parse arguments var verbose = false; var i = 0 var timing = false var testerSt = "QLT" // which tester to use var reps = 1250 // Number of repetitions var capacity = 100 // capacity for bounded queue var p = 4 // Number of workers // var maxSize = -1 // max # iters while(i < args.length){ if(args(i) == "--timing"){ timing = true; i += 1 } else if(testers.contains(args(i))){ testerSt = args(i).drop(2); i += 1 } else if(queues.contains(args(i))){ queueType = args(i).drop(2); i += 1 } else if(args(i) == "-p"){ p = args(i+1).toInt; i += 2 } else if(args(i) == "--iters"){ iters = args(i+1).toInt; i += 2 } else if(args(i) == "--reps"){ reps = args(i+1).toInt; i += 2 } // else if(args(i) == "--maxSize"){ maxSize = args(i+1).toInt; i += 2 } else if(args(i) == "--sharedLog"){ tsLog = false; i += 1 } else if(args(i) == "--enqueueProb"){ enqueueProb = args(i+1).toDouble; i += 2 } else sys.error("Usage:\n"+usage.stripMargin) } // if(maxSize < 0) // set maxSize to default // maxSize = // if(testerStr == "WGGraph") MaxSizeWGG // else if(testerStr == "JITGraph") MaxSizeJITG // else if(testerStr == "QLT") maxSize else MaxSizeT // val invocs = p*iters // Number of invocations // --------- Parameters of each test // The shared concurrent queue def concQueue : ConcQueue = queueType match{ case "lockFree" => new ox.cads.collection.LockFreeQueue[Int] // case "memoryBuggy" => // new ox.cads.collection.LockFreeQueueMemoryBuggy[Int](p) // The above throws null-pointer exceptions more frequently than // giving linearization failures. case "recycle" => new ox.cads.collection.LockFreeQueueRecycle[Int](p) case "unbounded" => new ox.cads.collection.UnboundedQueue[Int] // case "syncChan" => new ox.cads.collection.SynchronousChannel[Int] case "SHC" => new SHCQueue[Int](capacity) // case "lockedQueue" => new LockedQueue[Int](10) } // The undoable and immutable specififation datatypes def uSeqQueue : USeqQueue = new UndoableQueue[Int] def imSeqQueue = Queue[Int]() // The testers; sequential def WGTreeT = LinearizabilityTester.WGTree[USeqQueue,ConcQueue]( uSeqQueue, concQueue, p, uWorker, iters, tsLog, MaxSizeT) // def DFSJITLinT = new DFSJITLinTester( // *** // imSeqQueue, concQueue, p, worker _, invocs, 10000000) def JITTree = LinearizabilityTester.JITTree[USeqQueue, ConcQueue]( uSeqQueue, concQueue, p, uWorker, iters, tsLog, MaxSizeT) def BFSJIT = LinearizabilityTester.BFSJIT[SeqQueue, ConcQueue]( imSeqQueue, concQueue, p, worker, iters, tsLog, MaxSizeT) def WGGraph = LinearizabilityTester.WGGraph[SeqQueue, ConcQueue]( imSeqQueue, concQueue, p, worker, iters, tsLog, MaxSizeWGG) def JITGraph = LinearizabilityTester.JITGraph[SeqQueue, ConcQueue]( imSeqQueue, concQueue, p, worker, iters, tsLog, MaxSizeJITG) def QLT = QueueLinTester[Int, ConcQueue]( concQueue, p, QLTWorker, iters, tsLog) // Competition testers; tree-search algorithm name comes first def WGWGComp = CompetitionTester.WGWG( compWorker, p, iters, concQueue, imSeqQueue, uSeqQueue, tsLog, MaxSizeT, MaxSizeWGG) def WGJITComp = CompetitionTester.WGJIT( // WG Tree, JIT Graph compWorker, p, iters, concQueue, imSeqQueue, uSeqQueue, tsLog, MaxSizeT, MaxSizeJITG) def JITWGComp = CompetitionTester.JITWG( // JIT Tree, WG Graph compWorker, p, iters, concQueue, imSeqQueue, uSeqQueue, tsLog, MaxSizeT, MaxSizeWGG) def JITJITComp = CompetitionTester.JITJIT( compWorker, p, iters, concQueue, imSeqQueue, uSeqQueue, tsLog, MaxSizeT, MaxSizeJITG) def WGQLTComp = QueueCompetitionTester.WG[Int, USeqQueue, ConcQueue]( QCWorker, p, iters, concQueue, uSeqQueue, tsLog) def JITQLTComp = QueueCompetitionTester.JIT[Int,USeqQueue,ConcQueue]( QCWorker, p, iters, concQueue, uSeqQueue, tsLog) // Now run the tests val t0 = java.lang.System.nanoTime var r = 0 var result = 1 while(r < reps && result > 0){ // Create and run the tester object result = testerSt match{ // Tree searches case "WGTree" => { val tester = WGTreeT; tester() } // case "DFSJIT" => { val tester = DFSJITLinT; assert(tester() >= 0) } case "JITTree" => { val tester = JITTree; tester() } // Graph searches case "BFSJIT" => { val tester = BFSJIT; tester() } case "WGGraph" => { val tester = WGGraph; tester() } case "JITGraph" => { val tester = JITGraph; tester() } // Special-purpose case "QLT" => { val tester = QLT; tester() } // Competition testers case "WG-WG-Comp" => { val tester = WGWGComp; tester() } case "WG-JIT-Comp" => { val tester = WGJITComp; tester() } case "JIT-WG-Comp" => { val tester = JITWGComp; tester() } case "JIT-JIT-Comp" => { val tester = JITJITComp; tester() } case "WG-QLT-Comp" => { val tester = WGQLTComp; tester() } case "JIT-QLT-Comp" => { val tester = JITQLTComp; tester() } } // end of testerSt match //assert(result > 0) r += 1 if(!timing && r%100 == 0) print(".") } // end of for loop val t1 = java.lang.System.nanoTime if(timing && result <= 0) println(ErrorSignal) else if(timing) println(t1-t0) else println("\n"+(t1-t0)/1000000) Profiler.report } }