File size: 6,644 Bytes
5690e11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Copyright (c) Alibaba, Inc. and its affiliates.

import logging
import threading

from enum import Enum, unique
from queue import Queue

from . import logging, token, websocket
from .exception import InvalidParameter, ConnectionTimeout, ConnectionUnavailable

__URL__ = 'wss://nls-gateway.cn-shanghai.aliyuncs.com/ws/v1'
__HEADER__ = [
    'Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==',
    'Sec-WebSocket-Version: 13',
]

__FORMAT__ = '%(asctime)s - %(levelname)s - %(message)s'
#__all__ = ['NlsCore']

def core_on_msg(ws, message, args):
    print('core_on_msg:{}'.format(message))
    if not args:
        logging.error('callback core_on_msg with null args')
        return
    nls = args[0]
    nls._NlsCore__issue_callback('on_message', [message])

def core_on_error(ws, message, args):
    print('core_on_error:{}'.format(message))
    if not args:
        logging.error('callback core_on_error with null args')
        return
    nls = args[0]
    nls._NlsCore__issue_callback('on_error', [message])

def core_on_close(ws, close_status_code, close_msg, args):
    print('core_on_close')
    if not args:
        logging.error('callback core_on_close with null args')
        return
    nls = args[0]
    nls._NlsCore__issue_callback('on_close')

def core_on_open(ws, args):
    print('core_on_open:{}'.format(args))
    if not args:
        print('callback with null args')
        ws.close()
    elif len(args) != 2:
        print('callback args not 2')
        ws.close()
    nls = args[0]
    nls._NlsCore__notify_on_open()
    nls.start(args[1], nls._NlsCore__ping_interval, nls._NlsCore__ping_timeout)
    nls._NlsCore__issue_callback('on_open')

def core_on_data(ws, data, opcode, flag, args):
    print('core_on_data opcode={}'.format(opcode))
    if not args:
        logging.error('callback core_on_data with null args')
        return
    nls = args[0]
    nls._NlsCore__issue_callback('on_data', [data, opcode, flag])

@unique
class NlsConnectionStatus(Enum):
    Disconnected = 0
    Connected = 1


class NlsCore:
    """
    NlsCore
    """
    def __init__(self, 
                 url=__URL__,
                 token=None,
                 on_open=None, on_message=None, on_close=None,
                 on_error=None, on_data=None, asynch=False, callback_args=[]):
        self.__url = url
        self.__async = asynch
        if not token:
            raise InvalidParameter('Must provide a valid token!')
        else:
            self.__token = token
        self.__callbacks = {}
        if on_open:
            self.__callbacks['on_open'] = on_open
        if on_message:
            self.__callbacks['on_message'] = on_message
        if on_close:
            self.__callbacks['on_close'] = on_close
        if on_error:
            self.__callbacks['on_error'] = on_error
        if on_data:
            self.__callbacks['on_data'] = on_data
        if not on_open and not on_message and not on_close and not on_error:
            raise InvalidParameter('Must provide at least one callback')
        print('callback args:{}'.format(callback_args))
        self.__callback_args = callback_args
        self.__header = __HEADER__ + ['X-NLS-Token: {}'.format(self.__token)]
        websocket.enableTrace(True)
        self.__ws = websocket.WebSocketApp(self.__url,
                                           self.__header,
                                           on_message=core_on_msg,
                                           on_data=core_on_data,
                                           on_error=core_on_error,
                                           on_close=core_on_close,
                                           callback_args=[self])
        self.__ws.on_open = core_on_open
        self.__lock = threading.Lock()
        self.__cond = threading.Condition()
        self.__connection_status = NlsConnectionStatus.Disconnected

    def start(self, msg, ping_interval, ping_timeout):
        self.__lock.acquire()
        self.__ping_interval = ping_interval
        self.__ping_timeout = ping_timeout
        if self.__connection_status == NlsConnectionStatus.Disconnected:
            self.__ws.update_args(self, msg)
            self.__lock.release()
            self.__connect_before_start(ping_interval, ping_timeout)
        else:
            self.__lock.release()
            self.__ws.send(msg)

    def __notify_on_open(self):
        print('notify on open')
        with self.__cond:
            self.__connection_status = NlsConnectionStatus.Connected
            self.__cond.notify()

    def __issue_callback(self, which, exargs=[]):
        if which not in self.__callbacks:
            logging.error('no such callback:{}'.format(which))
            return
        if which is 'on_close':
            with self.__cond:
                self.__connection_status = NlsConnectionStatus.Disconnected
                self.__cond.notify()
        args = exargs+self.__callback_args
        self.__callbacks[which](*args)

    def send(self, msg, binary):
        self.__lock.acquire()
        if self.__connection_status == NlsConnectionStatus.Disconnected:
            self.__lock.release()
            logging.error('start before send')
            raise ConnectionUnavailable('Must call start before send!')
        else:
            self.__lock.release()
            if binary:
                self.__ws.send(msg, opcode=websocket.ABNF.OPCODE_BINARY)
            else:
                print('send {}'.format(msg))
                self.__ws.send(msg)
    
    def shutdown(self):
        self.__ws.close()

    def __run(self, ping_interval, ping_timeout):
        print('ws run...')
        self.__ws.run_forever(ping_interval=ping_interval,
                ping_timeout=ping_timeout)
        with self.__lock:
            self.__connection_status = NlsConnectionStatus.Disconnected
        print('ws exit...')

    def __connect_before_start(self, ping_interval, ping_timeout):
        with self.__cond:
            self.__th = threading.Thread(target=self.__run,
                    args=[ping_interval, ping_timeout])
            self.__th.start()
            if self.__connection_status == NlsConnectionStatus.Disconnected:
                print('wait cond wakeup')
                if not self.__async:
                    if self.__cond.wait(timeout=10):
                        print('wakeup without timeout')
                        return self.__connection_status == NlsConnectionStatus.Connected
                    else:
                        print('wakeup with timeout')
                        raise ConnectionTimeout('Wait response timeout! Please check local network!')