# await is not magic <img src="magic.jpg" /> ``` RSS: 33.66 MB #6 tasks position 1571345125268-0 #messages 1 msg/s 1 position 1571345125818-10 #messages 2066 msg/s 2065 position 1571345126367-7 #messages 4049 msg/s 1983 position 1571345126367-8 #messages 4050 msg/s 1 position 1571345126367-9 #messages 4051 msg/s 1 RSS: 206.22 MB #6 tasks position 1571345126367-10 #messages 4052 msg/s 1 position 1571345126367-11 #messages 4053 msg/s 1 position 1571345126367-12 #messages 4054 msg/s 1 RSS: 530.15 MB #6 tasks position 1571345126367-13 #messages 4055 msg/s 1 RSS: 753.86 MB #6 tasks position 1571345126367-14 #messages 4056 msg/s 1 position 1571345126367-15 #messages 4057 msg/s 1 RSS: 1.1 GB #6 tasks ``` On a high traffic subscription, I discovered that the --resume_from_last_position option triggered a big memory leak. Without it we can receive a decent amount of messages per second. ``` RSS: 33.51 MB #6 tasks position 1571345317795-0 #messages 1 msg/s 1 position 1571345318386-6 #messages 2066 msg/s 2065 position 1571345318955-25 #messages 4151 msg/s 2085 position 1571345319542-0 #messages 6266 msg/s 2115 position 1571345320134-13 #messages 8442 msg/s 2176 RSS: 36.91 MB #6 tasks position 1571345320720-6 #messages 10586 msg/s 2144 position 1571345321208-41 #messages 12634 msg/s 2048 position 1571345321845-3 #messages 14806 msg/s 2172 position 1571345322393-30 #messages 16890 msg/s 2084 position 1571345323018-3 #messages 19066 msg/s 2176 RSS: 37.01 MB #6 tasks ``` When using that option, the subscriber will write back into cobra the position of the last message that got read, so that in case of a crash the subscription can resume and not lose messages. It turns out that doing the write operation however ... slows down the subscriber, which leads to a lot of messages being buffered, memory increased and bots being killed by kubernete as they OOM. BTW, did you know that cobra can act as a Key/Value store ? The fix (that still need some polish) is to make that write operation work happen concurrently in an asyncio task. Otherwise the IO operation will block other tasks, which has a bad consequence here. It is fixed in cobra 2.2.4. Ironically Loris Cro was [bloging](https://redislabs.com/blog/async-await-programming-basics-python-examples/) about something similar recently. ``` diff --git a/cobras/client/connection.py b/cobras/client/connection.py index e3544e1..330641a 100644 --- a/cobras/client/connection.py +++ b/cobras/client/connection.py @@ -233,7 +233,9 @@ class Connection(object): ret = await messageHandler.handleMsg(message, position) if resumeFromLastPositionId and ret == ActionFlow.SAVE_POSITION: - await self.write(resumeFromLastPositionId, position) + # for performance reason this work should happen in its own task + # FIXME error handling + asyncio.create_task(self.write(resumeFromLastPositionId, position)) if ret == ActionFlow.STOP: break ``` There is still a small performance penalty when using that option, but the memory stays constant, and we can move on to another problem ... :) The other problem is actually that we are back to square one : there's a new memory leak where the items in the connection.py queue are not going away.