Index: /ccsd/trunk/crcnetd/modules/ccs_status.py
===================================================================
--- /ccsd/trunk/crcnetd/modules/ccs_status.py	(revision 991)
+++ /ccsd/trunk/crcnetd/modules/ccs_status.py	(revision 991)
@@ -0,0 +1,247 @@
+# Copyright (C) 2006  The University of Waikato
+#
+# This file is part of crcnetd - CRCnet Configuration System Daemon
+#
+# Status Module - Keeps track of the status of hosts in the system
+#
+# Author:       Matt Brown <matt@crc.net.nz>
+# Version:      $Id$
+#
+# crcnetd is free software; you can redistribute it and/or modify it under the
+# terms of the GNU General Public License version 2 as published by the Free
+# Software Foundation.
+#
+# crcnetd is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+# details.
+#
+# You should have received a copy of the GNU General Public License along with
+# crcnetd; if not, write to the Free Software Foundation, Inc., 
+# 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+import os.path
+import threading
+from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher
+from pysnmp.carrier.asynsock.dgram import udp
+from pyasn1.codec.ber import encoder, decoder
+from pysnmp.proto import api
+from time import time
+import random
+
+from crcnetd._utils.ccsd_common import *
+from crcnetd._utils.ccsd_log import *
+from crcnetd._utils.ccsd_events import *
+from crcnetd._utils.ccsd_session import getSession, getSessionE
+from crcnetd._utils.ccsd_server import exportViaXMLRPC, initThread
+from crcnetd._utils.ccsd_cfengine import getCfengineHostStatus
+from crcnetd.modules.ccs_host import ccs_host, getHostList
+
+ccs_mod_type = CCSD_SERVER
+
+DEFAULT_CHECK_INTERVAL = 3*60
+
+CCS_REVISION_OID = (1,3,6,1,4,1,15120,1,3,1)
+LOAD_AVG1_OID = (1,3,6,1,4,1,15120,1,3,1)
+LOAD_AVG5_OID = (1,3,6,1,4,1,15120,1,3,1)
+LOAD_AVG15_OID = (1,3,6,1,4,1,15120,1,3,1)
+UPTIME_OID = (1,3,6,1,4,1,15120,1,3,1)
+
+class ccs_status_error(ccsd_error):
+    pass
+
+class ccs_host_status(ccs_class):
+    """Monitors the status of a host in the system"""
+    
+    def __init__(self, session_id, host_id, interval):
+        """Initialises a new class for a specified host.
+
+        The specified session must be valid and have appropriate access to
+        the database.
+        """
+        
+        self.lock = threading.RLock()
+
+        session = getSession(session_id)
+        if session is None:
+            raise ccs_host_error("Invalid session id")
+        self._session_id = session_id
+        
+        # See if the specified host id makes sense
+        self.host = ccs_host(session_id, host_id)
+
+        # Initialise the status information
+        self.reachable = False
+        self.operatingRevision = -1
+        self.currentLoad = (-1,-1,-1)
+        self.uptime = 0
+        self.infoUpdatedAt = 0
+        # Choose a random number of seconds up to half of the check interval 
+        # long and pretend the last check happened at that time so we
+        # do some naieve staggering to avoid checking every host at once
+        delay = random.randint((interval)/2, interval)
+        self.lastCheckedAt = time.time() - delay
+        log_debug("Initialised host status monitor for %s with %d sec " \
+                "stagger" % (self.host["host_name"], (interval)-delay))
+        
+    def update(self):
+        """Called by the processing thread to trigger an update of the data"""
+        ip = self.host["ip_address"]
+        
+        # Grab the lock
+        lock = self.lock
+        lock.acquire()
+        try:
+            # Check that the host is reachable
+            if not isHostUp(ip):
+                self.reachable = False
+                self.lastCheckedAt = time.time()
+                return
+            self.reachable = True
+            # Retrieve host status information
+            pMod = api.protoModules[api.protoVersion1]
+            reqPDU =  pMod.GetRequestPDU()
+            pMod.apiPDU.setDefaults(reqPDU)
+            pMod.apiPDU.setVarBinds(
+                    reqPDU, ((CCS_REVISION_OID, pMod.Null()),
+                        (LOAD_AVG1_OID, pMod.Null()),
+                        (LOAD_AVG5_OID, pMod.Null()),
+                        (LOAD_AVG15_OID, pMod.Null()),
+                        (UPTIME_OID, pMod.Null())
+                        )
+                    )
+            # Build message
+            reqMsg = pMod.Message()
+            pMod.apiMessage.setDefaults(reqMsg)
+            pMod.apiMessage.setCommunity(reqMsg, 'public')
+            pMod.apiMessage.setPDU(reqMsg, reqPDU)
+            # Dispatch Request
+            transportDispatcher = AsynsockDispatcher()
+            transportDispatcher.registerTransport(
+                    udp.domainName, udp.UdpSocketTransport().openClientMode()
+            )
+
+            startedAt = time.time()
+            def cbTimerFun(timeNow):
+                if timeNow - startedAt > 3:
+                    transportDispatcher.jobFinished(1)
+                    transportDispatcher.closeDispatcher()
+                    raise ccs_status_error("Request timed out")
+    
+            def cbRecvFun(transportDispatcher, transportDomain, \
+                    transportAddress, wholeMsg, reqPDU=reqPDU):
+                while wholeMsg:
+                    rspMsg, wholeMsg = decoder.decode(wholeMsg, \
+                            asn1Spec=pMod.Message())
+                    rspPDU = pMod.apiMessage.getPDU(rspMsg)
+                    # Match response to request
+                    if pMod.apiPDU.getRequestID(reqPDU) == \
+                            pMod.apiPDU.getRequestID(rspPDU):
+                        # Check for SNMP errors reported
+                        errorStatus = pMod.apiPDU.getErrorStatus(rspPDU)
+                        if errorStatus:
+                            transportDispatcher.jobFinished(1)
+                            transportDispatcher.closeDispatcher()
+                            raise ccs_status_error(errorStatus.prettyPrint())
+                        else:
+                            la1 = 0
+                            la5 = 0
+                            la15 = 0
+                            for oid, val in pMod.apiPDU.getVarBinds(rspPDU):
+                                if oid == CCS_REVISION_OID:
+                                    self.operatingRevision = val
+                                elif oid == LOAD_AVG1_OID:
+                                    la1 = val
+                                elif oid == LOAD_AVG5_OID:
+                                    la5 = val
+                                elif oid == LOAD_AVG15_OID:
+                                    la15 = val
+                                elif oid == UPTIME_OID:
+                                    self.uptime = val
+                            self.currentLoad = (la1, la5, la15)
+                        transportDispatcher.jobFinished(1)
+                return wholeMsg
+
+            transportDispatcher.registerRecvCbFun(cbRecvFun)
+            transportDispatcher.registerTimerCbFun(cbTimerFun)
+            transportDispatcher.sendMessage(
+                    encoder.encode(reqMsg), udp.domainName, (ip, 161)
+            )
+            transportDispatcher.jobStarted(1)
+            transportDispatcher.runDispatcher()
+            transportDispatcher.closeDispatcher()
+            # Record that the details were updated
+            self.infoUpdatedAt = time.time()
+            self.lastCheckedAt = time.time()
+            log_info("Updated status details for %s" % self.host["host_name"])
+        finally:
+            lock.release()
+
+    def __getattribute__(self, name):
+        """Override the default accessor to put a lock around all accesses"""
+
+        lock = object.__getattribute__(self, "lock")
+        lock.acquire()
+        try:
+            val = object.__getattribute__(self, name)
+            return val
+        finally:
+            lock.release()
+    
+def getHostStatus(host_name):
+    """Returns the host status object for the specified host"""
+    global _statusInfo
+
+    if host_name not in _statusInfo.keys():
+        raise ccs_status_error("No status information for %s" % host_name)
+
+    return _statusInfo[host_name]
+
+@catchEvent("shutdown")
+def shutdownHandler(*args, **kwargs):
+    global _runStatus
+    _runStatus = False
+
+_statusInfo = {}    
+_runStatus = True
+def statusThread():
+    """Runs as a thread to keep the status information up to date"""
+    global _statusInfo, _runStatus
+
+    try:
+        # What interval shall we check hosts at
+        interval = config_get("status", "interval", DEFAULT_CHECK_INTERVAL)
+        
+        # Initialise the host status information
+        hosts = getHostList(ADMIN_SESSION_ID)
+        for host in getHostList(ADMIN_SESSION_ID):
+            if not host["host_active"]:
+                continue
+            name = host["host_name"]
+            _statusInfo[name] = ccs_host_status(ADMIN_SESSION_ID, \
+                    host["host_id"], interval)
+        
+        # Loop forever reading status as appropriate
+        lastCheck = time.time()-31
+        while _runStatus:
+            # If less than 30 seconds since the last check, wait a bit
+            if time.time() - lastCheck < 30:
+                time.sleep(2)
+                continue
+            # See which hosts need checking now
+            for host_name, host_status in _statusInfo.items():
+                if time.time() - host_status.lastCheckedAt > interval:
+                    try:
+                        host_status.update()
+                    except:
+                        log_error("Failed to update status of %s" % \
+                                host_name, sys.exc_info())
+            # Record when we last ran some checks
+            lastCheck = time.time()
+    except:
+        log_error("Exception in status monitor thread!", sys.exc_info())
+
+    log_info("Exiting status monitor thread")
+        
+def ccs_init():
+    # Start the status information thread
+    initThread(statusThread)
