File size: 7,182 Bytes
0a06673 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
import unittest
import mock
import time
import json
try:
from queue import Queue
except ImportError:
from Queue import Queue
from analytics.consumer import Consumer, MAX_MSG_SIZE
from analytics.request import APIError
class TestConsumer(unittest.TestCase):
def test_next(self):
q = Queue()
consumer = Consumer(q, '')
q.put(1)
next = consumer.next()
self.assertEqual(next, [1])
def test_next_limit(self):
q = Queue()
upload_size = 50
consumer = Consumer(q, '', upload_size)
for i in range(10000):
q.put(i)
next = consumer.next()
self.assertEqual(next, list(range(upload_size)))
def test_dropping_oversize_msg(self):
q = Queue()
consumer = Consumer(q, '')
oversize_msg = {'m': 'x' * MAX_MSG_SIZE}
q.put(oversize_msg)
next = consumer.next()
self.assertEqual(next, [])
self.assertTrue(q.empty())
def test_upload(self):
q = Queue()
consumer = Consumer(q, 'testsecret')
track = {
'type': 'track',
'event': 'python event',
'userId': 'userId'
}
q.put(track)
success = consumer.upload()
self.assertTrue(success)
def test_upload_interval(self):
# Put _n_ items in the queue, pausing a little bit more than
# _upload_interval_ after each one.
# The consumer should upload _n_ times.
q = Queue()
upload_interval = 0.3
consumer = Consumer(q, 'testsecret', upload_size=10,
upload_interval=upload_interval)
with mock.patch('analytics.consumer.post') as mock_post:
consumer.start()
for i in range(0, 3):
track = {
'type': 'track',
'event': 'python event %d' % i,
'userId': 'userId'
}
q.put(track)
time.sleep(upload_interval * 1.1)
self.assertEqual(mock_post.call_count, 3)
def test_multiple_uploads_per_interval(self):
# Put _upload_size*2_ items in the queue at once, then pause for
# _upload_interval_. The consumer should upload 2 times.
q = Queue()
upload_interval = 0.5
upload_size = 10
consumer = Consumer(q, 'testsecret', upload_size=upload_size,
upload_interval=upload_interval)
with mock.patch('analytics.consumer.post') as mock_post:
consumer.start()
for i in range(0, upload_size * 2):
track = {
'type': 'track',
'event': 'python event %d' % i,
'userId': 'userId'
}
q.put(track)
time.sleep(upload_interval * 1.1)
self.assertEqual(mock_post.call_count, 2)
@classmethod
def test_request(cls):
consumer = Consumer(None, 'testsecret')
track = {
'type': 'track',
'event': 'python event',
'userId': 'userId'
}
consumer.request([track])
def _test_request_retry(self, consumer,
expected_exception, exception_count):
def mock_post(*args, **kwargs):
mock_post.call_count += 1
if mock_post.call_count <= exception_count:
raise expected_exception
mock_post.call_count = 0
with mock.patch('analytics.consumer.post',
mock.Mock(side_effect=mock_post)):
track = {
'type': 'track',
'event': 'python event',
'userId': 'userId'
}
# request() should succeed if the number of exceptions raised is
# less than the retries paramater.
if exception_count <= consumer.retries:
consumer.request([track])
else:
# if exceptions are raised more times than the retries
# parameter, we expect the exception to be returned to
# the caller.
try:
consumer.request([track])
except type(expected_exception) as exc:
self.assertEqual(exc, expected_exception)
else:
self.fail(
"request() should raise an exception if still failing "
"after %d retries" % consumer.retries)
def test_request_retry(self):
# we should retry on general errors
consumer = Consumer(None, 'testsecret')
self._test_request_retry(consumer, Exception('generic exception'), 2)
# we should retry on server errors
consumer = Consumer(None, 'testsecret')
self._test_request_retry(consumer, APIError(
500, 'code', 'Internal Server Error'), 2)
# we should retry on HTTP 429 errors
consumer = Consumer(None, 'testsecret')
self._test_request_retry(consumer, APIError(
429, 'code', 'Too Many Requests'), 2)
# we should NOT retry on other client errors
consumer = Consumer(None, 'testsecret')
api_error = APIError(400, 'code', 'Client Errors')
try:
self._test_request_retry(consumer, api_error, 1)
except APIError:
pass
else:
self.fail('request() should not retry on client errors')
# test for number of exceptions raise > retries value
consumer = Consumer(None, 'testsecret', retries=3)
self._test_request_retry(consumer, APIError(
500, 'code', 'Internal Server Error'), 3)
def test_pause(self):
consumer = Consumer(None, 'testsecret')
consumer.pause()
self.assertFalse(consumer.running)
def test_max_batch_size(self):
q = Queue()
consumer = Consumer(
q, 'testsecret', upload_size=100000, upload_interval=3)
track = {
'type': 'track',
'event': 'python event',
'userId': 'userId'
}
msg_size = len(json.dumps(track).encode())
# number of messages in a maximum-size batch
n_msgs = int(475000 / msg_size)
def mock_post_fn(_, data, **kwargs):
res = mock.Mock()
res.status_code = 200
self.assertTrue(len(data.encode()) < 500000,
'batch size (%d) exceeds 500KB limit'
% len(data.encode()))
return res
with mock.patch('analytics.request._session.post',
side_effect=mock_post_fn) as mock_post:
consumer.start()
for _ in range(0, n_msgs + 2):
q.put(track)
q.join()
self.assertEqual(mock_post.call_count, 2)
@classmethod
def test_proxies(cls):
consumer = Consumer(None, 'testsecret', proxies='203.243.63.16:80')
track = {
'type': 'track',
'event': 'python event',
'userId': 'userId'
}
consumer.request([track])
|