202 lines
6.7 KiB
Python
Executable File

#!/usr/bin/env python
# Copyright (c) 2015 David Lundgren
#
# Python WMI Client
#
# Can be used in place of wmic when using check_wmi_plus.pl with Nagios.
#
# @author David Lundgren (@drlundgren)
#
# https://github.com/ProjectPatatoe/py-wmi-client/
#
# need additional packages:
# sudo pip3 install natsort
# sudo pip3 install impacket
import argparse
import re
import sys
import configparser
from natsort import natsorted, ns
from impacket.dcerpc.v5.dtypes import NULL
from impacket.dcerpc.v5.dcom import wmi
from impacket.dcerpc.v5.dcomrt import DCOMConnection
APP_VERSION = '0.1.0'
class WmiClient(object):
"""WMI Client"""
def __init__(self, auth, host):
"""
:param auth:
:param host:
"""
self.auth = auth
self.host = host
def get_language(self, lang):
"""
Retrieve the language passed in from int to string
:param lang: string
:return:
"""
if lang == 552:
return 'en-US'
return '??-??'
def format_value(self, value, cimtype, type):
"""
Formats the value based on the cimtype and type
:param value:
:param cimtype: string
:param type: int
:return:
"""
if cimtype == 'string':
if value == 0:
return '(null)'
else:
return str(value).strip()
elif cimtype == 'boolean': # boolean
if value == 'True':
return 'True'
else:
return 'False'
elif value is None:
if cimtype == 'uint32' or cimtype == 'uint64':
return '0'
else:
return ('%s' % value).strip()
def print_results(self, queryObject, delimiter):
"""
Prints the results in the classObject as wmic.c would
:param queryObject: IEnumWbemClassObject
:param delimiter: string
:return:
"""
while True:
try:
classObject = queryObject.Next(0xffffffff, 1)[0]
print('CLASS: %s' % classObject.getClassName())
record = classObject.getProperties()
keys = []
for name in record:
keys.append(name.strip())
keys = natsorted(keys, alg=ns.IGNORECASE)
print(delimiter.join(keys))
tmp = []
for key in keys:
if key == 'MUILanguages':
vals = []
for v in record[key]['value']:
vals.append(self.get_language(v))
record[key]['value'] = vals
if isinstance(record[key]['value'], list):
values = []
for v in record[key]['value']:
values.append(
self.format_value(v, record[key]['qualifiers']['CIMTYPE'], record[key]['type']))
tmp.append('(%s)' % ','.join(values))
else:
tmp.append('%s' % self.format_value(record[key]['value'], record[key]['qualifiers']['CIMTYPE'],
record[key]['type']))
print(delimiter.join(tmp))
except Exception as e:
if e.get_error_code() != wmi.WBEMSTATUS.WBEM_S_FALSE:
raise
else:
break
def query_and_print(self, wql, **kwargs):
"""
Querys and prints the results
:param wql:
:param kwargs:
:return:
"""
namespace = '//./root/cimv2'
delimiter = '|'
conn = None
classObject = None
wmiService = None
wmiLogin = None
if 'namespace' in kwargs:
namespace = kwargs['namespace']
if 'delimiter' in kwargs:
delimiter = kwargs['delimiter']
try:
conn = DCOMConnection(self.host, self.auth['username'], self.auth['password'], self.auth['domain'], '', '',
None, oxidResolver=True, doKerberos=False)
wmiInterface = conn.CoCreateInstanceEx(wmi.CLSID_WbemLevel1Login, wmi.IID_IWbemLevel1Login)
wmiLogin = wmi.IWbemLevel1Login(wmiInterface)
wmiService = wmiLogin.NTLMLogin(namespace, NULL, NULL)
wmiLogin.RemRelease()
queryObject = wmiService.ExecQuery(wql.strip('\n'),
wmi.WBEM_FLAG_RETURN_IMMEDIATELY | wmi.WBEM_FLAG_ENSURE_LOCATABLE)
self.print_results(queryObject, delimiter)
queryObject.RemRelease()
wmiService.RemRelease()
conn.disconnect()
except Exception as e:
if classObject is not None:
classObject.RemRelease()
if wmiLogin is not None:
wmiLogin.RemRelease()
if wmiService is not None:
wmiService.RemRelease()
if conn is not None:
conn.disconnect()
raise Exception("Could not connect to %s: %s" % (self.host, str(e)))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="WMI client")
parser.add_argument('-U', '--user', dest='user', help="[DOMAIN\]USERNAME[%%PASSWORD]")
parser.add_argument('-A', '--authentication-file', dest='authfile', help="Authentication file")
parser.add_argument('--delimiter', default='|', help="delimiter, default: |")
parser.add_argument('--namespace', default='//./root/cimv2', help='namespace name (default //./root/cimv2)')
parser.add_argument('host', metavar="//host")
parser.add_argument('wql', metavar="query")
options = parser.parse_args()
auth = {
'username': '',
'password': '',
'domain': ''
}
if options.authfile is not None:
authfile = '[root]\n' + open(options.authfile, 'r').read()
config = configparser.ConfigParser()
config.read_string(authfile)
auth['domain'] = config.get('root', 'domain', fallback='WORKGROUP')
auth['username'] = config.get('root', 'username')
auth['password'] = config.get('root', 'password')
elif options.user is not None:
auth['domain'], auth['username'], auth['password'] = re.compile(
'(?:(?:([^/\\\\%]*)[/\\\\])?([^%]*))(?:%(.*))?').match(options.user).groups('')
else:
print("Missing user information")
sys.exit(1)
if auth['domain'] == '':
auth['domain'] = 'WORKGROUP'
WmiClient(auth, options.host[2:]).query_and_print(options.wql, namespace=options.namespace,
delimiter=options.delimiter)