[prev in list] [next in list] [prev in thread] [next in thread] 

List:       ruby-talk
Subject:    [suggestion+code] timed deq for the Queue class?
From:       "Andrew S. Townley" <andrew.townley () bearingpoint ! com>
Date:       2005-08-31 20:10:02
Message-ID: 1125518993.4187.4.camel () macross
[Download RAW message or body]

Hi,

Based on my earlier emails and further thinking, I've implemented the
following methods which seem to work and which other people might find
useful.  No guarantees that they're 100% correct, but it seems to do
what I want.  Would it be possible to add something like this to the
standard Queue?

$ cat fu.rb
require 'thread'
require 'timeout'
 
class TimedReadQueue < Queue
   
  # Waits for the specified timeout (seconds) and returns,
  # waking up the accessing thread.  This method avoids a
  # problem with using Timeout and Queue together to achieve
  # the same result.
 
  def tdeq(timeout)
    data = nil
     
    if(empty?)
      mt = Thread.current
 
      # This thread will wait for the timeout.  If it is still
      # alive, it will remove the read thread (rt) from the
      # waiting list, kill it and restart the main thread.
 
      rt = nil
      tt = Thread.new do
        Thread.stop; sleep(timeout); @waiting.delete(rt); mt.wakeup
        __timeout
      end
       
      # This thread will actually try and read the data.  If
      # it gets data, it will kill the timer and wake up the
      # main thread.
 
      rt = Thread.new do
        Thread.stop; data = deq; __arnold(tt); mt.wakeup
        __read
      end
 
      begin
        tt.run
        rt.run
        Thread.stop
      ensure
        mt.wakeup
        __arnold(tt)
        __arnold(rt)
      end
    else
      # just read the data
      begin
        data = deq(true)
      rescue ThreadError
        # don't care
        data = nil
      end
    end
 
    # return the data value
    data
  end
 
private
 
  # This method takes a thread and unconditionally terminates
  # it, completely and utterly
 
  def __arnold(thread)
    thread.kill if thread && thread.alive?
  end
 
  def __timeout
    puts("TIMEOUT:  #{length} elements; #{num_waiting} threads waiting.")
  end
 
  def __read
    puts("READ:     #{length} elements; #{num_waiting} threads waiting.")
  end
end

The entire program (with minimal, manual tests) is also attached.

Any/all feedback more than welcome.

Cheers,

ast


***************************************************************************************************
 The information in this email is confidential and may be legally privileged.  Access \
to this email by anyone other than the intended addressee is unauthorized.  If you \
are not the intended recipient of this message, any review, disclosure, copying, \
distribution, retention, or any action taken or omitted to be taken in reliance on it \
is prohibited and may be unlawful.  If you are not the intended recipient, please \
reply to or forward a copy of this message to the sender and delete the message, any \
                attachments, and any copies thereof from your system.
***************************************************************************************************



["fu.rb" (fu.rb)]

require 'thread'
require 'timeout'

class TimedReadQueue < Queue
  
  # Waits for the specified timeout (seconds) and returns,
  # waking up the accessing thread.  This method avoids a
  # problem with using Timeout and Queue together to achieve
  # the same result.

  def tdeq(timeout)
    data = nil
    
    if(empty?)
      mt = Thread.current

      # This thread will wait for the timeout.  If it is still
      # alive, it will remove the read thread (rt) from the
      # waiting list, kill it and restart the main thread.

      rt = nil
      tt = Thread.new do
        Thread.stop; sleep(timeout); @waiting.delete(rt); mt.wakeup
        __timeout
      end
      
      # This thread will actually try and read the data.  If
      # it gets data, it will kill the timer and wake up the
      # main thread.

      rt = Thread.new do
        Thread.stop; data = deq; __arnold(tt); mt.wakeup
        __read
      end

      begin
        tt.run
        rt.run
        Thread.stop
      ensure
        mt.wakeup
        __arnold(tt)
        __arnold(rt)
      end
    else
      # just read the data
      begin
        data = deq(true)
      rescue ThreadError
        # don't care
        data = nil
      end
    end

    # return the data value
    data
  end

private

  # This method takes a thread and unconditionally terminates
  # it, completely and utterly

  def __arnold(thread)
    thread.kill if thread && thread.alive?
  end

  def __timeout
    puts("TIMEOUT:  #{length} elements; #{num_waiting} threads waiting.")
  end

  def __read
    puts("READ:     #{length} elements; #{num_waiting} threads waiting.")
  end
end


@queue = Queue.new

def read(timeout)
  begin
    Timeout::timeout(timeout) do
      puts("READ:   #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
      return @queue.deq
    end
  rescue Timeout::Error
    puts("TIMEOUT:  #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
  end
end

puts "Normal queue and timer..."
read(1); read(1); read(1); read(1); read(1); read(1)
@queue << "one"
read(1); read(1); read(1); read(1); read(1); read(1)

puts "Timed read queue..."
q2 = TimedReadQueue.new
q2.tdeq(1); q2.tdeq(1); q2.tdeq(1); q2.tdeq(1); q2.tdeq(1); q2.tdeq(1)
q2 << "one"
q2.tdeq(1); q2.tdeq(1); q2.tdeq(1); q2.tdeq(1); q2.tdeq(1); q2.tdeq(1)
puts("Q2: #{q2.length} elements; #{q2.num_waiting} threads waiting.")

puts Thread.list.join(", ")


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic