1 # -*- test-case-name: twisted.test.test_memcache -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Memcache client protocol. Memcached is a caching server, storing data in the
7 form of pairs key/value, and memcache is the protocol to talk with it.
9 To connect to a server, create a factory for L{MemCacheProtocol}::
11 from twisted.internet import reactor, protocol
12 from twisted.protocols.memcache import MemCacheProtocol, DEFAULT_PORT
13 d = protocol.ClientCreator(reactor, MemCacheProtocol
14 ).connectTCP("localhost", DEFAULT_PORT)
15 def doSomething(proto):
16 # Here you call the memcache operations
17 return proto.set("mykey", "a lot of data")
18 d.addCallback(doSomething)
21 All the operations of the memcache protocol are present, but
22 L{MemCacheProtocol.set} and L{MemCacheProtocol.get} are the more important.
24 See U{http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt} for
25 more information about the protocol.
29 from collections import deque
36 from twisted.protocols.basic import LineReceiver
37 from twisted.protocols.policies import TimeoutMixin
38 from twisted.internet.defer import Deferred, fail, TimeoutError
39 from twisted.python import log
47 class NoSuchCommand(Exception):
49 Exception raised when a non existent command is called.
54 class ClientError(Exception):
56 Error caused by an invalid client call.
61 class ServerError(Exception):
63 Problem happening on the server.
68 class Command(object):
70 Wrap a client action into an object, that holds the values used in the
73 @ivar _deferred: the L{Deferred} object that will be fired when the result
75 @type _deferred: L{Deferred}
77 @ivar command: name of the command sent to the server.
81 def __init__(self, command, **kwargs):
85 @param command: the name of the command.
88 @param kwargs: this values will be stored as attributes of the object
91 self.command = command
92 self._deferred = Deferred()
93 for k, v in kwargs.items():
97 def success(self, value):
99 Shortcut method to fire the underlying deferred.
101 self._deferred.callback(value)
104 def fail(self, error):
106 Make the underlying deferred fails.
108 self._deferred.errback(error)
112 class MemCacheProtocol(LineReceiver, TimeoutMixin):
114 MemCache protocol: connect to a memcached server to store/retrieve values.
116 @ivar persistentTimeOut: the timeout period used to wait for a response.
117 @type persistentTimeOut: C{int}
119 @ivar _current: current list of requests waiting for an answer from the
121 @type _current: C{deque} of L{Command}
123 @ivar _lenExpected: amount of data expected in raw mode, when reading for
125 @type _lenExpected: C{int}
127 @ivar _getBuffer: current buffer of data, used to store temporary data
128 when reading in raw mode.
129 @type _getBuffer: C{list}
131 @ivar _bufferLength: the total amount of bytes in C{_getBuffer}.
132 @type _bufferLength: C{int}
134 @ivar _disconnected: indicate if the connectionLost has been called or not.
135 @type _disconnected: C{bool}
138 _disconnected = False
140 def __init__(self, timeOut=60):
144 @param timeOut: the timeout to wait before detecting that the
145 connection is dead and close it. It's expressed in seconds.
146 @type timeOut: C{int}
148 self._current = deque()
149 self._lenExpected = None
150 self._getBuffer = None
151 self._bufferLength = None
152 self.persistentTimeOut = self.timeOut = timeOut
155 def _cancelCommands(self, reason):
157 Cancel all the outstanding commands, making them fail with C{reason}.
160 cmd = self._current.popleft()
164 def timeoutConnection(self):
166 Close the connection in case of timeout.
168 self._cancelCommands(TimeoutError("Connection timeout"))
169 self.transport.loseConnection()
172 def connectionLost(self, reason):
174 Cause any outstanding commands to fail.
176 self._disconnected = True
177 self._cancelCommands(reason)
178 LineReceiver.connectionLost(self, reason)
181 def sendLine(self, line):
183 Override sendLine to add a timeout to response.
185 if not self._current:
186 self.setTimeout(self.persistentTimeOut)
187 LineReceiver.sendLine(self, line)
190 def rawDataReceived(self, data):
192 Collect data for a get.
195 self._getBuffer.append(data)
196 self._bufferLength += len(data)
197 if self._bufferLength >= self._lenExpected + 2:
198 data = "".join(self._getBuffer)
199 buf = data[:self._lenExpected]
200 rem = data[self._lenExpected + 2:]
202 self._lenExpected = None
203 self._getBuffer = None
204 self._bufferLength = None
205 cmd = self._current[0]
207 flags, cas = cmd.values[cmd.currentKey]
208 cmd.values[cmd.currentKey] = (flags, cas, val)
211 self.setLineMode(rem)
214 def cmd_STORED(self):
216 Manage a success response to a set operation.
218 self._current.popleft().success(True)
221 def cmd_NOT_STORED(self):
223 Manage a specific 'not stored' response to a set operation: this is not
224 an error, but some condition wasn't met.
226 self._current.popleft().success(False)
231 This the end token to a get or a stat operation.
233 cmd = self._current.popleft()
234 if cmd.command == "get":
236 values = dict([(key, val[::2]) for key, val in
237 cmd.values.iteritems()])
240 cmd.success((cmd.flags, cmd.value))
241 elif cmd.command == "gets":
243 cmd.success(cmd.values)
245 cmd.success((cmd.flags, cmd.cas, cmd.value))
246 elif cmd.command == "stats":
247 cmd.success(cmd.values)
250 def cmd_NOT_FOUND(self):
252 Manage error response for incr/decr/delete.
254 self._current.popleft().success(False)
257 def cmd_VALUE(self, line):
259 Prepare the reading a value after a get.
261 cmd = self._current[0]
262 if cmd.command == "get":
263 key, flags, length = line.split()
266 key, flags, length, cas = line.split()
267 self._lenExpected = int(length)
269 self._bufferLength = 0
271 if key not in cmd.keys:
272 raise RuntimeError("Unexpected commands answer.")
274 cmd.values[key] = [int(flags), cas]
277 raise RuntimeError("Unexpected commands answer.")
278 cmd.flags = int(flags)
283 def cmd_STAT(self, line):
285 Reception of one stat line.
287 cmd = self._current[0]
288 key, val = line.split(" ", 1)
289 cmd.values[key] = val
292 def cmd_VERSION(self, versionData):
296 self._current.popleft().success(versionData)
301 An non-existent command has been sent.
303 log.err("Non-existent command sent.")
304 cmd = self._current.popleft()
305 cmd.fail(NoSuchCommand())
308 def cmd_CLIENT_ERROR(self, errText):
310 An invalid input as been sent.
312 log.err("Invalid input: %s" % (errText,))
313 cmd = self._current.popleft()
314 cmd.fail(ClientError(errText))
317 def cmd_SERVER_ERROR(self, errText):
319 An error has happened server-side.
321 log.err("Server error: %s" % (errText,))
322 cmd = self._current.popleft()
323 cmd.fail(ServerError(errText))
326 def cmd_DELETED(self):
328 A delete command has completed successfully.
330 self._current.popleft().success(True)
335 The last command has been completed.
337 self._current.popleft().success(True)
340 def cmd_EXISTS(self):
342 A C{checkAndSet} update has failed.
344 self._current.popleft().success(False)
347 def lineReceived(self, line):
349 Receive line commands from the server.
352 token = line.split(" ", 1)[0]
353 # First manage standard commands without space
354 cmd = getattr(self, "cmd_%s" % (token,), None)
356 args = line.split(" ", 1)[1:]
362 # Then manage commands with space in it
363 line = line.replace(" ", "_")
364 cmd = getattr(self, "cmd_%s" % (line,), None)
368 # Increment/Decrement response
369 cmd = self._current.popleft()
372 if not self._current:
373 # No pending request, remove timeout
374 self.setTimeout(None)
377 def increment(self, key, val=1):
379 Increment the value of C{key} by given value (default to 1).
380 C{key} must be consistent with an int. Return the new value.
382 @param key: the key to modify.
385 @param val: the value to increment.
388 @return: a deferred with will be called back with the new value
389 associated with the key (after the increment).
392 return self._incrdecr("incr", key, val)
395 def decrement(self, key, val=1):
397 Decrement the value of C{key} by given value (default to 1).
398 C{key} must be consistent with an int. Return the new value, coerced to
401 @param key: the key to modify.
404 @param val: the value to decrement.
407 @return: a deferred with will be called back with the new value
408 associated with the key (after the decrement).
411 return self._incrdecr("decr", key, val)
414 def _incrdecr(self, cmd, key, val):
416 Internal wrapper for incr/decr.
418 if self._disconnected:
419 return fail(RuntimeError("not connected"))
420 if not isinstance(key, str):
421 return fail(ClientError(
422 "Invalid type for key: %s, expecting a string" % (type(key),)))
423 if len(key) > self.MAX_KEY_LENGTH:
424 return fail(ClientError("Key too long"))
425 fullcmd = "%s %s %d" % (cmd, key, int(val))
426 self.sendLine(fullcmd)
427 cmdObj = Command(cmd, key=key)
428 self._current.append(cmdObj)
429 return cmdObj._deferred
432 def replace(self, key, val, flags=0, expireTime=0):
434 Replace the given C{key}. It must already exist in the server.
436 @param key: the key to replace.
439 @param val: the new value associated with the key.
442 @param flags: the flags to store with the key.
445 @param expireTime: if different from 0, the relative time in seconds
446 when the key will be deleted from the store.
447 @type expireTime: C{int}
449 @return: a deferred that will fire with C{True} if the operation has
450 succeeded, and C{False} with the key didn't previously exist.
453 return self._set("replace", key, val, flags, expireTime, "")
456 def add(self, key, val, flags=0, expireTime=0):
458 Add the given C{key}. It must not exist in the server.
460 @param key: the key to add.
463 @param val: the value associated with the key.
466 @param flags: the flags to store with the key.
469 @param expireTime: if different from 0, the relative time in seconds
470 when the key will be deleted from the store.
471 @type expireTime: C{int}
473 @return: a deferred that will fire with C{True} if the operation has
474 succeeded, and C{False} with the key already exists.
477 return self._set("add", key, val, flags, expireTime, "")
480 def set(self, key, val, flags=0, expireTime=0):
482 Set the given C{key}.
484 @param key: the key to set.
487 @param val: the value associated with the key.
490 @param flags: the flags to store with the key.
493 @param expireTime: if different from 0, the relative time in seconds
494 when the key will be deleted from the store.
495 @type expireTime: C{int}
497 @return: a deferred that will fire with C{True} if the operation has
501 return self._set("set", key, val, flags, expireTime, "")
504 def checkAndSet(self, key, val, cas, flags=0, expireTime=0):
506 Change the content of C{key} only if the C{cas} value matches the
507 current one associated with the key. Use this to store a value which
508 hasn't been modified since last time you fetched it.
510 @param key: The key to set.
513 @param val: The value associated with the key.
516 @param cas: Unique 64-bit value returned by previous call of C{get}.
519 @param flags: The flags to store with the key.
522 @param expireTime: If different from 0, the relative time in seconds
523 when the key will be deleted from the store.
524 @type expireTime: C{int}
526 @return: A deferred that will fire with C{True} if the operation has
527 succeeded, C{False} otherwise.
530 return self._set("cas", key, val, flags, expireTime, cas)
533 def _set(self, cmd, key, val, flags, expireTime, cas):
535 Internal wrapper for setting values.
537 if self._disconnected:
538 return fail(RuntimeError("not connected"))
539 if not isinstance(key, str):
540 return fail(ClientError(
541 "Invalid type for key: %s, expecting a string" % (type(key),)))
542 if len(key) > self.MAX_KEY_LENGTH:
543 return fail(ClientError("Key too long"))
544 if not isinstance(val, str):
545 return fail(ClientError(
546 "Invalid type for value: %s, expecting a string" %
551 fullcmd = "%s %s %d %d %d%s" % (
552 cmd, key, flags, expireTime, length, cas)
553 self.sendLine(fullcmd)
555 cmdObj = Command(cmd, key=key, flags=flags, length=length)
556 self._current.append(cmdObj)
557 return cmdObj._deferred
560 def append(self, key, val):
562 Append given data to the value of an existing key.
564 @param key: The key to modify.
567 @param val: The value to append to the current value associated with
571 @return: A deferred that will fire with C{True} if the operation has
572 succeeded, C{False} otherwise.
575 # Even if flags and expTime values are ignored, we have to pass them
576 return self._set("append", key, val, 0, 0, "")
579 def prepend(self, key, val):
581 Prepend given data to the value of an existing key.
583 @param key: The key to modify.
586 @param val: The value to prepend to the current value associated with
590 @return: A deferred that will fire with C{True} if the operation has
591 succeeded, C{False} otherwise.
594 # Even if flags and expTime values are ignored, we have to pass them
595 return self._set("prepend", key, val, 0, 0, "")
598 def get(self, key, withIdentifier=False):
600 Get the given C{key}. It doesn't support multiple keys. If
601 C{withIdentifier} is set to C{True}, the command issued is a C{gets},
602 that will return the current identifier associated with the value. This
603 identifier has to be used when issuing C{checkAndSet} update later,
604 using the corresponding method.
606 @param key: The key to retrieve.
609 @param withIdentifier: If set to C{True}, retrieve the current
610 identifier along with the value and the flags.
611 @type withIdentifier: C{bool}
613 @return: A deferred that will fire with the tuple (flags, value) if
614 C{withIdentifier} is C{False}, or (flags, cas identifier, value)
615 if C{True}. If the server indicates there is no value
616 associated with C{key}, the returned value will be C{None} and
617 the returned flags will be C{0}.
620 return self._get([key], withIdentifier, False)
623 def getMultiple(self, keys, withIdentifier=False):
625 Get the given list of C{keys}. If C{withIdentifier} is set to C{True},
626 the command issued is a C{gets}, that will return the identifiers
627 associated with each values. This identifier has to be used when
628 issuing C{checkAndSet} update later, using the corresponding method.
630 @param keys: The keys to retrieve.
631 @type keys: C{list} of C{str}
633 @param withIdentifier: If set to C{True}, retrieve the identifiers
634 along with the values and the flags.
635 @type withIdentifier: C{bool}
637 @return: A deferred that will fire with a dictionary with the elements
638 of C{keys} as keys and the tuples (flags, value) as values if
639 C{withIdentifier} is C{False}, or (flags, cas identifier, value) if
640 C{True}. If the server indicates there is no value associated with
641 C{key}, the returned values will be C{None} and the returned flags
647 return self._get(keys, withIdentifier, True)
649 def _get(self, keys, withIdentifier, multiple):
651 Helper method for C{get} and C{getMultiple}.
653 if self._disconnected:
654 return fail(RuntimeError("not connected"))
656 if not isinstance(key, str):
657 return fail(ClientError(
658 "Invalid type for key: %s, expecting a string" % (type(key),)))
659 if len(key) > self.MAX_KEY_LENGTH:
660 return fail(ClientError("Key too long"))
665 fullcmd = "%s %s" % (cmd, " ".join(keys))
666 self.sendLine(fullcmd)
668 values = dict([(key, (0, "", None)) for key in keys])
669 cmdObj = Command(cmd, keys=keys, values=values, multiple=True)
671 cmdObj = Command(cmd, key=keys[0], value=None, flags=0, cas="",
673 self._current.append(cmdObj)
674 return cmdObj._deferred
676 def stats(self, arg=None):
678 Get some stats from the server. It will be available as a dict.
680 @param arg: An optional additional string which will be sent along
681 with the I{stats} command. The interpretation of this value by
682 the server is left undefined by the memcache protocol
684 @type arg: L{NoneType} or L{str}
686 @return: a deferred that will fire with a C{dict} of the available
694 if self._disconnected:
695 return fail(RuntimeError("not connected"))
697 cmdObj = Command("stats", values={})
698 self._current.append(cmdObj)
699 return cmdObj._deferred
704 Get the version of the server.
706 @return: a deferred that will fire with the string value of the
710 if self._disconnected:
711 return fail(RuntimeError("not connected"))
712 self.sendLine("version")
713 cmdObj = Command("version")
714 self._current.append(cmdObj)
715 return cmdObj._deferred
718 def delete(self, key):
720 Delete an existing C{key}.
722 @param key: the key to delete.
725 @return: a deferred that will be called back with C{True} if the key
726 was successfully deleted, or C{False} if not.
729 if self._disconnected:
730 return fail(RuntimeError("not connected"))
731 if not isinstance(key, str):
732 return fail(ClientError(
733 "Invalid type for key: %s, expecting a string" % (type(key),)))
734 self.sendLine("delete %s" % key)
735 cmdObj = Command("delete", key=key)
736 self._current.append(cmdObj)
737 return cmdObj._deferred
742 Flush all cached values.
744 @return: a deferred that will be called back with C{True} when the
745 operation has succeeded.
748 if self._disconnected:
749 return fail(RuntimeError("not connected"))
750 self.sendLine("flush_all")
751 cmdObj = Command("flush_all")
752 self._current.append(cmdObj)
753 return cmdObj._deferred
757 __all__ = ["MemCacheProtocol", "DEFAULT_PORT", "NoSuchCommand", "ClientError",