Spaces:
Running
Running
File size: 6,644 Bytes
5690e11 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# Copyright (c) Alibaba, Inc. and its affiliates.
import logging
import threading
from enum import Enum, unique
from queue import Queue
from . import logging, token, websocket
from .exception import InvalidParameter, ConnectionTimeout, ConnectionUnavailable
__URL__ = 'wss://nls-gateway.cn-shanghai.aliyuncs.com/ws/v1'
__HEADER__ = [
'Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==',
'Sec-WebSocket-Version: 13',
]
__FORMAT__ = '%(asctime)s - %(levelname)s - %(message)s'
#__all__ = ['NlsCore']
def core_on_msg(ws, message, args):
print('core_on_msg:{}'.format(message))
if not args:
logging.error('callback core_on_msg with null args')
return
nls = args[0]
nls._NlsCore__issue_callback('on_message', [message])
def core_on_error(ws, message, args):
print('core_on_error:{}'.format(message))
if not args:
logging.error('callback core_on_error with null args')
return
nls = args[0]
nls._NlsCore__issue_callback('on_error', [message])
def core_on_close(ws, close_status_code, close_msg, args):
print('core_on_close')
if not args:
logging.error('callback core_on_close with null args')
return
nls = args[0]
nls._NlsCore__issue_callback('on_close')
def core_on_open(ws, args):
print('core_on_open:{}'.format(args))
if not args:
print('callback with null args')
ws.close()
elif len(args) != 2:
print('callback args not 2')
ws.close()
nls = args[0]
nls._NlsCore__notify_on_open()
nls.start(args[1], nls._NlsCore__ping_interval, nls._NlsCore__ping_timeout)
nls._NlsCore__issue_callback('on_open')
def core_on_data(ws, data, opcode, flag, args):
print('core_on_data opcode={}'.format(opcode))
if not args:
logging.error('callback core_on_data with null args')
return
nls = args[0]
nls._NlsCore__issue_callback('on_data', [data, opcode, flag])
@unique
class NlsConnectionStatus(Enum):
Disconnected = 0
Connected = 1
class NlsCore:
"""
NlsCore
"""
def __init__(self,
url=__URL__,
token=None,
on_open=None, on_message=None, on_close=None,
on_error=None, on_data=None, asynch=False, callback_args=[]):
self.__url = url
self.__async = asynch
if not token:
raise InvalidParameter('Must provide a valid token!')
else:
self.__token = token
self.__callbacks = {}
if on_open:
self.__callbacks['on_open'] = on_open
if on_message:
self.__callbacks['on_message'] = on_message
if on_close:
self.__callbacks['on_close'] = on_close
if on_error:
self.__callbacks['on_error'] = on_error
if on_data:
self.__callbacks['on_data'] = on_data
if not on_open and not on_message and not on_close and not on_error:
raise InvalidParameter('Must provide at least one callback')
print('callback args:{}'.format(callback_args))
self.__callback_args = callback_args
self.__header = __HEADER__ + ['X-NLS-Token: {}'.format(self.__token)]
websocket.enableTrace(True)
self.__ws = websocket.WebSocketApp(self.__url,
self.__header,
on_message=core_on_msg,
on_data=core_on_data,
on_error=core_on_error,
on_close=core_on_close,
callback_args=[self])
self.__ws.on_open = core_on_open
self.__lock = threading.Lock()
self.__cond = threading.Condition()
self.__connection_status = NlsConnectionStatus.Disconnected
def start(self, msg, ping_interval, ping_timeout):
self.__lock.acquire()
self.__ping_interval = ping_interval
self.__ping_timeout = ping_timeout
if self.__connection_status == NlsConnectionStatus.Disconnected:
self.__ws.update_args(self, msg)
self.__lock.release()
self.__connect_before_start(ping_interval, ping_timeout)
else:
self.__lock.release()
self.__ws.send(msg)
def __notify_on_open(self):
print('notify on open')
with self.__cond:
self.__connection_status = NlsConnectionStatus.Connected
self.__cond.notify()
def __issue_callback(self, which, exargs=[]):
if which not in self.__callbacks:
logging.error('no such callback:{}'.format(which))
return
if which is 'on_close':
with self.__cond:
self.__connection_status = NlsConnectionStatus.Disconnected
self.__cond.notify()
args = exargs+self.__callback_args
self.__callbacks[which](*args)
def send(self, msg, binary):
self.__lock.acquire()
if self.__connection_status == NlsConnectionStatus.Disconnected:
self.__lock.release()
logging.error('start before send')
raise ConnectionUnavailable('Must call start before send!')
else:
self.__lock.release()
if binary:
self.__ws.send(msg, opcode=websocket.ABNF.OPCODE_BINARY)
else:
print('send {}'.format(msg))
self.__ws.send(msg)
def shutdown(self):
self.__ws.close()
def __run(self, ping_interval, ping_timeout):
print('ws run...')
self.__ws.run_forever(ping_interval=ping_interval,
ping_timeout=ping_timeout)
with self.__lock:
self.__connection_status = NlsConnectionStatus.Disconnected
print('ws exit...')
def __connect_before_start(self, ping_interval, ping_timeout):
with self.__cond:
self.__th = threading.Thread(target=self.__run,
args=[ping_interval, ping_timeout])
self.__th.start()
if self.__connection_status == NlsConnectionStatus.Disconnected:
print('wait cond wakeup')
if not self.__async:
if self.__cond.wait(timeout=10):
print('wakeup without timeout')
return self.__connection_status == NlsConnectionStatus.Connected
else:
print('wakeup with timeout')
raise ConnectionTimeout('Wait response timeout! Please check local network!')
|