Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / protocols / memcache.py
1 # -*- test-case-name: twisted.test.test_memcache -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Memcache client protocol. Memcached is a caching server, storing data in the
7 form of pairs key/value, and memcache is the protocol to talk with it.
8
9 To connect to a server, create a factory for L{MemCacheProtocol}::
10
11     from twisted.internet import reactor, protocol
12     from twisted.protocols.memcache import MemCacheProtocol, DEFAULT_PORT
13     d = protocol.ClientCreator(reactor, MemCacheProtocol
14         ).connectTCP("localhost", DEFAULT_PORT)
15     def doSomething(proto):
16         # Here you call the memcache operations
17         return proto.set("mykey", "a lot of data")
18     d.addCallback(doSomething)
19     reactor.run()
20
21 All the operations of the memcache protocol are present, but
22 L{MemCacheProtocol.set} and L{MemCacheProtocol.get} are the more important.
23
24 See U{http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt} for
25 more information about the protocol.
26 """
27
28 try:
29     from collections import deque
30 except ImportError:
31     class deque(list):
32         def popleft(self):
33             return self.pop(0)
34
35
36 from twisted.protocols.basic import LineReceiver
37 from twisted.protocols.policies import TimeoutMixin
38 from twisted.internet.defer import Deferred, fail, TimeoutError
39 from twisted.python import log
40
41
42
43 DEFAULT_PORT = 11211
44
45
46
47 class NoSuchCommand(Exception):
48     """
49     Exception raised when a non existent command is called.
50     """
51
52
53
54 class ClientError(Exception):
55     """
56     Error caused by an invalid client call.
57     """
58
59
60
61 class ServerError(Exception):
62     """
63     Problem happening on the server.
64     """
65
66
67
68 class Command(object):
69     """
70     Wrap a client action into an object, that holds the values used in the
71     protocol.
72
73     @ivar _deferred: the L{Deferred} object that will be fired when the result
74         arrives.
75     @type _deferred: L{Deferred}
76
77     @ivar command: name of the command sent to the server.
78     @type command: C{str}
79     """
80
81     def __init__(self, command, **kwargs):
82         """
83         Create a command.
84
85         @param command: the name of the command.
86         @type command: C{str}
87
88         @param kwargs: this values will be stored as attributes of the object
89             for future use
90         """
91         self.command = command
92         self._deferred = Deferred()
93         for k, v in kwargs.items():
94             setattr(self, k, v)
95
96
97     def success(self, value):
98         """
99         Shortcut method to fire the underlying deferred.
100         """
101         self._deferred.callback(value)
102
103
104     def fail(self, error):
105         """
106         Make the underlying deferred fails.
107         """
108         self._deferred.errback(error)
109
110
111
112 class MemCacheProtocol(LineReceiver, TimeoutMixin):
113     """
114     MemCache protocol: connect to a memcached server to store/retrieve values.
115
116     @ivar persistentTimeOut: the timeout period used to wait for a response.
117     @type persistentTimeOut: C{int}
118
119     @ivar _current: current list of requests waiting for an answer from the
120         server.
121     @type _current: C{deque} of L{Command}
122
123     @ivar _lenExpected: amount of data expected in raw mode, when reading for
124         a value.
125     @type _lenExpected: C{int}
126
127     @ivar _getBuffer: current buffer of data, used to store temporary data
128         when reading in raw mode.
129     @type _getBuffer: C{list}
130
131     @ivar _bufferLength: the total amount of bytes in C{_getBuffer}.
132     @type _bufferLength: C{int}
133
134     @ivar _disconnected: indicate if the connectionLost has been called or not.
135     @type _disconnected: C{bool}
136     """
137     MAX_KEY_LENGTH = 250
138     _disconnected = False
139
140     def __init__(self, timeOut=60):
141         """
142         Create the protocol.
143
144         @param timeOut: the timeout to wait before detecting that the
145             connection is dead and close it. It's expressed in seconds.
146         @type timeOut: C{int}
147         """
148         self._current = deque()
149         self._lenExpected = None
150         self._getBuffer = None
151         self._bufferLength = None
152         self.persistentTimeOut = self.timeOut = timeOut
153
154
155     def _cancelCommands(self, reason):
156         """
157         Cancel all the outstanding commands, making them fail with C{reason}.
158         """
159         while self._current:
160             cmd = self._current.popleft()
161             cmd.fail(reason)
162
163
164     def timeoutConnection(self):
165         """
166         Close the connection in case of timeout.
167         """
168         self._cancelCommands(TimeoutError("Connection timeout"))
169         self.transport.loseConnection()
170
171
172     def connectionLost(self, reason):
173         """
174         Cause any outstanding commands to fail.
175         """
176         self._disconnected = True
177         self._cancelCommands(reason)
178         LineReceiver.connectionLost(self, reason)
179
180
181     def sendLine(self, line):
182         """
183         Override sendLine to add a timeout to response.
184         """
185         if not self._current:
186             self.setTimeout(self.persistentTimeOut)
187         LineReceiver.sendLine(self, line)
188
189
190     def rawDataReceived(self, data):
191         """
192         Collect data for a get.
193         """
194         self.resetTimeout()
195         self._getBuffer.append(data)
196         self._bufferLength += len(data)
197         if self._bufferLength >= self._lenExpected + 2:
198             data = "".join(self._getBuffer)
199             buf = data[:self._lenExpected]
200             rem = data[self._lenExpected + 2:]
201             val = buf
202             self._lenExpected = None
203             self._getBuffer = None
204             self._bufferLength = None
205             cmd = self._current[0]
206             if cmd.multiple:
207                 flags, cas = cmd.values[cmd.currentKey]
208                 cmd.values[cmd.currentKey] = (flags, cas, val)
209             else:
210                 cmd.value = val
211             self.setLineMode(rem)
212
213
214     def cmd_STORED(self):
215         """
216         Manage a success response to a set operation.
217         """
218         self._current.popleft().success(True)
219
220
221     def cmd_NOT_STORED(self):
222         """
223         Manage a specific 'not stored' response to a set operation: this is not
224         an error, but some condition wasn't met.
225         """
226         self._current.popleft().success(False)
227
228
229     def cmd_END(self):
230         """
231         This the end token to a get or a stat operation.
232         """
233         cmd = self._current.popleft()
234         if cmd.command == "get":
235             if cmd.multiple:
236                 values = dict([(key, val[::2]) for key, val in
237                                cmd.values.iteritems()])
238                 cmd.success(values)
239             else:
240                 cmd.success((cmd.flags, cmd.value))
241         elif cmd.command == "gets":
242             if cmd.multiple:
243                 cmd.success(cmd.values)
244             else:
245                 cmd.success((cmd.flags, cmd.cas, cmd.value))
246         elif cmd.command == "stats":
247             cmd.success(cmd.values)
248
249
250     def cmd_NOT_FOUND(self):
251         """
252         Manage error response for incr/decr/delete.
253         """
254         self._current.popleft().success(False)
255
256
257     def cmd_VALUE(self, line):
258         """
259         Prepare the reading a value after a get.
260         """
261         cmd = self._current[0]
262         if cmd.command == "get":
263             key, flags, length = line.split()
264             cas = ""
265         else:
266             key, flags, length, cas = line.split()
267         self._lenExpected = int(length)
268         self._getBuffer = []
269         self._bufferLength = 0
270         if cmd.multiple:
271             if key not in cmd.keys:
272                 raise RuntimeError("Unexpected commands answer.")
273             cmd.currentKey = key
274             cmd.values[key] = [int(flags), cas]
275         else:
276             if cmd.key != key:
277                 raise RuntimeError("Unexpected commands answer.")
278             cmd.flags = int(flags)
279             cmd.cas = cas
280         self.setRawMode()
281
282
283     def cmd_STAT(self, line):
284         """
285         Reception of one stat line.
286         """
287         cmd = self._current[0]
288         key, val = line.split(" ", 1)
289         cmd.values[key] = val
290
291
292     def cmd_VERSION(self, versionData):
293         """
294         Read version token.
295         """
296         self._current.popleft().success(versionData)
297
298
299     def cmd_ERROR(self):
300         """
301         An non-existent command has been sent.
302         """
303         log.err("Non-existent command sent.")
304         cmd = self._current.popleft()
305         cmd.fail(NoSuchCommand())
306
307
308     def cmd_CLIENT_ERROR(self, errText):
309         """
310         An invalid input as been sent.
311         """
312         log.err("Invalid input: %s" % (errText,))
313         cmd = self._current.popleft()
314         cmd.fail(ClientError(errText))
315
316
317     def cmd_SERVER_ERROR(self, errText):
318         """
319         An error has happened server-side.
320         """
321         log.err("Server error: %s" % (errText,))
322         cmd = self._current.popleft()
323         cmd.fail(ServerError(errText))
324
325
326     def cmd_DELETED(self):
327         """
328         A delete command has completed successfully.
329         """
330         self._current.popleft().success(True)
331
332
333     def cmd_OK(self):
334         """
335         The last command has been completed.
336         """
337         self._current.popleft().success(True)
338
339
340     def cmd_EXISTS(self):
341         """
342         A C{checkAndSet} update has failed.
343         """
344         self._current.popleft().success(False)
345
346
347     def lineReceived(self, line):
348         """
349         Receive line commands from the server.
350         """
351         self.resetTimeout()
352         token = line.split(" ", 1)[0]
353         # First manage standard commands without space
354         cmd = getattr(self, "cmd_%s" % (token,), None)
355         if cmd is not None:
356             args = line.split(" ", 1)[1:]
357             if args:
358                 cmd(args[0])
359             else:
360                 cmd()
361         else:
362             # Then manage commands with space in it
363             line = line.replace(" ", "_")
364             cmd = getattr(self, "cmd_%s" % (line,), None)
365             if cmd is not None:
366                 cmd()
367             else:
368                 # Increment/Decrement response
369                 cmd = self._current.popleft()
370                 val = int(line)
371                 cmd.success(val)
372         if not self._current:
373             # No pending request, remove timeout
374             self.setTimeout(None)
375
376
377     def increment(self, key, val=1):
378         """
379         Increment the value of C{key} by given value (default to 1).
380         C{key} must be consistent with an int. Return the new value.
381
382         @param key: the key to modify.
383         @type key: C{str}
384
385         @param val: the value to increment.
386         @type val: C{int}
387
388         @return: a deferred with will be called back with the new value
389             associated with the key (after the increment).
390         @rtype: L{Deferred}
391         """
392         return self._incrdecr("incr", key, val)
393
394
395     def decrement(self, key, val=1):
396         """
397         Decrement the value of C{key} by given value (default to 1).
398         C{key} must be consistent with an int. Return the new value, coerced to
399         0 if negative.
400
401         @param key: the key to modify.
402         @type key: C{str}
403
404         @param val: the value to decrement.
405         @type val: C{int}
406
407         @return: a deferred with will be called back with the new value
408             associated with the key (after the decrement).
409         @rtype: L{Deferred}
410         """
411         return self._incrdecr("decr", key, val)
412
413
414     def _incrdecr(self, cmd, key, val):
415         """
416         Internal wrapper for incr/decr.
417         """
418         if self._disconnected:
419             return fail(RuntimeError("not connected"))
420         if not isinstance(key, str):
421             return fail(ClientError(
422                 "Invalid type for key: %s, expecting a string" % (type(key),)))
423         if len(key) > self.MAX_KEY_LENGTH:
424             return fail(ClientError("Key too long"))
425         fullcmd = "%s %s %d" % (cmd, key, int(val))
426         self.sendLine(fullcmd)
427         cmdObj = Command(cmd, key=key)
428         self._current.append(cmdObj)
429         return cmdObj._deferred
430
431
432     def replace(self, key, val, flags=0, expireTime=0):
433         """
434         Replace the given C{key}. It must already exist in the server.
435
436         @param key: the key to replace.
437         @type key: C{str}
438
439         @param val: the new value associated with the key.
440         @type val: C{str}
441
442         @param flags: the flags to store with the key.
443         @type flags: C{int}
444
445         @param expireTime: if different from 0, the relative time in seconds
446             when the key will be deleted from the store.
447         @type expireTime: C{int}
448
449         @return: a deferred that will fire with C{True} if the operation has
450             succeeded, and C{False} with the key didn't previously exist.
451         @rtype: L{Deferred}
452         """
453         return self._set("replace", key, val, flags, expireTime, "")
454
455
456     def add(self, key, val, flags=0, expireTime=0):
457         """
458         Add the given C{key}. It must not exist in the server.
459
460         @param key: the key to add.
461         @type key: C{str}
462
463         @param val: the value associated with the key.
464         @type val: C{str}
465
466         @param flags: the flags to store with the key.
467         @type flags: C{int}
468
469         @param expireTime: if different from 0, the relative time in seconds
470             when the key will be deleted from the store.
471         @type expireTime: C{int}
472
473         @return: a deferred that will fire with C{True} if the operation has
474             succeeded, and C{False} with the key already exists.
475         @rtype: L{Deferred}
476         """
477         return self._set("add", key, val, flags, expireTime, "")
478
479
480     def set(self, key, val, flags=0, expireTime=0):
481         """
482         Set the given C{key}.
483
484         @param key: the key to set.
485         @type key: C{str}
486
487         @param val: the value associated with the key.
488         @type val: C{str}
489
490         @param flags: the flags to store with the key.
491         @type flags: C{int}
492
493         @param expireTime: if different from 0, the relative time in seconds
494             when the key will be deleted from the store.
495         @type expireTime: C{int}
496
497         @return: a deferred that will fire with C{True} if the operation has
498             succeeded.
499         @rtype: L{Deferred}
500         """
501         return self._set("set", key, val, flags, expireTime, "")
502
503
504     def checkAndSet(self, key, val, cas, flags=0, expireTime=0):
505         """
506         Change the content of C{key} only if the C{cas} value matches the
507         current one associated with the key. Use this to store a value which
508         hasn't been modified since last time you fetched it.
509
510         @param key: The key to set.
511         @type key: C{str}
512
513         @param val: The value associated with the key.
514         @type val: C{str}
515
516         @param cas: Unique 64-bit value returned by previous call of C{get}.
517         @type cas: C{str}
518
519         @param flags: The flags to store with the key.
520         @type flags: C{int}
521
522         @param expireTime: If different from 0, the relative time in seconds
523             when the key will be deleted from the store.
524         @type expireTime: C{int}
525
526         @return: A deferred that will fire with C{True} if the operation has
527             succeeded, C{False} otherwise.
528         @rtype: L{Deferred}
529         """
530         return self._set("cas", key, val, flags, expireTime, cas)
531
532
533     def _set(self, cmd, key, val, flags, expireTime, cas):
534         """
535         Internal wrapper for setting values.
536         """
537         if self._disconnected:
538             return fail(RuntimeError("not connected"))
539         if not isinstance(key, str):
540             return fail(ClientError(
541                 "Invalid type for key: %s, expecting a string" % (type(key),)))
542         if len(key) > self.MAX_KEY_LENGTH:
543             return fail(ClientError("Key too long"))
544         if not isinstance(val, str):
545             return fail(ClientError(
546                 "Invalid type for value: %s, expecting a string" %
547                 (type(val),)))
548         if cas:
549             cas = " " + cas
550         length = len(val)
551         fullcmd = "%s %s %d %d %d%s" % (
552             cmd, key, flags, expireTime, length, cas)
553         self.sendLine(fullcmd)
554         self.sendLine(val)
555         cmdObj = Command(cmd, key=key, flags=flags, length=length)
556         self._current.append(cmdObj)
557         return cmdObj._deferred
558
559
560     def append(self, key, val):
561         """
562         Append given data to the value of an existing key.
563
564         @param key: The key to modify.
565         @type key: C{str}
566
567         @param val: The value to append to the current value associated with
568             the key.
569         @type val: C{str}
570
571         @return: A deferred that will fire with C{True} if the operation has
572             succeeded, C{False} otherwise.
573         @rtype: L{Deferred}
574         """
575         # Even if flags and expTime values are ignored, we have to pass them
576         return self._set("append", key, val, 0, 0, "")
577
578
579     def prepend(self, key, val):
580         """
581         Prepend given data to the value of an existing key.
582
583         @param key: The key to modify.
584         @type key: C{str}
585
586         @param val: The value to prepend to the current value associated with
587             the key.
588         @type val: C{str}
589
590         @return: A deferred that will fire with C{True} if the operation has
591             succeeded, C{False} otherwise.
592         @rtype: L{Deferred}
593         """
594         # Even if flags and expTime values are ignored, we have to pass them
595         return self._set("prepend", key, val, 0, 0, "")
596
597
598     def get(self, key, withIdentifier=False):
599         """
600         Get the given C{key}. It doesn't support multiple keys. If
601         C{withIdentifier} is set to C{True}, the command issued is a C{gets},
602         that will return the current identifier associated with the value. This
603         identifier has to be used when issuing C{checkAndSet} update later,
604         using the corresponding method.
605
606         @param key: The key to retrieve.
607         @type key: C{str}
608
609         @param withIdentifier: If set to C{True}, retrieve the current
610             identifier along with the value and the flags.
611         @type withIdentifier: C{bool}
612
613         @return: A deferred that will fire with the tuple (flags, value) if
614             C{withIdentifier} is C{False}, or (flags, cas identifier, value)
615             if C{True}.  If the server indicates there is no value
616             associated with C{key}, the returned value will be C{None} and
617             the returned flags will be C{0}.
618         @rtype: L{Deferred}
619         """
620         return self._get([key], withIdentifier, False)
621
622
623     def getMultiple(self, keys, withIdentifier=False):
624         """
625         Get the given list of C{keys}.  If C{withIdentifier} is set to C{True},
626         the command issued is a C{gets}, that will return the identifiers
627         associated with each values. This identifier has to be used when
628         issuing C{checkAndSet} update later, using the corresponding method.
629
630         @param keys: The keys to retrieve.
631         @type keys: C{list} of C{str}
632
633         @param withIdentifier: If set to C{True}, retrieve the identifiers
634             along with the values and the flags.
635         @type withIdentifier: C{bool}
636
637         @return: A deferred that will fire with a dictionary with the elements
638             of C{keys} as keys and the tuples (flags, value) as values if
639             C{withIdentifier} is C{False}, or (flags, cas identifier, value) if
640             C{True}.  If the server indicates there is no value associated with
641             C{key}, the returned values will be C{None} and the returned flags
642             will be C{0}.
643         @rtype: L{Deferred}
644
645         @since: 9.0
646         """
647         return self._get(keys, withIdentifier, True)
648
649     def _get(self, keys, withIdentifier, multiple):
650         """
651         Helper method for C{get} and C{getMultiple}.
652         """
653         if self._disconnected:
654             return fail(RuntimeError("not connected"))
655         for key in keys:
656             if not isinstance(key, str):
657                 return fail(ClientError(
658                     "Invalid type for key: %s, expecting a string" % (type(key),)))
659             if len(key) > self.MAX_KEY_LENGTH:
660                 return fail(ClientError("Key too long"))
661         if withIdentifier:
662             cmd = "gets"
663         else:
664             cmd = "get"
665         fullcmd = "%s %s" % (cmd, " ".join(keys))
666         self.sendLine(fullcmd)
667         if multiple:
668             values = dict([(key, (0, "", None)) for key in keys])
669             cmdObj = Command(cmd, keys=keys, values=values, multiple=True)
670         else:
671             cmdObj = Command(cmd, key=keys[0], value=None, flags=0, cas="",
672                              multiple=False)
673         self._current.append(cmdObj)
674         return cmdObj._deferred
675
676     def stats(self, arg=None):
677         """
678         Get some stats from the server. It will be available as a dict.
679
680         @param arg: An optional additional string which will be sent along
681             with the I{stats} command.  The interpretation of this value by
682             the server is left undefined by the memcache protocol
683             specification.
684         @type arg: L{NoneType} or L{str}
685
686         @return: a deferred that will fire with a C{dict} of the available
687             statistics.
688         @rtype: L{Deferred}
689         """
690         if arg:
691             cmd = "stats " + arg
692         else:
693             cmd = "stats"
694         if self._disconnected:
695             return fail(RuntimeError("not connected"))
696         self.sendLine(cmd)
697         cmdObj = Command("stats", values={})
698         self._current.append(cmdObj)
699         return cmdObj._deferred
700
701
702     def version(self):
703         """
704         Get the version of the server.
705
706         @return: a deferred that will fire with the string value of the
707             version.
708         @rtype: L{Deferred}
709         """
710         if self._disconnected:
711             return fail(RuntimeError("not connected"))
712         self.sendLine("version")
713         cmdObj = Command("version")
714         self._current.append(cmdObj)
715         return cmdObj._deferred
716
717
718     def delete(self, key):
719         """
720         Delete an existing C{key}.
721
722         @param key: the key to delete.
723         @type key: C{str}
724
725         @return: a deferred that will be called back with C{True} if the key
726             was successfully deleted, or C{False} if not.
727         @rtype: L{Deferred}
728         """
729         if self._disconnected:
730             return fail(RuntimeError("not connected"))
731         if not isinstance(key, str):
732             return fail(ClientError(
733                 "Invalid type for key: %s, expecting a string" % (type(key),)))
734         self.sendLine("delete %s" % key)
735         cmdObj = Command("delete", key=key)
736         self._current.append(cmdObj)
737         return cmdObj._deferred
738
739
740     def flushAll(self):
741         """
742         Flush all cached values.
743
744         @return: a deferred that will be called back with C{True} when the
745             operation has succeeded.
746         @rtype: L{Deferred}
747         """
748         if self._disconnected:
749             return fail(RuntimeError("not connected"))
750         self.sendLine("flush_all")
751         cmdObj = Command("flush_all")
752         self._current.append(cmdObj)
753         return cmdObj._deferred
754
755
756
757 __all__ = ["MemCacheProtocol", "DEFAULT_PORT", "NoSuchCommand", "ClientError",
758            "ServerError"]