Imported Upstream version 8.0.586
[platform/upstream/vim.git] / src / testdir / test_channel.py
1 #!/usr/bin/python
2 #
3 # Server that will accept connections from a Vim channel.
4 # Used by test_channel.vim.
5 #
6 # This requires Python 2.6 or later.
7
8 from __future__ import print_function
9 import json
10 import socket
11 import sys
12 import time
13 import threading
14
15 try:
16     # Python 3
17     import socketserver
18 except ImportError:
19     # Python 2
20     import SocketServer as socketserver
21
22 class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
23
24     def handle(self):
25         print("=== socket opened ===")
26         while True:
27             try:
28                 received = self.request.recv(4096).decode('utf-8')
29             except socket.error:
30                 print("=== socket error ===")
31                 break
32             except IOError:
33                 print("=== socket closed ===")
34                 break
35             if received == '':
36                 print("=== socket closed ===")
37                 break
38             print("received: {0}".format(received))
39
40             # We may receive two messages at once. Take the part up to the
41             # newline, which should be after the matching "]".
42             todo = received
43             while todo != '':
44                 splitidx = todo.find('\n')
45                 if splitidx < 0:
46                      used = todo
47                      todo = ''
48                 else:
49                      used = todo[:splitidx]
50                      todo = todo[splitidx + 1:]
51                 if used != received:
52                     print("using: {0}".format(used))
53
54                 try:
55                     decoded = json.loads(used)
56                 except ValueError:
57                     print("json decoding failed")
58                     decoded = [-1, '']
59
60                 # Send a response if the sequence number is positive.
61                 if decoded[0] >= 0:
62                     if decoded[1] == 'hello!':
63                         # simply send back a string
64                         response = "got it"
65                     elif decoded[1] == 'malformed1':
66                         cmd = '["ex",":"]wrong!["ex","smi"]'
67                         print("sending: {0}".format(cmd))
68                         self.request.sendall(cmd.encode('utf-8'))
69                         response = "ok"
70                         # Need to wait for Vim to give up, otherwise it
71                         # sometimes fails on OS X.
72                         time.sleep(0.2)
73                     elif decoded[1] == 'malformed2':
74                         cmd = '"unterminated string'
75                         print("sending: {0}".format(cmd))
76                         self.request.sendall(cmd.encode('utf-8'))
77                         response = "ok"
78                         # Need to wait for Vim to give up, otherwise the double
79                         # quote in the "ok" response terminates the string.
80                         time.sleep(0.2)
81                     elif decoded[1] == 'malformed3':
82                         cmd = '["ex","missing ]"'
83                         print("sending: {0}".format(cmd))
84                         self.request.sendall(cmd.encode('utf-8'))
85                         response = "ok"
86                         # Need to wait for Vim to give up, otherwise the ]
87                         # in the "ok" response terminates the list.
88                         time.sleep(0.2)
89                     elif decoded[1] == 'split':
90                         cmd = '["ex","let '
91                         print("sending: {0}".format(cmd))
92                         self.request.sendall(cmd.encode('utf-8'))
93                         time.sleep(0.01)
94                         cmd = 'g:split = 123"]'
95                         print("sending: {0}".format(cmd))
96                         self.request.sendall(cmd.encode('utf-8'))
97                         response = "ok"
98                     elif decoded[1].startswith("echo "):
99                         # send back the argument
100                         response = decoded[1][5:]
101                     elif decoded[1] == 'make change':
102                         # Send two ex commands at the same time, before
103                         # replying to the request.
104                         cmd = '["ex","call append(\\"$\\",\\"added1\\")"]'
105                         cmd += '["ex","call append(\\"$\\",\\"added2\\")"]'
106                         print("sending: {0}".format(cmd))
107                         self.request.sendall(cmd.encode('utf-8'))
108                         response = "ok"
109                     elif decoded[1] == 'bad command':
110                         cmd = '["ex","foo bar"]'
111                         print("sending: {0}".format(cmd))
112                         self.request.sendall(cmd.encode('utf-8'))
113                         response = "ok"
114                     elif decoded[1] == 'do normal':
115                         # Send a normal command.
116                         cmd = '["normal","G$s more\u001b"]'
117                         print("sending: {0}".format(cmd))
118                         self.request.sendall(cmd.encode('utf-8'))
119                         response = "ok"
120                     elif decoded[1] == 'eval-works':
121                         # Send an eval request.  We ignore the response.
122                         cmd = '["expr","\\"foo\\" . 123", -1]'
123                         print("sending: {0}".format(cmd))
124                         self.request.sendall(cmd.encode('utf-8'))
125                         response = "ok"
126                     elif decoded[1] == 'eval-special':
127                         # Send an eval request.  We ignore the response.
128                         cmd = '["expr","\\"foo\x7f\x10\x01bar\\"", -2]'
129                         print("sending: {0}".format(cmd))
130                         self.request.sendall(cmd.encode('utf-8'))
131                         response = "ok"
132                     elif decoded[1] == 'eval-getline':
133                         # Send an eval request.  We ignore the response.
134                         cmd = '["expr","getline(3)", -3]'
135                         print("sending: {0}".format(cmd))
136                         self.request.sendall(cmd.encode('utf-8'))
137                         response = "ok"
138                     elif decoded[1] == 'eval-fails':
139                         # Send an eval request that will fail.
140                         cmd = '["expr","xxx", -4]'
141                         print("sending: {0}".format(cmd))
142                         self.request.sendall(cmd.encode('utf-8'))
143                         response = "ok"
144                     elif decoded[1] == 'eval-error':
145                         # Send an eval request that works but the result can't
146                         # be encoded.
147                         cmd = '["expr","function(\\"tr\\")", -5]'
148                         print("sending: {0}".format(cmd))
149                         self.request.sendall(cmd.encode('utf-8'))
150                         response = "ok"
151                     elif decoded[1] == 'eval-bad':
152                         # Send an eval request missing the third argument.
153                         cmd = '["expr","xxx"]'
154                         print("sending: {0}".format(cmd))
155                         self.request.sendall(cmd.encode('utf-8'))
156                         response = "ok"
157                     elif decoded[1] == 'an expr':
158                         # Send an expr request.
159                         cmd = '["expr","setline(\\"$\\", [\\"one\\",\\"two\\",\\"three\\"])"]'
160                         print("sending: {0}".format(cmd))
161                         self.request.sendall(cmd.encode('utf-8'))
162                         response = "ok"
163                     elif decoded[1] == 'call-func':
164                         cmd = '["call","MyFunction",[1,2,3], 0]'
165                         print("sending: {0}".format(cmd))
166                         self.request.sendall(cmd.encode('utf-8'))
167                         response = "ok"
168                     elif decoded[1] == 'redraw':
169                         cmd = '["redraw",""]'
170                         print("sending: {0}".format(cmd))
171                         self.request.sendall(cmd.encode('utf-8'))
172                         response = "ok"
173                     elif decoded[1] == 'redraw!':
174                         cmd = '["redraw","force"]'
175                         print("sending: {0}".format(cmd))
176                         self.request.sendall(cmd.encode('utf-8'))
177                         response = "ok"
178                     elif decoded[1] == 'empty-request':
179                         cmd = '[]'
180                         print("sending: {0}".format(cmd))
181                         self.request.sendall(cmd.encode('utf-8'))
182                         response = "ok"
183                     elif decoded[1] == 'eval-result':
184                         # Send back the last received eval result.
185                         response = last_eval
186                     elif decoded[1] == 'call me':
187                         cmd = '[0,"we called you"]'
188                         print("sending: {0}".format(cmd))
189                         self.request.sendall(cmd.encode('utf-8'))
190                         response = "ok"
191                     elif decoded[1] == 'call me again':
192                         cmd = '[0,"we did call you"]'
193                         print("sending: {0}".format(cmd))
194                         self.request.sendall(cmd.encode('utf-8'))
195                         response = ""
196                     elif decoded[1] == 'send zero':
197                         cmd = '[0,"zero index"]'
198                         print("sending: {0}".format(cmd))
199                         self.request.sendall(cmd.encode('utf-8'))
200                         response = "sent zero"
201                     elif decoded[1] == 'close me':
202                         print("closing")
203                         self.request.close()
204                         response = ""
205                     elif decoded[1] == 'wait a bit':
206                         time.sleep(0.2)
207                         response = "waited"
208                     elif decoded[1] == '!quit!':
209                         # we're done
210                         self.server.shutdown()
211                         return
212                     elif decoded[1] == '!crash!':
213                         # Crash!
214                         42 / 0
215                     else:
216                         response = "what?"
217
218                     if response == "":
219                         print("no response")
220                     else:
221                         encoded = json.dumps([decoded[0], response])
222                         print("sending: {0}".format(encoded))
223                         self.request.sendall(encoded.encode('utf-8'))
224
225                 # Negative numbers are used for "eval" responses.
226                 elif decoded[0] < 0:
227                     last_eval = decoded
228
229 class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
230     pass
231
232 def writePortInFile(port):
233     # Write the port number in Xportnr, so that the test knows it.
234     f = open("Xportnr", "w")
235     f.write("{0}".format(port))
236     f.close()
237
238 if __name__ == "__main__":
239     HOST, PORT = "localhost", 0
240
241     # Wait half a second before opening the port to test waittime in ch_open().
242     # We do want to get the port number, get that first.  We cannot open the
243     # socket, guess a port is free.
244     if len(sys.argv) >= 2 and sys.argv[1] == 'delay':
245         PORT = 13684
246         writePortInFile(PORT)
247
248         print("Wait for it...")
249         time.sleep(0.5)
250
251     server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
252     ip, port = server.server_address
253
254     # Start a thread with the server.  That thread will then start a new thread
255     # for each connection.
256     server_thread = threading.Thread(target=server.serve_forever)
257     server_thread.start()
258
259     writePortInFile(port)
260
261     print("Listening on port {0}".format(port))
262
263     # Main thread terminates, but the server continues running
264     # until server.shutdown() is called.
265     try:
266         while server_thread.isAlive(): 
267             server_thread.join(1)
268     except (KeyboardInterrupt, SystemExit):
269         server.shutdown()