WebSocket Event Bus & Processing Pipeline – Polymarket Trading Bot Deep Dive (Part 1)
415 segments
Okay, so we are back with another video.
So in this video I will be going in a
little bit more details about how I
handle all the events the you know event
layer the processing layer and also
doing uh somewhat of a smaller refactor.
So I've been looking into the to the
comments here and
I specifically
uh saw this one here.
Uh he says I think this is the one.
Yeah, nice architecture. Signal layer
should publish to the event bus and uh
strategy layer consumes all signal plus
position. So this um actually inspired
me from cool ball cool
uh to do somewhat of a small refactor
compared to the previous uh video I did.
So you can see here it looks a little
bit different. So we have the event bus
actually being the more central part of
the the trading uh bot here. So we still
have we still have the data sources for
example the market channel webocket the
user channel web socket from uh from uh
poly market here over to the pipeline
adapters over to the processing unit. So
what happens here is we have the event
bus sort of as a central unit that
basically just orchestrates all the
different events meaning that for
example we have the signal layer with
all the different signals that we use
for our strategies. This one can
subscribe um to events from the event
bus and it actually does that indirectly
through the strategy layer. So you can
see how I have errors coming back and
forth. It means for example with the
position layer the position layer can
subscribe to the event bus. For example,
if we are looking for the events from
the user channel and then whenever it um
matches let's say it matches uh one of
these events and then updates our
position state that a trade has been
filled. it can then sub um it can then
uh actually publish an event to the
event bus and then other parts of the
trading bot um can subscribe to that
event as well. So let's say that our
strategies relies on uh market fills
then it can actually subscribe to that
uh event uh through the event bus to the
position layer.
So um that's actually how that works.
And then for the execution layer I have
I thought about also doing it through
the event bus um but um
I've chosen at the end to keep this
separate because it's um you know it's
it's actions and um and and yeah so so
the strategy layers are still publishing
these sort of action events over to the
execution layer. So um yeah, so thank
you for that cool B and um
yeah and I saw this comment here as
well. I just replied to it actually Mr.
Flipstar I think uh all your comments is
really great and I just put a comment
here. So I think so you have uh let me
see here.
Yeah. So position sync issue. Yeah. So
sometimes the connection drops to the to
the websocket. So you mentioned losing
websocket for two seconds. I saw this by
auto locking to subbase.
So I can recover state and restart. And
yeah, this is you know it's a good idea.
But like again like I mentioned here in
my in my comment like if I run let's say
I subscribe to market events for just
one market the 15 minute uh Bitcoin
market locking data to JSON for just uh
60 seconds is around 20 megabyte and
I'll I'll demonstrate this uh in my
video here actually how much it is. So
I'm just thinking like if you're locking
this to superb basease if you are
dropping all the data after the market
um resolves or actually are you actually
keeping it for a long time it seems like
you have to clean up a lot otherwise it
could potentially be very um very
expensive because you if you run this
you know daily uh 24 hour you know it's
it's a lot of data.
All right so let let's let me uh go over
to the more interesting parts which is
the coding here. All right. So, I've
created this um sort of a test file just
to demonstrate
uh a few areas of the trading bots. So,
specifically what I will be showing is
the the the pipelines, the pipeline
adapters, the processing units, and then
the event bus.
All the other stuff such as the signal
layer, the position layer, the strategy
layer, I will show that in a in a later
video. So for now it's sort of this uh
area here that I will be uh be showing
you. So basically we start by
initializing the event bus and the event
bus if you go into the code here you can
see here how it has a publish method. So
basically this method publishes an event
to all the subscribers. So again a
subscriber can for example be the
position layer subscribes to events
through the event bus to uh the user
channel. Okay. So, so that's that method
that handles all that part here. And
then we also have a subscribe method in
the event bus. So again, this position
layer want to subscribe to you know any
event or it could be the signal that
want to subscribe. So we have a
subscribe method and a publish method. A
publish method to publish an event to
sort of the network and then as a a
subscribe. So, we basically just start
by initializing the event bus here. And
then we have a replay mode, which is
actually something I forgot to mention
in my previous video. And I'm also going
to demonstrate this by showing how uh we
can actually do a replay of um all the
data that we're collecting. So, I'll be
showing that here later. Next part is
the processing. Okay. So the processing
layer is over here.
The the processing layer basically has
the responsibility of taking the tagged
message
and constructing
um you know the final processed event
that all our subscribers can use. So we
are kind of building the butt out from
the from the end you know so from the
event bus then to the processing unit.
And then you can see here how the
processing unit has uh of course it has
the event bus and then it has a
processors. So the processors is
specific to the different data sources.
So for example we can have a processor
uh specific for the the market channel
webocket or it could be you know for the
BTCUSD. It then process the raw data
that we have here. So that basically
what its responsibility is is to process
the raw data to some data that we can
actually use to compute our signals or
or or whatever we needed to.
So we uh have a method to register that
processor
and then we have a method to process it
using the the processor and then we have
a helper method here to actually
register the different processors. So I
can open this up and you can see how it
basically just take the processing
class and then we register the processor
that I just discussed and um we have for
example the book message. So we have a
type here. So that's the uh book event
from polyark websocket and then process
book. So that's just different methods
to handle the data. So you can see how
that looks like. So we just do some
validation of the data and um and then
we uh you know we return the processed
event.
So for example the what market it is
because we can have we can be subscribed
to many different markets. So in this
case the processed event also needs to
know well what market is it what kind of
event type. So we basically just making
these events uniform and do some
validation and stuff like that. You see
how we have the data, the best bit and
ask price, the sizes and so on and so
forth. Time stamp
where we uh we pass it. For example, we
could potentially have many different
kinds of timestamps coming in from
different events and all use different,
you know, so maybe we want to unifor uh
you make that uniform in some way,
right? So we register it here and then
we have our polyard websocket adapter.
Okay. So for for this example, I've
implemented
the uh market channel webocket here.
So that's basically the uh the pipeline
adapter
here. Okay. which its responsibility is
basically to connect with data sources
and then tag the message tag the message
with different data that that's relevant
for our strategies and for our trading
bot.
So we uh start by defining the the
polymer market web socket adapter
and this is basically where we connect
you know we we do all the connections
and and this one could be you know it it
didn't has have to be the websocket it
can also be you know creating a you know
a pipeline for you know the weather in
in France you know or you know it could
be the Bitcoin price it could be data
from from Reddit maybe you some
sentiment analysis we can feed into our
trading bot
and um and then it you know this one
basically just follows the principles
from the documentation of poly market
we have a connect method we have
disconnect method we have a subscribe
active markets
and this is probably a part of it where
I'm some it's a little bit hardcoded
here where the way it works now it's it
basically just connects to the Bitcoin
15-minute markets, you know, and then
just auto automatically connect. I will
do some refactoring here where it's
going to be easier to fine grain that
like maybe we don't want to do that.
Maybe we want to, you know, select a
specific market or maybe we want to do
the Ethereum 15 minutes or maybe we want
to do something completely different.
So, so I'm going to change that to some
extent, but for the sake of this
example, I'll keep it and and and not
really touch this for a while. And um
yeah, so so that's basically how that
works. And then we have a set message
call back on the adapter. So um and then
you have the the processing uh method on
the processing um
uh uh class over here. So that is
actually what happens here. So it
basically
connects to the websocket
where we get the raw data that you can
see here. it runs through the pipeline.
Okay, so that's that's this the pipeline
adapter here basically this one which
tags it. So it becomes a tagged message.
Okay, where we just give it some data
and then we have uh the processing. So
this um message callback will then call
this method here from the processing
unit and then construct the processed
event and the process event is now ready
for our event bus
and uh and then the and then yeah to to
be uh you know transferred to all the
different areas of our bot that
subscribes to set event. So let me
demonstrate this so you can see for
yourself how this works. And as I
mentioned
just before we have a method to record
data.
So I will
try change my screen here a little bit
so you can see. So if I go
to uh
the recordings here you can see we have
a folder with different recordings. Now
you can when I start the bot you should
be able to see a new file coming in. So
let me run it here. So you saw here we
have the file now it's empty. So it
basically uh
do like this
it will uh subscribe to the websockets
and subscribe to the serious. So this is
kind of hardcoded for now. So this
specific series that's the bitcoin up
and down. And then you can see I found
96 active events. So this is again an
area where I need to fix it because this
serious Bitcoin up and down it actually
have a lot of active markets even though
they're not really active. Okay. So now
it's actually going to subscribe to 96
different events.
So um and that's not efficient. That's
why it actually takes so long time to
begin because it subscribes all of them.
So I have to fix this. But you'll see
the data that we actually have come in.
So now you can see how it starts and how
it fills up the the files. You can see
here it's actually the same event. If
you look at at all the the slots here,
it's it's basically the same event. So
it's it's connected with 96 different
events, but it's only data from from one
we actually getting in. So that's
something I need to fix. So yeah, we're
just getting all this data in. And you
can see how we actually filling up this
uh this file here. And again, I will
demonstrate how much data this actually
is. You can see how just filling up
filling up filling up. And uh this these
are the price updates. And you can see
here how sometimes you get other events.
There's for example what a last trade
event. So that's an event when there's
actually a trade getting filled.
Okay. And we have a book message as
well. Difficult to see because it's
going so fast. So we have here we have a
book message here for example.
And um yeah, we have all the the um the
raw data here, but we also have other
data that we basically tag it it with.
Um so yeah, now it's just running and uh
we don't have any strategies or
anything. We just get the data. Um and
just to demonstrate. So so now it's
probably been running for maybe a minute
or something. So let me just uh stop it
for a while.
So now I stopped it and let me see if I
can just open it up. Here we have So
yeah, so it ran for 129 seconds and we
got 10,577
events which is like 81 and a half event
per second.
And uh most of these events is the price
change event. And um and yeah, then it
records the file that we have here.
Yeah. So this is the file here. So we
ran it for what was it?
129 seconds and it's a 6.41 mgabyte. So
let's say we have 3 mgaby per minute
time 60 and then times
uh
24. So that's 4.3 gabyte per day, you
know. It's and we've run it then 30 days
you know it's it's quickly to fill up.
So I mean yeah it's it's a lot of data.
Um
so let me now demonstrate. Now we have
this and the reason why I made this is
if you want to do a a rerun you know
because it's it's going to collect all
the data actually. It's not it's not
only going to collect the websocket
data. So if we have other signals it's
it's it's basically collecting data from
the event bus. So if the signal layer
if the SEC signal layer produces
produces an event
and publishes it to the event bus it
will be recorded and the same for the
position layer. So if I run it here and
um I do let's say we take this one here.
So I'll just pass in for now and you can
see how it runs and now it's going to
run much faster because we don't have
any limit. Um,
I actually think I made it so it's 10
times as fast as before. And uh, so
yeah, it's basically just run through
all the data again. And now it stopped.
It replayed 10,577
messages, which is, you know, pretty
fast. And again, it's just for
simulation. It's just through like back
testing.
So it allows us to just record a lot of
data and then run our strategies on it
again and again. So that was that was
basically the idea behind that.
So yeah, that's it for this video. I um
next step is actually to do a video
about um I think it makes sense to do
uh I actually maybe I'll split it up
into multiple videos, but I'll do the
signal layer, position layer,
and the strategy layer. And and yeah,
see how it goes. So, so I will uh make
sure to make that video as soon as
possible.
So, yeah, if you enjoyed this video, uh
yeah, leave a like, subscribe if you
want to see more, and soon I am ready to
announce a
a news on this channel. Something a lot
of people have been requested that I
will be launching hopefully uh soon, but
I will let you know in a separate video
when that happens. So until next time,
have a good one and see you.
Ask follow-up questions or revisit key timestamps.
The video details a refactor of a trading bot's event handling architecture, inspired by a user comment. The updated design positions the event bus as a central orchestrator for events flowing from data sources through pipeline adapters to processing units. The speaker demonstrates the implementation of the event bus, processing unit, and a PolyMarket websocket adapter. A key feature introduced is a replay mode, enabling the recording and subsequent replaying of market data for backtesting. The discussion also highlights the significant volume of data generated by logging market events, raising concerns about storage costs. Future videos are planned to cover the signal, position, and strategy layers of the bot.
Videos recently processed by our community