Skip to content

k3redisutil

Action-CI Documentation Status Package

Redis utilities for easier client management and proxy support.

k3redisutil is a component of pykit3 project: a python3 toolkit set.

Installation

pip install k3redisutil

Quick Start

import k3redisutil

# Get a process-wise singleton redis client
client = k3redisutil.get_client(6379)
client.set('foo', 'bar')

# Using redis as a duplex cross-process channel
c = k3redisutil.RedisChannel(6379, '/foo', 'client')
s = k3redisutil.RedisChannel(6379, '/foo', 'server')

c.send_msg('hello from client')
print(s.recv_msg())  # 'hello from client'

# Using redis proxy client
cli = k3redisutil.RedisProxyClient([('127.0.0.1', 2222)])
cli.set('key', 'value', expire=1000)  # with TTL in msec
print(cli.get('key'))

API Reference

k3redisutil

For using redis more easily.

KeyNotFoundError

Bases: RedisProxyError

It is a subclass of redisutil.RedisProxyError. Raise if key not found (redis proxy server return a 404).

Source code in k3redisutil/redis_proxy_cli.py
34
35
36
37
38
39
40
class KeyNotFoundError(RedisProxyError):
    """
    It is a subclass of `redisutil.RedisProxyError`.
    Raise if key not found (redis proxy server return a `404`).
    """

    pass

RedisChannel

Bases: object

send message data through channel. channel is a list in redis. peer is "client" or "server".

send is a rpush operation that adds an item to the end of a list. recv is a lpop operation that pops an item from the start of a list.

Source code in k3redisutil/redisutil.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
class RedisChannel(object):
    """
    send message `data` through `channel`.
    `channel` is a list in redis.
    `peer` is "client" or "server".

    send is a rpush operation that adds an item to the end of a list.
    recv is a lpop operation that pops an item from the start of a list.
    """

    other_peer = {
        "client": "server",
        "server": "client",
    }

    def __init__(self, ip_port, channel, peer, timeout=None):
        """
        `redisutil.RedisChannel(ip_port, channel, peer, timeout=None)`

        Create a redis list based channel for cross process communication.

        See [redis-list-command](https://redis.io/commands#list).

        Initializing this class does **NOT** create a socket connecting to redis or
        create any data in redis.

        A channel support duplex communication.
        Thus in redis it creates two lists for each channel for two way communication:
        `<channel>/client` and `<channel>/server`.

        Client side uses `<channel>/client` to send message.
        Server side uses `<channel>/server` to send message.

        A client should initialize `RedisChannel` with argument `peer` set to `client`.
        A server should initialize `RedisChannel` with argument `peer` set to `server`.
        :param ip_port: tuple of `(ip, port)` or a single int/long number as port.
        :param channel: specifies the name of the channel to create.
        A channel should be in URI form, starting with `/`, such as: `/asyncworker/image-process`
        `channel` can also be a tuple of string:
        `('asyncworker', 'image-process')` will be converted to `/asyncworker/image-process`.
        :param peer: specifies this instance is a client or a server.
        It can be `"client"` or `"server"`.
        :param timeout: the expire time of the channel, in second.
        If it is `None`, the channel exists until being cleaned.
        """
        assert peer in self.other_peer

        self.ip_port = normalize_ip_port(ip_port)
        self.rcl = get_client(self.ip_port)

        # convert ['a', 'b'] to '/a/b'
        if isinstance(channel, (list, tuple)):
            channel = "/" + "/".join(channel)

        self.channel = channel
        self.peer = peer.lower()
        self.send_list_name = "/".join([self.channel, self.peer])
        self.recv_list_name = "/".join([self.channel, self.other_peer[self.peer]])
        self.timeout = timeout

    def send_msg(self, data):
        """
        Send data. `data` is json-encoded in redis list.
        :param data: any data type that can be json encoded.
        :return: Nothing
        """
        j = k3utfjson.dump(data)
        self.rcl.rpush(self.send_list_name, j)

        if self.timeout is not None:
            self.rcl.expire(self.send_list_name, self.timeout)

    def recv_msg(self):
        """
        Receive one message.
        If there is no message in this channel, it returns `None`.
        :return: data that is loaded from json. Or `None` if there is no message in channel.
        """
        v = self.rcl.lpop(self.recv_list_name)
        if v is None:
            return None

        return k3utfjson.load(v)

    def brecv_msg(self, timeout=0):
        """
        Block to receive one message.
        If there is no message in this channel,
        then block for `timeout` seconds or until
        a value was pushed to the channel.

        :param timeout: seconds of the block time.
        If it is `0`, then block until a message is available.

        :return: data that is loaded from json. Or `None` if timeout.
        """
        v = self.rcl.blpop(self.recv_list_name, timeout=timeout)
        if v is None or len(v) != 2:
            return None

        # v is a tuple, (key, value)
        _, value = v

        return k3utfjson.load(value)

    def recv_last_msg(self):
        """
        Similar to `RedisChannel.recv_msg` except it returns only the last message it
        sees and removes all previous messages from this channel.
        :return: data that is loaded from json. Or `None` if there is no message in channel.
        """
        last = None
        while True:
            v = self.rcl.lpop(self.recv_list_name)
            if v is None:
                return k3utfjson.load(last)

            last = v

    def brecv_last_msg(self, timeout=0):
        """
        Similar to `RedisChannel.recv_last_msg` except it blocks for `timeout`
        seconds if the channel is empty.
        :param timeout: seconds of the block time.
        If it is `0`, then block until a message is available.
        :return: data that is loaded from json. Or `None` if timeout.
        """
        msg = self.brecv_msg(timeout=timeout)
        if msg is None:
            return None

        last = self.recv_last_msg() or msg

        return last

    def peek_msg(self):
        """
        Similar to `RedisChannel.recv_msg` except it does not remove the message it
        returned.
        :return: data that is loaded from json. Or `None` if there is no message in channel.
        """
        v = self.rcl.lindex(self.recv_list_name, 0)
        if v is None:
            return None

        return k3utfjson.load(v)

    def rpeek_msg(self):
        """
        Similar to `RedisChannel.peek_msg`
        except it gets message from the tail of the channel.
        :return: JSON decoded message. Or `None` if there is no message in channel.
        """
        v = self.rcl.lindex(self.recv_list_name, -1)
        if v is None:
            return None

        return k3utfjson.load(v)

    def list_channel(self, prefix):
        """
        List all channel names those start with `prefix`.
        :param prefix: specifies what channel to list. `prefix` must starts with '/', ends with '/'.
        :return: a list of channel names.
        """
        if isinstance(prefix, (list, tuple)):
            _prefix = "/" + "/".join(prefix) + "/"
        else:
            _prefix = prefix

        if not _prefix.startswith("/"):
            raise ValueError('prefix must starts with "/", but:' + repr(prefix))

        if _prefix.endswith("*"):
            raise ValueError('prefix must NOT ends with "*", but:' + repr(prefix))

        if not _prefix.endswith("/"):
            raise ValueError('prefix must ends with "/", but:' + repr(prefix))

        _prefix = _prefix + "*"
        channels = self.rcl.keys(_prefix)

        rst = []
        for c in channels:
            for k in self.other_peer:
                k = "/" + k
                if c.endswith(k.encode()):
                    c = c[: -len(k)]
                    break
            else:
                logger.info("not a channel: " + repr(c))
                continue

            if c not in rst:
                rst.append(c)

        return sorted(rst)

__init__(ip_port, channel, peer, timeout=None)

redisutil.RedisChannel(ip_port, channel, peer, timeout=None)

Create a redis list based channel for cross process communication.

See redis-list-command.

Initializing this class does NOT create a socket connecting to redis or create any data in redis.

A channel support duplex communication. Thus in redis it creates two lists for each channel for two way communication: <channel>/client and <channel>/server.

Client side uses <channel>/client to send message. Server side uses <channel>/server to send message.

A client should initialize RedisChannel with argument peer set to client. A server should initialize RedisChannel with argument peer set to server. :param ip_port: tuple of (ip, port) or a single int/long number as port. :param channel: specifies the name of the channel to create. A channel should be in URI form, starting with /, such as: /asyncworker/image-process channel can also be a tuple of string: ('asyncworker', 'image-process') will be converted to /asyncworker/image-process. :param peer: specifies this instance is a client or a server. It can be "client" or "server". :param timeout: the expire time of the channel, in second. If it is None, the channel exists until being cleaned.

Source code in k3redisutil/redisutil.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(self, ip_port, channel, peer, timeout=None):
    """
    `redisutil.RedisChannel(ip_port, channel, peer, timeout=None)`

    Create a redis list based channel for cross process communication.

    See [redis-list-command](https://redis.io/commands#list).

    Initializing this class does **NOT** create a socket connecting to redis or
    create any data in redis.

    A channel support duplex communication.
    Thus in redis it creates two lists for each channel for two way communication:
    `<channel>/client` and `<channel>/server`.

    Client side uses `<channel>/client` to send message.
    Server side uses `<channel>/server` to send message.

    A client should initialize `RedisChannel` with argument `peer` set to `client`.
    A server should initialize `RedisChannel` with argument `peer` set to `server`.
    :param ip_port: tuple of `(ip, port)` or a single int/long number as port.
    :param channel: specifies the name of the channel to create.
    A channel should be in URI form, starting with `/`, such as: `/asyncworker/image-process`
    `channel` can also be a tuple of string:
    `('asyncworker', 'image-process')` will be converted to `/asyncworker/image-process`.
    :param peer: specifies this instance is a client or a server.
    It can be `"client"` or `"server"`.
    :param timeout: the expire time of the channel, in second.
    If it is `None`, the channel exists until being cleaned.
    """
    assert peer in self.other_peer

    self.ip_port = normalize_ip_port(ip_port)
    self.rcl = get_client(self.ip_port)

    # convert ['a', 'b'] to '/a/b'
    if isinstance(channel, (list, tuple)):
        channel = "/" + "/".join(channel)

    self.channel = channel
    self.peer = peer.lower()
    self.send_list_name = "/".join([self.channel, self.peer])
    self.recv_list_name = "/".join([self.channel, self.other_peer[self.peer]])
    self.timeout = timeout

brecv_last_msg(timeout=0)

Similar to RedisChannel.recv_last_msg except it blocks for timeout seconds if the channel is empty. :param timeout: seconds of the block time. If it is 0, then block until a message is available. :return: data that is loaded from json. Or None if timeout.

Source code in k3redisutil/redisutil.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def brecv_last_msg(self, timeout=0):
    """
    Similar to `RedisChannel.recv_last_msg` except it blocks for `timeout`
    seconds if the channel is empty.
    :param timeout: seconds of the block time.
    If it is `0`, then block until a message is available.
    :return: data that is loaded from json. Or `None` if timeout.
    """
    msg = self.brecv_msg(timeout=timeout)
    if msg is None:
        return None

    last = self.recv_last_msg() or msg

    return last

brecv_msg(timeout=0)

Block to receive one message. If there is no message in this channel, then block for timeout seconds or until a value was pushed to the channel.

:param timeout: seconds of the block time. If it is 0, then block until a message is available.

:return: data that is loaded from json. Or None if timeout.

Source code in k3redisutil/redisutil.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def brecv_msg(self, timeout=0):
    """
    Block to receive one message.
    If there is no message in this channel,
    then block for `timeout` seconds or until
    a value was pushed to the channel.

    :param timeout: seconds of the block time.
    If it is `0`, then block until a message is available.

    :return: data that is loaded from json. Or `None` if timeout.
    """
    v = self.rcl.blpop(self.recv_list_name, timeout=timeout)
    if v is None or len(v) != 2:
        return None

    # v is a tuple, (key, value)
    _, value = v

    return k3utfjson.load(value)

list_channel(prefix)

List all channel names those start with prefix. :param prefix: specifies what channel to list. prefix must starts with '/', ends with '/'. :return: a list of channel names.

Source code in k3redisutil/redisutil.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def list_channel(self, prefix):
    """
    List all channel names those start with `prefix`.
    :param prefix: specifies what channel to list. `prefix` must starts with '/', ends with '/'.
    :return: a list of channel names.
    """
    if isinstance(prefix, (list, tuple)):
        _prefix = "/" + "/".join(prefix) + "/"
    else:
        _prefix = prefix

    if not _prefix.startswith("/"):
        raise ValueError('prefix must starts with "/", but:' + repr(prefix))

    if _prefix.endswith("*"):
        raise ValueError('prefix must NOT ends with "*", but:' + repr(prefix))

    if not _prefix.endswith("/"):
        raise ValueError('prefix must ends with "/", but:' + repr(prefix))

    _prefix = _prefix + "*"
    channels = self.rcl.keys(_prefix)

    rst = []
    for c in channels:
        for k in self.other_peer:
            k = "/" + k
            if c.endswith(k.encode()):
                c = c[: -len(k)]
                break
        else:
            logger.info("not a channel: " + repr(c))
            continue

        if c not in rst:
            rst.append(c)

    return sorted(rst)

peek_msg()

Similar to RedisChannel.recv_msg except it does not remove the message it returned. :return: data that is loaded from json. Or None if there is no message in channel.

Source code in k3redisutil/redisutil.py
208
209
210
211
212
213
214
215
216
217
218
def peek_msg(self):
    """
    Similar to `RedisChannel.recv_msg` except it does not remove the message it
    returned.
    :return: data that is loaded from json. Or `None` if there is no message in channel.
    """
    v = self.rcl.lindex(self.recv_list_name, 0)
    if v is None:
        return None

    return k3utfjson.load(v)

recv_last_msg()

Similar to RedisChannel.recv_msg except it returns only the last message it sees and removes all previous messages from this channel. :return: data that is loaded from json. Or None if there is no message in channel.

Source code in k3redisutil/redisutil.py
178
179
180
181
182
183
184
185
186
187
188
189
190
def recv_last_msg(self):
    """
    Similar to `RedisChannel.recv_msg` except it returns only the last message it
    sees and removes all previous messages from this channel.
    :return: data that is loaded from json. Or `None` if there is no message in channel.
    """
    last = None
    while True:
        v = self.rcl.lpop(self.recv_list_name)
        if v is None:
            return k3utfjson.load(last)

        last = v

recv_msg()

Receive one message. If there is no message in this channel, it returns None. :return: data that is loaded from json. Or None if there is no message in channel.

Source code in k3redisutil/redisutil.py
145
146
147
148
149
150
151
152
153
154
155
def recv_msg(self):
    """
    Receive one message.
    If there is no message in this channel, it returns `None`.
    :return: data that is loaded from json. Or `None` if there is no message in channel.
    """
    v = self.rcl.lpop(self.recv_list_name)
    if v is None:
        return None

    return k3utfjson.load(v)

rpeek_msg()

Similar to RedisChannel.peek_msg except it gets message from the tail of the channel. :return: JSON decoded message. Or None if there is no message in channel.

Source code in k3redisutil/redisutil.py
220
221
222
223
224
225
226
227
228
229
230
def rpeek_msg(self):
    """
    Similar to `RedisChannel.peek_msg`
    except it gets message from the tail of the channel.
    :return: JSON decoded message. Or `None` if there is no message in channel.
    """
    v = self.rcl.lindex(self.recv_list_name, -1)
    if v is None:
        return None

    return k3utfjson.load(v)

send_msg(data)

Send data. data is json-encoded in redis list. :param data: any data type that can be json encoded. :return: Nothing

Source code in k3redisutil/redisutil.py
133
134
135
136
137
138
139
140
141
142
143
def send_msg(self, data):
    """
    Send data. `data` is json-encoded in redis list.
    :param data: any data type that can be json encoded.
    :return: Nothing
    """
    j = k3utfjson.dump(data)
    self.rcl.rpush(self.send_list_name, j)

    if self.timeout is not None:
        self.rcl.expire(self.send_list_name, self.timeout)

RedisProxyClient

Bases: object

redis operation, http method, count of args, optional args name

### RedisProxyClient.delete Delete the specified key. Do nothing if the key doesn’t exist.

arguments: key:specifies the key to redis. retry:try to send request for another N times while failed to send request. By default, it is 0.

return: None ### RedisProxyClient.get Get the value with key. Raise a redisutil.KeyNotFoundError if the key doesn’t exist.

retry:try to send request for another N times while failed to send request. By default, it is 0.

return: the value at key.

### RedisProxyClient.hdel

Delete the specified hashkey in the specified hashname. Raise a redisutil.KeyNotFoundError if it doesn’t exist.

arguments: hashname:specifies the hash name to redis.

hashkey:specifies the hash key to redis.

retry:try to send request for another N times while failed to send request. By default, it is 0.

return: None

### RedisProxyClient.set

Set the value at key to val. arguments: key: specifies the key to redis. val: specifies the value to redis. expire: the expire time on key in msec. Defaults to None. retry: try to send request for another N times while failed to send request. By default, it is 0.

return: nothing

### RedisProxyClient.hget

Return the value of haskkey within the haskname. Raise a redisutil.KeyNotFoundError if it doesn’t exist.

arguments: hashname: specifies the hash name to redis. hashkey: specifies the hash key to redis. retry: try to send request for another N times while failed to send request. By default, it is 0.

return: the value of hashkey within the hashname.

### RedisProxyClient.hset

Set hashkey to val within hashname.

arguments:

hashname: specifies the hash name to redis.

hashkey: specifies the hash key to redis.

val: specifies the value to redis.

expire: the expire time on hashkey within hashname in msec. Defaults to None.

retry: try to send request for another N times while failed to send request. By default, it is 0.

return: nothing

### RedisProxyClient.hkeys

Return the list of keys within hashname. Raise a redisutil.KeyNotFoundError if the hashname doesn’t exist.

arguments:

hashname: specifies the hash name to redis.

retry: try to send request for another N times while failed to send request. By default, it is 0.

return: a list contains all the keys.

### RedisProxyClient.hvals

Return the list of values within hashname. Raise a redisutil.KeyNotFoundError if the hashname doesn’t exist.

arguments:

hashname: specifies the hash name to redis.

retry: try to send request for another N times while failed to send request. By default, it is 0.

return: a list contains all the values.

### RedisProxyClient.hgetall

Return a dict of the hash’s name/value pairs. Raise a redisutil.KeyNotFoundError if the hashname doesn’t exist.

arguments:

hashname:specifies the hash name to redis.

retry:try to send request for another N times while failed to send request. By default, it is 0.

return: a dict of the hash’s name/value pairs.

Source code in k3redisutil/redis_proxy_cli.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
class RedisProxyClient(object):
    """
     redis operation, http method, count of args, optional args name

     ### RedisProxyClient.delete
     Delete the specified key.
     Do nothing if the key doesn’t exist.

     **arguments**:
     `key`:specifies the key to redis.
     `retry`:try to send request for another N times while failed to send request.
     By default, it is `0`.

     **return**: None
     ###  RedisProxyClient.get
     Get the value with `key`. Raise a `redisutil.KeyNotFoundError` if the key doesn’t exist.

    `retry`:try to send request for another N times while failed to send request.
     By default, it is `0`.

     **return**: the value at `key`.

     ### RedisProxyClient.hdel

     Delete the specified `hashkey` in the specified `hashname`.
     Raise a `redisutil.KeyNotFoundError` if it doesn’t exist.

     **arguments**:
     `hashname`:specifies the hash name to redis.

     `hashkey`:specifies the hash key to redis.

     `retry`:try to send request for another N times while failed to send request.
     By default, it is `0`.

     **return**: None

     ###  RedisProxyClient.set

     Set the value at `key` to `val`.
     **arguments**:
     `key`: specifies the key to redis.
     `val`: specifies the value to redis.
     `expire`: the expire time on `key` in msec. Defaults to `None`.
     `retry`: try to send request for another N times while failed to send request.
     By default, it is `0`.

     **return**: nothing

     ###  RedisProxyClient.hget

     Return the value of `haskkey` within the `haskname`.
     Raise a `redisutil.KeyNotFoundError` if it doesn’t exist.

     **arguments**:
     `hashname`: specifies the hash name to redis.
     `hashkey`: specifies the hash key to redis.
     `retry`: try to send request for another N times while failed to send request.
     By default, it is `0`.

     **return**: the value of `hashkey` within the `hashname`.

     ###  RedisProxyClient.hset

     Set `hashkey` to `val` within `hashname`.

     **arguments**:

     `hashname`: specifies the hash name to redis.

     `hashkey`: specifies the hash key to redis.

     `val`: specifies the value to redis.

     `expire`: the expire time on `hashkey` within `hashname` in msec. Defaults to `None`.

     `retry`: try to send request for another N times while failed to send request.
     By default, it is `0`.

     **return**: nothing

     ###  RedisProxyClient.hkeys

     Return the list of keys within `hashname`.
     Raise a `redisutil.KeyNotFoundError` if the `hashname` doesn’t exist.

     **arguments**:

     `hashname`: specifies the hash name to redis.

     `retry`: try to send request for another N times while failed to send request.
     By default, it is `0`.

     **return**: a `list` contains all the keys.

     ###  RedisProxyClient.hvals

     Return the list of values within `hashname`.
     Raise a `redisutil.KeyNotFoundError` if the `hashname` doesn’t exist.

     **arguments**:

     `hashname`: specifies the hash name to redis.

     `retry`: try to send request for another N times while failed to send request.
     By default, it is `0`.

     **return**: a `list` contains all the values.

     ###  RedisProxyClient.hgetall

     Return a dict of the hash’s name/value pairs.
     Raise a `redisutil.KeyNotFoundError` if the `hashname` doesn’t exist.

     **arguments**:

     `hashname`:specifies the hash name to redis.

     `retry`:try to send request for another N times while failed to send request.
     By default, it is `0`.

     **return**: a `dict` of the hash’s name/value pairs.
    """

    methods = {
        # get(key, retry=0)
        "get": ("get", "GET", 2, ()),
        # set(key, val, expire=None, retry=0)
        "set": ("set", "PUT", 4, ("expire",)),
        # hget(hashname, hashkey, retry=0)
        "hget": ("hget", "GET", 3, ()),
        # hset(hashname, hashkey, val, expire=None, retry=0)
        "hset": ("hset", "PUT", 5, ("expire",)),
        # hkeys(hashname, retry=0)
        "hkeys": ("hkeys", "GET", 2, ()),
        # hvals(hashname, retry=0)
        "hvals": ("hvals", "GET", 2, ()),
        # hgetall(hashname, retry=0)
        "hgetall": ("hgetall", "GET", 2, ()),
        # delete(key, retry=0)
        "delete": ("del", "DELETE", 2, ()),
        # hdel(hashname, key, retry=0)
        "hdel": ("hdel", "DELETE", 3, ()),
    }

    def __init__(self, hosts, proxy_hosts=None, nwr=None, ak_sk=None, timeout=None):
        """
        A client for redis proxy server.
        :param hosts: a `tuple` each element is a `(ip, port)`, retry with next addr while failed to use prev one.
        :param proxy_hosts: a `tuple` each element is `hosts`, as the backup hosts.
        :param nwr: `tuple` of `(n, w, r)`, count of replicas, count of write, count of read.
        If it is `None`, `config.rp_cli_nwr` is used.
        :param ak_sk: tuple` of `(access_key, secret_key)` to sign the request.
        If it is `None`, `config.rp_cli_ak_sk` is used.
        :param timeout: timeout of connecting redis proxy server in seconds. Defaults to `None`.
        """
        self.hosts = hosts
        self.proxy_hosts = proxy_hosts

        if nwr is None:
            nwr = conf.rp_cli_nwr

        if ak_sk is None:
            ak_sk = conf.rp_cli_ak_sk

        self.n, self.w, self.r = nwr
        self.access_key, self.secret_key = ak_sk

        self.timeout = timeout or DEFAULT_TIMEOUT
        self.ver = "/redisproxy/v1"

        for mtd_name, mtd_info in self.methods.items():
            api_obj = SetAPI(self, mtd_info[0], mtd_info[1:])
            setattr(self, mtd_name, api_obj.api)

    def _sign_req(self, req):
        sign_payload = True if "body" in req else False
        signer = k3awssign.Signer(self.access_key, self.secret_key)
        sign_ctx = signer.add_auth(req, query_auth=True, sign_payload=sign_payload)
        logger.debug("signing details: {ctx}".format(ctx=sign_ctx))

    def _make_req_uri(self, params, qs):
        path = [self.ver]
        path.extend(params)

        qs_list = [
            "n={n}".format(n=self.n),
            "w={w}".format(w=self.w),
            "r={r}".format(r=self.r),
        ]
        for k, v in qs.items():
            if v is None:
                continue

            qs_list.append("{k}={v}".format(k=k, v=v))

        return "{p}?{qs}".format(p="/".join(path), qs="&".join(qs_list))

    def _req(self, req):
        if "headers" not in req:
            req["headers"] = {
                "host": "{ip}:{port}".format(ip=self.ip, port=self.port),
            }

        elif "host" not in req["headers"]:
            req["headers"]["host"] = "{ip}:{port}".format(ip=self.ip, port=self.port)

        body = req.get("body", None)
        if body is not None:
            req["headers"]["Content-Length"] = len(body)

        self._sign_req(req)

        cli = k3http.Client(self.ip, self.port, self.timeout)
        cli.send_request(req["uri"], method=req["verb"], headers=req["headers"])
        if body is not None:
            cli.send_body(req["body"])

        cli.read_response()
        res = cli.read_body(None)

        msg = "Status:{s} req:{req} res:{res} server:{ip}:{p}".format(
            s=cli.status, req=repr(req), res=repr(res), ip=self.ip, p=self.port
        )

        if cli.status == 404:
            raise KeyNotFoundError(msg)

        elif cli.status != 200:
            raise ServerResponseError(msg)

        return res

    @_proxy
    @_retry
    def _api(self, verb, path, body, qs):
        req = {
            "verb": verb,
            "uri": self._make_req_uri(path, qs),
        }

        if body is not None:
            req["body"] = body

        if verb == "GET":
            rst = self._req(req)
            return k3utfjson.load(rst)

        else:
            self._req(req)

__init__(hosts, proxy_hosts=None, nwr=None, ak_sk=None, timeout=None)

A client for redis proxy server. :param hosts: a tuple each element is a (ip, port), retry with next addr while failed to use prev one. :param proxy_hosts: a tuple each element is hosts, as the backup hosts. :param nwr: tuple of (n, w, r), count of replicas, count of write, count of read. If it is None, config.rp_cli_nwr is used. :param ak_sk: tupleof(access_key, secret_key)to sign the request. If it isNone,config.rp_cli_ak_skis used. :param timeout: timeout of connecting redis proxy server in seconds. Defaults toNone`.

Source code in k3redisutil/redis_proxy_cli.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def __init__(self, hosts, proxy_hosts=None, nwr=None, ak_sk=None, timeout=None):
    """
    A client for redis proxy server.
    :param hosts: a `tuple` each element is a `(ip, port)`, retry with next addr while failed to use prev one.
    :param proxy_hosts: a `tuple` each element is `hosts`, as the backup hosts.
    :param nwr: `tuple` of `(n, w, r)`, count of replicas, count of write, count of read.
    If it is `None`, `config.rp_cli_nwr` is used.
    :param ak_sk: tuple` of `(access_key, secret_key)` to sign the request.
    If it is `None`, `config.rp_cli_ak_sk` is used.
    :param timeout: timeout of connecting redis proxy server in seconds. Defaults to `None`.
    """
    self.hosts = hosts
    self.proxy_hosts = proxy_hosts

    if nwr is None:
        nwr = conf.rp_cli_nwr

    if ak_sk is None:
        ak_sk = conf.rp_cli_ak_sk

    self.n, self.w, self.r = nwr
    self.access_key, self.secret_key = ak_sk

    self.timeout = timeout or DEFAULT_TIMEOUT
    self.ver = "/redisproxy/v1"

    for mtd_name, mtd_info in self.methods.items():
        api_obj = SetAPI(self, mtd_info[0], mtd_info[1:])
        setattr(self, mtd_name, api_obj.api)

RedisProxyError

Bases: Exception

The base class of other exceptions of RedisProxyClient. It is a subclass of Exception.

Source code in k3redisutil/redis_proxy_cli.py
16
17
18
19
20
21
22
class RedisProxyError(Exception):
    """
    The base class of other exceptions of `RedisProxyClient`.
    It is a subclass of `Exception`.
    """

    pass

SendRequestError

Bases: RedisProxyError

It is a subclass of redisutil.RedisProxyError. Raise if failed to send request to redis proxy server.

Source code in k3redisutil/redis_proxy_cli.py
25
26
27
28
29
30
31
class SendRequestError(RedisProxyError):
    """
    It is a subclass of `redisutil.RedisProxyError`.
    Raise if failed to send request to redis proxy server.
    """

    pass

ServerResponseError

Bases: RedisProxyError

It is a subclass of redisutil.RedisProxyError. Raise if http-status not in (200, 404) return from redis proxy server.

Source code in k3redisutil/redis_proxy_cli.py
43
44
45
46
47
48
49
50
class ServerResponseError(RedisProxyError):
    """
    It is a subclass of `redisutil.RedisProxyError`.
    Raise if http-status not in `(200, 404)` return from redis proxy server.

    """

    pass

get_client(ip_port)

Return a process-wise singleton redis client, which is an instance of redis.StrictRedis.

Redis client returned is shared across the entire process and will not be re-created. It is also safe to use if a process fork and inherited opened socket file-descriptors. :param ip_port: could be a port number in int or long to get or create a client connecting to localhost. It can also be tuple of (ip, port). :return: an instance of redis.StrictRedis.

Source code in k3redisutil/redisutil.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def get_client(ip_port):
    """
    Return a process-wise singleton redis client, which is an instance of `redis.StrictRedis`.

    Redis client returned is shared across the entire process and will not be
    re-created.
    It is also safe to use if a process fork and inherited opened socket file-descriptors.
    :param ip_port: could be a port number in `int` or `long` to get or create a
    client connecting to localhost.
    It can also be tuple of `(ip, port)`.
    :return: an instance of `redis.StrictRedis`.
    """
    ip_port = normalize_ip_port(ip_port)

    pid = os.getpid()

    with _lock:
        o = _pid_client[ip_port]

        if pid not in o:
            o[pid] = redis.StrictRedis(*ip_port)

    return _pid_client[ip_port][pid]

wait_serve(ip_port, timeout=5)

Wait for at most timeout seconds until redis start serving request. It is useful when start a redis server. :param ip_port: tuple of (ip, port) or a single int/long number as port. :param timeout: specifies max waiting time, in seconds. :return: Nothing

Source code in k3redisutil/redisutil.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def wait_serve(ip_port, timeout=5):
    """
    Wait for at most `timeout` seconds until redis start serving request.
    It is useful when start a redis server.
    :param ip_port: tuple of `(ip, port)` or a single int/long number as port.
    :param timeout: specifies max waiting time, in seconds.
    :return: Nothing
    """
    t = time.time() + timeout

    rcl = get_client(ip_port)

    while time.time() < t:
        try:
            rcl.hget("foo", "foo")
            logger.info("redis is ready: " + repr(ip_port))
            return

        except redis.ConnectionError as e:
            logger.info("can not connect to redis: " + repr(ip_port) + " " + repr(e))
            time.sleep(0.1)
            continue
    else:
        logger.error("can not connect to redis: " + repr(ip_port))
        raise

License

The MIT License (MIT) - Copyright (c) 2015 Zhang Yanpo (张炎泼)