+ - 0:00:00
Notes for current slide
Notes for next slide

== 15 seconds

1 / 59

Hello world

  • Benjamin Sergeant (bsergean@gmail.com)

Alt text

2 / 59

== 15 seconds

Agenda

  1. Architecture
  2. Clients metrics publishing
  3. Message broker Cobra
  4. Bots and data sinks
3 / 59

Using AWK (stream programming) for analytics

Stdin AWK Stdout
+---------------+
| |
| $1 == "fps" |
fps:60 +--->+ { print $2 } +----> 60
mem:100000 | |
... | |
+---------------+
$ cat /tmp/input
memory:100000
fps:60
$
$
$ awk -F: '$1 == "fps" { print $2 }' < /tmp/input
60
4 / 59

Awk is a very simple programming language following the UNIX principles of processing text passed in on standard-in, and writing it to standard out.

The stream is defined by multiple lines, and each lines contains multiple tokens. A field separator defines how lines are broken down into tokens.

In the common case, an awk program consists of a pattern, and of an action. Awk will search the input for a pattern, and for each line that match the pattern it will execute an action.

Let's imagine that we are building an analytic solution with awk. Each event that we care about will be a line, and we'll have fields to represent an id and other data for that event.

If we want to write a report that extracts all the information about the FPS, our awk program pattern will match the fps term, and the action will extract the value (60 because our game is perfect and always run at the ideal frame rate :). Something that awk could do is sum up those values and display an average.

Using PubSub for analytics

One subscriber

Publishers Broker Subscribers
+-------------------------------+
channel | | channel
{ id: fps_id, -----> | SELECT data.fps from `fps_id` +---------> 60
data: { fps: 60 } } | |
channel | |
{ id: mem_id, -----> | |
data: { mem: 100000 } } | |
| |
+-------------------------------+
5 / 59

== 60 seconds

PubSub for analytics

N subscribers

Publishers Broker Subscribers
+-------------------------------+
| |
{ id: fps_id, -----> | SELECT data.fps from `fps_id` +------> 60
data: { fps: 60 } } | |
| |
{ id: mem_id, -----> | SELECT data.mem from `mem_id` +------> 100000
data: { mem: 100000 } } | |
| |
+-------------------------------+
6 / 59

== 15 seconds

Requirements

  • Should:

    • Real Time (fast)
    • Work on Mobile
    • Flexible, extensible
    • Easy to write subscribers
  • Should not:

    • Guaranteed message delivery (no explicit ack or retry support)
    • No history
    • Filling hard drives
7 / 59

Messaging protocol

Publishers

8 / 59

Publisher <-> Broker

Publisher Broker
+-------------------------------+ +---------------------+
| | | |
| { data: { fps: 30 }, | handshake | |
| device: +------------->+ |
| { app_version: '3.25', | | |
| game: 'wiso', | handshake/ok | |
| model: 'iPhone', +<-------------+ Cobra |
| os_language: 'en', | | |
| os_name: 'iOS', | auth | * Python |
| os_version: '10.3.3' }, +------------->+ * Redis PubSub |
| id: 'engine_fps_id', | | |
| session: '4cb5989ba', | auth/ok | |
| timestamp: 1501190629268, +<-------------+ |
| version: 1 } | | |
| | publish | |
| +------------->+ |
+-------------------------------+ +---------------------+
JSON
WebSocket Transport
9 / 59

WebSocket

  1. TCP based
    • Persistent connection
    • Bi-directional (C <-> S)
    • Full duplex
  2. Compatible with HTTP(S)
  3. Inter-operability (browser)
  4. zlib compression
  5. Heart-beat
  6. Large message segmentation
10 / 59

IXWebSocket

  • We open-sourced our C++ implementation

  • On GitHub (star it !)

  • Why ?

    • No boost dependency (for small Install size)
    • Asynchronous and Interruptible/Cancellable
    • Auto-reconnection logic
11 / 59

ws

$ ws --help
ws is a websocket tool
Usage: ws [OPTIONS] SUBCOMMAND
Options:
-h,--help Print this help message and exit
Subcommands:
send Send a file
receive Receive a file
transfer Broadcasting server
connect Connect to a remote server
chat Group chat
echo_server Echo server
broadcast_server Broadcasting server
ping Ping pong
curl HTTP Client
redis_publish Redis publisher
redis_subscribe Redis subscriber
12 / 59

Messaging protocol

Publishers

Broker

13 / 59

Features

PubSub

Also

  1. Authentication
  2. StreamSQL
  3. Websocket for the transport
  4. JSON serialization
14 / 59

Cobra

,,'6''-,.
<====,.;;--.
_`---===. """==__
//""@@-\===\@@@@ ""\\
|( @@@ |===| @@@ ||
\\ @@ |===| @@ //
\\ @@ |===|@@@ //
\\ |===| //
___________\\|===| //_____,----""""""""""-----,_
""""---,__`\===`/ _________,---------,____ `,
|==|| `\ \
|==| | Python 3 + Redis ) |
|==| | _____ ______,--' '
|=| `----""" `"""""""" _,-'
`=\ __,---"""-------------"""''
""""
15 / 59

Why Redis ?

  • Redis Pub/Sub very easy to get going. We could also use Redis Streams.

  • Has asyncio python clients

  • Simpler than the elephant in the room (Kafka)

  • I like the values expressed in the Redis manifesto

16 / 59

Why Python ?

  • This is what I know :)

  • Python 3.5+ has builtin asynchronous IO. async and await keywords makes code very simple and readable

  • Very fast libuv based event loop, uvloop. Claimed to be faster than node.js and comparable to Go.

  • Nice libraries websockets, fast json processing with UJson or rapidjson (Tencent).

  • mypy type checker, memory debugging tool.

17 / 59

Memory optimizations (thanks to tracemalloc module)

A simple asyncio PUBLISH only client (see code in the annex)

262.5 KB new 262.5 KB total / 2994 new 2994 total memory blocks:
...
File "/src/cobra/server/pipelined_publisher.py", line 35
pipe.publish(appChannel, data)
File "/usr/local/lib/python3.7/site-packages/aioredis/commands/transaction.py", line 145
task = asyncio.ensure_future(attr(*args, **kw),
File "/usr/local/lib/python3.7/site-packages/aioredis/commands/pubsub.py", line 14
return self.execute(b'PUBLISH', channel, message)
File "/usr/local/lib/python3.7/site-packages/aioredis/commands/__init__.py", line 50
return self._pool_or_conn.execute(command, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/aioredis/commands/transaction.py", line 102
self._pipeline.append((fut, cmd, args, kw))
18 / 59

Pipelined redis publishing

Sacrifice some latency for throughput.

  1. Incoming published messages are queued.
  2. Then published in a batch mode every 100ms, using a redis pipe, to minimize RTT.
19 / 59

Channel Sharding

Cobra
+-------------+ +---------+
| | | redis-1 |
engine_fps_id +-------+ | +---------+
| | hash() |
| | | +---------+
engine_mem_id +--------------------------->+ redis-2 |
| | | +---------+
| | |
.... | | | +---------+
| +------------------->+ redis-3 |
| | +---------+
+-------------+
.....
20 / 59

To be able to scale horizontally we run multiple redis instances, and we select a redis instance to talk to by computing a hash on the channel name.

SQL like language to process messages

  • Filter data on the broker
Broker Subscriber
+-------------+
| |
+---------->+ | +-------------+
| SELECT ... | | few messages|
High frequency | +---->+ received |
message | (subset | | |
| of data) | +-------------+
+---------->+ |
| |
+-------------+
21 / 59

Example Stream SQL expressions

HTTP Errors

SELECT data.url,data.status FROM \`engine_net_request_id\`
WHERE data.status != 200 AND
data.url LIKE 'why'
{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}
{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}
{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}
{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}
{'data.url': 'https://why_are_we_still_making/this_one_too', 'data.status': 404}

CDN requests

SELECT data.url,data.duration_ms FROM \`engine_net_request_id\`
WHERE data.url LIKE '.png'
{'data.url': 'https://big.cdn.provider/asset_1.png', 'data.duration_ms': 28}}
{'data.url': 'https://big.cdn.provider/asset_2.png', 'data.duration_ms': 1008}}
{'data.url': 'https://other.cdn.provider/asset_2.png', 'data.duration_ms': 512}}
22 / 59

We used this to deprecate sending low res assets to old devices, as very few of them remained in use, and it would speed up our deployment process, and reduce our CDNs costs to do so.

Deployed with OpenShift

  • 60 python nodes (1CPU + 4G) dedicated to publishers
  • 10 python nodes (1CPU + 4G) dedicated to subscribers
  • 5 redis nodes (512M ?)
redis1-1-gbjvt 1/1 Running 2 171d
redis2-1-8jvj7 1/1 Running 0 78d
redis3-1-s9hxj 1/1 Running 0 78d
redis4-1-v96gx 1/1 Running 2 166d
redis5-1-z6262 1/1 Running 0 78d
cobra-44-4xfbw 1/1 Running 0 12d
cobra-44-58vch 1/1 Running 0 12d
cobra-44-5m4ff 1/1 Running 0 12d
cobra-44-6vvk5 1/1 Running 0 12d
...
cobra-subscribers-30-7bpqh 1/1 Running 0 12d
cobra-subscribers-30-fd477 1/1 Running 0 12d
cobra-subscribers-30-h5ntn 1/1 Running 0 12d
cobra-subscribers-30-hlcd6 1/1 Running 0 12d
...
23 / 59

Very interesting to notice the uptime of Redis, which is very easy to operate and super reliable.

Publishers

Broker

Subscribers

24 / 59

Subscribers ecosystem

Broker Subscribers
+----------------------+ +-------------+ http
| | ws | +--------> Sentry
| +----->+ node.js |
| Cobra | | +--------> Grafana
| * Python3 | +-------------+ udp
| * Redis PubSub |
| | +-------------+ mysql
| | ws | +--------> tableau
| +----->+ Python |
| | | +--------> "Terminal"
| | +-------------+
| |
| | +-------------+
| | ws | | websocket
| +----->+ Web Browser |--------> Neo (custom dashboard)
| +--+ | |
+----------------------+ | +-------------+
|
| +----------------+
| | |
+-->+ Java, PHP, C++ |
| |
+----------------+
25 / 59

Flexible and rich ecosystem of subscribers or bots, used by various individuals or teams to solve different problems. Simple WebSocket API + JSON

Good property of a flexible system.

Command line tools

  • Great for one-off custom things, or fast experimentation.
  • Lowest barrier to entry (good for programmers)
  • Does not scale well
+-------------+-------------+----------------+---------------+
| Frame Rate | iPad mini 2 | MacBookPro11,5 | iPhone 8 Plus |
+-------------+-------------+----------------+---------------+
| Samples | 1 | 1 | 1 |
+-------------+-------------+----------------+---------------+
| > 60 fps | 0.0 % | 1.6 % | 0.0 % |
+-------------+-------------+----------------+---------------+
| 60 fps | 69.5 % | 98.1 % | 100.0 % |
+-------------+-------------+----------------+---------------+
| 30 fps | 30.5 % | 0.2 % | 0.0 % |
+-------------+-------------+----------------+---------------+
| 15 fps | 0.0 % | 0.0 % | 0.0 % |
+-------------+-------------+----------------+---------------+
| 10 fps | 0.0 % | 0.0 % | 0.0 % |
+-------------+-------------+----------------+---------------+
| < 8 fps | 0.0 % | 0.0 % | 0.0 % |
+-------------+-------------+----------------+---------------+
| Average FPS | 50.85 | 55.80 | 59.99 |
+-------------+-------------+----------------+---------------+
26 / 59

Neo

  • Real time dashboard
  • Search/Filter with StreamSQL

Alt text

27 / 59

Neo

  • Recording of user sessions
  • Charting system memory, lua memory FPS, annotated by user actions
    • Scenes or documents visited

Alt text

28 / 59

Grafana

  • Charting a signal
  • Trends

Alt text

29 / 59

Grafana

  • zlib compression -> traffic reduction

Alt text

30 / 59

Sentry

  • Great to provide extra source code context, such as a stacktrace.
    • Soft crashes
    • Critical errors

Alt text

31 / 59

Tableau

  • Data exploration

32 / 59

Tableau was used to let us know that our Crash Handler was not working on Android x86, as the crash rate looked bogus on this platform.

Conclusion

  • Python3 is fine for writing networking servers
    • 400,000 concurrent connections
    • 100 billions events per month
  • Websocket is an ubiquitous communication protocol
  • PubSub is very fast and well fitted if fire and forget is enough
  • You can write an end to end pipeline yourself and get insights to answers your own developers questions
33 / 59
34 / 59
35 / 59

Appendix

36 / 59

I will not go through this in the presentation

Messaging protocol

37 / 59

Health check (messages sent / received)

Client cobra command

  1. Handshake + authenticate
  2. Subscribe to a channel
  3. Publish a message
  4. Make sure that message is received
$ cobra health
> {'action': 'auth/handshake', 'body': {'data': {'role': 'health_publisher_subscriber'}, 'method': 'role_secret'}, 'id': 1}
< {"action": "auth/handshake/ok", "body": {"data": {"nonce": "MTI0Njg4NTAyMjYxMzgxMzgzMg==", "version": "0.0.24"}}, "id": 1}
> {'action': 'auth/authenticate', 'body': {'method': 'role_secret', 'credentials': {'hash': '/Ec2gh0AQpAbzq7vQTvuIw=='}}, 'id': 2}
< {"action": "auth/authenticate/ok", "body": {}, "id": 2}
> {'action': 'rtm/subscribe', 'body': {'subscription_id': 'ws_subscribe', 'channel': 'sms_republished_v1_neo', 'fast_forward': True, 'filter': ''}, 'id': 3}
> {"action": "rtm/subscribe/ok", "body": {"position": "1519190184:559034812775", "subscription_id": "ws_subscribe"}, "id": 3}
Received {"action": "rtm/subscription/data", "body": {"subscription_id": "ws_subscribe", "messages": [{"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "8dab3f6ecb2742669c8a838366be3cab"}}]}], "position": "1519190184:568807785938"}}
System is healthy
38 / 59

Server health check traffic (messages sent / received)

$ cobra run --verbose
runServer {'host': '127.0.0.1', 'port': '8765', 'redisUrls': 'redis://localhost;redis://localhost', 'redisPassword': None, 'verbose': True, 'debugMemory': False}
node caad17054b494236aae5964743b22f7b
appkey EEEEEEEEEEEEEEEEFFFFFFFFFFFFFFFF
(open) connections 1
< {"action":"auth\/handshake","body":{"data":{"role":"health_publisher_subscriber"},"method":"role_secret"},"id":1}
> {'action': 'auth/handshake/ok', 'body': {'data': {'nonce': 'MTI0Njg4NTAyMjYxMzgxMzgzMg==', 'version': '0.0.24'}}, 'id': 1}
< {"action":"auth\/authenticate","body":{"method":"role_secret","credentials":{"hash":"\/Ec2gh0AQpAbzq7vQTvuIw=="}},"id":2}
server hash /Ec2gh0AQpAbzq7vQTvuIw==
client hash /Ec2gh0AQpAbzq7vQTvuIw==
> {'action': 'auth/authenticate/ok', 'body': {}, 'id': 2}
< {"action":"rtm\/subscribe","body":{"subscription_id":"ws_subscribe","channel":"sms_republished_v1_neo","fast_forward":true,"filter":""},"id":3}
> {'action': 'rtm/subscribe/ok', 'body': {'position': '1519190184:559034812775', 'subscription_id': 'ws_subscribe'}, 'id': 3}
< {"action": "rtm/publish", "body": {"channel": "sms_republished_v1_neo", "message": {"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "8dab3f6ecb2742669c8a838366be3cab"}}]}}}
> {"action": "rtm/subscription/data", "body": {"subscription_id": "ws_subscribe", "messages": [{"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "95a18ef313bb45de83f6ddfd59a964a2"}}]}], "position": "1519190184:568807785938"}}
#messages 1 msg/s 1
< {"action": "rtm/unsubscribe", "body": {"subscription_id": "sms_republished_v1_neo"}}
> {"action": "rtm/unsubscribe/ok", "id": 3}
subscribe redis connection closed
(close) connections 0
39 / 59

Handshake action (C -> S)

{
"action": "auth/handshake",
"body": {
"data": {
"role": "health_publisher_subscriber"
},
"method": "role_secret"
},
"id": 1
}
40 / 59

Each pdu (protocol data unit) has an action and a body. The handshake method specify a role name.

Handshake action (S -> C)

{
"action": "auth/handshake/ok",
"body": {
"data": {
"nonce": "MTI0Njg4NTAyMjYxMzgxMzgzMg==",
"version": "0.0.24"
}
},
"id": 1
}
41 / 59

The server respond with a method whose action is slash ok, or slash error if an error happens (invalid syntax etc...). Here the client receive a nonce from the server which it will use for authentication.

Authenticate action (C -> S)

{
"action": "auth/authenticate",
"body": {
"method": "role_secret",
"credentials": {
"hash": "/Ec2gh0AQpAbzq7vQTvuIw=="
}
},
"id": 2
}
42 / 59

A hash is computed using the nonce, and passed through the authenticate pdu (or message)

Authenticate action (S -> C)

{
"action": "auth/authenticate/ok",
"body": {},
"id": 2
}
  1. Lookup secret for role
  2. Compute hash: serverHash = computeHash(secret, nonce)
  3. Compare with the one the client supplied
server hash /Ec2gh0AQpAbzq7vQTvuIw==
client hash /Ec2gh0AQpAbzq7vQTvuIw==
43 / 59

When the server receive the client auth message, it does ...

Publish action (C -> S)

{
"action": "rtm/publish",
"body": {
"channel": "sms_republished_v1_neo",
"message": {
"data": {
"info": "test_from_minix"
},
"device": {
"foo": {
"foo": true,
"stuff": 3.0
}
},
"id": "engine_test_id",
"session": "4dacbafefc214578aa3f8c1d2975ec92",
"timestamp": 1536270895637,
"version": 1
}
}
}
44 / 59

The client can publish messages.

Subscribe action (C -> S)

  • Filter (StreamSQL) optional
{
"action": "rtm/subscribe",
"body": {
"channel": "sms_republished_v1_neo",
"subscription_id": "ws_subscribe" /* optional */
},
"id": 3
}
45 / 59

Subscription data (S -> C)

{
"action": "rtm/subscription/data",
"body": {
"subscription_id": "ws_subscribe",
"messages": [
{
"position": "1519190184:547873030411",
"subscription_id": "cobra_republisher",
"messages": [
{
"device": {
"game": "test",
"android_id": "95a18ef313bb45de83f6ddfd59a964a2"
}
}
]
}
],
"position": "1519190184:568807785938"
}
}
46 / 59

Unsubscription data (C -> S)

{
"action": "rtm/unsubscribe",
"body": {
"subscription_id": "sms_republished_v1_neo"
}
}
47 / 59

Unsubscription data (S -> C)

{
"action": "rtm/unsubscribe/ok",
"id": 3
}
48 / 59

Command line tools

(venv) cobra$ cobra ws_subscribe --url 'ws://foo.bar.com/v2?appkey=ZZZZZZZZZZZZZZZZ' \
--stream_sql "select data.fps from \`engine_fps_id\`" --verbose
> {'action': 'auth/handshake', 'id': 0, 'body': {'data': {'role': '_sub'}, 'method': 'role_secret'}}
< {"action": "auth/handshake/ok", "id": 0, "body": {"data": {"nonce": "MTU1NTM4NDM5Mjg3Nzk0NjIyNzA=", "version": "0.0.177", "connection_id": "794e630d", "node": "cobra-subscribers-30-7bpqh"}}}
> {'action': 'auth/authenticate', 'id': 1, 'body': {'method': 'role_secret', 'credentials': {'hash': 'xeBLmfJjUjGmrfpsH9/ksg=='}}}
< {"action": "auth/authenticate/ok", "id": 1, "body": {}}
> {'action': 'rtm/subscribe', 'body': {'subscription_id': 'engine_payload_id', 'channel': 'engine_payload_id', 'fast_forward': True, 'filter': 'select data.fps from `engine_fps_id`'}, 'id': 3}
< {"action": "rtm/subscribe/ok", "id": 3, "body": {"position": "1519190184:559034812775", "subscription_id": "engine_payload_id", "redis_node": "redis5"}}
{'data.fps': 25.0}
#messages 1 msg/s 1
{'data.fps': 19.0}
{'data.fps': 19.0}
{'data.fps': 24.0}
{'data.fps': 28.0}
{'data.fps': 27.0}
{'data.fps': 23.0}
{'data.fps': 26.0}
{'data.fps': 2.0}
{'data.fps': 29.0}
{'data.fps': 7.0}
{'data.fps': 26.0}
49 / 59

Message framing with WebSocket

  • Multiple opcodes (PING, PONG, CONTINUATION, TEXT, BINARY)
http://tools.ietf.org/html/rfc6455#section-5.2 Base Framing Protocol
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
50 / 59

Complexity of the code base

< 2000 lines of code for the server

$ wc -l src/cobra/server/*.py src/cobra/common/*.py src/cobra/runner/commands/run.py
0 src/cobra/server/__init__.py
234 src/cobra/server/app.py
64 src/cobra/server/apps_config.py
17 src/cobra/server/connection_state.py
56 src/cobra/server/pipelined_publisher.py
41 src/cobra/server/pipelined_publishers.py
477 src/cobra/server/protocol.py
42 src/cobra/server/redis_connections.py
92 src/cobra/server/redis_publisher.py
129 src/cobra/server/stats.py
207 src/cobra/server/stream_sql.py
74 src/cobra/server/subscribe.py
0 src/cobra/common/__init__.py
8 src/cobra/common/algorithm.py
13 src/cobra/common/auth_hash.py
4 src/cobra/common/cobra_types.py
27 src/cobra/common/memoize.py
113 src/cobra/common/memory_debugger.py
23 src/cobra/common/memory_usage.py
24 src/cobra/common/task_cleanup.py
22 src/cobra/common/throttle.py
16 src/cobra/common/version.py
75 src/cobra/runner/commands/run.py
1758 total
51 / 59

"Plugins" / "Republishing"

  • Help with high-throughput channels
  • Can republish message from a high-traffic channel to lower traffic ones, more specialized
    • We use it for our http metric id.
52 / 59

A simple redis client (1/4)

import asyncio
class RedisPublisher(object):
'''
See https://redis.io/topics/mass-insert
'''
def __init__(self, host, port, password, verbose=False):
self.host = host
self.port = port
self.password = password
self.verbose = verbose
self.reset()
self.lock = asyncio.Lock()
def reset(self):
self.publishCount = 0
def writeString(self, data: bytes):
self.writer.write(b'$%d\r\n' % len(data))
self.writer.write(data)
self.writer.write(b'\r\n')
53 / 59

A simple redis client (2/4)

async def connect(self):
async with self.lock:
print(f'Opening connection to redis at {self.host}:{self.port}')
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port)
if self.password:
self.writer.write(b'*2\r\n')
self.writeString(b'AUTH')
password = self.password
if not isinstance(password, bytes):
password = password.encode('utf8')
self.writeString(password)
await self.execute()
54 / 59

A simple redis client (3/4)

def publish(self, channel, msg):
self.publishCount += 1
if not isinstance(channel, bytes):
channel = channel.encode('utf8')
if not isinstance(msg, bytes):
msg = msg.encode('utf8')
self.writer.write(b'*3\r\n')
self.writeString(b'PUBLISH')
self.writeString(channel)
self.writeString(msg)
55 / 59

A simple redis client (4/4)

async def execute(self):
async with self.lock:
await self.writer.drain()
results = []
# read until we get something
for i in range(self.publishCount):
line = await self.reader.readline()
results.append(line)
# FIXME: proper error handling !!!
if self.verbose:
print(f'Received: {line.decode()!r}')
if 'NOAUTH Authentication required' in line.decode():
raise ValueError('Authentication failed')
self.reset()
return results
56 / 59

Pipelined redis publishing

class PipelinedPublisher():
def __init__(self, redis):
self.redis = redis
self.queue = asyncio.Queue()
async def run(self):
while True:
await asyncio.sleep(0.1)
N = self.queue.qsize()
if N == 0: continue
pipe = self.redis.pipeline()
for i in range(N):
item = await self.queue.get()
appkey, channel, data = item
appChannel = '{}::{}'.format(appkey, channel)
pipe.publish(appChannel, data)
self.queue.task_done()
await pipe.execute()
def enqueue(self, job):
self.queue.put_nowait(job)
57 / 59

Memory optimizations

  • Memory usage problematic early on.
  • We ended up writing a very simple redis client, which is just doing batch publish with a pipeline.
  • And we threw more hardware at it, CPU + memory (easy with 'the cloud' + containers).
cobra-44-7bmcx 0/1 OOMKilled 17 23h
58 / 59

Architecture

  1. Publishers
  2. Message broker
  3. Subscribers

(it's like a chat system)

Publishers Message Broker Subscribers
+-----------------------+
| |
+-----------> | Cobra | +-------------->
| |
+-----------> | * Python3 | +-------------->
| * Redis PubSub |
+-----------> | | +-------------->
| |
+-----------------------+
59 / 59

The pubsub architecture has 3 components, publishers, broker and subscribers. In our system a message leaves our games

awk '/^==/ { sum += $2} END { print sum/60 }' slides.html

Hello world

  • Benjamin Sergeant (bsergean@gmail.com)

Alt text

2 / 59

== 15 seconds

Paused

Help

Keyboard shortcuts

, , Pg Up, k Go to previous slide
, , Pg Dn, Space, j Go to next slide
Home Go to first slide
End Go to last slide
Number + Return Go to specific slide
b / m / f Toggle blackout / mirrored / fullscreen mode
c Clone slideshow
p Toggle presenter mode
t Restart the presentation timer
?, h Toggle this help
Esc Back to slideshow