File size: 3,476 Bytes
c19ca42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from ctypes import *
from ctypes.wintypes import *
from typing import NamedTuple, TypeVar
from .apis import PdhExpandWildCardPathW, PdhOpenQueryW, PdhAddEnglishCounterW, PdhCollectQueryData, PdhGetFormattedCounterValue, PdhGetFormattedCounterArrayW, PdhCloseQuery
from .structures import PDH_HQUERY, PDH_HCOUNTER, PDH_FMT_COUNTERVALUE, PPDH_FMT_COUNTERVALUE_ITEM_W
from .defines import *
from .msvcrt import malloc
from .errors import PDHError


class __InternalAbstraction(NamedTuple):
    flag: int
    attr_name: str


_type_map = {
    int: __InternalAbstraction(PDH_FMT_LARGE, "largeValue"),
    float: __InternalAbstraction(PDH_FMT_DOUBLE, "doubleValue"),
}


def expand_wildcard_path(path: str) -> list[str]:
    listLength = DWORD(0)
    if PdhExpandWildCardPathW(None, LPCWSTR(path), None, byref(listLength), PDH_NOEXPANDCOUNTERS) != PDH_MORE_DATA:
        raise PDHError("Something went wrong.")
    expanded = (WCHAR * listLength.value)()
    if PdhExpandWildCardPathW(None, LPCWSTR(path), expanded, byref(listLength), PDH_NOEXPANDCOUNTERS) != PDH_OK:
        raise PDHError(f"Couldn't expand wildcard path '{path}'")
    result = []
    cur = ""
    for c in expanded:
        if c == '\0':
            result.append(cur)
            cur = ""
        else:
            cur += c
    result.pop()
    return result


T = TypeVar("T", *_type_map.keys())


class HCounter(PDH_HCOUNTER):
    def get_formatted_value(self, typ: T) -> T:
        if typ not in _type_map:
            raise PDHError(f"Invalid value type: {typ}")
        flag, attr_name = _type_map[typ]
        value = PDH_FMT_COUNTERVALUE()
        if PdhGetFormattedCounterValue(self, DWORD(flag | PDH_FMT_NOSCALE), None, byref(value)) != PDH_OK:
            raise PDHError("Couldn't get formatted counter value.")
        return getattr(value.u, attr_name)

    def get_formatted_dict(self, typ: T) -> dict[str, T]:
        if typ not in _type_map:
            raise PDHError(f"Invalid value type: {typ}")
        flag, attr_name = _type_map[typ]
        bufferSize = DWORD(0)
        itemCount = DWORD(0)
        if PdhGetFormattedCounterArrayW(self, DWORD(flag | PDH_FMT_NOSCALE), byref(bufferSize), byref(itemCount), None) != PDH_MORE_DATA:
            raise PDHError("Something went wrong.")
        itemBuffer = cast(malloc(c_size_t(bufferSize.value)), PPDH_FMT_COUNTERVALUE_ITEM_W)
        if PdhGetFormattedCounterArrayW(self, DWORD(flag | PDH_FMT_NOSCALE), byref(bufferSize), byref(itemCount), itemBuffer) != PDH_OK:
            raise PDHError("Couldn't get formatted counter array.")
        result: dict[str, T] = dict()
        for i in range(0, itemCount.value):
            item = itemBuffer[i]
            result[item.szName] = getattr(item.FmtValue.u, attr_name)
        return result


class HQuery(PDH_HQUERY):
    def __init__(self):
        super().__init__()
        if PdhOpenQueryW(None, None, byref(self)) != PDH_OK:
            raise PDHError("Couldn't open PDH query.")

    def add_counter(self, path: str) -> HCounter:
        hCounter = HCounter()
        if PdhAddEnglishCounterW(self, LPCWSTR(path), None, byref(hCounter)) != PDH_OK:
            raise PDHError("Couldn't add counter query.")
        return hCounter

    def collect_data(self):
        if PdhCollectQueryData(self) != PDH_OK:
            raise PDHError("Couldn't collect query data.")

    def close(self):
        if PdhCloseQuery(self) != PDH_OK:
            raise PDHError("Couldn't close PDH query.")