source: ccsd/trunk/crcnetd/modules/ccs_status.py @ 991

Last change on this file since 991 was 991, checked in by mglb1, 7 years ago

Add the initial structure of the host status module that runs in a
separate thread and regularly collects information from each host
about it's current view of the world

  • Property svn:keywords set to Id
File size: 9.5 KB
Line 
1# Copyright (C) 2006  The University of Waikato
2#
3# This file is part of crcnetd - CRCnet Configuration System Daemon
4#
5# Status Module - Keeps track of the status of hosts in the system
6#
7# Author:       Matt Brown <matt@crc.net.nz>
8# Version:      $Id$
9#
10# crcnetd is free software; you can redistribute it and/or modify it under the
11# terms of the GNU General Public License version 2 as published by the Free
12# Software Foundation.
13#
14# crcnetd is distributed in the hope that it will be useful, but WITHOUT ANY
15# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
17# details.
18#
19# You should have received a copy of the GNU General Public License along with
20# crcnetd; if not, write to the Free Software Foundation, Inc.,
21# 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
22import os.path
23import threading
24from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher
25from pysnmp.carrier.asynsock.dgram import udp
26from pyasn1.codec.ber import encoder, decoder
27from pysnmp.proto import api
28from time import time
29import random
30
31from crcnetd._utils.ccsd_common import *
32from crcnetd._utils.ccsd_log import *
33from crcnetd._utils.ccsd_events import *
34from crcnetd._utils.ccsd_session import getSession, getSessionE
35from crcnetd._utils.ccsd_server import exportViaXMLRPC, initThread
36from crcnetd._utils.ccsd_cfengine import getCfengineHostStatus
37from crcnetd.modules.ccs_host import ccs_host, getHostList
38
39ccs_mod_type = CCSD_SERVER
40
41DEFAULT_CHECK_INTERVAL = 3*60
42
43CCS_REVISION_OID = (1,3,6,1,4,1,15120,1,3,1)
44LOAD_AVG1_OID = (1,3,6,1,4,1,15120,1,3,1)
45LOAD_AVG5_OID = (1,3,6,1,4,1,15120,1,3,1)
46LOAD_AVG15_OID = (1,3,6,1,4,1,15120,1,3,1)
47UPTIME_OID = (1,3,6,1,4,1,15120,1,3,1)
48
49class ccs_status_error(ccsd_error):
50    pass
51
52class ccs_host_status(ccs_class):
53    """Monitors the status of a host in the system"""
54   
55    def __init__(self, session_id, host_id, interval):
56        """Initialises a new class for a specified host.
57
58        The specified session must be valid and have appropriate access to
59        the database.
60        """
61       
62        self.lock = threading.RLock()
63
64        session = getSession(session_id)
65        if session is None:
66            raise ccs_host_error("Invalid session id")
67        self._session_id = session_id
68       
69        # See if the specified host id makes sense
70        self.host = ccs_host(session_id, host_id)
71
72        # Initialise the status information
73        self.reachable = False
74        self.operatingRevision = -1
75        self.currentLoad = (-1,-1,-1)
76        self.uptime = 0
77        self.infoUpdatedAt = 0
78        # Choose a random number of seconds up to half of the check interval
79        # long and pretend the last check happened at that time so we
80        # do some naieve staggering to avoid checking every host at once
81        delay = random.randint((interval)/2, interval)
82        self.lastCheckedAt = time.time() - delay
83        log_debug("Initialised host status monitor for %s with %d sec " \
84                "stagger" % (self.host["host_name"], (interval)-delay))
85       
86    def update(self):
87        """Called by the processing thread to trigger an update of the data"""
88        ip = self.host["ip_address"]
89       
90        # Grab the lock
91        lock = self.lock
92        lock.acquire()
93        try:
94            # Check that the host is reachable
95            if not isHostUp(ip):
96                self.reachable = False
97                self.lastCheckedAt = time.time()
98                return
99            self.reachable = True
100            # Retrieve host status information
101            pMod = api.protoModules[api.protoVersion1]
102            reqPDU =  pMod.GetRequestPDU()
103            pMod.apiPDU.setDefaults(reqPDU)
104            pMod.apiPDU.setVarBinds(
105                    reqPDU, ((CCS_REVISION_OID, pMod.Null()),
106                        (LOAD_AVG1_OID, pMod.Null()),
107                        (LOAD_AVG5_OID, pMod.Null()),
108                        (LOAD_AVG15_OID, pMod.Null()),
109                        (UPTIME_OID, pMod.Null())
110                        )
111                    )
112            # Build message
113            reqMsg = pMod.Message()
114            pMod.apiMessage.setDefaults(reqMsg)
115            pMod.apiMessage.setCommunity(reqMsg, 'public')
116            pMod.apiMessage.setPDU(reqMsg, reqPDU)
117            # Dispatch Request
118            transportDispatcher = AsynsockDispatcher()
119            transportDispatcher.registerTransport(
120                    udp.domainName, udp.UdpSocketTransport().openClientMode()
121            )
122
123            startedAt = time.time()
124            def cbTimerFun(timeNow):
125                if timeNow - startedAt > 3:
126                    transportDispatcher.jobFinished(1)
127                    transportDispatcher.closeDispatcher()
128                    raise ccs_status_error("Request timed out")
129   
130            def cbRecvFun(transportDispatcher, transportDomain, \
131                    transportAddress, wholeMsg, reqPDU=reqPDU):
132                while wholeMsg:
133                    rspMsg, wholeMsg = decoder.decode(wholeMsg, \
134                            asn1Spec=pMod.Message())
135                    rspPDU = pMod.apiMessage.getPDU(rspMsg)
136                    # Match response to request
137                    if pMod.apiPDU.getRequestID(reqPDU) == \
138                            pMod.apiPDU.getRequestID(rspPDU):
139                        # Check for SNMP errors reported
140                        errorStatus = pMod.apiPDU.getErrorStatus(rspPDU)
141                        if errorStatus:
142                            transportDispatcher.jobFinished(1)
143                            transportDispatcher.closeDispatcher()
144                            raise ccs_status_error(errorStatus.prettyPrint())
145                        else:
146                            la1 = 0
147                            la5 = 0
148                            la15 = 0
149                            for oid, val in pMod.apiPDU.getVarBinds(rspPDU):
150                                if oid == CCS_REVISION_OID:
151                                    self.operatingRevision = val
152                                elif oid == LOAD_AVG1_OID:
153                                    la1 = val
154                                elif oid == LOAD_AVG5_OID:
155                                    la5 = val
156                                elif oid == LOAD_AVG15_OID:
157                                    la15 = val
158                                elif oid == UPTIME_OID:
159                                    self.uptime = val
160                            self.currentLoad = (la1, la5, la15)
161                        transportDispatcher.jobFinished(1)
162                return wholeMsg
163
164            transportDispatcher.registerRecvCbFun(cbRecvFun)
165            transportDispatcher.registerTimerCbFun(cbTimerFun)
166            transportDispatcher.sendMessage(
167                    encoder.encode(reqMsg), udp.domainName, (ip, 161)
168            )
169            transportDispatcher.jobStarted(1)
170            transportDispatcher.runDispatcher()
171            transportDispatcher.closeDispatcher()
172            # Record that the details were updated
173            self.infoUpdatedAt = time.time()
174            self.lastCheckedAt = time.time()
175            log_info("Updated status details for %s" % self.host["host_name"])
176        finally:
177            lock.release()
178
179    def __getattribute__(self, name):
180        """Override the default accessor to put a lock around all accesses"""
181
182        lock = object.__getattribute__(self, "lock")
183        lock.acquire()
184        try:
185            val = object.__getattribute__(self, name)
186            return val
187        finally:
188            lock.release()
189   
190def getHostStatus(host_name):
191    """Returns the host status object for the specified host"""
192    global _statusInfo
193
194    if host_name not in _statusInfo.keys():
195        raise ccs_status_error("No status information for %s" % host_name)
196
197    return _statusInfo[host_name]
198
199@catchEvent("shutdown")
200def shutdownHandler(*args, **kwargs):
201    global _runStatus
202    _runStatus = False
203
204_statusInfo = {}   
205_runStatus = True
206def statusThread():
207    """Runs as a thread to keep the status information up to date"""
208    global _statusInfo, _runStatus
209
210    try:
211        # What interval shall we check hosts at
212        interval = config_get("status", "interval", DEFAULT_CHECK_INTERVAL)
213       
214        # Initialise the host status information
215        hosts = getHostList(ADMIN_SESSION_ID)
216        for host in getHostList(ADMIN_SESSION_ID):
217            if not host["host_active"]:
218                continue
219            name = host["host_name"]
220            _statusInfo[name] = ccs_host_status(ADMIN_SESSION_ID, \
221                    host["host_id"], interval)
222       
223        # Loop forever reading status as appropriate
224        lastCheck = time.time()-31
225        while _runStatus:
226            # If less than 30 seconds since the last check, wait a bit
227            if time.time() - lastCheck < 30:
228                time.sleep(2)
229                continue
230            # See which hosts need checking now
231            for host_name, host_status in _statusInfo.items():
232                if time.time() - host_status.lastCheckedAt > interval:
233                    try:
234                        host_status.update()
235                    except:
236                        log_error("Failed to update status of %s" % \
237                                host_name, sys.exc_info())
238            # Record when we last ran some checks
239            lastCheck = time.time()
240    except:
241        log_error("Exception in status monitor thread!", sys.exc_info())
242
243    log_info("Exiting status monitor thread")
244       
245def ccs_init():
246    # Start the status information thread
247    initThread(statusThread)
Note: See TracBrowser for help on using the repository browser.