await is not magic
RSS: 33.66 MB
#6 tasks
position 1571345125268-0 #messages 1 msg/s 1
position 1571345125818-10 #messages 2066 msg/s 2065
position 1571345126367-7 #messages 4049 msg/s 1983
position 1571345126367-8 #messages 4050 msg/s 1
position 1571345126367-9 #messages 4051 msg/s 1
RSS: 206.22 MB
#6 tasks
position 1571345126367-10 #messages 4052 msg/s 1
position 1571345126367-11 #messages 4053 msg/s 1
position 1571345126367-12 #messages 4054 msg/s 1
RSS: 530.15 MB
#6 tasks
position 1571345126367-13 #messages 4055 msg/s 1
RSS: 753.86 MB
#6 tasks
position 1571345126367-14 #messages 4056 msg/s 1
position 1571345126367-15 #messages 4057 msg/s 1
RSS: 1.1 GB
#6 tasks
On a high traffic subscription, I discovered that the --resume_from_last_position option triggered a big memory leak. Without it we can receive a decent amount of messages per second.
RSS: 33.51 MB
#6 tasks
position 1571345317795-0 #messages 1 msg/s 1
position 1571345318386-6 #messages 2066 msg/s 2065
position 1571345318955-25 #messages 4151 msg/s 2085
position 1571345319542-0 #messages 6266 msg/s 2115
position 1571345320134-13 #messages 8442 msg/s 2176
RSS: 36.91 MB
#6 tasks
position 1571345320720-6 #messages 10586 msg/s 2144
position 1571345321208-41 #messages 12634 msg/s 2048
position 1571345321845-3 #messages 14806 msg/s 2172
position 1571345322393-30 #messages 16890 msg/s 2084
position 1571345323018-3 #messages 19066 msg/s 2176
RSS: 37.01 MB
#6 tasks
When using that option, the subscriber will write back into cobra the position of the last message that got read, so that in case of a crash the subscription can resume and not lose messages. It turns out that doing the write operation however ... slows down the subscriber, which leads to a lot of messages being buffered, memory increased and bots being killed by kubernete as they OOM. BTW, did you know that cobra can act as a Key/Value store ?
The fix (that still need some polish) is to make that write operation work happen concurrently in an asyncio task. Otherwise the IO operation will block other tasks, which has a bad consequence here. It is fixed in cobra 2.2.4. Ironically Loris Cro was bloging about something similar recently.
diff --git a/cobras/client/connection.py b/cobras/client/connection.py
index e3544e1..330641a 100644
--- a/cobras/client/connection.py
+++ b/cobras/client/connection.py
@@ -233,7 +233,9 @@ class Connection(object):
ret = await messageHandler.handleMsg(message, position)
if resumeFromLastPositionId and ret == ActionFlow.SAVE_POSITION:
- await self.write(resumeFromLastPositionId, position)
+ # for performance reason this work should happen in its own task
+ # FIXME error handling
+ asyncio.create_task(self.write(resumeFromLastPositionId, position))
if ret == ActionFlow.STOP:
break
There is still a small performance penalty when using that option, but the memory stays constant, and we can move on to another problem ... :)
The other problem is actually that we are back to square one : there's a new memory leak where the items in the connection.py queue are not going away.