[Avida-cvs] [avida-svn] r746 - in development: Avida.xcodeproj source/actions source/analyze source/platform/win32-pthread

brysonda@myxo.css.msu.edu brysonda at myxo.css.msu.edu
Mon Jun 12 12:27:24 PDT 2006


Author: brysonda
Date: 2006-06-12 15:27:24 -0400 (Mon, 12 Jun 2006)
New Revision: 746

Modified:
   development/Avida.xcodeproj/project.pbxproj
   development/source/actions/LandscapeActions.cc
   development/source/analyze/cAnalyzeJobQueue.cc
   development/source/analyze/cAnalyzeJobQueue.h
   development/source/analyze/cAnalyzeJobWorker.cc
   development/source/platform/win32-pthread/pthread.h
Log:
Improve cAnalyzeJobQueue and cAnalyzeJobWorker to implement a more complete thread pool paradigm.   

JobQueue now creates and starts the threads upon initialization.  Also, AddJob is now thread safe, allowing for calls from within executing jobs on worker threads.  This avenue should be pursued *carefully* when implementing analyze commands, though, as thread scheduling can affect the order in which jobs are placed into the queue, and thus the Random object assigned.  Non-random analyze commands will be unaffected, though, and can now be added at will.   Additionally, a new method AddJobImmediate can be used to wake a sleeping worker (if there exist any) for the job upon insertion.  Useful for boot-strapping a threaded recursive job expansion, for example.

JobWorkers now use condition variables to sleep until notified by the queue, remaining available until the queue is deleted. Also, fixed memory leak within worker.  Job objects were not properly deleted after execution.

Modified: development/Avida.xcodeproj/project.pbxproj
===================================================================
--- development/Avida.xcodeproj/project.pbxproj	2006-06-09 20:07:35 UTC (rev 745)
+++ development/Avida.xcodeproj/project.pbxproj	2006-06-12 19:27:24 UTC (rev 746)
@@ -473,7 +473,7 @@
 		700E12610A097A0800B604CD /* CMakeLists.txt */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = text; path = CMakeLists.txt; sourceTree = "<group>"; };
 		700E12630A097A1700B604CD /* CMakeLists.txt */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = text; path = CMakeLists.txt; sourceTree = "<group>"; };
 		700E28CF0859FFD700CF158A /* tObjectFactory.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = tObjectFactory.h; sourceTree = "<group>"; };
-		700E2B83085DE50C00CF158A /* avida-viewer */ = {isa = PBXFileReference; includeInIndex = 0; lastKnownFileType = "compiled.mach-o.executable"; path = "avida-viewer"; sourceTree = BUILT_PRODUCTS_DIR; };
+		700E2B83085DE50C00CF158A /* avida-viewer */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.executable"; includeInIndex = 0; path = "avida-viewer"; sourceTree = BUILT_PRODUCTS_DIR; };
 		7013845F09028B3E0087ED2E /* cAvidaConfig.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = cAvidaConfig.h; sourceTree = "<group>"; };
 		7013846009028B3E0087ED2E /* cAvidaConfig.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = cAvidaConfig.cc; sourceTree = "<group>"; };
 		701384A10902A16F0087ED2E /* defs.h */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = sourcecode.c.h; path = defs.h; sourceTree = "<group>"; };
@@ -943,7 +943,7 @@
 		DCC315CF076253A5008F7A48 /* Makefile */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = sourcecode.make; path = Makefile; sourceTree = "<group>"; };
 		DCC315D0076253A5008F7A48 /* task_event_gen.cc */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = sourcecode.cpp.cpp; path = task_event_gen.cc; sourceTree = "<group>"; };
 		DCC315D1076253A5008F7A48 /* task_event_gen.old.cc */ = {isa = PBXFileReference; fileEncoding = 30; lastKnownFileType = sourcecode.cpp.cpp; path = task_event_gen.old.cc; sourceTree = "<group>"; };
-		DCC3164D07626CF3008F7A48 /* avida */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.executable"; includeInIndex = 0; path = avida; sourceTree = BUILT_PRODUCTS_DIR; };
+		DCC3164D07626CF3008F7A48 /* avida */ = {isa = PBXFileReference; includeInIndex = 0; lastKnownFileType = "compiled.mach-o.executable"; path = avida; sourceTree = BUILT_PRODUCTS_DIR; };
 /* End PBXFileReference section */
 
 /* Begin PBXFrameworksBuildPhase section */

Modified: development/source/actions/LandscapeActions.cc
===================================================================
--- development/source/actions/LandscapeActions.cc	2006-06-09 20:07:35 UTC (rev 745)
+++ development/source/actions/LandscapeActions.cc	2006-06-12 19:27:24 UTC (rev 746)
@@ -348,7 +348,7 @@
 void RegisterLandscapeActions(cActionLibrary* action_lib)
 {
   action_lib->Register<cActionAnalyzeLandscape>("AnalyzeLandscape");
-  action_lib->Register<cActionRandomLandscape>("FullLandscape");
+  action_lib->Register<cActionFullLandscape>("FullLandscape");
   action_lib->Register<cActionRandomLandscape>("RandomLandscape");
   action_lib->Register<cActionSampleLandscape>("SampleLandscape");
 }

Modified: development/source/analyze/cAnalyzeJobQueue.cc
===================================================================
--- development/source/analyze/cAnalyzeJobQueue.cc	2006-06-09 20:07:35 UTC (rev 745)
+++ development/source/analyze/cAnalyzeJobQueue.cc	2006-06-12 19:27:24 UTC (rev 746)
@@ -12,46 +12,89 @@
 #include "cAnalyzeJobWorker.h"
 #include "cWorld.h"
 #include "cWorldDriver.h"
-#include "tArray.h"
 
 
 #include "defs.h"
 
-#include <iostream>
-using namespace std;
 
-
-cAnalyzeJobQueue::cAnalyzeJobQueue(cWorld* world) : m_world(world), m_last_jobid(0)
+cAnalyzeJobQueue::cAnalyzeJobQueue(cWorld* world)
+: m_world(world), m_last_jobid(0), m_jobs(0), m_pending(0), m_workers(world->GetConfig().MT_CONCURRENCY.Get())
 {
   for (int i = 0; i < MT_RANDOM_POOL_SIZE; i++) {
     m_rng_pool[i] = new cRandomMT(world->GetRandom().GetInt(0x7FFFFFFF));
   }
   
   pthread_mutex_init(&m_mutex, NULL);
+  pthread_cond_init(&m_cond, NULL);
+  pthread_cond_init(&m_term_cond, NULL);
+
+  for (int i = 0; i < m_workers.GetSize(); i++) {
+    m_workers[i] = new cAnalyzeJobWorker(this);
+    m_workers[i]->Start();
+  }
 }
 
 cAnalyzeJobQueue::~cAnalyzeJobQueue()
 {
+  const int num_workers = m_workers.GetSize();
+  
+  pthread_mutex_lock(&m_mutex);
+  
+  // Clean out any waiting jobs
   cAnalyzeJob* job;
   while (job = m_queue.Pop()) delete job;
+  
+  // Set job count so that all workers receive NULL jobs
+  m_jobs = num_workers;
+  
+  pthread_mutex_unlock(&m_mutex);
+  
+  // Signal all workers to check job queue
+  pthread_cond_broadcast(&m_cond);
+  
+  for (int i = 0; i < num_workers; i++) {
+    m_workers[i]->Join();
+    delete m_workers[i];
+  }
+  
   pthread_mutex_destroy(&m_mutex);
+  pthread_cond_destroy(&m_cond);
 }
 
+void cAnalyzeJobQueue::AddJob(cAnalyzeJob* job)
+{
+  pthread_mutex_lock(&m_mutex);
+  job->SetID(m_last_jobid++);
+  m_queue.PushRear(job);
+  m_jobs++;
+  pthread_mutex_unlock(&m_mutex);
+}
+
+void cAnalyzeJobQueue::AddJobImmediate(cAnalyzeJob* job)
+{
+  pthread_mutex_lock(&m_mutex);
+  job->SetID(m_last_jobid++);
+  m_queue.PushRear(job);
+  m_jobs++;
+  pthread_mutex_unlock(&m_mutex);
+  pthread_cond_signal(&m_cond);
+}
+
+
 void cAnalyzeJobQueue::Execute()
 {
-  const int num_workers = m_world->GetConfig().MT_CONCURRENCY.Get();
-  tArray<cAnalyzeJobWorker*> workers(num_workers);
-
   if (m_world->GetConfig().VERBOSITY.Get() >= VERBOSE_DETAILS)
-    m_world->GetDriver().NotifyComment("Going Multithreaded...");
+    m_world->GetDriver().NotifyComment("waking worker threads...");
 
-  for (int i = 0; i < num_workers; i++) {
-    workers[i] = new cAnalyzeJobWorker(this);
-    workers[i]->Start();
+  pthread_cond_broadcast(&m_cond);
+  
+  // Wait for term signal
+  pthread_mutex_lock(&m_mutex);
+  while (m_jobs > 0 || m_pending > 0) {
+    pthread_cond_wait(&m_term_cond, &m_mutex);
   }
+  pthread_mutex_unlock(&m_mutex);
 
-  for (int i = 0; i < num_workers; i++) {
-    workers[i]->Join();
-    delete workers[i];
-  }
+  if (m_world->GetConfig().VERBOSITY.Get() >= VERBOSE_DETAILS)
+    m_world->GetDriver().NotifyComment("job queue complete");
 }

Modified: development/source/analyze/cAnalyzeJobQueue.h
===================================================================
--- development/source/analyze/cAnalyzeJobQueue.h	2006-06-09 20:07:35 UTC (rev 745)
+++ development/source/analyze/cAnalyzeJobQueue.h	2006-06-12 19:27:24 UTC (rev 746)
@@ -13,6 +13,9 @@
 #ifndef cAnalyzeJob
 #include "cAnalyzeJob.h"
 #endif
+#ifndef tArray_h
+#include "tArray.h"
+#endif
 #ifndef tList_h
 #include "tList.h"
 #endif
@@ -28,6 +31,7 @@
 const int MT_RANDOM_POOL_SIZE = 16;
 const int MT_RANDOM_INDEX_MASK = 0xF;
 
+
 class cAnalyzeJobQueue
 {
   friend class cAnalyzeJobWorker;
@@ -38,7 +42,15 @@
   int m_last_jobid;
   cRandomMT* m_rng_pool[MT_RANDOM_POOL_SIZE];
   pthread_mutex_t m_mutex;
+  pthread_cond_t m_cond;
+  pthread_cond_t m_term_cond;
   
+  volatile int m_jobs;      // count of waiting jobs, used in pthread_cond constructs
+  volatile int m_pending;   // count of currently executing jobs
+  
+  tArray<cAnalyzeJobWorker*> m_workers;
+
+  
   cAnalyzeJobQueue(); // @not_implemented
   cAnalyzeJobQueue(const cAnalyzeJobQueue&); // @not_implemented
   cAnalyzeJobQueue& operator=(const cAnalyzeJobQueue&); // @not_implemented
@@ -48,7 +60,8 @@
   cAnalyzeJobQueue(cWorld* world);
   ~cAnalyzeJobQueue();
 
-  void AddJob(cAnalyzeJob* job) { job->SetID(m_last_jobid++); m_queue.PushRear(job); } // @DMB - Warning: NOT thread safe
+  void AddJob(cAnalyzeJob* job);
+  void AddJobImmediate(cAnalyzeJob* job);
 
   void Execute();
   

Modified: development/source/analyze/cAnalyzeJobWorker.cc
===================================================================
--- development/source/analyze/cAnalyzeJobWorker.cc	2006-06-09 20:07:35 UTC (rev 745)
+++ development/source/analyze/cAnalyzeJobWorker.cc	2006-06-12 19:27:24 UTC (rev 746)
@@ -12,6 +12,7 @@
 #include "cAnalyzeJobQueue.h"
 #include "cAvidaContext.h"
 
+
 void cAnalyzeJobWorker::Run()
 {
   cAvidaContext ctx(NULL);
@@ -21,13 +22,29 @@
   
   while (1) {
     pthread_mutex_lock(&m_queue->m_mutex);
+    while (m_queue->m_jobs == 0) {
+      pthread_cond_wait(&m_queue->m_cond, &m_queue->m_mutex);
+    }
     job = m_queue->m_queue.Pop();
+    m_queue->m_jobs--;
+    m_queue->m_pending++; 
     pthread_mutex_unlock(&m_queue->m_mutex);
     
     if (job) {
+      // Set RNG from the waiting pool and execute the job
       ctx.SetRandom(m_queue->GetRandom(job->GetID()));
       job->Run(ctx);
+      delete job;
+      pthread_mutex_lock(&m_queue->m_mutex);
+      int pending = --m_queue->m_pending;
+      pthread_mutex_unlock(&m_queue->m_mutex);
+      if (!pending) pthread_cond_signal(&m_queue->m_term_cond);
     } else {
+      // Terminate worker on NULL job receipt
+      pthread_mutex_lock(&m_queue->m_mutex);
+      int pending = --m_queue->m_pending;
+      pthread_mutex_unlock(&m_queue->m_mutex);
+      if (!pending) pthread_cond_signal(&m_queue->m_term_cond);
       break;
     }
   }

Modified: development/source/platform/win32-pthread/pthread.h
===================================================================
--- development/source/platform/win32-pthread/pthread.h	2006-06-09 20:07:35 UTC (rev 745)
+++ development/source/platform/win32-pthread/pthread.h	2006-06-12 19:27:24 UTC (rev 746)
@@ -20,6 +20,14 @@
 #define pthread_mutex_unlock(x)
 #define pthread_mutex_destroy(x)
 
+// Just define away condition variable support
+#define pthread_cond_t int
+#define pthread_cond_init(x)
+#define pthread_cond_wait(x, y)
+#define pthread_cond_signal(x)
+#define pthread_cond_broadcast(x)
+#define pthread_cond_destroy(x)
+
 // Define away pthread support
 #define pthread_t int
 #define pthread_exit(x)




More information about the Avida-cvs mailing list