Maintains a pool of object instances which are expected to be communicating with “the outside world” in some way. A message sent to the pool is replicated to all the communicators in that pool. Some communicators are fast, which means they are called synchronously and expected to return very quickly. Some communicators are slow, which means they are called asynchronously through a work queue.

See add for more information.

Namespace
Methods
A
C
N
R
T
W
Constants
MAX_SLOW_QUEUE_SIZE =
50
 

Hoodoo::Communicators::Slow subclass communicators are called in their own Threads via a processing Queue. There is the potential for a flood of communications to cause the queue to back up considerably, so a maximum number of messages is defined. If the queue size is _equal to or greater_ than this amount when a message arrives, it will be dropped and a 'dropped message' count incremented.

THREAD_EXIT_TIMEOUT =
5
 

When asking slow communicator threads to exit, a timeout must be used in case the thread doesn't seem to be responsive. This is the timeout value in seconds - it can take a floating point or integer value.

THREAD_WAIT_TIMEOUT =
5
 

Analogous to THREAD_WAIT_TIMEOUT but used when waiting for a processing Thread to drain its Queue, without asking it to exit.

Attributes
[RW] group

Retrieve the ThreadGroup instance managing the collection of slow communicator threads. This is mostly used for testing purposes and has little general purpose utility.

Class Public methods
new()

Create a new pool of communicators - instances of subclasses of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow, are added with add and called with communicate.

# File lib/hoodoo/communicators/pool.rb, line 57
def initialize
  @pool  = {}
  @group = ::ThreadGroup.new
end
Instance Public methods
add( communicator )

Add a communicator instance to the pool. Future calls to communicate will call the same-named method in that instance.

Subclasses of Hoodoo::Communicators::Slow are called within a processing Thread. Subclasses of Hoodoo::Communicators::Fast are called inline. The instances are called in the order of addition, but since each slow communicator runs in its own Thread, the execution order is indeterminate for such instances.

If a slow communicator's inbound message queue length matches or exceeds MAX_SLOW_QUEUE_SIZE, messages for that specific communicator will start being dropped until the communicator clears the backlog and at last one space opens on the queue. Slow communicators can detect when this has happened by implementing Hoodoo::Communicators::Slow#dropped in the subclass.

If you pass the same instance more than once, the subsequent calls are ignored. You can add many instances of the same class if that's useful for any reason.

Returns the passed-in communicator instance parameter, for convenience.

communicator

Instance is to be added to the pool. Must be either a Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow subclass instance.

# File lib/hoodoo/communicators/pool.rb, line 88
def add( communicator )
  unless ( communicator.class < Hoodoo::Communicators::Fast ||
           communicator.class < Hoodoo::Communicators::Slow )
    raise "Hoodoo::Communicators::Pool\#add must be called with an instance of a subclass of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow only"
  end

  return if @pool.has_key?( communicator )

  if communicator.is_a?( Hoodoo::Communicators::Fast )
    add_fast_communicator( communicator )
  else
    add_slow_communicator( communicator )
  end

  return communicator
end
communicate( object )

Call the communicate method on each communicator instance added via add. Each instance is called in the same order as corresponding calls are made to the pool. Across instances, fast communicators are called in the order they were added to the pool, but since each slow communicator runs in its own Thread, execution order is indeterminate.

object

Parameter passed to the communicator subclass instance communicate methods.

# File lib/hoodoo/communicators/pool.rb, line 147
def communicate( object )
  @pool.each do | communicator, item |

    if item.has_key?( :fast )
      begin
        communicator.communicate( object )
      rescue => exception
        handle_exception( exception, communicator )
      end

    else
      data       = item[ :slow       ]
      thread     = data[ :thread     ]
      work_queue = data[ :work_queue ]

      # This is inaccurate if one or more "dropped messages" reports are
      # on the queue, but since some communicators might report them in
      # the same way as other messages, it's not necessarily incorrect
      # either.
      #
      if work_queue.size < MAX_SLOW_QUEUE_SIZE
        dropped = thread[ :dropped_messages ]

        if dropped > 0
          thread[ :dropped_messages ] = 0

          # Opposite of comment above on MAX_SLOW_QUEUE_SIZE check...
          # Yes, this takes up a queue entry and the payload addition
          # afterwards might take it one above max size, but that's OK
          # since this is just a "dropped messages" report and though
          # some communicators might deal with them slowly, others may
          # just ignore them.
          #
          work_queue << QueueEntry.new( dropped: dropped )
        end

        work_queue << QueueEntry.new( payload: object )

      else
        thread[ :dropped_messages ] += 1

      end
    end

  end
end
remove( communicator )

Remove a communicator previously added by add. See that for details.

It is harmless to try and remove communicator instances more than once or to try to remove something that was never added in the first place; the call simply has no side effects.

If removing a slow communicator, its thread will be terminated with default timeout value of THREAD_EXIT_TIMEOUT seconds. For this reason, removing a slow communicator may take a long time.

Returns the passed-in communicator instance parameter, for convenience.

communicator

Instance is to be removed from the pool. Must be either a Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow subclass instance.

# File lib/hoodoo/communicators/pool.rb, line 121
def remove( communicator )
  unless ( communicator.class < Hoodoo::Communicators::Fast ||
           communicator.class < Hoodoo::Communicators::Slow )
    raise "Hoodoo::Communicators::Pool\#remove must be called with an instance of a subclass of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow only"
  end

  return unless @pool.has_key?( communicator )

  if communicator.is_a?( Hoodoo::Communicators::Fast )
    remove_fast_communicator( communicator )
  else
    remove_slow_communicator( communicator )
  end

  return communicator
end
terminate( per_instance_timeout: THREAD_EXIT_TIMEOUT )

The communication pool is “emptied” by this call, going back to a clean state as if just initialised. New workers can be added via add and then called via communicate if you so wish.

Hoodoo::Communciators::Fast subclass instances are removed immediately without complications.

Hoodoo::Communicators::Slow subclass instances in the communication pool are called via a worker Thread; this method shuts down all such worker Threads, clearing their work queues and asking each one to exit (politely). There is no mechanism (other than overall Ruby process exit) available to shut down the Threads by force, so some Threads may not respond and time out.

When this method exits, all workers will have either exited or timed out and possibly still be running, but are considered too slow or dead. No further communications are made to them.

The following named parameters are supported:

per_instance_timeout

Timeout for each slow communicator Thread in seconds. Optional. Default is the value in THREAD_EXIT_TIMEOUT. For example, with three slow communicators in the pool and all three reached a 5 second timeout, the termination method would not return for 15 seconds (3 * 5 seconds full timeout).

# File lib/hoodoo/communicators/pool.rb, line 275
def terminate( per_instance_timeout: THREAD_EXIT_TIMEOUT )
  loop do
    klass, item = @pool.shift() # Hash#shift -> remove a key/value pair.
    break if klass.nil?

    next unless item.has_key?( :slow )
    data = item[ :slow ]

    request_termination_for(
      thread:     data[ :thread     ],
      work_queue: data[ :work_queue ],
      timeout:    per_instance_timeout
    )
  end
end
wait( per_instance_timeout: THREAD_WAIT_TIMEOUT, communicator: nil )

This method is only useful if there are any Hoodoo::Communicators::Slow subclass instances in the communication pool. Each instance is called via a worker Thread; this method waits for each communicator to drain its queue before returning. This is useful if you have a requirement to wait for all communications to finish on all threads, presumably for wider synchronisation reasons.

Since fast communicators are called synchronously there is never any need to wait for them, so this call ignores such pool entries.

The following named parameters are supported:

per_instance_timeout

Timeout for each slow communicator Thread in seconds. Optional. Default is the value in THREAD_WAIT_TIMEOUT.

communicator

If you want to wait for specific instance only (see add), pass it here. If the instance is a fast communicator, or any object not added to the pool, then there is no error raised. The method simply returns immediately.

# File lib/hoodoo/communicators/pool.rb, line 216
def wait( per_instance_timeout: THREAD_WAIT_TIMEOUT,
          communicator:         nil )

  if communicator.nil?
    @pool.each do | communicator, item |
      next unless item.has_key?( :slow )
      data = item[ :slow ]

      wait_for(
        work_queue: data[ :work_queue ],
        sync_queue: data[ :sync_queue ],
        timeout:    per_instance_timeout
      )
    end

  else
    return unless @pool.has_key?( communicator )
    item = @pool[ communicator ]

    return unless item.has_key?( :slow )
    data = item[ :slow ]

    wait_for(
      work_queue: data[ :work_queue ],
      sync_queue: data[ :sync_queue ],
      timeout:    per_instance_timeout
    )

  end
end