1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Test the memcache client protocol.
8 from twisted.internet.error import ConnectionDone
10 from twisted.protocols.memcache import MemCacheProtocol, NoSuchCommand
11 from twisted.protocols.memcache import ClientError, ServerError
13 from twisted.trial.unittest import TestCase
14 from twisted.test.proto_helpers import StringTransportWithDisconnection
15 from twisted.internet.task import Clock
16 from twisted.internet.defer import Deferred, gatherResults, TimeoutError
17 from twisted.internet.defer import DeferredList
23 Setup and tests for basic invocation of L{MemCacheProtocol} commands.
26 def _test(self, d, send, recv, result):
28 Helper test method to test the resulting C{Deferred} of a
29 L{MemCacheProtocol} command.
31 raise NotImplementedError()
36 L{MemCacheProtocol.get} returns a L{Deferred} which is called back with
37 the value and the flag associated with the given key if the server
38 returns a successful result.
40 return self._test(self.proto.get("foo"), "get foo\r\n",
41 "VALUE foo 0 3\r\nbar\r\nEND\r\n", (0, "bar"))
44 def test_emptyGet(self):
46 Test getting a non-available key: it succeeds but return C{None} as
47 value and C{0} as flag.
49 return self._test(self.proto.get("foo"), "get foo\r\n",
53 def test_getMultiple(self):
55 L{MemCacheProtocol.getMultiple} returns a L{Deferred} which is called
56 back with a dictionary of flag, value for each given key.
58 return self._test(self.proto.getMultiple(['foo', 'cow']),
60 "VALUE foo 0 3\r\nbar\r\nVALUE cow 0 7\r\nchicken\r\nEND\r\n",
61 {'cow': (0, 'chicken'), 'foo': (0, 'bar')})
64 def test_getMultipleWithEmpty(self):
66 When L{MemCacheProtocol.getMultiple} is called with non-available keys,
67 the corresponding tuples are (0, None).
69 return self._test(self.proto.getMultiple(['foo', 'cow']),
71 "VALUE cow 1 3\r\nbar\r\nEND\r\n",
72 {'cow': (1, 'bar'), 'foo': (0, None)})
77 L{MemCacheProtocol.set} returns a L{Deferred} which is called back with
78 C{True} when the operation succeeds.
80 return self._test(self.proto.set("foo", "bar"),
81 "set foo 0 0 3\r\nbar\r\n", "STORED\r\n", True)
86 L{MemCacheProtocol.add} returns a L{Deferred} which is called back with
87 C{True} when the operation succeeds.
89 return self._test(self.proto.add("foo", "bar"),
90 "add foo 0 0 3\r\nbar\r\n", "STORED\r\n", True)
93 def test_replace(self):
95 L{MemCacheProtocol.replace} returns a L{Deferred} which is called back
96 with C{True} when the operation succeeds.
98 return self._test(self.proto.replace("foo", "bar"),
99 "replace foo 0 0 3\r\nbar\r\n", "STORED\r\n", True)
102 def test_errorAdd(self):
104 Test an erroneous add: if a L{MemCacheProtocol.add} is called but the
105 key already exists on the server, it returns a B{NOT STORED} answer,
106 which calls back the resulting L{Deferred} with C{False}.
108 return self._test(self.proto.add("foo", "bar"),
109 "add foo 0 0 3\r\nbar\r\n", "NOT STORED\r\n", False)
112 def test_errorReplace(self):
114 Test an erroneous replace: if a L{MemCacheProtocol.replace} is called
115 but the key doesn't exist on the server, it returns a B{NOT STORED}
116 answer, which calls back the resulting L{Deferred} with C{False}.
118 return self._test(self.proto.replace("foo", "bar"),
119 "replace foo 0 0 3\r\nbar\r\n", "NOT STORED\r\n", False)
122 def test_delete(self):
124 L{MemCacheProtocol.delete} returns a L{Deferred} which is called back
125 with C{True} when the server notifies a success.
127 return self._test(self.proto.delete("bar"), "delete bar\r\n",
131 def test_errorDelete(self):
133 Test a error during a delete: if key doesn't exist on the server, it
134 returns a B{NOT FOUND} answer which calls back the resulting L{Deferred}
137 return self._test(self.proto.delete("bar"), "delete bar\r\n",
138 "NOT FOUND\r\n", False)
141 def test_increment(self):
143 Test incrementing a variable: L{MemCacheProtocol.increment} returns a
144 L{Deferred} which is called back with the incremented value of the
147 return self._test(self.proto.increment("foo"), "incr foo 1\r\n",
151 def test_decrement(self):
153 Test decrementing a variable: L{MemCacheProtocol.decrement} returns a
154 L{Deferred} which is called back with the decremented value of the
158 self.proto.decrement("foo"), "decr foo 1\r\n", "5\r\n", 5)
161 def test_incrementVal(self):
163 L{MemCacheProtocol.increment} takes an optional argument C{value} which
164 replaces the default value of 1 when specified.
166 return self._test(self.proto.increment("foo", 8), "incr foo 8\r\n",
170 def test_decrementVal(self):
172 L{MemCacheProtocol.decrement} takes an optional argument C{value} which
173 replaces the default value of 1 when specified.
175 return self._test(self.proto.decrement("foo", 3), "decr foo 3\r\n",
179 def test_stats(self):
181 Test retrieving server statistics via the L{MemCacheProtocol.stats}
182 command: it parses the data sent by the server and calls back the
183 resulting L{Deferred} with a dictionary of the received statistics.
185 return self._test(self.proto.stats(), "stats\r\n",
186 "STAT foo bar\r\nSTAT egg spam\r\nEND\r\n",
187 {"foo": "bar", "egg": "spam"})
190 def test_statsWithArgument(self):
192 L{MemCacheProtocol.stats} takes an optional C{str} argument which,
193 if specified, is sent along with the I{STAT} command. The I{STAT}
194 responses from the server are parsed as key/value pairs and returned
195 as a C{dict} (as in the case where the argument is not specified).
197 return self._test(self.proto.stats("blah"), "stats blah\r\n",
198 "STAT foo bar\r\nSTAT egg spam\r\nEND\r\n",
199 {"foo": "bar", "egg": "spam"})
202 def test_version(self):
204 Test version retrieval via the L{MemCacheProtocol.version} command: it
205 returns a L{Deferred} which is called back with the version sent by the
208 return self._test(self.proto.version(), "version\r\n",
209 "VERSION 1.1\r\n", "1.1")
212 def test_flushAll(self):
214 L{MemCacheProtocol.flushAll} returns a L{Deferred} which is called back
215 with C{True} if the server acknowledges success.
217 return self._test(self.proto.flushAll(), "flush_all\r\n",
222 class MemCacheTestCase(CommandMixin, TestCase):
224 Test client protocol class L{MemCacheProtocol}.
229 Create a memcache client, connect it to a string protocol, and make it
230 use a deterministic clock.
232 self.proto = MemCacheProtocol()
234 self.proto.callLater = self.clock.callLater
235 self.transport = StringTransportWithDisconnection()
236 self.transport.protocol = self.proto
237 self.proto.makeConnection(self.transport)
240 def _test(self, d, send, recv, result):
242 Implementation of C{_test} which checks that the command sends C{send}
243 data, and that upon reception of C{recv} the result is C{result}.
245 @param d: the resulting deferred from the memcache command.
248 @param send: the expected data to be sent.
251 @param recv: the data to simulate as reception.
254 @param result: the expected result.
258 self.assertEqual(res, result)
259 self.assertEqual(self.transport.value(), send)
261 self.proto.dataReceived(recv)
265 def test_invalidGetResponse(self):
267 If the value returned doesn't match the expected key of the current
268 C{get} command, an error is raised in L{MemCacheProtocol.dataReceived}.
270 self.proto.get("foo")
272 self.assertRaises(RuntimeError,
273 self.proto.dataReceived,
274 "VALUE bar 0 %s\r\n%s\r\nEND\r\n" % (len(s), s))
277 def test_invalidMultipleGetResponse(self):
279 If the value returned doesn't match one the expected keys of the
280 current multiple C{get} command, an error is raised error in
281 L{MemCacheProtocol.dataReceived}.
283 self.proto.getMultiple(["foo", "bar"])
285 self.assertRaises(RuntimeError,
286 self.proto.dataReceived,
287 "VALUE egg 0 %s\r\n%s\r\nEND\r\n" % (len(s), s))
290 def test_timeOut(self):
292 Test the timeout on outgoing requests: when timeout is detected, all
293 current commands fail with a L{TimeoutError}, and the connection is
296 d1 = self.proto.get("foo")
297 d2 = self.proto.get("bar")
299 self.proto.connectionLost = d3.callback
301 self.clock.advance(self.proto.persistentTimeOut)
302 self.assertFailure(d1, TimeoutError)
303 self.assertFailure(d2, TimeoutError)
304 def checkMessage(error):
305 self.assertEqual(str(error), "Connection timeout")
306 d1.addCallback(checkMessage)
307 return gatherResults([d1, d2, d3])
310 def test_timeoutRemoved(self):
312 When a request gets a response, no pending timeout call remains around.
314 d = self.proto.get("foo")
316 self.clock.advance(self.proto.persistentTimeOut - 1)
317 self.proto.dataReceived("VALUE foo 0 3\r\nbar\r\nEND\r\n")
320 self.assertEqual(result, (0, "bar"))
321 self.assertEqual(len(self.clock.calls), 0)
326 def test_timeOutRaw(self):
328 Test the timeout when raw mode was started: the timeout is not reset
329 until all the data has been received, so we can have a L{TimeoutError}
330 when waiting for raw data.
332 d1 = self.proto.get("foo")
334 self.proto.connectionLost = d2.callback
336 self.proto.dataReceived("VALUE foo 0 10\r\n12345")
337 self.clock.advance(self.proto.persistentTimeOut)
338 self.assertFailure(d1, TimeoutError)
339 return gatherResults([d1, d2])
342 def test_timeOutStat(self):
344 Test the timeout when stat command has started: the timeout is not
345 reset until the final B{END} is received.
347 d1 = self.proto.stats()
349 self.proto.connectionLost = d2.callback
351 self.proto.dataReceived("STAT foo bar\r\n")
352 self.clock.advance(self.proto.persistentTimeOut)
353 self.assertFailure(d1, TimeoutError)
354 return gatherResults([d1, d2])
357 def test_timeoutPipelining(self):
359 When two requests are sent, a timeout call remains around for the
360 second request, and its timeout time is correct.
362 d1 = self.proto.get("foo")
363 d2 = self.proto.get("bar")
365 self.proto.connectionLost = d3.callback
367 self.clock.advance(self.proto.persistentTimeOut - 1)
368 self.proto.dataReceived("VALUE foo 0 3\r\nbar\r\nEND\r\n")
371 self.assertEqual(result, (0, "bar"))
372 self.assertEqual(len(self.clock.calls), 1)
373 for i in range(self.proto.persistentTimeOut):
374 self.clock.advance(1)
375 return self.assertFailure(d2, TimeoutError).addCallback(checkTime)
376 def checkTime(ignored):
377 # Check that the timeout happened C{self.proto.persistentTimeOut}
378 # after the last response
380 self.clock.seconds(), 2 * self.proto.persistentTimeOut - 1)
381 d1.addCallback(check)
385 def test_timeoutNotReset(self):
387 Check that timeout is not resetted for every command, but keep the
388 timeout from the first command without response.
390 d1 = self.proto.get("foo")
392 self.proto.connectionLost = d3.callback
394 self.clock.advance(self.proto.persistentTimeOut - 1)
395 d2 = self.proto.get("bar")
396 self.clock.advance(1)
397 self.assertFailure(d1, TimeoutError)
398 self.assertFailure(d2, TimeoutError)
399 return gatherResults([d1, d2, d3])
402 def test_timeoutCleanDeferreds(self):
404 C{timeoutConnection} cleans the list of commands that it fires with
405 C{TimeoutError}: C{connectionLost} doesn't try to fire them again, but
406 sets the disconnected state so that future commands fail with a
409 d1 = self.proto.get("foo")
410 self.clock.advance(self.proto.persistentTimeOut)
411 self.assertFailure(d1, TimeoutError)
412 d2 = self.proto.get("bar")
413 self.assertFailure(d2, RuntimeError)
414 return gatherResults([d1, d2])
417 def test_connectionLost(self):
419 When disconnection occurs while commands are still outstanding, the
422 d1 = self.proto.get("foo")
423 d2 = self.proto.get("bar")
424 self.transport.loseConnection()
425 done = DeferredList([d1, d2], consumeErrors=True)
426 def checkFailures(results):
427 for success, result in results:
428 self.assertFalse(success)
429 result.trap(ConnectionDone)
430 return done.addCallback(checkFailures)
433 def test_tooLongKey(self):
435 An error is raised when trying to use a too long key: the called
436 command returns a L{Deferred} which fails with a L{ClientError}.
438 d1 = self.assertFailure(self.proto.set("a" * 500, "bar"), ClientError)
439 d2 = self.assertFailure(self.proto.increment("a" * 500), ClientError)
440 d3 = self.assertFailure(self.proto.get("a" * 500), ClientError)
441 d4 = self.assertFailure(
442 self.proto.append("a" * 500, "bar"), ClientError)
443 d5 = self.assertFailure(
444 self.proto.prepend("a" * 500, "bar"), ClientError)
445 d6 = self.assertFailure(
446 self.proto.getMultiple(["foo", "a" * 500]), ClientError)
447 return gatherResults([d1, d2, d3, d4, d5, d6])
450 def test_invalidCommand(self):
452 When an unknown command is sent directly (not through public API), the
453 server answers with an B{ERROR} token, and the command fails with
456 d = self.proto._set("egg", "foo", "bar", 0, 0, "")
457 self.assertEqual(self.transport.value(), "egg foo 0 0 3\r\nbar\r\n")
458 self.assertFailure(d, NoSuchCommand)
459 self.proto.dataReceived("ERROR\r\n")
463 def test_clientError(self):
465 Test the L{ClientError} error: when the server sends a B{CLIENT_ERROR}
466 token, the originating command fails with L{ClientError}, and the error
467 contains the text sent by the server.
470 d = self.proto.set("foo", a)
471 self.assertEqual(self.transport.value(),
472 "set foo 0 0 8\r\neggspamm\r\n")
473 self.assertFailure(d, ClientError)
475 self.assertEqual(str(err), "We don't like egg and spam")
477 self.proto.dataReceived("CLIENT_ERROR We don't like egg and spam\r\n")
481 def test_serverError(self):
483 Test the L{ServerError} error: when the server sends a B{SERVER_ERROR}
484 token, the originating command fails with L{ServerError}, and the error
485 contains the text sent by the server.
488 d = self.proto.set("foo", a)
489 self.assertEqual(self.transport.value(),
490 "set foo 0 0 8\r\neggspamm\r\n")
491 self.assertFailure(d, ServerError)
493 self.assertEqual(str(err), "zomg")
495 self.proto.dataReceived("SERVER_ERROR zomg\r\n")
499 def test_unicodeKey(self):
501 Using a non-string key as argument to commands raises an error.
503 d1 = self.assertFailure(self.proto.set(u"foo", "bar"), ClientError)
504 d2 = self.assertFailure(self.proto.increment(u"egg"), ClientError)
505 d3 = self.assertFailure(self.proto.get(1), ClientError)
506 d4 = self.assertFailure(self.proto.delete(u"bar"), ClientError)
507 d5 = self.assertFailure(self.proto.append(u"foo", "bar"), ClientError)
508 d6 = self.assertFailure(self.proto.prepend(u"foo", "bar"), ClientError)
509 d7 = self.assertFailure(
510 self.proto.getMultiple(["egg", 1]), ClientError)
511 return gatherResults([d1, d2, d3, d4, d5, d6, d7])
514 def test_unicodeValue(self):
516 Using a non-string value raises an error.
518 return self.assertFailure(self.proto.set("foo", u"bar"), ClientError)
521 def test_pipelining(self):
523 Multiple requests can be sent subsequently to the server, and the
524 protocol orders the responses correctly and dispatch to the
525 corresponding client command.
527 d1 = self.proto.get("foo")
528 d1.addCallback(self.assertEqual, (0, "bar"))
529 d2 = self.proto.set("bar", "spamspamspam")
530 d2.addCallback(self.assertEqual, True)
531 d3 = self.proto.get("egg")
532 d3.addCallback(self.assertEqual, (0, "spam"))
533 self.assertEqual(self.transport.value(),
534 "get foo\r\nset bar 0 0 12\r\nspamspamspam\r\nget egg\r\n")
535 self.proto.dataReceived("VALUE foo 0 3\r\nbar\r\nEND\r\n"
537 "VALUE egg 0 4\r\nspam\r\nEND\r\n")
538 return gatherResults([d1, d2, d3])
541 def test_getInChunks(self):
543 If the value retrieved by a C{get} arrive in chunks, the protocol
544 is able to reconstruct it and to produce the good value.
546 d = self.proto.get("foo")
547 d.addCallback(self.assertEqual, (0, "0123456789"))
548 self.assertEqual(self.transport.value(), "get foo\r\n")
549 self.proto.dataReceived("VALUE foo 0 10\r\n0123456")
550 self.proto.dataReceived("789")
551 self.proto.dataReceived("\r\nEND")
552 self.proto.dataReceived("\r\n")
556 def test_append(self):
558 L{MemCacheProtocol.append} behaves like a L{MemCacheProtocol.set}
559 method: it returns a L{Deferred} which is called back with C{True} when
560 the operation succeeds.
562 return self._test(self.proto.append("foo", "bar"),
563 "append foo 0 0 3\r\nbar\r\n", "STORED\r\n", True)
566 def test_prepend(self):
568 L{MemCacheProtocol.prepend} behaves like a L{MemCacheProtocol.set}
569 method: it returns a L{Deferred} which is called back with C{True} when
570 the operation succeeds.
572 return self._test(self.proto.prepend("foo", "bar"),
573 "prepend foo 0 0 3\r\nbar\r\n", "STORED\r\n", True)
578 L{MemCacheProtocol.get} handles an additional cas result when
579 C{withIdentifier} is C{True} and forward it in the resulting
582 return self._test(self.proto.get("foo", True), "gets foo\r\n",
583 "VALUE foo 0 3 1234\r\nbar\r\nEND\r\n", (0, "1234", "bar"))
586 def test_emptyGets(self):
588 Test getting a non-available key with gets: it succeeds but return
589 C{None} as value, C{0} as flag and an empty cas value.
591 return self._test(self.proto.get("foo", True), "gets foo\r\n",
592 "END\r\n", (0, "", None))
595 def test_getsMultiple(self):
597 L{MemCacheProtocol.getMultiple} handles an additional cas field in the
598 returned tuples if C{withIdentifier} is C{True}.
600 return self._test(self.proto.getMultiple(["foo", "bar"], True),
602 "VALUE foo 0 3 1234\r\negg\r\nVALUE bar 0 4 2345\r\nspam\r\nEND\r\n",
603 {'bar': (0, '2345', 'spam'), 'foo': (0, '1234', 'egg')})
606 def test_getsMultipleWithEmpty(self):
608 When getting a non-available key with L{MemCacheProtocol.getMultiple}
609 when C{withIdentifier} is C{True}, the other keys are retrieved
610 correctly, and the non-available key gets a tuple of C{0} as flag,
611 C{None} as value, and an empty cas value.
613 return self._test(self.proto.getMultiple(["foo", "bar"], True),
615 "VALUE foo 0 3 1234\r\negg\r\nEND\r\n",
616 {'bar': (0, '', None), 'foo': (0, '1234', 'egg')})
619 def test_checkAndSet(self):
621 L{MemCacheProtocol.checkAndSet} passes an additional cas identifier
622 that the server handles to check if the data has to be updated.
624 return self._test(self.proto.checkAndSet("foo", "bar", cas="1234"),
625 "cas foo 0 0 3 1234\r\nbar\r\n", "STORED\r\n", True)
628 def test_casUnknowKey(self):
630 When L{MemCacheProtocol.checkAndSet} response is C{EXISTS}, the
631 resulting L{Deferred} fires with C{False}.
633 return self._test(self.proto.checkAndSet("foo", "bar", cas="1234"),
634 "cas foo 0 0 3 1234\r\nbar\r\n", "EXISTS\r\n", False)
638 class CommandFailureTests(CommandMixin, TestCase):
640 Tests for correct failure of commands on a disconnected
646 Create a disconnected memcache client, using a deterministic clock.
648 self.proto = MemCacheProtocol()
650 self.proto.callLater = self.clock.callLater
651 self.transport = StringTransportWithDisconnection()
652 self.transport.protocol = self.proto
653 self.proto.makeConnection(self.transport)
654 self.transport.loseConnection()
657 def _test(self, d, send, recv, result):
659 Implementation of C{_test} which checks that the command fails with
660 C{RuntimeError} because the transport is disconnected. All the
661 parameters except C{d} are ignored.
663 return self.assertFailure(d, RuntimeError)