source: ccsd/trunk/crcnetd/modules/ccs_status.py @ 992

Last change on this file since 992 was 992, checked in by mglb1, 7 years ago

Add support for displaying the status of the operating revision on
each host in the CFengine management screen.

  • Property svn:keywords set to Id
File size: 11.2 KB
Line 
1# Copyright (C) 2006  The University of Waikato
2#
3# This file is part of crcnetd - CRCnet Configuration System Daemon
4#
5# Status Module - Keeps track of the status of hosts in the system
6#
7# Author:       Matt Brown <matt@crc.net.nz>
8# Version:      $Id$
9#
10# crcnetd is free software; you can redistribute it and/or modify it under the
11# terms of the GNU General Public License version 2 as published by the Free
12# Software Foundation.
13#
14# crcnetd is distributed in the hope that it will be useful, but WITHOUT ANY
15# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
17# details.
18#
19# You should have received a copy of the GNU General Public License along with
20# crcnetd; if not, write to the Free Software Foundation, Inc.,
21# 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
22import os.path
23import threading
24from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher
25from pysnmp.carrier.asynsock.dgram import udp
26from pyasn1.codec.ber import encoder, decoder
27from pysnmp.proto import api
28from time import time
29import random
30
31from crcnetd._utils.ccsd_common import *
32from crcnetd._utils.ccsd_log import *
33from crcnetd._utils.ccsd_events import *
34from crcnetd._utils.ccsd_session import getSession, getSessionE
35from crcnetd._utils.ccsd_server import exportViaXMLRPC, initThread
36from crcnetd._utils.ccsd_cfengine import getCfengineHostStatus
37from crcnetd.modules.ccs_host import ccs_host, getHostList
38
39ccs_mod_type = CCSD_SERVER
40
41DEFAULT_CHECK_INTERVAL = 3*60
42
43CCS_REVISION_OID = (1,3,6,1,4,1,15120,1,3,1)
44LOAD_AVG1_OID = (1,3,6,1,4,1,15120,1,3,1)
45LOAD_AVG5_OID = (1,3,6,1,4,1,15120,1,3,1)
46LOAD_AVG15_OID = (1,3,6,1,4,1,15120,1,3,1)
47UPTIME_OID = (1,3,6,1,4,1,15120,1,3,1)
48
49class ccs_status_error(ccsd_error):
50    pass
51
52class ccs_host_status(ccs_class):
53    """Monitors the status of a host in the system"""
54   
55    def __init__(self, session_id, host_id, interval):
56        """Initialises a new class for a specified host.
57
58        The specified session must be valid and have appropriate access to
59        the database.
60        """
61       
62        self.lock = threading.RLock()
63
64        session = getSession(session_id)
65        if session is None:
66            raise ccs_host_error("Invalid session id")
67        self._session_id = session_id
68       
69        # See if the specified host id makes sense
70        self.host = ccs_host(session_id, host_id)
71
72        # Initialise the status information
73        self.reachable = False
74        self.operatingRevision = -1
75        self.operatingRevisionStatus = "unknown"
76        self.operatingRevisionText = "Unknown"
77        self.operatingRevisionHint = "Status not yet fetched"
78        self.currentLoad = (-1,-1,-1)
79        self.uptime = 0
80        self.infoUpdatedAt = 0
81        # Choose a random number of seconds up to half of the check interval
82        # long and pretend the last check happened at that time so we
83        # do some naieve staggering to avoid checking every host at once
84        delay = random.randint((interval)/2, interval)
85        self.lastCheckedAt = time.time() - delay
86        log_debug("Initialised host status monitor for %s with %d sec " \
87                "stagger" % (self.host["host_name"], (interval)-delay))
88       
89    def update(self):
90        """Called by the processing thread to trigger an update of the data"""
91        ip = self.host["ip_address"]
92       
93        # Grab the lock
94        lock = self.lock
95        lock.acquire()
96        try:
97            # Check that the host is reachable
98            if not isHostUp(ip):
99                self.reachable = False
100                self.lastCheckedAt = time.time()
101                return
102            self.reachable = True
103            # Retrieve host status information
104            pMod = api.protoModules[api.protoVersion1]
105            reqPDU =  pMod.GetRequestPDU()
106            pMod.apiPDU.setDefaults(reqPDU)
107            pMod.apiPDU.setVarBinds(
108                    reqPDU, ((CCS_REVISION_OID, pMod.Null()),
109                        (LOAD_AVG1_OID, pMod.Null()),
110                        (LOAD_AVG5_OID, pMod.Null()),
111                        (LOAD_AVG15_OID, pMod.Null()),
112                        (UPTIME_OID, pMod.Null())
113                        )
114                    )
115            # Build message
116            reqMsg = pMod.Message()
117            pMod.apiMessage.setDefaults(reqMsg)
118            pMod.apiMessage.setCommunity(reqMsg, 'public')
119            pMod.apiMessage.setPDU(reqMsg, reqPDU)
120            # Dispatch Request
121            transportDispatcher = AsynsockDispatcher()
122            transportDispatcher.registerTransport(
123                    udp.domainName, udp.UdpSocketTransport().openClientMode()
124            )
125
126            startedAt = time.time()
127            def cbTimerFun(timeNow):
128                if timeNow - startedAt > 3:
129                    transportDispatcher.jobFinished(1)
130                    transportDispatcher.closeDispatcher()
131                    raise ccs_status_error("Request timed out")
132   
133            def cbRecvFun(transportDispatcher, transportDomain, \
134                    transportAddress, wholeMsg, reqPDU=reqPDU):
135                while wholeMsg:
136                    rspMsg, wholeMsg = decoder.decode(wholeMsg, \
137                            asn1Spec=pMod.Message())
138                    rspPDU = pMod.apiMessage.getPDU(rspMsg)
139                    # Match response to request
140                    if pMod.apiPDU.getRequestID(reqPDU) == \
141                            pMod.apiPDU.getRequestID(rspPDU):
142                        # Check for SNMP errors reported
143                        errorStatus = pMod.apiPDU.getErrorStatus(rspPDU)
144                        if errorStatus:
145                            transportDispatcher.jobFinished(1)
146                            transportDispatcher.closeDispatcher()
147                            raise ccs_status_error(errorStatus.prettyPrint())
148                        else:
149                            la1 = 0
150                            la5 = 0
151                            la15 = 0
152                            for oid, val in pMod.apiPDU.getVarBinds(rspPDU):
153                                if oid == CCS_REVISION_OID:
154                                    self.operatingRevision = str(val)
155                                    self._parseOperatingRevision()
156                                elif oid == LOAD_AVG1_OID:
157                                    la1 = val
158                                elif oid == LOAD_AVG5_OID:
159                                    la5 = val
160                                elif oid == LOAD_AVG15_OID:
161                                    la15 = val
162                                elif oid == UPTIME_OID:
163                                    self.uptime = val
164                            self.currentLoad = (la1, la5, la15)
165                        transportDispatcher.jobFinished(1)
166                return wholeMsg
167
168            transportDispatcher.registerRecvCbFun(cbRecvFun)
169            transportDispatcher.registerTimerCbFun(cbTimerFun)
170            transportDispatcher.sendMessage(
171                    encoder.encode(reqMsg), udp.domainName, (ip, 161)
172            )
173            transportDispatcher.jobStarted(1)
174            transportDispatcher.runDispatcher()
175            transportDispatcher.closeDispatcher()
176            # Record that the details were updated
177            self.infoUpdatedAt = time.time()
178            self.lastCheckedAt = time.time()
179            log_info("Updated status details for %s" % self.host["host_name"])
180        finally:
181            lock.release()
182
183    def _parseOperatingRevision(self):
184        """Parsing the incoming operating revision and discerns what it means
185        about the status of the host"""
186
187        if self.operatingRevision == -1 or self.operatingRevision=="":
188            # Host doesn't know it's own revision!
189            self.operatingRevisionStatus = "critical"
190            self.operatingRevisionText = "- Invalid State"
191            self.operatingRevisionHint = "The host is in an invalid state"
192            self.operatingRevision = ""
193            return
194
195        if self.operatingRevision.find("M") != -1:
196            self.operatingRevisionStatus = "warning"
197            self.operatingRevisionText = "with manual changes!"
198            self.operatingRevisionHint = "Manual changes risk being overwritten"
199            self.operatingRevision = self.operatingRevision[:-1]
200            return
201
202        if self.operatingRevision.find(":") != -1:
203            parts = self.operatingRevision.split(":")
204            self.operatingRevisionStatus = "warning"
205            self.operatingRevisionText = "mixed with r%s" % parts[1]
206            self.operatingRevisionHint = "There are configuration files on " \
207                    "the host that do not come from the active revision"
208            self.operatingRevision = parts[0]
209            return
210
211        self.operatingRevisionStatus = "ok"
212        self.operatingRevisionText = ""
213        self.operatingRevisionHint = "The host configuration is up to date"
214        return
215
216    def __getattribute__(self, name):
217        """Override the default accessor to put a lock around all accesses"""
218
219        lock = object.__getattribute__(self, "lock")
220        lock.acquire()
221        try:
222            val = object.__getattribute__(self, name)
223            return val
224        finally:
225            lock.release()
226   
227def getHostStatus(host_name):
228    """Returns the host status object for the specified host"""
229    global _statusInfo
230
231    if host_name not in _statusInfo.keys():
232        raise ccs_status_error("No status information for %s" % host_name)
233
234    return _statusInfo[host_name]
235
236@catchEvent("shutdown")
237def shutdownHandler(*args, **kwargs):
238    global _runStatus
239    _runStatus = False
240
241_statusInfo = {}   
242_runStatus = True
243def statusThread():
244    """Runs as a thread to keep the status information up to date"""
245    global _statusInfo, _runStatus
246
247    try:
248        # What interval shall we check hosts at
249        interval = config_get("status", "interval", DEFAULT_CHECK_INTERVAL)
250       
251        # Initialise the host status information
252        hosts = getHostList(ADMIN_SESSION_ID)
253        for host in getHostList(ADMIN_SESSION_ID):
254            if not host["host_active"]:
255                continue
256            name = host["host_name"]
257            _statusInfo[name] = ccs_host_status(ADMIN_SESSION_ID, \
258                    host["host_id"], interval)
259       
260        # Loop forever reading status as appropriate
261        lastCheck = time.time()-31
262        while _runStatus:
263            # If less than 30 seconds since the last check, wait a bit
264            if time.time() - lastCheck < 30:
265                time.sleep(2)
266                continue
267            # See which hosts need checking now
268            for host_name, host_status in _statusInfo.items():
269                if time.time() - host_status.lastCheckedAt > interval:
270                    try:
271                        host_status.update()
272                    except:
273                        log_error("Failed to update status of %s" % \
274                                host_name, sys.exc_info())
275            # Record when we last ran some checks
276            lastCheck = time.time()
277    except:
278        log_error("Exception in status monitor thread!", sys.exc_info())
279
280    log_info("Exiting status monitor thread")
281       
282def ccs_init():
283    # Start the status information thread
284    initThread(statusThread)
Note: See TracBrowser for help on using the repository browser.