We can't find the internet
Attempting to reconnect
Something went wrong!
Attempting to reconnect
Analysis Summary
Worth Noting
Positive elements
- This video provides a highly detailed, hour-plus technical walkthrough of the RabbitMQ Stream Go client, including specific code implementations for both core and plugin approaches.
Influence Dimensions
How are these scored?About this analysis
Knowing about these techniques makes them visible, not powerless. The ones that work best on you are the ones that match beliefs you already hold.
This analysis is a tool for your own thinking — what you do with it is up to you.
Related content covering similar topics.
Transcript
[Music] hi and welcome to programming Percy in this tutorial we will be looking at rabbit mq streams and how we can use them in go I recently published another tutorial about rabbit mq and how you can use it in go and if you are not familiar with rabbit mq I really recommend you to click the link up here and watch that first because this tutorial will kind of expect you to understand the basics of rabbit mq I won't be covering rabbit mq basics in details but we will rather focus on the streaming aspects for me I have always been relying on using two message Brokers I have been using rabbit mq but I have also been using Kafka so the difference between them is that Kafka allows us to have this append Only log which is great when we have to replay events or when we have to try time travel in our applications or just when we want events that persists forever uh for audibility reasons now time traveling is when you can replay events from a certain time in your system now using two Brokers both rabbit mq and Kafka has the negative part of adding complexity you have two systems to take care of and you have to handle both Brokers what if we could use just one of them I'm going to be honest with you I have stuck with kfka for a long time but rabit mq released a feature called streams in 2021 it has taken a long time for me to actually test it and use it in production but I'm I finally did and I'm so glad that I actually did it because it will help a lot to have one system running and streams were really working well so streams are designed to actually help us with the problems that a regular queue can have and how they work is that each event that is sent on the rabbit mq stream is actually written to the file system it's stored it is persisted there forever unless we specify something else but but by default it's forever so each event that flows through the stream is indexed and we also save a time stamp so that allows us to easily replay events and Trigger the time traveling and whatever else we need for audibility ETC now one other reason that we cannot forget to mention is one of the negative Parts about using regular Q's in rabbit mq is that if you have a fan out exchange each consumer has to create their own que this can lead to a lot of cues being created for no particular reason but streams allows us to actually have multiple consumers to the same stream which is amazing and we can get rid of a lot of boiler plates and what's even more amazing is if you're familiar with rabit rabbit mq if you're already using rabit mq in your systems converting to using streams is almost seamless it's really easy they really found a fantastic way to implement this without adding a bunch of complexity so I guess we should dig right in now in this video we will be covering the following topics what streams are how they work how to use the streams in go what stream core and the stream plug-in is and their differences we will also cover sub entries and compression we'll talk a little about decoupling and um that's it so let's just let's just go so before we begin hacking code I just want to cover really quickly what streams are and how they work now streams are available since rabbit mq version 3.9 it's really kind of simple now a stream can be considered a append Only log file so this is a giant log file and we can only write new rows to the log we cannot remove events we cannot edit them we can just append new stuff to the log so what really happens is that as a regular rabit mqq we have a producer who is sending out these events to the stream the stream then persists the files on the hard drive by writing them to a giant five and when somebody tries to consume it rabbit mq will simply read from this log file and and send it back to the consumer each event gets an index and a timestamp and these two are used to kind of know which event is what and when they occurred and those two informations will allow us to later replay and fetch only a certain event Etc and this is really great because a regular queue you consume the message you acknowledge the message and then the message is gone forever but with a stream you can consume the message but the message is still there for anybody else to consume so this is kind of basic you have producer they send an event the event is stored on the dis we can later fetch the event this is the basics of rabbitmq streams core you can see up here I wrote core because we have something called stream core stream core is the basic capabilities to allow rabbit mq to handle streams stream Coe or uses the regular amqp 0.991 protocol so stream core is possible to use with any Revit mq clients out there basically and a stream is kind of a q which you will notice very soon stream core is seamless to use almost you can just use it right away but streams have these amazing capabilities they allow us to to instead of using a plain text protocol they allow us to use a binary protocol which increases the speed a lot but to use the binary protocol which is amqp 1.0 we actually have to use something called The Stream plug-in the stream plugin Works in exactly the same way for you as a user we have a producer they send the events the are sent as binary format instead the stream will then take care of writing the files to the disk and the consumer can fetch them just the same way as before but stream plug-in is not enabled by default in rmq this is something you have to enable on your rabbit mq server stream plug-in enables a bunch of other features such as something called super streams which is a way of you can you can have multiplexed streams so you can have a stream with a lot of streams inside of it so that's the difference between steam core and steam plug-in steam core is compatible with amqp 0.91 which is basically every Revit mq client out there stream plug-in offers better capabilities extended capabilities better a better protocol which is faster but you need to enable it and you also need to use another SDK we will cover this very short so why would you take go through the trouble of using the stream plugin when you can use stream core well according to dvit mq docs stream core can handle hundreds of thousands of messages per second which is a lot but stream plug-in can handle Millions messages per second so that's quite a benefit and we always want to maximize so let's start by learning about stream core so we learn about the concept of streams and how they actually work so let's begin using ritm stream core inside of go now I made a small picture that should that will illustrate uh basically how you create a stream it's actually really simple um you have a s developer here who wants to create a queue so he initiates create Q either by the SDK the configuration file or the rabbit mq CTL and he gets a q now you down here you have a happy developer he wants to do the same thing what he has to do to start using streams is simply add a header we will be covering these headers I just want to mention it quickly before we start going repit mq allows you to upgrade AQ when creating it by adding the xq type and set it to the value of stream that's all we need to do to change a q into a stream you can't modify a existing Q into a stream you need to create a new one I'm going to go ahead and create a new module I'm going to do go mod in it programming pery streaming mq you can name it whatever you want so we have once you have initialized a module uh we won't really be using the module though but for now let's make a new folder I want to make a folder called producer I also want to make a folder called consumer and we want a main file in each of those so let's go ahead touch producer main.go touch consumer main.go so we have two files so we have a simple structure we have a producer and consumer folder and they both contain their main program so that's easy but before we start uh writing our code we actually need to make sure that we have a Docker container running rabit mq because we will need to communicate with rabit mq so let's create the docker container running Revit mq since we need a Revit mq instance to to test against so I'm going to start it by using dock run I'm going to do- d uh to run it in background I'm going to name it and we can see my old command here but that's for another purpose so we're going to expose the port 5672 5672 and also the Management Port which is on 15672 and I'm going to on rabit mq 3.9 which is the latest current version once we have that up and running we can actually start coding I'm just going to go ahead open up my folder and I'm we're going to start with the producer so let's start with the producer my editor doesn't look so great when zoomed in this much but so you guys can see so let's go ahead and create our um main package and and we're going to create a main function so the first thing that we need to do is we need to connect to rabit mq and we do that uh by doing amqp dial I'm going to Sprint a connection string which is the protocol amqp we're going to have the user the password and the host and the namespace or the virtual host so by default rabit mq use as guest guest that is always available for us and we can use Local Host uh let's not forget the port 5672 and we use the default virtual host so we have a connection string which will connect us to Local Host guest guest Local Host nothing strange going on yet so let's just Panic whenever we have an error because uh that's the most simple thing for now don't do this in production so let's create a channel we always need a channel to communicate through let's also check that error to make sure we are actually connected the first thing that we are going to do is to show you the difference between a que and a stream is that we're going to create a regular queue so let's create a new queue and we do that by again I won't be covering what AQ is how to declare it Etc the parameters we use to declare it because I covered that in my earlier video so if you're unfamiliar with that or feel that I don't explain this it's because I kind of expect you to understand this before starting working with streams now let's create a que and let's name the queue events we're going to send events on it so we're going to I need to check which uh variables I need so let's and jump up here let's apply a code action we're going to import fum and amqp we will import the amqp 091 module so that our program actually knows what's going on so always when I import mqb I like to add an alias so that's it's a little bit easier to use the module so let's go ahead now we know we have syntax highlighting we're going to make it outo delete false exclusive false no weight let's set that as true and then we can add a amqp table as a parameter and this amqp table is really is important to because we will be using the amqp table to set the headers that we need let's go ahead and create a empty table for now but let's remember it and let's just check if we have any errors and panic if we do now we have declared a queue here called events let's also publish some messages I'm going to create a [Music] context and let's just my account publish 10,1 messages we're going to create a simple for Loop and let's just make it Loop to 1,1 and let's increment that uh no compiler let's see let's organize the Imports oh my bad it should be context so inside our for Loop let's just before we Loop here we need some data to send so let's let's go ahead I'm going to scroll back up I'm just going to create a very simple structure let's call it event it's going to contain one field which is name this is the events that we will be sending across the stream uh so let's just have that up there for now and inside our for Loop let's create a new event and let's set the name to test I guess that's fine for now so when you're using Revit mq it's really up to you to decide which data you want to send you can send protuff you can send Json you can do whatever you want really I do strongly recommend using protuff but to showcase the stream functionality we're going to use a simple yon Marshall to send our events and then I will leave it up to you guys to implement protuff and whatever you need to serialize and deserialize your messages that's kind of up to your application right so let's go ahead and import Json so we have data is not used let's go ahead and uh send this so we're going to publish this event we're going to do publish with context publish with context is the new way of publishing using the the new in the newest amqp module the regular publish is actually dicated so you should always have a context when you send something let's just go ahead um send it using the context we're not going to use an exchange we're going to send it on the default exchange and we're going to send it on the events Q because that's what we named our Q it's not mandatory it's not immediate and then we need to publish a publishing me message so let's go ahead and do a mqp publishing this is the message we're sending we're going to send our Marshall data as the body of the message but one thing I really like to do also for fun and which you should probably do in a real application is that I like adding a correlation ID correlation ID allows you to trace the messages where they're sent from and this correlation ID is also sent to the consumer so the consumer can use it so if you have a long Shain of an event that's stretching from a user action into a lot of stuff happening you can use the correlation ID to trace all those things so let's just create a new uu ID I usually use the Google module when I'm doing uu IDs so we need to add that to the import so my LSP for now is having some issues and won't understand that I'm using the Google and it won't allow me to use a code action so I'm just going to manually import it I'm going to do GitHub Google uuid I think it is now I couldn't find it so let's go ahead go mod tid again and here we go great so now we are adding a correlation ID to our messages but not forget to check if there's any errors when sending just to make sure we're doing everything correct and we will just Panic if there's any errors and before we close the program let's close the channel uh close the channel to await all messages being sent if you don't close the channel you will actually see a differing amount of messages being sent because it will uh close the program before you have sent uh all the messages or yeah let's go ahead and print the Q name whenever we run the program so we have a pretty simple program we connect to rabit mq we create a channel to use we create the queue and we publish our messages on that queue and we're good to go let's just make sure that everything is running so I'm going to go here and I'm going to go inside my producer and I'm going to run the program and we can see events being PR printed that means the program ran as expected now if you like to make sure that everything works you can visit host uh 15672 that's the admin dashboard for rmq you can log in using guest guest we can see that we have 1,1 messages ready on some Q let's go inside cues and we can see it here so we have a Classic Q with 1,1 messages great that's actually what we wanted now remember I said it was almost seamless to start using streams let's take a look at how we can upgrade this to a stream also note I said we cannot upgrade existing cues to start using a stream we need to add the xq type header and it goes inside the amqp table when we declare the que and this is the same if you're right now we're using the Sate to create the queue you can create cues using Revit mq CTL or configuration files and it's the same thing there whenever you create the queue you add this header in the way specified by by the way your we're using so xq type is going to have the value of stream and let's add a comma now this is all you need to do to turn a q into a stream this is it we're done let's go ahead go inside a runner and let's run the producer again you're going to see an exception remember that I said that you cannot upgrade AQ to a stream that's why we're trying to upgrade an existing que called events into a stream so what you need to do is you need to go into the admin dashboard or use the command line to remove the queue that is already existing I'm just going to go ahead and do Docker exec interactive rabbit mq and I'm going to use rabbit mq CTL delete q and we call it events right so I'm deleting the events Cube once that is deleted let's go ahead and rerun our producer boom we get the Q name printed or should I say the stream name because if we open up the dashboard again we can see that the type is now actually a stream great so we have created a stream it has a th messages which we can reread all the time we're going to do that soon with a consumer but before we do that i' like to talk a little bit about retention so right now we are persisting all the events to the dis and that's good but what happens if we keep saving all the events to our dis I mean eventually we will run out of space so we need to come up with some kind of retention plan because we probably want to store the events forever but do we have dis space to do it and that's something you can configure so you don't have a crash but you need to consider your application's needs to handle this when we talk about retention in streams we're talking about time based retention or size based retention now size based retention is when we have the ability to say whenever our log file is too big when it's size 5 GB I want to start retention whenever the total size of the stream reaches that point it will actually start applying its policy on how to handle it the default policy is that it will actually delete it will actually delete the oldest segment so what's a segment well what actually happens is when we receive a event on the stream it gets sent from the stream to the disk and written to a file and these these log files are called segments and we can configure how large our segments are so let's say we want our stream to handle one gigabit of space but we can say that each segment should be 100 megabits so whenever our dis is full when we have one gigabit it will delete the first first 100 megabit that was present on the stream now you might be thinking but I'm using stream because I want to persist data and you're correct but you need to have a plan for what should happen whenever the dis space run out should you start and instead of crashing the application the retention plan kicks in but that's the default Behavior it starts deleting the oldest files there's ways of handling this one common way is just uploading the files to a backup system to a backup storage um because you can always import the segments whenever you would need them you wouldn't have to store streamed data for 25 years on the same dis you could have some kind of code storage which you can load uh Etc or configure over another stream some kind of archived stream or whatever solution you find suitable but you need to to know that you can configure these things or you should configure the these things there there are there are three different configurations we will look at and it's the Xtreme Max segment size bytes that's a long word what did I even say I said xream max segment size bytes and this argument will Define how large these log files can be remember we have 1 gabit and each log file is 100 megabits the segment log file is this 100 megabits is configured by the extreme segment by size so you need to you need to consider how much data you're willing to drop before proceeding with new data the segment size all always needs to be set to apply retention if you don't set this rabit mq will never trigger the retention plan we will cover we will look at this inside the code and inside the files to see it actually apply I think you will understand it a lot better than me rambling about it the actual retentions so we have the x max length bytes the x max length bytes is related to this size so we can tell rev mq that whenever the stream is larger than 1 Gigabyte we want to start activating retention what that means is that it will remove old segment files important to understand we can also set x max age and x max age will take a parameter and apply that and start deleting segments older than a certain age important to understand is that if we say that our max age for events is 10 seconds and we start sending events and then after 5 minutes we notice the old events are not deleted but they are older than 10 seconds we need to understand that this rule this policy will only be applied once we reach the segmentation limit so unless we ever reach 100 megabits to activate the segment change the policy will never trigger so always set the segment size to an appropriate value for your system and know that whenever the segment size triggers it will trigger the length bytes and the max age policy let's try this out I think it's easier if we see this in practice so let's go ahead and remove the old stream I'm going to delete the event stream once again because we're going to change the stream configuration and you can never modify a existing stream this way you need to remove it to change it so always consider what your settings should be before you create it otherwise you will have to apply other tactics to change them you can change them in code so let's set the maximum segment size first and I'm actually going to set it to what's on good value let's set it to 30,000 which should be 0.03 megabytes let me [Music] calculate yes that's correct so 30,000 bytes should be 0.03 megabytes I'm setting the values really low it's just because I want to show you guys how it actually looks when these things happen so let's set them low let's set a length bites to 150,000 so that's 1.5 megabytes right and oh yeah missing col missing semicolon my editor looks really bad when when when it's this zoomed in sorry if a bunch of popups are showing so what we're doing is each segment file is allowed 0.03 megabytes and total stream size is this so whenever we reach on whenever we overflow whenever we overflow this we will start deleting segments of 0.3 megabytes so if you would like to use time based you would add here max age for instance and max age accepts a string format such as 10 seconds 10 hours uh it's all needed documentations I will put a link in the description all right we have created a retention policy here so we can now actually handle when we overflow our storage space let's go ahead and run the producer running the producer and it printed events so everything everything worked out as expected I'm going to open a new terminal and I'm going to go jump inside the docker so I'm going to go ahead and do Docker exec interactive rabbit mq bin bash so that we are working inside of the rabbit mq instance once we are inside we're going to have a look at the segment files that we have declared that we were going to use so we can go ahead and look inside of bar lib rabbit mq Amnesia I have no idea why it's Amnesia but sure um there's going to be a folder with a uh ID which is specific to your computer so you can't use the same ID I just press tab always so the and inside this folder there's going to be a stream folder and if we print that you can see we have the events stream and an ID appended to it because you can have multiple streams right all the files are stored in these folders so if we print this let's do this so if we print uh whatever is inside our stream folder we will see that we have a index file and a segment file you can actually see the size of the segment files let's go ahead and add an age to the ls so LS a LH will print the human readable size which is 37 kilobit and this seems appropriate because this is kind of what we configured you may notice that it's not exactly the same what as we configured but that's because it's it's impossible for rabit mq to match it exactly you will always see a little bit above it and a little bit below it depending on the size of your payloads so it's hard to match exactly so we have the segments files and these segment files are actually the storage for our events and if we append these we will probably see we are close to the Limit that we set this segment file is not yet complete but whenever this file completes it will start deleting the older segments which are these segments so the first segment is always 0 0 Z just zero all the way so whenever we get new events now we will probably overflow our size limit and start removing the old segments we can actually view one of these files so let's just print one of them and let's move into this folder so we don't have to type and we can that the segment file so here you can see this is what the segment looks like we have a routing key an event IDE some binary data and our payload and a little bit more binary data you don't need to worry about the formatting inside the files that's up to rabit mq but it's good to understand it stores each event as a row let's go ahead once again I'm going to do LSA so you can see all the files let's go ahead and rerun the producer so that we send more events which will cause us to over overflow the retention rule that we set I'm just going to go ahead and run it let's jump back to our container and let's execute LS again and you will now see all the old segments are actually deleted and this is because we have reached our maximum space that we allowed our size limit was overflowed rabbit mq handled it for us and this is really good to know you need to really consider the configurations that you apply so you don't delete stuff that you don't want to delete we have a producer we have watched how the streams are working and how they are writing files on the disk to kind of remember what's going on now we need to consume them of course so let's go ahead jump back to the code and we're actually going to create a consumer so we can start using the stream and I'm going to go ahead open up the consumer Main and let's go ahead and fill this out so package Main and we're going to do exactly the same thing as inside the producer so I'm just going to copy the producer or parts of the producer let's copy this and let's remove this so let's copy the first part of the producer into the consumer so that that we are now doing the same thing we're going to connect to we can remove some of the Imports we don't need a context we only need fum and amqp so we're going to do the same same thing as the producer we're going to connect to the rabit mq we're going to set up a channel to send to communicate over we're going to do one thing that you need to do when you're working with streams you need to set up a quality of service so go ahead and do uh channel. qos which will set up the quality of service and you need to set up a prefetch count and a prefetch size now the prefetch is basically how many messages you will accept before sending an acknowledge to the server that they can send more messages so the server doesn't overflow you so let's go ahead and say that we want 50 messages and we want zero prefetch size and for the global let's set it to false so let's also I won't be covering globals and stuff because we did that in the earlier video so I'm kind of assuming that you have a grasp of what's going on so let's see I have a syntax error someplace if error if error not equal oh sorry if error not equals nil of course let's Panic right so once we have that we can start consuming our stream one thing that you need to understand also that you cannot use Auto acknowledgement when using streams Auto acknowledgement has to be false if you set auto acknowledgement to True your application will crash it will not allow that auto act has to be false uh it will not work with streams so let's go ahead and do Channel consume and the stream name is event and the consumer name what should we do for Consumer name we will do events consumer that's really imaginative and auto acknowledgement false exclusive we can set to false no local false we don't want to wait so no wait false and then again we have the arguments in the consumer we're going to leave mqp table empty we can do that so we are consuming the stream the same way we are consuming a regular que so let's just go ahead and check if there's any errors and then we want to Loop forever and just read the messages so let's make a print statement so we know that we are reading starting to consume stream and then we are going to create a uh for Loop uh and consume returns a stream or a channel so we can range over it which is handy so let's just do four event range stream and let's print some information regarding the data that we are receiving so we're receiving the event let's just do events and let's print a string new line and if you open up the event here you can see it's the same thing that we are sending we have all the values um that we can use so let's go ahead and find our correlation ID because we sent a correlation ID to know which event we're talking about and we can send headers so let's do that headers is going to be important for us to review so let's print those so you can see them and then whatever we have in the payload the payload is in the body so let's print the payload as a simple stream in in a regular application you should Marshall it using the same structure that you are sending so again protot is really great for that and then let's just close the channel so we and when we exit the program we make sure that we have empty uh so let's go ahead once we have done that so we connect to the Revit mq we start consuming the events and we print them let's go to Consumer let's run the consumer and it nothing will happen now you might be wondering why why is that so you can go ahead and run the producer again you will see a bunch of data being spr uh bunch of data being printed you can see the indexes is 3,000 for me I had to run the producer once more I've run it three times thousand messages so my index starts from there but you might be Wondering Why didn't it print messages when we simply ran the producer it's quite simple because as you see when we print the header you can see something called the extreme offset extreme offset is the index of the event message so this is the unique identifier for that message which Revit mq sets for US based on the sequence that they are sent by default unless we specify something in the consumer it will use a xtrem offset called Next and next kind of works like a regular Cube it waits for new messages to come in and print them so this xream offset is really important because it defines where in the Stream we start listening from it's this it's the starting point from where we will be reading data and there's a bunch of different X stream offsets that we can set let's take a quick look at them so let's go into our code we're going to change the consumer remember everything is kind of an extension of cues so we need to add this as a header so let's go ahead and do xream offset say that we want only the offsets starting from 2ou 3,000 since I've printed it once more we will get any messages after 3,000 so we can save the file go back to Runner and run the consumer again let's clear the screen and let's run the consumer you can see we are getting every offset above 3,000 if uh this is the first one because why don't I see stream offset number one you might ask it's because the segments are deleted we have overflowed so we are seeing the first event that is persisted be after the Overflow so let we what what we can do is maybe we only want events after 3,100 so we can go ahead and clear the screen we can jump back Let's do 3,100 let's run the consumer again and now you see we only get events with a stream offset above 3,100 it's a sequence which is auto incrementing which each event received on the stream so that's how you use the offset to specify where to start but sometimes you don't know where to start so there's a bunch of other offsets you can use you can use last for instance last is going to print the last Shunk written to the segment file so not the whole segment file but the last written Shunk that rabbit mq made to the stream um so what if you want everything then you would use first first will return the first index found in the earliest segment file so it will take everything in the Stream as is so first this useful when you want to replay from the start for instance if you don't specify any extreme offset it will use next as we saw earlier next just kind of waits for new messages so we have next first and last but we also have the ability to specify time time based so let's say I only want messages that are 5 minutes ago so I specify 5m M for minutes and it's h for hours Etc y for years this will look at the current time of the rabbit mq server and return anything from the last five minutes uh so let's go ahead and run that and see we have nothing I made that maybe was it 10 minutes let's go back 10 minutes and ah there's my events and this is useful say that you know you patched a system and introduced a bug two days ago and you need to replay every event since that date you can kind of go back in time this way just replay it which is very very very handy if you're using an event driven system so one thing to understand is that whenever we set the extreme offset we set the starting point so if I say start reading events from 10 minutes ago that's the starting point there's no end after it have grabbed all the events from 10 minutes ago it will go back to next so it will wait for new events uh the way we have configured this so that's something you always need to remember you set the starting point with this so far we have been using the stream core and as you as you have seen it kind of works off the bat with regular cues and stuff uh you use the same SDK as the other ritm stuff um and that's because it's using amqp 0.9.1 but when we are using stream core we are not actually fully leveraging what streams are capable of remember earlier I said hundreds of thousands of messages using stream core but if we use the stream plug-in we will see millions of messages per second one reason for that is because stream plug-in is not using the 0.91 one protocol it's compliant with that protocol but it's not using it uh if you're using the stream plugin it's using a new protocol which is binary which will increase your performance majorly now to start using this we actually need to enable the plugin inside of rabbit mq and we will need to open up a new Port the stream Port by default uses Port 5552 so let's let's start by removing the rabbit mq instance that we have because we're going to um we're going to recreate it and open up a new port and we want a fresh start so we want to recreate it and have a fresh start so I'm going to do Docker container remove Das if and delete the current running Docker container so we have removed RIT temp Q now we need to open up a new Docker container and I'm going to do the same thing- d-n Revit mq but I'm going to add 55 boom boom boom 5552 we're going to also add the Old Port the regular Port because I want to show you guys that we can use streams plug-in and regular streams together now when we are creating our Docker container we need to add a environment variable and it's the server additional erlang arguments because we need to update the rabbit mq configuration to advertise a host name and this is really important uh because if you don't do it stream plug-in won't work uh I don't fully understand the reason why I've been reading their documentation and a few issues on GitHub related to this but I kind of only find information that you need to have it something about the new protocol I guess so let's just add it and we're going to do rabbit mq stream advertised host is going to be Local Host we're going to have a no loop back and we're going to use the same rabit mq version as before so run that command and now we also need to enable the plugin enabling plugins in rabit mq is really easy you can do it using um rabit mq CTL so let's go ahead and do Docker EXC rabit mq and inside rabit mq we're going to use the rabit mq plugins command and I want to enable rabit mq stream this will enable the rabbit mq stream plugin you should see that it prints rabbit andq stream has been enabled if you want to use the management UI there is another called rabit mq stream management now if you if you enable that you will also be able to view the streams inside the rabbit mq web user interface it should actually create a new tab for you uh if I refresh my website we can see it here this wasn't present before which indicates that we have enabled streams so sadly we won't be able to use the same package that we did here we won't be able to use the mqp the rabbit mq official team provides a SDK for the stream plugin so we're going to have to change to that plugin um but it's not a third party library or anything it's still maintained by D temp team so that's that's great so to make this easy I'm going to create a new folder called producer plugin to indicate that this is a producer using the plugin and it's going to be a folder plugin and inside producer plugin I'm going to create a main file we're going to create a main module so after we created our main package let's open up a terminal and do goget dasu and I'm going to do github.com SL rabbitmq it's the rabbit mq official GitHub RIT mq- stream D go- client there are clients for many programming languages that's why it's called rabbit mq stream go clients there's Java clients whatever clients so whatever language you use you can probably find a maintained client there and we're going to do exactly the same thing that we did before in the producer so I'm actually going to go ahead and jump to the producer and I'm going to copy the producer the old producer I'm going to go back inside to the new producer in producer plugin and I'm just going to paste it then I'm going to remove a bunch of things we're going to remove the connection we're going to remove the Q declaration we can leave the publishing part but we're going to remove publish with context basically we removed everything so but yeah basically but yeah we should have a main file we have our event structure the same as before and we're going to Loop through thousand of messages and print them now we're going to do do one thing again because actually we are reusing the event stream name all the time so I'm just going to create a constant because let's start using the new plugin we're going to connect to the server and we do that a little bit differently so connect to the connect to the stream plugin on rabbit mq so to do that we will accept an environment uh accept an error and we will do stream so to connect we're going to use new environment and new environment accepts a new environment option so let's go ahead and create a new environment option the new SDK kind of likes uh shaning functions so all of these functions that we will be using are kind of you kind of Shain them together so if you're not used to that don't be worried it's not that hard so new environment options return return a new options and on that option we want to set host and the host is going to be Local Host same as before and we want to dot afterwards because we're going to set Port set Port we're going to set the port to 5,52 we're going to set the user to guests right and let's fix that's broken up syntax and let's set the password the password is guest I kind of like this way more than creating a string with the fum this is more I don't know I like this more it's a little bit more to type but I think it's nicer in the end so let's finish that okay we already did that so let's do that I'm just going to for format this a little bit so it's a little bit clearer this one expects that one now the formatting is done so we create a new environment we pass in a new Option and we set the host Port user and password let's check for errors if error isn't nil let's just panic and now we have a new environment we need to declare the Stream So in the earlier producer we declared a stream with Q declare in the new SDK we can do error equals let's make a small comment declare the stream and we also want to set the segment size and the segment the segment size and the maximum size for the stream the same way we did before we want to do exactly the same thing so decare the stream set segment size and Max bytes on stream right that's our gold let's go ahead and do the environment so we're using the stream new environment returns a environment and that environment we're is connected to Revit mq so we're going to use the environment to actually declared Stream So declare stream the stream name is event stream and we're going to pass in options and it's the same syntax as when we created the environment so we're going to do stream. new stream options and for the stream options we're going to do a new dots oh [Music] sorry so the stream options and we want a DOT I I recently changed my neovim configuration to use lazy beam and I'm still not used to it as you might see so let's go ahead and set the segment size and there's a function for that so instead of this is this is great because instead of knowing the header name and that we have to set it we can use this function and it kind of streamlines uh what we need to know it makes stuff there's more to write but it's actually pretty much easier I think so good job on them so the set magnets set max segment size accepts a bite capacity and that's not going to be a pointer so we're going to set the byte capacity and the BTE capacity is going to have uh a megabyte set and it's a function which accepts a value so this time we cannot use o0 Point blah blah blah 0 point uh 3 because um it only accepts integer values so we cannot use those now we also need to set the maximum length so I'm going to go ahead set max length bytes there's a function for that perfect it also accepts a by capacity so stream. BTE capacity and let's do that and the bite capacity is going to be what should we use the segment is 1 megab so let's do double we're going to have two segment files for our stream okay we can do like this instead it's a bit nicer and we can remove that okay so we have declared the stream we have set the segment size we have set the length byes I think this is actually more nice than this I don't know why I mean both works but I think this is clearer I like functions I like calling functions so let's just check for errors if errors not equals nil let's panic and let's go ahead so we have the stream we need to create a producer we don't have a producer so create a new producer so let's begin by creating let's begin by creating the producer options so produ producer options let's do that and they're part of the stream module so new producer options always when it comes to creating new options you'll notice it's the stream uh module so stream. new stream options stream. new environment stream. producer okay so we have a um we have a producer option so let's see what we can do with that here we can set a few things the thing we're going to be interested in for now is setting the producer name we will cover more why shortly uh but let's set the the producer name so I'm going to set it to producer simple really simple so we have a producer called producer and we also need to create the producer using these options so environment again stream is the module for the entities environment is where we actually apply the settings so new producer and the stream name will be then event stream we call it the producer options will be producer producer options all right and let's check the error to make sure we don't have any Tyles all right so we have what we we have the same thing here that we have in the old producer in the old producer we connected we declared the stream and in the new one we connect we declare the stream we set set up our producer it's time to start publishing the messages since we increased the capacity to megabits instead of the minor ones we had before I'm actually going to increase the message size let's set it to 6,000 messages instead just because we will we want to fill up the segments faster I'm just going to go ahead increase it a little bit now we want to do the same thing we want to create the event we want to martiall the event to on bite array and then we want to pass this bite array as a message so let's go ahead and message amqp now it's important that you use the right amqp module we're not going to use 091 remember the the stream plugin does not use version 091 so you need to use the rabbit mq streaming client amqp module and there's a function called New Message which accepts by array data so let's just pass in our data and go ahead and now we have a new amqp message amqp message returns amqp 10 which is a structure we can actually have a quick look at that so amqp 10 is the structure that we are sending over the network it has a few properties that we might be interested in so for instance the properties what's that the am qp10 properties is message ID user ID who it's to we have the correlation ID for instance and a lot of other stuff that we usually see in the mqp 90 header so we want to add correlation ID because we did that in the earlier version so in that we set the data and the correlation ID so let's go ahead and create a property and it's part of the amqp package now module now so module message property and it's a structure and in the properties we want to set the correlation ID and the correlation ID should be uu ID new string and as we can see we are not using the properties yet so let's apply the properties to the message message. property is props and let's go ahead and also send it sending messages is really easy we do if ER producer dot you can see here we have a batch send which accept an array of stream messages and we have send which accept a stream message maybe I should clarify that amqp new message returns a amqp 1.0 structure but that structure fulfills the stream message interface so uh there's no worries there so let's go ahead and pass in the message and we want to check if error not equals nail Panic so this way we are sending the message and we apply properties to our message this way and in the end we want to do the same thing instead of closing the channel we're closing the producer whenever we shut down the program we can remove the context we no longer need it and we can apply a code action to organize the Imports so we're done all the warnings are gone all our warnings are gone and everything looks good so what are we doing again we're we're creating the environment connecting to the server we declare the stream we create a new producer and we send 6,000 messages uh it is possible to batch send messages as we said if you have an array of messages instead you can use use batch send but actually if you Traverse into the code um of the SDK you can actually find that batch send is being used um batch send is actually being used under the hood so I'm just going to go ahead and do send for now okay so our producer is done so let's go ahead and go back to the runner I'm just going to jump back I'm going to go into producer plugin I want to use my new producer which is running the new and improved stream plug-in I'm going to run that producer and let's see what happens okay okay no errors was panicking so that means our stream has gotten data we can take a look at it by jumping to the UI go to Q you can see here we have 6,000 messages inside our Q or our stream you can see that our values are actually applied here we have segments of one megabyte and a total size of two megabytes we can actually let's to show you guys that it's actually doing exactly the same thing as before let's jump into the docker container uh let's do Docker ex-8 rabbit mq bin bash we can go inside VAR lib rabbit mq Amnesia rabbit and we go into stream and we will have our stream folder so let's go inside that stream folder and let's do on LS and you can see the same thing as before we have a segment and we have a index file now if we keep sending messages we will see more segments in in our case we should only ever get two segments uh one thing you can do to see the difference is if we print the segment file I don't know if you remember when we printed the segments files before but we could actually see a few data points but now we can see that it's all binary except for our payload remember the payload is up to us so if you want a binary payload inside the messages you have to use protou for another binary message protocol but the actual rabbit mq event is binary under the hood the stream plug-in is working exactly the same way as the stream core so at this point we are producing events to the stream plug-in we can actually start consuming them one thing I kind of want to show you for fun is that if you go inside the runner now and you run the old consumer without changing anything you don't have to change anything uh let's run the consumer and see what happens we are actually seeing the Stream plug-in messages because the consumer going to the consumer the consumer is configured to use the events stream and from 10 minutes ago and that is the the stream we just created so the stream plug-in and the stream core are using the same fundamental core the the plugin is only an extension so they can work together you can listen to that stream from my regular amqp 090 but it will be using 090 to but we'll be using the old protocol to handle it so you won't get all the fancy extra stuff so we kind of want to change that anyways because right now we have a great producer and we have a semi great consumer but we want two great entities right but it's it's important that you understand that the the underlying basic structure comes from the stream core even if you're using plug-in it's using the core but it's it has these extended capabilities basically let's go ahead and create a new folder and it's going to be consumer plugin and in that directory I will have a new main file actually I will be copying the producer copy everything from the producer into the consumer plugin and once I've and once we've done that scroll down a little bit to where we declare the stream and delete everything from there so that's how our consumer will be looking um we can remove the event structure for now we won't be marshalling it basically I wanted to just steal the connection so we're going to connect to the same stream to the same server so let's just steal the environment setup now in the same way we replicated our regular producer with the plug-in producer we will do the same thing with the consumer it's just going to be uh a lot easier so let's create consumer options create consumer options and in the consumer options we can set the xream offset so this is where we create the offset for our consumer and since the offset is really important it's great to know where we do that so let's do stream dot remember stream is the entities so new consumer options and in the consumer we want to set a name set consumer name and we will name it consumer uncore one something simple let's remove that dot and let's place it here otherwise my editor will not understand what we're doing once we have set the consumer name we want to set the offset and this is what I like about the new SDK instead of adding headers which feels kind of hacky basically that's what they are doing for us but it feels much better to have like a method called set offset and I we can use that to specify it it's more clear and as we can see the offset accepts a offset specification um now so so let's create a offset specification and once we have a offset specification an empty one we need to apply a offset and they allow us to do that by by these functions you should be familiar with this function by now because it's the same functions that we toyed around with when we made the consumer we have last next first offset string Tim stamp even last consumed if you want to do that so it's really easy to apply an offset with the new plugin because each of these functions also help you let's do so let's just try we want to grab everything in the Stream So let's do first and once we have that consumer options we need to the same way we do with producer we need to create a consumer so let's apply a consumer and remember the consumer is going to connect to the server so that is done by using the environment entities in the Stream module Connections in the environment new consumer so the stream name and we have that in a constant now now you can see here the second parameter is a message Handler you might be wondering what that is so let's jump to the message Handler and we can see here that message Handler is an alias for a function which accepts a consumer context and a aqp message the same aqp message that our producer are sending let's copy that function signature and let's jump back to our consumer before we go on and let's create a function message let's call it message Handler and and what did we have can do this like this [Music] actually so we have so we have a function called message Handler which accepts a consumer context and the consumer context is a entity so it's in the Stream package and again the message we are grabbing from amqp so we have a Handler so what are we going to do in our Handler let's do the same thing as our um previous consumer in our previous consumer we kind of just printed everything right so let's do that let's print what we have we have message which okay let's do this for now let's go here and add our message hander I need to correct the syntax otherwise it my editor won't know what I'm doing and we're going to pass in the consumer options as a third parameter so our new consumer accepts an event stream name the message Handler and a consumer option and we need to no code action my bad I have a typo here it's messing it up [Music] okay so let's apply a code action to import font and let's take a look at the message so the message is the same thing we did in the producer so it's very familiar so we know that there's a properties field with the correlation ID set so let's print that and let's also print the data inside of um the message and the data you can get by doing the message. get data get data will return the bytes inside your payload so in your business logic or your application you would UNM maral your data into struct here and then do something with it of course um so this is a simple message Handler which you pass in as a second parameter to the consumer now what new consumer does is it actually starts a background process which listens on the stream and starts for each message that we receive it applies the message Handler so we don't need to do anything more with that for now let's just check if there's any errors and [Music] then uh let's close the consumer once we're done with it and since we want want to actually have this program running for a while let's for now uh or yeah during this tutorial we're going to use a sleep and let's import time package so the consumer is not that long we create the environment we connect to the server we create a consumer offset a consumer option where we declare the offset and we start consuming and then we sleep and wait for 5 seconds so let's go ahead and try it out so let's move into the consumer plugin and let's do go run main.go oh my bad we have a few Imports I need to take care of let's do a code action organize Imports and save the file and now taada we are seeing the messages this is great so now we see the correlation ID and we see the data being printed from the messages so using the using the new SDK for the plugin is really I I like it a lot let's try just for the fun so we want to grab only events after 6,000 event or after the IND 6 6,000 so we can use the offset specification we can apply the offset function the offset function accept a in 64 so let's just insert 6,000 we have applied the offset so let's rerun it and we see two events which is correct we have 6,2 events so that makes sense and then the program exits at this time we know how to publish and consume streams both using the core and the plug-in and there's one thing I'd like to mention that we will just review really quickly I I try to keep it short and it's sub entries and compression I don't know if you've noticed but we're sending is each message once at a time that's a truth with modification because the SDK batches it for you but there's something called sub entries that I'd like to touch upon really fast subentries is a technique that allows us to patch messages inside the same frame so that's what I'm trying to draw here we have one frame being sent but inside of that frame we have made room for many events and this allows you to kind of increase the throughputs since you don't have to send 100 uh requests to the server instead you send one request with 100 messages inside of it and it reduces the network traffic by a lot and one other great thing about it is it enables us to use compression which further reduces the network size and it can increase your througho much but it will introduce some latency it's important to understand understand that there's a trade-off uh the latency will come from the the actual batching when it's waiting to fill up the buffer and uh there's a default it if there's no messages in a certain time span it just sends them um but in anyways still a small latency also when we add compression there's also the latency of uh compressing and uncompressing of course let's let's look at the pros and cons so what we need to do to enable sub entry it's actually very it's very very easy so if we go into the producer plugin and in the producer options we're actually going to set so batch 100 events in the same frame and the SDK will handle everything for you so this is really easy let's go ahead do the producer options set sub entry is a function which accepts a integer for how many messages you want to have in the same entry I'm going to set it to 100 and then when we use sub entries we actually are able to set compression um so that's really a great part about it so let's add a compression we want to add stream. compression again it's a entity and we want to use the Gip function these two lines of code will start allowing the producer to sub entry the messages sent and it will apply compression to them to make everything clear what's going on I want to show you quickly the network traffic so that you really can see what's going on so I'm going to go ahead and actually comment out the changes we use that I'm going to open up a new terminal and I'm going to do pseudo TCP dump I'm going to dump my Docker interface if you're running Docker your computer will have a Docker interface which you can sniff Network traffic on and I'm going to listen on the port that this rabbit mq streams are using so I'm going to start a network sniffer and sniff the network traffic for the rabbit mq stream and once I have done that I'm going to go inside of my producer and I'm going to run the producer to send out the messages without sub entry and without compression so let's go back and close CL the sniffing I want to go ahead and run the same command again start a new sniffer but I'm going to rame it to sub entry Revit mq sub entry pcap and before I run it I'm going to uncomment the sub entries so what I'm essentially doing here is I'm creating two two network files two pcaps containing the network to show you guys the difference between TW sub entry and compression and regular sense so we will have two files one for each use case so let's start the TCP sniffy sniffer let's go and run the producer again and once it's done let's close the sniffer now we have two files which we can inspect and see the difference on so I'm just going to do TCP Dum I'm going to print the rabbit mqp cap the one without sub entries and the one without compression first and- a in TCP dump will print the payloads and- r is just to so we can review it can go down here we will see a bunch of stuff being sent and let's go down this is just all mumbo jumbo go down to see the actual payloads being sent so let's go down this is just regular traffic traffic traffic okay here it starts so you can see the rabit mq stream is sending messages you can see our events here actually in clear plain text and you can see the length is 4,000 bytes and 7,000 bytes and 7,000 bytes again we have a package 14,000 bytes uh 24,000 bytes Etc it's the size that's being being sent over the network but let's go ahead and do the same thing for the pcap with the sub entry sub entry and if we go down to the [Music] payloads let's find the pay oh here's a payload you can see that the payload size is 2,200 and in fact it's going to be 2,200 for almost all the packages because they're sending 100 messages at a time uh if we actually remove - a it's a lot easier to see uh because you can see the length it's always 2,000 basically all the packages are 2,000 which contains traffic and I don't know if you noticed the packages are no longer readable because they're compressed so that's why uh the packages are a lot smaller which will save your network some struggling actually for for for fun we could uh do a small grip for length and you will see all the lengths and we can do a we can summarize all the lengths that our systems [Music] see I think let's do it like that so the subentry file is 40,000 long but if we print the regular file is six okay five times the size basically which is a lot uh so that's good to know about sub entries and compression but it comes with one major flaw a big big big big big flaw is actually if you go into the producer code and search sech for assign publishing ID if we look at the SDK you can see a comment here in case of sub entries the duplication is disabled so we haven't talked about the duplication so let's do that really quick um the duplication is the act where you send one message from the system and you you know that that the producer knows that it sent it but if the producer is restarted it might get sent again for instance or if you have a it might send the same message over and over and over again and that's called duplication that's not good sometimes if say an customer registers at your website and you send out a rabbit mq message to all the consumers that he registered and one of the consumers is an email service which sends the customer an email but what if you keep sending that same message over and over and over um because some kind of bug so we can avoid that by two simple uh options so the one option is actually the producer name if you have a producer name we can trace the producer and which messages they have sent so always have a producer name the second option is actually if we go down to the message where we assign the property we can assign a publishing ID set publishing ID to prevent D duplication [Music] and you do message. set publishing ID and Publishing ID should be on auto incrementing identifier so your system needs to keep track of the event ID to know and assign the same ID to avoid the duplication in our case here we have a for Loop so it's really easy I know that index one is message one but in a real life application it's it's a little bit more Troublesome because you need some way of knowing okay this event has this ID this publishing ID I've already used that um apply that again or Etc but if you do this you have a producer name and you have a publishing ID if I rerun the producer these messages will actually not get accepted by rabbit mq but if we also have sub entries activated it kind of will get accepted so that's one of the major you have to consider which one you uh want the more the most so to kind of to test this we can actually do um Docker X it rabbit mq here so Docker exit rabbit mq delete the Q events I'm going to delete the events Q now it's gone so let's go ahead and run the producer and we should now see the queue being created and we have have 602 messages since we set the prod publishing ID we set the publishing ID to the same integer we don't have sub entries and we have a producer name if I go ahead and rerun we should have 12,000 entries but since we are now protecting ourself against the duplication we won't get it because rabbit mq already knows these messages has been sent before so it doesn't add them now sadly we cannot combine this with sub entries so if we enable sub entries again and even if you set the publishing ID it doesn't matter so if you go ahead run the producer again and you will in fact see 6 6,000 new messages have been added so that's kind of sad you have to wage which one you want to use the most the duplication which I feel is kind of important for the extra uh throughput from compression so at this time we have covered stream core we have covered the stream plug-in remember the stream plugging it's much more performant than stream core but it requires a few extra steps when you build a real world application I really do recommend you to like kind of have a have your payload entities in a shared library that the Publishers and consumers can import and use in our example here we use a event structure inside the same module but uh I really recommend you to use something like protuff so you can Marshall and unmarshal your payloads really easily before sending them over rabbit mq and also the consumers can kind of unmar I I can really recommend protuff uh again it's also binary very smooth to work with and in this tutorial we will not cover super streams because super streams are not really implemented in implemented in the go SDK yet but you can follow the progress on GitHub I put a link in the descriptions how badass doesn't super stream really sound it sounds amazing all right guys in this tutorial we have learned what streams are how they work how stream core versus stream plug-in Works how to use both of them in go we have looked at sub entries and compression and we have talked a little bit about the duplication all the code used in this video you can find on my GitHub a link in the description will be present I really do hope you enjoyed this video I enjoyed really making it and I do love it when you guys put feedback and comments so go ahead and do that and if you haven't liked And subscribe my channel you should really do that I appreciate it very much and thank you guys for all the nice comments all the bad comments I I appreciate it all because it helps me evolve so just reach out if there's anything I can help you with or if there's any topic you kind of want thank you very much for watching bye
Video description
In this video, we will have a look at how to use RabbitMQ Streams in Go. We will learn how to use both the Stream Core and the Stream Plugin approach. The tutorial does not only cover basic usage but also the underlying functionality for a better understanding of what is going on. After the video, you should be familiar with how to create streams, and how to consume and produce messages on them. We will also have a quick look at compression subentries, and deduplication. All the links mentioned in the video can be found here: GitHub with Source code: https://github.com/percybolmer/rabbitmqstreams-go SuperStream SDK Support: https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/149 00:06 Intro 03:38 Streams and how they work 05:00 Stream Core & Plugin 08:00 Stream Core Producer 21:48 How Streams Works & Retention 34:40 Consume Streams 46:10 Stream Plugin 01:08:00 Consuming from Stream Plugin 01:18:40 Subentries & Compression 01:27:00 Deduplication 01:31:00 Conclusion