Using in-stream aggregation to manage event storms and reduce data volume
This article shows you how to use in-stream aggregation to manage event storms and reduce data volume. There are two different ways to do this:
- A beginner method that uses
statsto reduce the number of raw events coming to the index while retaining important field information. - An advanced method that builds on the beginner method, allowing you to clone the original events that would otherwise be lost through aggregation to another data store or create metrics from
stats.
Why use stats in the stream?
You're probably familiar with running stats at search time; one of the most common Splunk platform operations is to pipe search results to stats to get aggregated results. This process takes a large set of events and extracts specific fields, usually grouped by other fields.
For example:
index=fw sourcetype="cisco:asa" | stats sum(rcvd_bytes) BY src_ip
You can do this easily at search time, so why would you want to do this in the stream before the data is indexed, especially when the aggregations are all you're typically concerned with? Does it make sense to ingest thousands, or millions of lines of discrete events if your day-to-day only consists of sum/avg/count?
By leveraging stats in the stream, you can build aggregations on large sets of data before they reach the index. This reduces the number of events arriving into the Splunk platform without losing significant data fidelity, while also reducing workload when retrieving those stats for your searches, reports, and dashboards.
Data sources
Firewall logs are typically among the most verbose data sources in both security-centric and observability-centric environments. In the case of Cisco ASA, there are some especially noisy events like ASA-4-733104, which indicates SYN flood detection during DDoS attacks or other brute force attempts.
In most cases, these log events occur fairly infrequently. However, during attacks, large spikes in these events can bloat indexes and reduce search efficiency due to repeated duplicate events. The screenshot below shows that at search time, a SYN flood attack results in the same event code occurring very frequently during the flood or attack event. These events are largely the same, with the difference being the flood rate (in this example they happen to be static). What you likely care about is the total rate in aggregate and the destination IP.
At significant scale, searching raw text logs for this data and running stats at search time wastes computing power, storage budget, and analyst time. All raw events have to be retrieved, search time fields have to be extracted, and statistics have to be calculated. When these searches run routinely, the inefficiency compounds.
Using stats in the stream, you can focus on this ASA event code and always aggregate those events. This will reduce the event flow generally, but will also automatically deduplicate, count, and summarize that data for quick and efficient analysis during attacks.
Processing data in motion allows granular application of data manipulation operations. This capability should be leveraged to start small and grow incrementally. When applying aggregations, identifying the most consequential events helps realize the greatest benefit and prevents unnecessary complexity. This example focuses specifically on the ASA-4-733104 SYN flood event for aggregation, while continuing to send all other events directly to the index. Additional use cases can be built over time as you adapt to this approach to data management.
Beginner method
This beginner method provides a simple way to reduce the number of raw events coming to the index while retaining important field information. However, there are some caveats to this method:
- Your searches and detections have to change. Because
statsused in the stream results in data being shipped to new indexes with new sourcetypes, your detections and dashboards have to be updated to refer to this new data. - The more cardinality, the less effective. In-stream aggregations have relatively limited windows of data captured. When your data has a lot of variety in the groupings, the aggregations contain fewer groups and result in less dense aggregations.
- Make sure you are comfortable with the loss of the original events. Sometimes keeping the original, unaggregated data is important. This beginner method results in the loss of the original events. If you need to store the original events, you can use the advanced method to route the original events to another location like Amazon S3 for long term storage. Coupled with federated search, these original events can be searched in-place, in real time if needed.
- Consider using metrics, not events. The beginner method shows how to use a normal Splunk platform event index to store these aggregate results. But these results are now just metrics, and you can take advantage of Splunk platform metrics indexes or Splunk Observability Cloud to store and search these metrics, allowing you to search at even faster speeds while costing less to store. The advanced method shows you how turn
statsinto metrics that you can search in this way.
- Start by creating a new pipeline. Both Splunk Edge Processor and Splunk Ingest Processor are able to run this logic. Choose whichever fits best in your architecture.
- Set up your pipeline to with
cisco:asasourcetype data and use the defaultsplunk_indexerdestination. - Select
splunk_indexeras the default destination. - In the pipeline authoring canvas, remove all of the boilerplate code and start by importing the
routecommand to send the SYN flood events to their own index.
import route from /splunk.ingest.commands
- Start a new pipeline and use
rexto extract the ASA code from all events. This gives you an initial field to work with.
$pipeline = | from $source | rex field=_raw /%(?PASA-[^:]+)/
- Use
routeto create a new path for the target events.
| route asa_code == "ASA-4-733104", [
- Use
rexto extract all of the event fields.
| rex field=_raw /see_full_spl2_for_regex/
Click here to see the full SPL2 regex.
import route from /splunk.ingest.commands $pipeline = | from $source //extract the asa code from the asa events | rex field=_raw /%(?P<asa_code>ASA-[^:]+)/ //remove the SYN flood codes from the main stream and process them separately | route asa_code == "ASA-4-733104", [ //extract the fields specific to this ASA event | rex field=_raw /^(?P<timestamp>.*?)\s+(?P<device>\S+)\s+:\s+%ASA-4-733104:\s+TCP\s+Intercept\s+SYN\s+flood\s+attack\s+detected\s+to\s+(?P<target_ip>[\da-fA-F:.]+)\/(?P<target_port>\d+)\s+\((?P<real_ip>[\da-fA-F:.]+)\/(?P<real_port>\d+)\)\.\s+Average\s+rate\s+of\s+(?P<avg_rate>\d+)\s+SYNs\/sec\s+exceeded\s+the\s+threshold\s+of\s+(?P<threshold>\d+)/ //create a valid timestamp for grouping | eval _time = strptime(timestamp, "%Y-%m-%d %H:%M:%S %Z") //count the number of actual events from the firewall //and aggregate the average rate to get a total for this grouping | stats count() AS syn_flood_event_count, sum(avg_rate) AS syn_rate BY target_ip, target_port, real_ip, real_port, span(_time, 10s) //set an appropriate index and sourcetype for delivery to splunk | eval index="fw_stats", sourcetype="cisco:asa:dedup:4-733104" - Convert the timestamp into unix time.
| eval _time = strptime(timestamp, "%Y-%m-%d %H:%M:%S %Z")
- Create the aggregations using
stats.count()gives you the number of actual firewall events.sum(avg_rate)gives you the aggregated SYN rate.
| stats count() AS syn_flood_event_count, sum(avg_rate) AS syn_rate BY target_ip, target_port, real_ip, real_port, span(_time, 10s)
This assumes relatively low cardinality of the grouped fields. High cardinality events will not benefit from
statsas much as low cardinality events. - (Optional) add CIM-related fields by adding them directly to the
_rawevent. You should perform this step if you want these events to be CIM-compliant so that Splunk Enterprise Security and other CIM-related detections see the aggregations in the same way we would see the raw events.You should follow typical "new sourcetype" steps to create search-time knowledge objects that map the aggregation output to something like the Intrusion Detection model.
You should generally rely on search-time artifacts and knowledge objects to do these field mappings and extractions, but this example shows how you might add context to the
_rawevent to assist in those search-time extractions. By adding these fields to the_rawevent, you lock in that data and making it part of the permanent record. Using search-time extractions is more flexible and adaptable if you need to make changes later.| eval _raw = json_set(_raw, "dest_ip", real_ip, "dest_port", real_port, "protocol", "TCP", "signature", "SYN Flood Attack", "action", "blocked", "vendor", "Cisco", "product", "ASA", "category", "intrusion_detection", "dest", real_ip ) - The
statscommand creates a new_rawas well as top level fields. You are only interested in_rawand the normal metadata fields.
| fields _raw, host, source
If you do not remove top level fields, you get indexed fields alongside raw fields, which produces duplicate data. You want either indexed fields or raw, not both.
- Use
evalto set a proper index and sourcetype for the new stats-based output. Note that storingstatsusing a new index and sourcetype is a best practice to avoid co-mingling raw andstatsdata.
| eval index="fw_stats", sourcetype="cisco:asa:dedup:4-733104"
- Use
intoto send the aggregated results into the Splunk platform destination. The closing brace finalizes therouteyou started before.
| into $destination]
- A final
intosends all other events to their default destination.
| into $destination;
Search results
Event volume (before)
The screenshot below shows the number of total events and total size per second of a SYN flood attack without any aggregation.
Event volume (after)
With the aggregation pipeline in place, there is no spike in event counts and no burst in event sizes on disk. The aggregated results summarize the information in the bursts.
Search results (aggregations)
The SYN flood events are now stored as JSON events with the field groupings and aggregate values. This results in far fewer events on disk while maintaining the same data fidelity.
Now, using stats at search time, you can aggregate those pre-computed aggregations to get your final counts and clearly see the SYN flood attack on a server, but you scan far fewer events to get the same result.
This screenshot shows the raw event search:
This screenshot shows the stats on aggregates:
Advanced method
You might want to use this method if:
- You're concerned about aggregations of events resulting in the loss of the original data. While the aggregations shown in the beginner method can address tactical use cases and help you get answers quickly and efficiently, the aggregations might not solve for both technical as well as business requirements.
- Your use case is observability-focused. The beginner method sends aggregations to the Splunk platform as normal raw events. However, because the data in the events consists of just numbers and dimensions, they are good candidates to store as metrics instead of logs. If your use case is observability-focused, you can store these metrics in Splunk Observability Cloud. The native metrics indexes in the Splunk platform can also function as an optimized destination for this data.
You can make two changes to the pipeline shown in the beginner method to satisfy both of these needs. In the following copy of the SPL2 pipeline (previously shown in the beginner method), two placeholder sections have been added where this additional behavior can be inserted:
<CLONE ORIGINAL EVENTS TO ANOTHER INDEX OR OBJECT STORE><CREATE METRICS FROM STATS>
import route from /splunk.ingest.commands
$pipeline = | from $source
//extract the asa code from the asa events
| rex field=_raw /%(?P<asa_code>ASA-[^:]+)/
//remove the SYN flood codes from the main stream and process them separately
| route asa_code == "ASA-4-733104", [
<CLONE ORIGINAL EVENTS TO ANOTHER INDEX OR OBJECT STORE>
//extract the fields specific to this ASA event
| rex field=_raw /^(?P<timestamp>.*?)\s+(?P<device>\S+)\s+:\s+%ASA-4-733104:\s+TCP\s+Intercept\s+SYN\s+flood\s+attack\s+detected\s+to\s+(?P<target_ip>[\da-fA-F:.]+)\/(?P<target_port>\d+)\s+\((?P<real_ip>[\da-fA-F:.]+)\/(?P<real_port>\d+)\)\.\s+Average\s+rate\s+of\s+(?P<avg_rate>\d+)\s+SYNs\/sec\s+exceeded\s+the\s+threshold\s+of\s+(?P<threshold>\d+)/
//create a valid timestamp for grouping
| eval _time = strptime(timestamp, "%Y-%m-%d %H:%M:%S %Z")
//count the number of actual events from the firewall
//and aggregate the average rate to get a total for this grouping
| stats count() AS syn_flood_event_count, sum(avg_rate) AS syn_rate BY target_ip, target_port, real_ip, real_port, span(_time, 10s)
<CREATE METRICS FROM STATS>
]
//all other codes go to the default splunk destination without any modifications
| into $destination;
Cloning data to Amazon S3
The aggregations created in the pipeline help you to perform efficient storage and search of otherwise verbose datasets. You can reserve your most expensive, high-performance storage (like Dynamic Data Active Searchable for cloud environments or hot/warm buckets for on-premises environments) for storing data for near real-time searching and addressing your most critical use cases.
Use cases that might require the original events to be stored and potentially searched include:
- Investigations or searches that require searching data beyond the retention period of the index storing the aggregations
- Investigations or searches that require correlating on data that wasn't captured in the original aggregation
- Compliance or regulatory requirements that require stored or searchable events in their original format
When full fidelity (original) events are required, the general approach is to clone the stream so that those events get sent to a location that has a lower cost. These lower cost destinations often trade cost for lower performance or higher complexity.
Dynamic Data Active Archive (cloud) or simple cold storage (on-premises) could be considered for this purpose, but this method shows how to do this with Amazon S3. Amazon S3 can be searched in place with Federated Search for Amazon S3 (FSS3), has the benefit of being able to democratize these data sets with other systems and groups, and might have lower long-term storage costs.
Pipeline and workflow changes needed to clone data to Amazon S3
You'll need to have at least one Amazon S3 destination configured to perform this method. See Splunk Help for guidance if needed.
- Replace the placeholder starting with
<CLONE ORIGINAL EVENTS TO ANOTHER INDEX OR OBJECT STORE>with the code block below.thrumakes a clone of the data and sends it to a specific S3 destination. Replace the$s3_destinationvariable with a variable suitable for your environment.Note that this implementation of sending data to Amazon S3 is relatively simplistic and does not attempt to optimize the data being sent. In practice, the partitioning and structure of the data being stored in S3 should support the types of searches that will use this data set. Additional fields, values, and logic can be implemented within the
thrublock to make the data more usable or optimized.| thru [ | fields _raw, _time, index, source, sourcetype, host | into $s3_destination ] - Assign an S3 destination to the
$s3_destinationin the user interface.
Structure, compression, and batch sizes can be controlled by the Amazon S3 destination configuration. After the data is there, it can be searched in-place with Federated Search for Amazon S3, Athena, or other tools.
If using FSS3, you need to create an Amazon Glue table and the proper FSS3 configs. After you've done that, sdselect syntax can be used to retrieve the original events and potentially be searched side-by-side with aggregated data using standard SPL or SPL2 search language.
Storing and searching aggregations as metrics
Using raw text searching on the aggregations from your pipeline is reasonable, but as the number of events and cardinality increase, a better approach is to use metrics native storage and search, whether through Splunk Observability Cloud or a metrics index. In this section we'll focus on native metrics.
Native metrics offer two key benefits:
- More efficient storage on disk using a metrics index rather than events index. The data is stored columnar rather than raw text and has a fixed license cost for ingestion purposes.
- You get a much faster search using
mstats, rather than by using raw text searches.
There are some caveats to this process:
- At the time of writing, the
logs_to_metricscommand used in this process is only supported in Splunk Ingest Processor. - Only 1 metricization rule can be created for each pipeline branch. To produce both the
syn_ratecounter and thesyn_flood_event_countcounter, you would need to usethru. - Metrics cannot be included in DMA/CIM. Security detections and searches need to be modified to include metrics explicitly.
Pipeline and workflow changes needed to store and search aggregations as metrics
You'll need to have at least one metrics index configured in the Splunk platform to perform this process.
-
Replace the placeholder starting with
<CREATE METRICS FROM STATS>with the code block below. The fieldsevalandintohave been replaced withlogs_to_metricsand a newevalandinto. Ensure that the specified index in theevalcommand matches a valid metrics index.| logs_to_metrics name="syn_rate" metrictype="counter" value=syn_rate time=_time dimensions={"target_ip": target_ip, "target_port": target_port, "real_ip": real_ip, "real_port": real_port} | eval index="cisco_asa_metrics", sourcetype="cisco:asa:metrics" | into $metrics_destination -
Assign a valid Splunk indexer destination to the
$metrics_destination.
After these changes have been made and applied, new ASA-4-733104 event aggregations will be found in the cisco_asa_metrics index. You can use metrics-related interfaces and commands to discover and search this data. The screenshot below shows these aggregations in search:
The aggregations can also be seen in the Analytics Workbench:
Next steps
Aggregations are a powerful tool for reducing noise and optimizing storage and search, but careful consideration must be given to the integration of these changes to your existing production workflows, knowledge objects, and search artifacts. Additional benefit and further optimizations can be found where the scale of your data is very high or specific performance is needed.
Data management pipelines open up new opportunities with a wide variety of options and implementation patterns. Make sure you review Splunk Validated Architectures, have proper training, and work with data stakeholders in your journey to implement these techniques.
In addition, these resources might help you understand and implement this guidance:

