== 15 seconds
Stdin AWK Stdout +---------------+ | | | $1 == "fps" | fps:60 +--->+ { print $2 } +----> 60 mem:100000 | | ... | | +---------------+
$ cat /tmp/input memory:100000fps:60$$$ awk -F: '$1 == "fps" { print $2 }' < /tmp/input 60
Awk is a very simple programming language following the UNIX principles of processing text passed in on standard-in, and writing it to standard out.
The stream is defined by multiple lines, and each lines contains multiple tokens. A field separator defines how lines are broken down into tokens.
In the common case, an awk program consists of a pattern, and of an action. Awk will search the input for a pattern, and for each line that match the pattern it will execute an action.
Let's imagine that we are building an analytic solution with awk. Each event that we care about will be a line, and we'll have fields to represent an id and other data for that event.
If we want to write a report that extracts all the information about the FPS, our awk program pattern will match the fps term, and the action will extract the value (60 because our game is perfect and always run at the ideal frame rate :). Something that awk could do is sum up those values and display an average.
Publishers Broker Subscribers +-------------------------------+ channel | | channel{ id: fps_id, -----> | SELECT data.fps from `fps_id` +---------> 60 data: { fps: 60 } } | | channel | |{ id: mem_id, -----> | | data: { mem: 100000 } } | | | | +-------------------------------+
== 60 seconds
Publishers Broker Subscribers +-------------------------------+ | |{ id: fps_id, -----> | SELECT data.fps from `fps_id` +------> 60 data: { fps: 60 } } | | | |{ id: mem_id, -----> | SELECT data.mem from `mem_id` +------> 100000 data: { mem: 100000 } } | | | | +-------------------------------+
== 15 seconds
Should:
Should not:
Publisher Broker+-------------------------------+ +---------------------+| | | || { data: { fps: 30 }, | handshake | || device: +------------->+ || { app_version: '3.25', | | || game: 'wiso', | handshake/ok | || model: 'iPhone', +<-------------+ Cobra || os_language: 'en', | | || os_name: 'iOS', | auth | * Python || os_version: '10.3.3' }, +------------->+ * Redis PubSub || id: 'engine_fps_id', | | || session: '4cb5989ba', | auth/ok | || timestamp: 1501190629268, +<-------------+ || version: 1 } | | || | publish | || +------------->+ |+-------------------------------+ +---------------------+ JSON WebSocket Transport
We open-sourced our C++ implementation
On GitHub (star it !)
Why ?
$ ws --helpws is a websocket toolUsage: ws [OPTIONS] SUBCOMMANDOptions: -h,--help Print this help message and exitSubcommands: send Send a file receive Receive a file transfer Broadcasting server connect Connect to a remote server chat Group chat echo_server Echo server broadcast_server Broadcasting server ping Ping pong curl HTTP Client redis_publish Redis publisher redis_subscribe Redis subscriber
,,'6''-,. <====,.;;--. _`---===. """==__ //""@@-\===\@@@@ ""\\ |( @@@ |===| @@@ || \\ @@ |===| @@ // \\ @@ |===|@@@ // \\ |===| //___________\\|===| //_____,----""""""""""-----,_ """"---,__`\===`/ _________,---------,____ `, |==|| `\ \ |==| | Python 3 + Redis ) | |==| | _____ ______,--' ' |=| `----""" `"""""""" _,-' `=\ __,---"""-------------"""'' """"
Redis Pub/Sub very easy to get going. We could also use Redis Streams.
Has asyncio python clients
Simpler than the elephant in the room (Kafka)
I like the values expressed in the Redis manifesto
This is what I know :)
Python 3.5+ has builtin asynchronous IO. async and await keywords makes code very simple and readable
Very fast libuv based event loop, uvloop. Claimed to be faster than node.js and comparable to Go.
Nice libraries websockets, fast json processing with UJson or rapidjson (Tencent).
mypy type checker, memory debugging tool.
262.5 KB new 262.5 KB total / 2994 new 2994 total memory blocks: ... File "/src/cobra/server/pipelined_publisher.py", line 35 pipe.publish(appChannel, data) File "/usr/local/lib/python3.7/site-packages/aioredis/commands/transaction.py", line 145 task = asyncio.ensure_future(attr(*args, **kw), File "/usr/local/lib/python3.7/site-packages/aioredis/commands/pubsub.py", line 14 return self.execute(b'PUBLISH', channel, message) File "/usr/local/lib/python3.7/site-packages/aioredis/commands/__init__.py", line 50 return self._pool_or_conn.execute(command, *args, **kwargs) File "/usr/local/lib/python3.7/site-packages/aioredis/commands/transaction.py", line 102 self._pipeline.append((fut, cmd, args, kw))
Cobra +-------------+ +---------+ | | | redis-1 | engine_fps_id +-------+ | +---------+ | | hash() | | | | +---------+ engine_mem_id +--------------------------->+ redis-2 | | | | +---------+ | | | .... | | | +---------+ | +------------------->+ redis-3 | | | +---------+ +-------------+ .....
To be able to scale horizontally we run multiple redis instances, and we select a redis instance to talk to by computing a hash on the channel name.
Broker Subscriber +-------------+ | | +---------->+ | +-------------+ | SELECT ... | | few messages| High frequency | +---->+ received | message | (subset | | | | of data) | +-------------+ +---------->+ | | | +-------------+
SELECT data.url,data.status FROM \`engine_net_request_id\` WHERE data.status != 200 AND data.url LIKE 'why'{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}{'data.url': 'https://why_are_we_still_making/this_one_too', 'data.status': 404}
SELECT data.url,data.duration_ms FROM \`engine_net_request_id\` WHERE data.url LIKE '.png'{'data.url': 'https://big.cdn.provider/asset_1.png', 'data.duration_ms': 28}}{'data.url': 'https://big.cdn.provider/asset_2.png', 'data.duration_ms': 1008}}{'data.url': 'https://other.cdn.provider/asset_2.png', 'data.duration_ms': 512}}
We used this to deprecate sending low res assets to old devices, as very few of them remained in use, and it would speed up our deployment process, and reduce our CDNs costs to do so.
redis1-1-gbjvt 1/1 Running 2 171dredis2-1-8jvj7 1/1 Running 0 78dredis3-1-s9hxj 1/1 Running 0 78dredis4-1-v96gx 1/1 Running 2 166dredis5-1-z6262 1/1 Running 0 78d
cobra-44-4xfbw 1/1 Running 0 12dcobra-44-58vch 1/1 Running 0 12dcobra-44-5m4ff 1/1 Running 0 12dcobra-44-6vvk5 1/1 Running 0 12d...cobra-subscribers-30-7bpqh 1/1 Running 0 12dcobra-subscribers-30-fd477 1/1 Running 0 12dcobra-subscribers-30-h5ntn 1/1 Running 0 12dcobra-subscribers-30-hlcd6 1/1 Running 0 12d...
Very interesting to notice the uptime of Redis, which is very easy to operate and super reliable.
Broker Subscribers +----------------------+ +-------------+ http | | ws | +--------> Sentry | +----->+ node.js | | Cobra | | +--------> Grafana | * Python3 | +-------------+ udp | * Redis PubSub | | | +-------------+ mysql | | ws | +--------> tableau | +----->+ Python | | | | +--------> "Terminal" | | +-------------+ | | | | +-------------+ | | ws | | websocket | +----->+ Web Browser |--------> Neo (custom dashboard) | +--+ | | +----------------------+ | +-------------+ | | +----------------+ | | | +-->+ Java, PHP, C++ | | | +----------------+
Flexible and rich ecosystem of subscribers or bots, used by various individuals or teams to solve different problems. Simple WebSocket API + JSON
Good property of a flexible system.
+-------------+-------------+----------------+---------------+ | Frame Rate | iPad mini 2 | MacBookPro11,5 | iPhone 8 Plus | +-------------+-------------+----------------+---------------+ | Samples | 1 | 1 | 1 | +-------------+-------------+----------------+---------------+ | > 60 fps | 0.0 % | 1.6 % | 0.0 % | +-------------+-------------+----------------+---------------+ | 60 fps | 69.5 % | 98.1 % | 100.0 % | +-------------+-------------+----------------+---------------+ | 30 fps | 30.5 % | 0.2 % | 0.0 % | +-------------+-------------+----------------+---------------+ | 15 fps | 0.0 % | 0.0 % | 0.0 % | +-------------+-------------+----------------+---------------+ | 10 fps | 0.0 % | 0.0 % | 0.0 % | +-------------+-------------+----------------+---------------+ | < 8 fps | 0.0 % | 0.0 % | 0.0 % | +-------------+-------------+----------------+---------------+ | Average FPS | 50.85 | 55.80 | 59.99 | +-------------+-------------+----------------+---------------+
Tableau was used to let us know that our Crash Handler was not working on Android x86, as the crash rate looked bogus on this platform.
I will not go through this in the presentation
Client cobra command
$ cobra health > {'action': 'auth/handshake', 'body': {'data': {'role': 'health_publisher_subscriber'}, 'method': 'role_secret'}, 'id': 1}< {"action": "auth/handshake/ok", "body": {"data": {"nonce": "MTI0Njg4NTAyMjYxMzgxMzgzMg==", "version": "0.0.24"}}, "id": 1}> {'action': 'auth/authenticate', 'body': {'method': 'role_secret', 'credentials': {'hash': '/Ec2gh0AQpAbzq7vQTvuIw=='}}, 'id': 2}< {"action": "auth/authenticate/ok", "body": {}, "id": 2}> {'action': 'rtm/subscribe', 'body': {'subscription_id': 'ws_subscribe', 'channel': 'sms_republished_v1_neo', 'fast_forward': True, 'filter': ''}, 'id': 3}> {"action": "rtm/subscribe/ok", "body": {"position": "1519190184:559034812775", "subscription_id": "ws_subscribe"}, "id": 3}Received {"action": "rtm/subscription/data", "body": {"subscription_id": "ws_subscribe", "messages": [{"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "8dab3f6ecb2742669c8a838366be3cab"}}]}], "position": "1519190184:568807785938"}}System is healthy
$ cobra run --verboserunServer {'host': '127.0.0.1', 'port': '8765', 'redisUrls': 'redis://localhost;redis://localhost', 'redisPassword': None, 'verbose': True, 'debugMemory': False}node caad17054b494236aae5964743b22f7bappkey EEEEEEEEEEEEEEEEFFFFFFFFFFFFFFFF(open) connections 1< {"action":"auth\/handshake","body":{"data":{"role":"health_publisher_subscriber"},"method":"role_secret"},"id":1}> {'action': 'auth/handshake/ok', 'body': {'data': {'nonce': 'MTI0Njg4NTAyMjYxMzgxMzgzMg==', 'version': '0.0.24'}}, 'id': 1}< {"action":"auth\/authenticate","body":{"method":"role_secret","credentials":{"hash":"\/Ec2gh0AQpAbzq7vQTvuIw=="}},"id":2}server hash /Ec2gh0AQpAbzq7vQTvuIw==client hash /Ec2gh0AQpAbzq7vQTvuIw==> {'action': 'auth/authenticate/ok', 'body': {}, 'id': 2}< {"action":"rtm\/subscribe","body":{"subscription_id":"ws_subscribe","channel":"sms_republished_v1_neo","fast_forward":true,"filter":""},"id":3}> {'action': 'rtm/subscribe/ok', 'body': {'position': '1519190184:559034812775', 'subscription_id': 'ws_subscribe'}, 'id': 3}< {"action": "rtm/publish", "body": {"channel": "sms_republished_v1_neo", "message": {"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "8dab3f6ecb2742669c8a838366be3cab"}}]}}}> {"action": "rtm/subscription/data", "body": {"subscription_id": "ws_subscribe", "messages": [{"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "95a18ef313bb45de83f6ddfd59a964a2"}}]}], "position": "1519190184:568807785938"}}#messages 1 msg/s 1< {"action": "rtm/unsubscribe", "body": {"subscription_id": "sms_republished_v1_neo"}}> {"action": "rtm/unsubscribe/ok", "id": 3}subscribe redis connection closed(close) connections 0
{ "action": "auth/handshake", "body": { "data": { "role": "health_publisher_subscriber" }, "method": "role_secret" }, "id": 1}
Each pdu (protocol data unit) has an action and a body. The handshake method specify a role name.
{ "action": "auth/handshake/ok", "body": { "data": { "nonce": "MTI0Njg4NTAyMjYxMzgxMzgzMg==", "version": "0.0.24" } }, "id": 1}
The server respond with a method whose action is
{ "action": "auth/authenticate", "body": { "method": "role_secret", "credentials": { "hash": "/Ec2gh0AQpAbzq7vQTvuIw==" } }, "id": 2}
A hash is computed using the nonce, and passed through the authenticate pdu (or message)
{ "action": "auth/authenticate/ok", "body": {}, "id": 2}
serverHash = computeHash(secret, nonce)
server hash /Ec2gh0AQpAbzq7vQTvuIw==client hash /Ec2gh0AQpAbzq7vQTvuIw==
When the server receive the client auth message, it does ...
{ "action": "rtm/publish", "body": { "channel": "sms_republished_v1_neo", "message": { "data": { "info": "test_from_minix" }, "device": { "foo": { "foo": true, "stuff": 3.0 } }, "id": "engine_test_id", "session": "4dacbafefc214578aa3f8c1d2975ec92", "timestamp": 1536270895637, "version": 1 } }}
The client can publish messages.
{ "action": "rtm/subscribe", "body": { "channel": "sms_republished_v1_neo", "subscription_id": "ws_subscribe" /* optional */ }, "id": 3}
{ "action": "rtm/subscription/data", "body": { "subscription_id": "ws_subscribe", "messages": [ { "position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [ { "device": { "game": "test", "android_id": "95a18ef313bb45de83f6ddfd59a964a2" } } ] } ], "position": "1519190184:568807785938" }}
{ "action": "rtm/unsubscribe", "body": { "subscription_id": "sms_republished_v1_neo" }}
{ "action": "rtm/unsubscribe/ok", "id": 3}
(venv) cobra$ cobra ws_subscribe --url 'ws://foo.bar.com/v2?appkey=ZZZZZZZZZZZZZZZZ' \ --stream_sql "select data.fps from \`engine_fps_id\`" --verbose> {'action': 'auth/handshake', 'id': 0, 'body': {'data': {'role': '_sub'}, 'method': 'role_secret'}}< {"action": "auth/handshake/ok", "id": 0, "body": {"data": {"nonce": "MTU1NTM4NDM5Mjg3Nzk0NjIyNzA=", "version": "0.0.177", "connection_id": "794e630d", "node": "cobra-subscribers-30-7bpqh"}}}> {'action': 'auth/authenticate', 'id': 1, 'body': {'method': 'role_secret', 'credentials': {'hash': 'xeBLmfJjUjGmrfpsH9/ksg=='}}}< {"action": "auth/authenticate/ok", "id": 1, "body": {}}> {'action': 'rtm/subscribe', 'body': {'subscription_id': 'engine_payload_id', 'channel': 'engine_payload_id', 'fast_forward': True, 'filter': 'select data.fps from `engine_fps_id`'}, 'id': 3}< {"action": "rtm/subscribe/ok", "id": 3, "body": {"position": "1519190184:559034812775", "subscription_id": "engine_payload_id", "redis_node": "redis5"}}{'data.fps': 25.0}#messages 1 msg/s 1{'data.fps': 19.0}{'data.fps': 19.0}{'data.fps': 24.0}{'data.fps': 28.0}{'data.fps': 27.0}{'data.fps': 23.0}{'data.fps': 26.0}{'data.fps': 2.0}{'data.fps': 29.0}{'data.fps': 7.0}{'data.fps': 26.0}
http://tools.ietf.org/html/rfc6455#section-5.2 Base Framing Protocol 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-------+-+-------------+-------------------------------+ |F|R|R|R| opcode|M| Payload len | Extended payload length | |I|S|S|S| (4) |A| (7) | (16/64) | |N|V|V|V| |S| | (if payload len==126/127) | | |1|2|3| |K| | | +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | Extended payload length continued, if payload len == 127 | + - - - - - - - - - - - - - - - +-------------------------------+ | |Masking-key, if MASK set to 1 | +-------------------------------+-------------------------------+ | Masking-key (continued) | Payload Data | +-------------------------------- - - - - - - - - - - - - - - - + : Payload Data continued ... : + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | Payload Data continued ... | +---------------------------------------------------------------+
< 2000 lines of code for the server
$ wc -l src/cobra/server/*.py src/cobra/common/*.py src/cobra/runner/commands/run.py 0 src/cobra/server/__init__.py 234 src/cobra/server/app.py 64 src/cobra/server/apps_config.py 17 src/cobra/server/connection_state.py 56 src/cobra/server/pipelined_publisher.py 41 src/cobra/server/pipelined_publishers.py 477 src/cobra/server/protocol.py 42 src/cobra/server/redis_connections.py 92 src/cobra/server/redis_publisher.py 129 src/cobra/server/stats.py 207 src/cobra/server/stream_sql.py 74 src/cobra/server/subscribe.py 0 src/cobra/common/__init__.py 8 src/cobra/common/algorithm.py 13 src/cobra/common/auth_hash.py 4 src/cobra/common/cobra_types.py 27 src/cobra/common/memoize.py 113 src/cobra/common/memory_debugger.py 23 src/cobra/common/memory_usage.py 24 src/cobra/common/task_cleanup.py 22 src/cobra/common/throttle.py 16 src/cobra/common/version.py 75 src/cobra/runner/commands/run.py 1758 total
import asyncioclass RedisPublisher(object): ''' See https://redis.io/topics/mass-insert ''' def __init__(self, host, port, password, verbose=False): self.host = host self.port = port self.password = password self.verbose = verbose self.reset() self.lock = asyncio.Lock() def reset(self): self.publishCount = 0 def writeString(self, data: bytes): self.writer.write(b'$%d\r\n' % len(data)) self.writer.write(data) self.writer.write(b'\r\n')
async def connect(self): async with self.lock: print(f'Opening connection to redis at {self.host}:{self.port}') self.reader, self.writer = await asyncio.open_connection( self.host, self.port) if self.password: self.writer.write(b'*2\r\n') self.writeString(b'AUTH') password = self.password if not isinstance(password, bytes): password = password.encode('utf8') self.writeString(password) await self.execute()
def publish(self, channel, msg): self.publishCount += 1 if not isinstance(channel, bytes): channel = channel.encode('utf8') if not isinstance(msg, bytes): msg = msg.encode('utf8') self.writer.write(b'*3\r\n') self.writeString(b'PUBLISH') self.writeString(channel) self.writeString(msg)
async def execute(self): async with self.lock: await self.writer.drain() results = [] # read until we get something for i in range(self.publishCount): line = await self.reader.readline() results.append(line) # FIXME: proper error handling !!! if self.verbose: print(f'Received: {line.decode()!r}') if 'NOAUTH Authentication required' in line.decode(): raise ValueError('Authentication failed') self.reset() return results
class PipelinedPublisher(): def __init__(self, redis): self.redis = redis self.queue = asyncio.Queue() async def run(self): while True: await asyncio.sleep(0.1) N = self.queue.qsize() if N == 0: continue pipe = self.redis.pipeline() for i in range(N): item = await self.queue.get() appkey, channel, data = item appChannel = '{}::{}'.format(appkey, channel) pipe.publish(appChannel, data) self.queue.task_done() await pipe.execute() def enqueue(self, job): self.queue.put_nowait(job)
cobra-44-7bmcx 0/1 OOMKilled 17 23h
(it's like a chat system)
Publishers Message Broker Subscribers +-----------------------+ | | +-----------> | Cobra | +--------------> | | +-----------> | * Python3 | +--------------> | * Redis PubSub | +-----------> | | +--------------> | | +-----------------------+
The pubsub architecture has 3 components, publishers, broker and subscribers. In our system a message leaves our games
awk '/^==/ { sum += $2} END { print sum/60 }' slides.html
== 15 seconds
Keyboard shortcuts
↑, ←, Pg Up, k | Go to previous slide |
↓, →, Pg Dn, Space, j | Go to next slide |
Home | Go to first slide |
End | Go to last slide |
Number + Return | Go to specific slide |
b / m / f | Toggle blackout / mirrored / fullscreen mode |
c | Clone slideshow |
p | Toggle presenter mode |
t | Restart the presentation timer |
?, h | Toggle this help |
Esc | Back to slideshow |