name: first --- class: center, middle # Hello world - Benjamin Sergeant (bsergean@gmail.com) ![Alt text](mz_engine.png) ??? Been working on Graphics a lot, at MZ I've worked on the Game Engine, on fast asset delivery, the game chat and monitoring a lot. ??? == 15 seconds --- # Agenda 1. Architecture 2. Clients metrics publishing 3. Message broker Cobra 4. Bots and data sinks --- # Using AWK (stream programming) for analytics ``` Stdin AWK Stdout +---------------+ | | | $1 == "fps" | fps:60 +--->+ { print $2 } +----> 60 mem:100000 | | ... | | +---------------+ ``` ```shell $ cat /tmp/input memory:100000 fps:60 $ $ $ awk -F: '$1 == "fps" { print $2 }' < /tmp/input 60 ``` ??? Awk is a very simple programming language following the UNIX principles of processing text passed in on standard-in, and writing it to standard out. The stream is defined by multiple lines, and each lines contains multiple tokens. A field separator defines how lines are broken down into tokens. In the common case, an awk program consists of a pattern, and of an action. Awk will search the input for a pattern, and for each line that match the pattern it will execute an action. Let's imagine that we are building an analytic solution with awk. Each event that we care about will be a line, and we'll have fields to represent an id and other data for that event. If we want to write a report that extracts all the information about the FPS, our awk program pattern will match the fps term, and the action will extract the value (60 because our game is perfect and always run at the ideal frame rate :). Something that awk could do is sum up those values and display an average. --- # Using PubSub for analytics ## One subscriber ``` Publishers Broker Subscribers +-------------------------------+ channel | | channel { id: fps_id, -----> | SELECT data.fps from `fps_id` +---------> 60 data: { fps: 60 } } | | channel | | { id: mem_id, -----> | | data: { mem: 100000 } } | | | | +-------------------------------+ ``` ??? == 60 seconds --- # PubSub for analytics ## N subscribers ``` Publishers Broker Subscribers +-------------------------------+ | | { id: fps_id, -----> | SELECT data.fps from `fps_id` +------> 60 data: { fps: 60 } } | | | | { id: mem_id, -----> | SELECT data.mem from `mem_id` +------> 100000 data: { mem: 100000 } } | | | | +-------------------------------+ ``` ??? == 15 seconds --- # Requirements - Should: - Real Time (fast) - Work on Mobile - Flexible, extensible - Easy to write subscribers - Should *not*: - Guaranteed message delivery (no explicit ack or retry support) - No history - Filling hard drives ??? --- .left-column[ ## Messaging protocol ## Publishers ] --- # Publisher <-> Broker ``` Publisher Broker +-------------------------------+ +---------------------+ | | | | | { data: { fps: 30 }, | handshake | | | device: +------------->+ | | { app_version: '3.25', | | | | game: 'wiso', | handshake/ok | | | model: 'iPhone', +<-------------+ Cobra | | os_language: 'en', | | | | os_name: 'iOS', | auth | * Python | | os_version: '10.3.3' }, +------------->+ * Redis PubSub | | id: 'engine_fps_id', | | | | session: '4cb5989ba', | auth/ok | | | timestamp: 1501190629268, +<-------------+ | | version: 1 } | | | | | publish | | | +------------->+ | +-------------------------------+ +---------------------+ JSON WebSocket Transport ``` --- # WebSocket 1. TCP based - Persistent connection - Bi-directional (C <-> S) - Full duplex 2. Compatible with HTTP(S) 3. Inter-operability (browser) 4. zlib compression 5. Heart-beat 6. Large message segmentation --- # IXWebSocket - We open-sourced our C++ implementation - On [GitHub](https://github.com/machinezone/IXWebSocket) (star it !) - Why ? - No boost dependency (for small Install size) - Asynchronous and Interruptible/Cancellable - Auto-reconnection logic --- # ws ``` $ ws --help ws is a websocket tool Usage: ws [OPTIONS] SUBCOMMAND Options: -h,--help Print this help message and exit Subcommands: send Send a file receive Receive a file transfer Broadcasting server connect Connect to a remote server chat Group chat echo_server Echo server broadcast_server Broadcasting server ping Ping pong curl HTTP Client redis_publish Redis publisher redis_subscribe Redis subscriber ``` --- .left-column[ ## Messaging protocol ## Publishers ## Broker ] --- # Features ## PubSub ## Also 1. Authentication 2. StreamSQL 3. Websocket for the transport 4. JSON serialization --- # Cobra ``` ,,'6''-,. <====,.;;--. _`---===. """==__ //""@@-\===\@@@@ ""\\ |( @@@ |===| @@@ || \\ @@ |===| @@ // \\ @@ |===|@@@ // \\ |===| // ___________\\|===| //_____,----""""""""""-----,_ """"---,__`\===`/ _________,---------,____ `, |==|| `\ \ |==| | Python 3 + Redis ) | |==| | _____ ______,--' ' |=| `----""" `"""""""" _,-' `=\ __,---"""-------------"""'' """" ``` --- # Why Redis ? - Redis Pub/Sub very easy to get going. We could also use Redis Streams. - Has asyncio python clients - Simpler than the elephant in the room (Kafka) - I like the values expressed in the [Redis manifesto](http://antirez.com/post/redis-manifesto.html) --- # Why Python ? - This is what I know :) - Python 3.5+ has builtin asynchronous IO. async and await keywords makes code very simple and readable - Very fast libuv based event loop, [uvloop](https://uvloop.readthedocs.io/). Claimed to be faster than node.js and comparable to Go. - Nice libraries [websockets](http://mypy-lang.org/), fast json processing with UJson or rapidjson (Tencent). - [mypy](http://mypy-lang.org/) type checker, memory debugging tool. --- # Memory optimizations (thanks to tracemalloc module) ## A simple asyncio PUBLISH only client (see code in the annex) ``` 262.5 KB new 262.5 KB total / 2994 new 2994 total memory blocks: ... File "/src/cobra/server/pipelined_publisher.py", line 35 pipe.publish(appChannel, data) File "/usr/local/lib/python3.7/site-packages/aioredis/commands/transaction.py", line 145 task = asyncio.ensure_future(attr(*args, **kw), File "/usr/local/lib/python3.7/site-packages/aioredis/commands/pubsub.py", line 14 return self.execute(b'PUBLISH', channel, message) File "/usr/local/lib/python3.7/site-packages/aioredis/commands/__init__.py", line 50 return self._pool_or_conn.execute(command, *args, **kwargs) File "/usr/local/lib/python3.7/site-packages/aioredis/commands/transaction.py", line 102 self._pipeline.append((fut, cmd, args, kw)) ``` --- # Pipelined redis publishing ## Sacrifice some latency for throughput. 1. Incoming published messages are queued. 2. Then published in a batch mode every 100ms, using a redis pipe, to minimize RTT. --- # Channel Sharding - Split traffic to multiple redis nodes - Using [Consistent hashing](https://techspot.zzzeek.org/2012/07/07/the-absolutely-simplest-consistent-hashing-example/) ``` Cobra +-------------+ +---------+ | | | redis-1 | engine_fps_id +-------+ | +---------+ | | hash() | | | | +---------+ engine_mem_id +--------------------------->+ redis-2 | | | | +---------+ | | | .... | | | +---------+ | +------------------->+ redis-3 | | | +---------+ +-------------+ ..... ``` ??? To be able to scale horizontally we run multiple redis instances, and we select a redis instance to talk to by computing a hash on the channel name. --- # SQL like language to process messages - Filter data on the broker ``` Broker Subscriber +-------------+ | | +---------->+ | +-------------+ | SELECT ... | | few messages| High frequency | +---->+ received | message | (subset | | | | of data) | +-------------+ +---------->+ | | | +-------------+ ``` --- # Example Stream SQL expressions ## HTTP Errors ``` SELECT data.url,data.status FROM \`engine_net_request_id\` WHERE data.status != 200 AND data.url LIKE 'why' {'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404} {'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404} {'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404} {'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404} {'data.url': 'https://why_are_we_still_making/this_one_too', 'data.status': 404} ``` ## CDN requests ``` SELECT data.url,data.duration_ms FROM \`engine_net_request_id\` WHERE data.url LIKE '.png' {'data.url': 'https://big.cdn.provider/asset_1.png', 'data.duration_ms': 28}} {'data.url': 'https://big.cdn.provider/asset_2.png', 'data.duration_ms': 1008}} {'data.url': 'https://other.cdn.provider/asset_2.png', 'data.duration_ms': 512}} ``` ??? We used this to deprecate sending low res assets to old devices, as very few of them remained in use, and it would speed up our deployment process, and reduce our CDNs costs to do so. --- # Deployed with OpenShift - 60 python nodes (1CPU + 4G) dedicated to publishers - 10 python nodes (1CPU + 4G) dedicated to subscribers - 5 redis nodes (512M ?) ``` redis1-1-gbjvt 1/1 Running 2 171d redis2-1-8jvj7 1/1 Running 0 78d redis3-1-s9hxj 1/1 Running 0 78d redis4-1-v96gx 1/1 Running 2 166d redis5-1-z6262 1/1 Running 0 78d ``` ``` cobra-44-4xfbw 1/1 Running 0 12d cobra-44-58vch 1/1 Running 0 12d cobra-44-5m4ff 1/1 Running 0 12d cobra-44-6vvk5 1/1 Running 0 12d ... cobra-subscribers-30-7bpqh 1/1 Running 0 12d cobra-subscribers-30-fd477 1/1 Running 0 12d cobra-subscribers-30-h5ntn 1/1 Running 0 12d cobra-subscribers-30-hlcd6 1/1 Running 0 12d ... ``` ??? Very interesting to notice the uptime of Redis, which is very easy to operate and super reliable. --- .left-column[ ## Publishers ## Broker ## Subscribers ] --- # Subscribers ecosystem ``` Broker Subscribers +----------------------+ +-------------+ http | | ws | +--------> Sentry | +----->+ node.js | | Cobra | | +--------> Grafana | * Python3 | +-------------+ udp | * Redis PubSub | | | +-------------+ mysql | | ws | +--------> tableau | +----->+ Python | | | | +--------> "Terminal" | | +-------------+ | | | | +-------------+ | | ws | | websocket | +----->+ Web Browser |--------> Neo (custom dashboard) | +--+ | | +----------------------+ | +-------------+ | | +----------------+ | | | +-->+ Java, PHP, C++ | | | +----------------+ ``` ??? Flexible and rich ecosystem of subscribers or bots, used by various individuals or teams to solve different problems. Simple WebSocket API + JSON Good property of a flexible system. --- # Command line tools - Great for one-off custom things, or fast experimentation. - Lowest barrier to entry (good for programmers) - Does not scale well ``` +-------------+-------------+----------------+---------------+ | Frame Rate | iPad mini 2 | MacBookPro11,5 | iPhone 8 Plus | +-------------+-------------+----------------+---------------+ | Samples | 1 | 1 | 1 | +-------------+-------------+----------------+---------------+ | > 60 fps | 0.0 % | 1.6 % | 0.0 % | +-------------+-------------+----------------+---------------+ | 60 fps | 69.5 % | 98.1 % | 100.0 % | +-------------+-------------+----------------+---------------+ | 30 fps | 30.5 % | 0.2 % | 0.0 % | +-------------+-------------+----------------+---------------+ | 15 fps | 0.0 % | 0.0 % | 0.0 % | +-------------+-------------+----------------+---------------+ | 10 fps | 0.0 % | 0.0 % | 0.0 % | +-------------+-------------+----------------+---------------+ | < 8 fps | 0.0 % | 0.0 % | 0.0 % | +-------------+-------------+----------------+---------------+ | Average FPS | 50.85 | 55.80 | 59.99 | +-------------+-------------+----------------+---------------+ ``` --- # Neo - Real time dashboard - Search/Filter with StreamSQL ![Alt text](neo_map.png) --- # Neo - Recording of user sessions - Charting system memory, lua memory FPS, annotated by user actions - Scenes or documents visited ![Alt text](neo_session.png) --- # Grafana - Charting a signal - Trends ![Alt text](grafana_critical_logs.png) --- # Grafana - zlib compression -> traffic reduction ![Alt text](grafana_zlib.png) --- # Sentry - Great to provide extra source code context, such as a stacktrace. - Soft crashes - Critical errors ![Alt text](sentry.png) --- # Tableau - Data exploration ![](tableau.png) ??? Tableau was used to let us know that our Crash Handler was not working on Android x86, as the crash rate looked bogus on this platform. --- # Conclusion - Python3 is fine for writing networking servers - 400,000 concurrent connections - 100 billions events per month - Websocket is an ubiquitous communication protocol - PubSub is very fast and well fitted if fire and forget is enough - You can write an end to end pipeline yourself and get insights to answers your own developers questions --- name: tenyears --- name: last --- class: center, middle # Appendix ??? I will not go through this in the presentation --- class: center, middle # Messaging protocol --- # Health check (messages sent / received) Client cobra command 1. Handshake + authenticate 2. Subscribe to a channel 3. Publish a message 4. Make sure that message is received ``` $ cobra health > {'action': 'auth/handshake', 'body': {'data': {'role': 'health_publisher_subscriber'}, 'method': 'role_secret'}, 'id': 1} < {"action": "auth/handshake/ok", "body": {"data": {"nonce": "MTI0Njg4NTAyMjYxMzgxMzgzMg==", "version": "0.0.24"}}, "id": 1} > {'action': 'auth/authenticate', 'body': {'method': 'role_secret', 'credentials': {'hash': '/Ec2gh0AQpAbzq7vQTvuIw=='}}, 'id': 2} < {"action": "auth/authenticate/ok", "body": {}, "id": 2} > {'action': 'rtm/subscribe', 'body': {'subscription_id': 'ws_subscribe', 'channel': 'sms_republished_v1_neo', 'fast_forward': True, 'filter': ''}, 'id': 3} > {"action": "rtm/subscribe/ok", "body": {"position": "1519190184:559034812775", "subscription_id": "ws_subscribe"}, "id": 3} Received {"action": "rtm/subscription/data", "body": {"subscription_id": "ws_subscribe", "messages": [{"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "8dab3f6ecb2742669c8a838366be3cab"}}]}], "position": "1519190184:568807785938"}} System is healthy ``` --- # Server health check traffic (messages sent / received) ``` $ cobra run --verbose runServer {'host': '127.0.0.1', 'port': '8765', 'redisUrls': 'redis://localhost;redis://localhost', 'redisPassword': None, 'verbose': True, 'debugMemory': False} node caad17054b494236aae5964743b22f7b appkey EEEEEEEEEEEEEEEEFFFFFFFFFFFFFFFF (open) connections 1 < {"action":"auth\/handshake","body":{"data":{"role":"health_publisher_subscriber"},"method":"role_secret"},"id":1} > {'action': 'auth/handshake/ok', 'body': {'data': {'nonce': 'MTI0Njg4NTAyMjYxMzgxMzgzMg==', 'version': '0.0.24'}}, 'id': 1} < {"action":"auth\/authenticate","body":{"method":"role_secret","credentials":{"hash":"\/Ec2gh0AQpAbzq7vQTvuIw=="}},"id":2} server hash /Ec2gh0AQpAbzq7vQTvuIw== client hash /Ec2gh0AQpAbzq7vQTvuIw== > {'action': 'auth/authenticate/ok', 'body': {}, 'id': 2} < {"action":"rtm\/subscribe","body":{"subscription_id":"ws_subscribe","channel":"sms_republished_v1_neo","fast_forward":true,"filter":""},"id":3} > {'action': 'rtm/subscribe/ok', 'body': {'position': '1519190184:559034812775', 'subscription_id': 'ws_subscribe'}, 'id': 3} < {"action": "rtm/publish", "body": {"channel": "sms_republished_v1_neo", "message": {"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "8dab3f6ecb2742669c8a838366be3cab"}}]}}} > {"action": "rtm/subscription/data", "body": {"subscription_id": "ws_subscribe", "messages": [{"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "95a18ef313bb45de83f6ddfd59a964a2"}}]}], "position": "1519190184:568807785938"}} #messages 1 msg/s 1 < {"action": "rtm/unsubscribe", "body": {"subscription_id": "sms_republished_v1_neo"}} > {"action": "rtm/unsubscribe/ok", "id": 3} subscribe redis connection closed (close) connections 0 ``` --- # Handshake action (C -> S) ```json { "action": "auth/handshake", "body": { "data": { "role": "health_publisher_subscriber" }, "method": "role_secret" }, "id": 1 } ``` ??? Each pdu (protocol data unit) has an action and a body. The handshake method specify a role name. --- # Handshake action (S -> C) ```json { "action": "auth/handshake/ok", "body": { "data": { "nonce": "MTI0Njg4NTAyMjYxMzgxMzgzMg==", "version": "0.0.24" } }, "id": 1 } ``` ??? The server respond with a method whose action is
slash ok, or slash error if an error happens (invalid syntax etc...). Here the client receive a nonce from the server which it will use for authentication. --- # Authenticate action (C -> S) ```json { "action": "auth/authenticate", "body": { "method": "role_secret", "credentials": { "hash": "/Ec2gh0AQpAbzq7vQTvuIw==" } }, "id": 2 } ``` ??? A hash is computed using the nonce, and passed through the authenticate pdu (or message) --- # Authenticate action (S -> C) ```json { "action": "auth/authenticate/ok", "body": {}, "id": 2 } ``` 1. Lookup secret for role 2. Compute hash: `serverHash = computeHash(secret, nonce)` 3. Compare with the one the client supplied ``` server hash /Ec2gh0AQpAbzq7vQTvuIw== client hash /Ec2gh0AQpAbzq7vQTvuIw== ``` ??? When the server receive the client auth message, it does ... --- # Publish action (C -> S) ```json { "action": "rtm/publish", "body": { "channel": "sms_republished_v1_neo", "message": { "data": { "info": "test_from_minix" }, "device": { "foo": { "foo": true, "stuff": 3.0 } }, "id": "engine_test_id", "session": "4dacbafefc214578aa3f8c1d2975ec92", "timestamp": 1536270895637, "version": 1 } } } ``` ??? The client can publish messages. --- # Subscribe action (C -> S) - Filter (StreamSQL) optional ```json { "action": "rtm/subscribe", "body": { "channel": "sms_republished_v1_neo", "subscription_id": "ws_subscribe" /* optional */ }, "id": 3 } ``` --- # Subscription data (S -> C) ```json { "action": "rtm/subscription/data", "body": { "subscription_id": "ws_subscribe", "messages": [ { "position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [ { "device": { "game": "test", "android_id": "95a18ef313bb45de83f6ddfd59a964a2" } } ] } ], "position": "1519190184:568807785938" } } ``` --- # Unsubscription data (C -> S) ```json { "action": "rtm/unsubscribe", "body": { "subscription_id": "sms_republished_v1_neo" } } ``` --- # Unsubscription data (S -> C) ```json { "action": "rtm/unsubscribe/ok", "id": 3 } ``` --- # Command line tools ``` (venv) cobra$ cobra ws_subscribe --url 'ws://foo.bar.com/v2?appkey=ZZZZZZZZZZZZZZZZ' \ --stream_sql "select data.fps from \`engine_fps_id\`" --verbose > {'action': 'auth/handshake', 'id': 0, 'body': {'data': {'role': '_sub'}, 'method': 'role_secret'}} < {"action": "auth/handshake/ok", "id": 0, "body": {"data": {"nonce": "MTU1NTM4NDM5Mjg3Nzk0NjIyNzA=", "version": "0.0.177", "connection_id": "794e630d", "node": "cobra-subscribers-30-7bpqh"}}} > {'action': 'auth/authenticate', 'id': 1, 'body': {'method': 'role_secret', 'credentials': {'hash': 'xeBLmfJjUjGmrfpsH9/ksg=='}}} < {"action": "auth/authenticate/ok", "id": 1, "body": {}} > {'action': 'rtm/subscribe', 'body': {'subscription_id': 'engine_payload_id', 'channel': 'engine_payload_id', 'fast_forward': True, 'filter': 'select data.fps from `engine_fps_id`'}, 'id': 3} < {"action": "rtm/subscribe/ok", "id": 3, "body": {"position": "1519190184:559034812775", "subscription_id": "engine_payload_id", "redis_node": "redis5"}} {'data.fps': 25.0} #messages 1 msg/s 1 {'data.fps': 19.0} {'data.fps': 19.0} {'data.fps': 24.0} {'data.fps': 28.0} {'data.fps': 27.0} {'data.fps': 23.0} {'data.fps': 26.0} {'data.fps': 2.0} {'data.fps': 29.0} {'data.fps': 7.0} {'data.fps': 26.0} ``` --- # Message framing with WebSocket - Multiple opcodes (PING, PONG, CONTINUATION, TEXT, BINARY) ``` http://tools.ietf.org/html/rfc6455#section-5.2 Base Framing Protocol 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-------+-+-------------+-------------------------------+ |F|R|R|R| opcode|M| Payload len | Extended payload length | |I|S|S|S| (4) |A| (7) | (16/64) | |N|V|V|V| |S| | (if payload len==126/127) | | |1|2|3| |K| | | +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | Extended payload length continued, if payload len == 127 | + - - - - - - - - - - - - - - - +-------------------------------+ | |Masking-key, if MASK set to 1 | +-------------------------------+-------------------------------+ | Masking-key (continued) | Payload Data | +-------------------------------- - - - - - - - - - - - - - - - + : Payload Data continued ... : + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | Payload Data continued ... | +---------------------------------------------------------------+ ``` --- # Complexity of the code base < 2000 lines of code for the server ``` $ wc -l src/cobra/server/*.py src/cobra/common/*.py src/cobra/runner/commands/run.py 0 src/cobra/server/__init__.py 234 src/cobra/server/app.py 64 src/cobra/server/apps_config.py 17 src/cobra/server/connection_state.py 56 src/cobra/server/pipelined_publisher.py 41 src/cobra/server/pipelined_publishers.py 477 src/cobra/server/protocol.py 42 src/cobra/server/redis_connections.py 92 src/cobra/server/redis_publisher.py 129 src/cobra/server/stats.py 207 src/cobra/server/stream_sql.py 74 src/cobra/server/subscribe.py 0 src/cobra/common/__init__.py 8 src/cobra/common/algorithm.py 13 src/cobra/common/auth_hash.py 4 src/cobra/common/cobra_types.py 27 src/cobra/common/memoize.py 113 src/cobra/common/memory_debugger.py 23 src/cobra/common/memory_usage.py 24 src/cobra/common/task_cleanup.py 22 src/cobra/common/throttle.py 16 src/cobra/common/version.py 75 src/cobra/runner/commands/run.py 1758 total ``` --- # "Plugins" / "Republishing" - Help with high-throughput channels - Can republish message from a high-traffic channel to lower traffic ones, more specialized - We use it for our http metric id. --- # A simple redis client (1/4) ``` import asyncio class RedisPublisher(object): ''' See https://redis.io/topics/mass-insert ''' def __init__(self, host, port, password, verbose=False): self.host = host self.port = port self.password = password self.verbose = verbose self.reset() self.lock = asyncio.Lock() def reset(self): self.publishCount = 0 def writeString(self, data: bytes): self.writer.write(b'$%d\r\n' % len(data)) self.writer.write(data) self.writer.write(b'\r\n') ``` --- # A simple redis client (2/4) ``` async def connect(self): async with self.lock: print(f'Opening connection to redis at {self.host}:{self.port}') self.reader, self.writer = await asyncio.open_connection( self.host, self.port) if self.password: self.writer.write(b'*2\r\n') self.writeString(b'AUTH') password = self.password if not isinstance(password, bytes): password = password.encode('utf8') self.writeString(password) await self.execute() ``` --- # A simple redis client (3/4) ``` def publish(self, channel, msg): self.publishCount += 1 if not isinstance(channel, bytes): channel = channel.encode('utf8') if not isinstance(msg, bytes): msg = msg.encode('utf8') self.writer.write(b'*3\r\n') self.writeString(b'PUBLISH') self.writeString(channel) self.writeString(msg) ``` --- # A simple redis client (4/4) ``` async def execute(self): async with self.lock: await self.writer.drain() results = [] # read until we get something for i in range(self.publishCount): line = await self.reader.readline() results.append(line) # FIXME: proper error handling !!! if self.verbose: print(f'Received: {line.decode()!r}') if 'NOAUTH Authentication required' in line.decode(): raise ValueError('Authentication failed') self.reset() return results ``` --- # Pipelined redis publishing ``` class PipelinedPublisher(): def __init__(self, redis): self.redis = redis self.queue = asyncio.Queue() async def run(self): while True: await asyncio.sleep(0.1) N = self.queue.qsize() if N == 0: continue pipe = self.redis.pipeline() for i in range(N): item = await self.queue.get() appkey, channel, data = item appChannel = '{}::{}'.format(appkey, channel) pipe.publish(appChannel, data) self.queue.task_done() await pipe.execute() def enqueue(self, job): self.queue.put_nowait(job) ``` --- # Memory optimizations - Memory usage problematic early on. - We ended up writing a very simple redis client, which is just doing batch publish with a pipeline. - And we threw more hardware at it, CPU + memory (easy with 'the cloud' + containers). ``` cobra-44-7bmcx 0/1 OOMKilled 17 23h ``` --- # Architecture 1. Publishers 2. Message broker 3. Subscribers (it's like a chat system) ``` Publishers Message Broker Subscribers +-----------------------+ | | +-----------> | Cobra | +--------------> | | +-----------> | * Python3 | +--------------> | * Redis PubSub | +-----------> | | +--------------> | | +-----------------------+ ``` ??? The pubsub architecture has 3 components, publishers, broker and subscribers. In our system a message leaves our games awk '/^==/ { sum += $2} END { print sum/60 }' slides.html