| 1 | # Copyright (C) 2006 The University of Waikato |
|---|
| 2 | # |
|---|
| 3 | # This file is part of crcnetd - CRCnet Configuration System Daemon |
|---|
| 4 | # |
|---|
| 5 | # Status Module - Keeps track of the status of hosts in the system |
|---|
| 6 | # |
|---|
| 7 | # Author: Matt Brown <matt@crc.net.nz> |
|---|
| 8 | # Version: $Id$ |
|---|
| 9 | # |
|---|
| 10 | # crcnetd is free software; you can redistribute it and/or modify it under the |
|---|
| 11 | # terms of the GNU General Public License version 2 as published by the Free |
|---|
| 12 | # Software Foundation. |
|---|
| 13 | # |
|---|
| 14 | # crcnetd is distributed in the hope that it will be useful, but WITHOUT ANY |
|---|
| 15 | # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|---|
| 16 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
|---|
| 17 | # details. |
|---|
| 18 | # |
|---|
| 19 | # You should have received a copy of the GNU General Public License along with |
|---|
| 20 | # crcnetd; if not, write to the Free Software Foundation, Inc., |
|---|
| 21 | # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
|---|
| 22 | import os.path |
|---|
| 23 | import threading |
|---|
| 24 | from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher |
|---|
| 25 | from pysnmp.carrier.asynsock.dgram import udp |
|---|
| 26 | from pyasn1.codec.ber import encoder, decoder |
|---|
| 27 | from pysnmp.proto import api |
|---|
| 28 | from time import time |
|---|
| 29 | import random |
|---|
| 30 | |
|---|
| 31 | from crcnetd._utils.ccsd_common import * |
|---|
| 32 | from crcnetd._utils.ccsd_log import * |
|---|
| 33 | from crcnetd._utils.ccsd_events import * |
|---|
| 34 | from crcnetd._utils.ccsd_session import getSession, getSessionE |
|---|
| 35 | from crcnetd._utils.ccsd_server import exportViaXMLRPC, initThread |
|---|
| 36 | from crcnetd._utils.ccsd_cfengine import getCfengineHostStatus |
|---|
| 37 | from crcnetd.modules.ccs_host import ccs_host, getHostList |
|---|
| 38 | |
|---|
| 39 | ccs_mod_type = CCSD_SERVER |
|---|
| 40 | |
|---|
| 41 | DEFAULT_CHECK_INTERVAL = 3*60 |
|---|
| 42 | |
|---|
| 43 | CCS_REVISION_OID = (1,3,6,1,4,1,15120,1,3,1) |
|---|
| 44 | LOAD_AVG1_OID = (1,3,6,1,4,1,15120,1,3,1) |
|---|
| 45 | LOAD_AVG5_OID = (1,3,6,1,4,1,15120,1,3,1) |
|---|
| 46 | LOAD_AVG15_OID = (1,3,6,1,4,1,15120,1,3,1) |
|---|
| 47 | UPTIME_OID = (1,3,6,1,4,1,15120,1,3,1) |
|---|
| 48 | |
|---|
| 49 | class ccs_status_error(ccsd_error): |
|---|
| 50 | pass |
|---|
| 51 | |
|---|
| 52 | class ccs_host_status(ccs_class): |
|---|
| 53 | """Monitors the status of a host in the system""" |
|---|
| 54 | |
|---|
| 55 | def __init__(self, session_id, host_id, interval): |
|---|
| 56 | """Initialises a new class for a specified host. |
|---|
| 57 | |
|---|
| 58 | The specified session must be valid and have appropriate access to |
|---|
| 59 | the database. |
|---|
| 60 | """ |
|---|
| 61 | |
|---|
| 62 | self.lock = threading.RLock() |
|---|
| 63 | |
|---|
| 64 | session = getSession(session_id) |
|---|
| 65 | if session is None: |
|---|
| 66 | raise ccs_host_error("Invalid session id") |
|---|
| 67 | self._session_id = session_id |
|---|
| 68 | |
|---|
| 69 | # See if the specified host id makes sense |
|---|
| 70 | self.host = ccs_host(session_id, host_id) |
|---|
| 71 | |
|---|
| 72 | # Initialise the status information |
|---|
| 73 | self.reachable = False |
|---|
| 74 | self.operatingRevision = -1 |
|---|
| 75 | self.currentLoad = (-1,-1,-1) |
|---|
| 76 | self.uptime = 0 |
|---|
| 77 | self.infoUpdatedAt = 0 |
|---|
| 78 | # Choose a random number of seconds up to half of the check interval |
|---|
| 79 | # long and pretend the last check happened at that time so we |
|---|
| 80 | # do some naieve staggering to avoid checking every host at once |
|---|
| 81 | delay = random.randint((interval)/2, interval) |
|---|
| 82 | self.lastCheckedAt = time.time() - delay |
|---|
| 83 | log_debug("Initialised host status monitor for %s with %d sec " \ |
|---|
| 84 | "stagger" % (self.host["host_name"], (interval)-delay)) |
|---|
| 85 | |
|---|
| 86 | def update(self): |
|---|
| 87 | """Called by the processing thread to trigger an update of the data""" |
|---|
| 88 | ip = self.host["ip_address"] |
|---|
| 89 | |
|---|
| 90 | # Grab the lock |
|---|
| 91 | lock = self.lock |
|---|
| 92 | lock.acquire() |
|---|
| 93 | try: |
|---|
| 94 | # Check that the host is reachable |
|---|
| 95 | if not isHostUp(ip): |
|---|
| 96 | self.reachable = False |
|---|
| 97 | self.lastCheckedAt = time.time() |
|---|
| 98 | return |
|---|
| 99 | self.reachable = True |
|---|
| 100 | # Retrieve host status information |
|---|
| 101 | pMod = api.protoModules[api.protoVersion1] |
|---|
| 102 | reqPDU = pMod.GetRequestPDU() |
|---|
| 103 | pMod.apiPDU.setDefaults(reqPDU) |
|---|
| 104 | pMod.apiPDU.setVarBinds( |
|---|
| 105 | reqPDU, ((CCS_REVISION_OID, pMod.Null()), |
|---|
| 106 | (LOAD_AVG1_OID, pMod.Null()), |
|---|
| 107 | (LOAD_AVG5_OID, pMod.Null()), |
|---|
| 108 | (LOAD_AVG15_OID, pMod.Null()), |
|---|
| 109 | (UPTIME_OID, pMod.Null()) |
|---|
| 110 | ) |
|---|
| 111 | ) |
|---|
| 112 | # Build message |
|---|
| 113 | reqMsg = pMod.Message() |
|---|
| 114 | pMod.apiMessage.setDefaults(reqMsg) |
|---|
| 115 | pMod.apiMessage.setCommunity(reqMsg, 'public') |
|---|
| 116 | pMod.apiMessage.setPDU(reqMsg, reqPDU) |
|---|
| 117 | # Dispatch Request |
|---|
| 118 | transportDispatcher = AsynsockDispatcher() |
|---|
| 119 | transportDispatcher.registerTransport( |
|---|
| 120 | udp.domainName, udp.UdpSocketTransport().openClientMode() |
|---|
| 121 | ) |
|---|
| 122 | |
|---|
| 123 | startedAt = time.time() |
|---|
| 124 | def cbTimerFun(timeNow): |
|---|
| 125 | if timeNow - startedAt > 3: |
|---|
| 126 | transportDispatcher.jobFinished(1) |
|---|
| 127 | transportDispatcher.closeDispatcher() |
|---|
| 128 | raise ccs_status_error("Request timed out") |
|---|
| 129 | |
|---|
| 130 | def cbRecvFun(transportDispatcher, transportDomain, \ |
|---|
| 131 | transportAddress, wholeMsg, reqPDU=reqPDU): |
|---|
| 132 | while wholeMsg: |
|---|
| 133 | rspMsg, wholeMsg = decoder.decode(wholeMsg, \ |
|---|
| 134 | asn1Spec=pMod.Message()) |
|---|
| 135 | rspPDU = pMod.apiMessage.getPDU(rspMsg) |
|---|
| 136 | # Match response to request |
|---|
| 137 | if pMod.apiPDU.getRequestID(reqPDU) == \ |
|---|
| 138 | pMod.apiPDU.getRequestID(rspPDU): |
|---|
| 139 | # Check for SNMP errors reported |
|---|
| 140 | errorStatus = pMod.apiPDU.getErrorStatus(rspPDU) |
|---|
| 141 | if errorStatus: |
|---|
| 142 | transportDispatcher.jobFinished(1) |
|---|
| 143 | transportDispatcher.closeDispatcher() |
|---|
| 144 | raise ccs_status_error(errorStatus.prettyPrint()) |
|---|
| 145 | else: |
|---|
| 146 | la1 = 0 |
|---|
| 147 | la5 = 0 |
|---|
| 148 | la15 = 0 |
|---|
| 149 | for oid, val in pMod.apiPDU.getVarBinds(rspPDU): |
|---|
| 150 | if oid == CCS_REVISION_OID: |
|---|
| 151 | self.operatingRevision = val |
|---|
| 152 | elif oid == LOAD_AVG1_OID: |
|---|
| 153 | la1 = val |
|---|
| 154 | elif oid == LOAD_AVG5_OID: |
|---|
| 155 | la5 = val |
|---|
| 156 | elif oid == LOAD_AVG15_OID: |
|---|
| 157 | la15 = val |
|---|
| 158 | elif oid == UPTIME_OID: |
|---|
| 159 | self.uptime = val |
|---|
| 160 | self.currentLoad = (la1, la5, la15) |
|---|
| 161 | transportDispatcher.jobFinished(1) |
|---|
| 162 | return wholeMsg |
|---|
| 163 | |
|---|
| 164 | transportDispatcher.registerRecvCbFun(cbRecvFun) |
|---|
| 165 | transportDispatcher.registerTimerCbFun(cbTimerFun) |
|---|
| 166 | transportDispatcher.sendMessage( |
|---|
| 167 | encoder.encode(reqMsg), udp.domainName, (ip, 161) |
|---|
| 168 | ) |
|---|
| 169 | transportDispatcher.jobStarted(1) |
|---|
| 170 | transportDispatcher.runDispatcher() |
|---|
| 171 | transportDispatcher.closeDispatcher() |
|---|
| 172 | # Record that the details were updated |
|---|
| 173 | self.infoUpdatedAt = time.time() |
|---|
| 174 | self.lastCheckedAt = time.time() |
|---|
| 175 | log_info("Updated status details for %s" % self.host["host_name"]) |
|---|
| 176 | finally: |
|---|
| 177 | lock.release() |
|---|
| 178 | |
|---|
| 179 | def __getattribute__(self, name): |
|---|
| 180 | """Override the default accessor to put a lock around all accesses""" |
|---|
| 181 | |
|---|
| 182 | lock = object.__getattribute__(self, "lock") |
|---|
| 183 | lock.acquire() |
|---|
| 184 | try: |
|---|
| 185 | val = object.__getattribute__(self, name) |
|---|
| 186 | return val |
|---|
| 187 | finally: |
|---|
| 188 | lock.release() |
|---|
| 189 | |
|---|
| 190 | def getHostStatus(host_name): |
|---|
| 191 | """Returns the host status object for the specified host""" |
|---|
| 192 | global _statusInfo |
|---|
| 193 | |
|---|
| 194 | if host_name not in _statusInfo.keys(): |
|---|
| 195 | raise ccs_status_error("No status information for %s" % host_name) |
|---|
| 196 | |
|---|
| 197 | return _statusInfo[host_name] |
|---|
| 198 | |
|---|
| 199 | @catchEvent("shutdown") |
|---|
| 200 | def shutdownHandler(*args, **kwargs): |
|---|
| 201 | global _runStatus |
|---|
| 202 | _runStatus = False |
|---|
| 203 | |
|---|
| 204 | _statusInfo = {} |
|---|
| 205 | _runStatus = True |
|---|
| 206 | def statusThread(): |
|---|
| 207 | """Runs as a thread to keep the status information up to date""" |
|---|
| 208 | global _statusInfo, _runStatus |
|---|
| 209 | |
|---|
| 210 | try: |
|---|
| 211 | # What interval shall we check hosts at |
|---|
| 212 | interval = config_get("status", "interval", DEFAULT_CHECK_INTERVAL) |
|---|
| 213 | |
|---|
| 214 | # Initialise the host status information |
|---|
| 215 | hosts = getHostList(ADMIN_SESSION_ID) |
|---|
| 216 | for host in getHostList(ADMIN_SESSION_ID): |
|---|
| 217 | if not host["host_active"]: |
|---|
| 218 | continue |
|---|
| 219 | name = host["host_name"] |
|---|
| 220 | _statusInfo[name] = ccs_host_status(ADMIN_SESSION_ID, \ |
|---|
| 221 | host["host_id"], interval) |
|---|
| 222 | |
|---|
| 223 | # Loop forever reading status as appropriate |
|---|
| 224 | lastCheck = time.time()-31 |
|---|
| 225 | while _runStatus: |
|---|
| 226 | # If less than 30 seconds since the last check, wait a bit |
|---|
| 227 | if time.time() - lastCheck < 30: |
|---|
| 228 | time.sleep(2) |
|---|
| 229 | continue |
|---|
| 230 | # See which hosts need checking now |
|---|
| 231 | for host_name, host_status in _statusInfo.items(): |
|---|
| 232 | if time.time() - host_status.lastCheckedAt > interval: |
|---|
| 233 | try: |
|---|
| 234 | host_status.update() |
|---|
| 235 | except: |
|---|
| 236 | log_error("Failed to update status of %s" % \ |
|---|
| 237 | host_name, sys.exc_info()) |
|---|
| 238 | # Record when we last ran some checks |
|---|
| 239 | lastCheck = time.time() |
|---|
| 240 | except: |
|---|
| 241 | log_error("Exception in status monitor thread!", sys.exc_info()) |
|---|
| 242 | |
|---|
| 243 | log_info("Exiting status monitor thread") |
|---|
| 244 | |
|---|
| 245 | def ccs_init(): |
|---|
| 246 | # Start the status information thread |
|---|
| 247 | initThread(statusThread) |
|---|