Improving data pipeline processing in Splunk Enterprise
Trying to modify splunkd
using the props.conf
and transforms.conf
files is not simple. However, you know that applying EVAL
logic to and performing regex extractions on pipeline data allows you to change the value of a field to provide more meaningful information, extract interesting nested fields into top-level fields, and redact certain information from the data. You want to learn how to apply these Splunk transforms during event parsing on an indexer or heavy forwarder.
Altering data ingestion rules is risky. Only advanced Splunk users should do so, and they should always develop on their laptop. When using the configuration samples shown here, you may need to change parameters and values, according to your configuration. In addition, follow these guidelines to help ensure you implement safe transforms:
- Use Visual Studio Code with the Splunk extension to manage configs.
- Use the reload URL
http://localhost:8000/en-US/debug/refresh
instead of restarting your splunkd after making changes. - Use
_index_earliest
and_index_latest
to view recently ingested data - Use the [
copy_to_meta
] transform to debug.
Determine license usage
The license_usage.log
file is verbose, however due to squashing of host and source values, you might not be able to determine the total license usage from specific hosts. To report license usage of specific hosts or source fields, you can useINGEST
_EVAL
to compute the string length for each event and write it to an indexed field.
props.conf:
[default]
TRANSFORM-z-last_transform = add_raw_length_to_meta_field
transforms.conf:
[add_raw_length_to_meta_field]
INGEST_EVAL = event_length=len(_raw)
Then run the following search to perform license calculation:
| tstats sum(event_length) AS total_ingestion WHERE index=* host=<hosts_to_measure> _index_earliest=-30d@d _index_latest=-1d@d BY host _time span=1d@d | xyseries _time sourcetype total_ingestion
You can split the results by other fields, such as host
or source
, as needed.
Estimate data requirements
Prior to a net new data source being onboarded, additional context might be required to ensure sufficient compute and storage is available. You can use INGEST_EVAL
and CLONE_SOURCETYPE
functionality to emit metrics events that can describe the data coming in.
The metric event takes up to 150 bytes of license.
props.conf:
# data comes in on this sourcetype
[v:orig:data]
# this configuration is universal, and can be reused
TRANSFORMS-enable_estimate_mode_drop_orig = v_estimation_set_metrics, v_estimation_create_metrics, v_estimation_drop_orig
# metrics of metadata are created on this sourcetype
[v:estimate:pipeline]
TRANSFORMS-set_metric_name = v_estimation_metric_info
transforms.conf:
# clone original data, transform it into metrics events
[v_estimation_create_metrics]
REGEX = (.*) CLONE_SOURCETYPE = v:estimate:pipeline
# create metadata about the event, preserve original attributes
# these fields become metric dimensions!
[v_estimation_set_metrics]
INGEST_EVAL = orig_host=host, orig_source=source, orig_sourcetype=sourcetype, orig_index=index
# we do not need to keep the original data, we only want the metadata, so let’s drop it
[v_estimation_drop_orig]
INGEST_EVAL = queue="nullQueue"
# format event into a metric, route it to appropriate metrics index
[v_estimation_metric_info]
INGEST_EVAL = index="<name_of_your_metrics_index_to_write_to>", metric_name="estimation_mode", _value=len(_raw)
Then run the following search to see the estimation:
|mstats prestats=t max(_value) avg(_value) WHERE index=data_metrics AND metric_name="estimation_mode" BY orig_sourcetype span=5m |timechart max(_value) avg(_value) BY orig_sourcetype
Selective routing to other destinations
Sometimes data need to be shared with other destinations, Splunk Enterprise, or third party systems. Use cases can require all or a subset of data to be shared. INGEST_EVAL
can be used to control data routing groups.
props.conf:
# data comes in on this sourcetype
[v:interesting:data]
TRANSFORMS-example_data_route = v_sample_route_buttercup_bu
transforms.conf:
# if event data host is from buttercup1 OR buttercup2
# route data to {{cloud}}, otherwise send it to onprem indexers
[v_sample_route_buttercup_bu]
INGEST_EVAL = _TCP_ROUTING=if(match(host, "buttercup[12]"), "splunkcloud_indexers", "splunk_onprem_indexers")
outputs.conf:
[tcpout]
defaultGroup = splunk_onprem_indexers
# this output group routes data to {{cloud}}
[tcpout:splunkcloud_indexers]
server = inputs.buttercup.splunkcloud.com:9997
# this output group keeps data on-prem
[tcpout:splunk_onprem_indexers]
server = 10.10.10.10:9997
Manage conflicting time formats
Any well-curated Splunk Enterprise instance uses sourcetype
to accurately identify the event format timestamp. However, collisions occasionally occur in a single source type where there are conflicting timestamp formats.INGEST_EVAL
offers a new approach of using the strptime()
function to solve this problem.
Example Events:
props.conf:
[demutliplexed_datetime_formats]
DATETIME_CONFIG = CURRENT
TRANSFORMS-extract_date = demultiplex_datetime
transforms.conf:
[demultiplex_datetime]
# add fall-through case to set custom date or route “unknown” data to special quarantine index
INGEST_EVAL= _time=case(isnotnull(strptime(_raw, "%c")), strptime(_raw, "%c"), isnotnull(strptime(_raw, "%H:%M:%S %y-%m-%d")),strptime(_raw, "%H:%M:%S %y-%m-%d"), isnotnull(strptime(_raw, "%Y-%m-%d %H:%M:%S")), strptime(_raw, "%Y-%m-%d %H:%M:%S"))
This example initially sets the time of the event to be the current time. After this, you use a transform to try to replace that time by testing the known time formats using a case statement and picking the first that matches. This is not very computationally efficient as it invokes strptime
multiple times, but you are able to get the answer in a single invocation of INGEST_EVAL
.
Extract the time and date from the file name
Sometimes, in edge cases, the date is captured as part of the file name and the time is only logged in the event. Previously, you would need to use datetime_config.xml
and hope for the best. With INGEST_EVAL
, you can tackle this problem more elegantly.
props.conf:
[compound_date_time]
DATETIME_CONFIG = CURRENT
TRANSFORMS-get-date = construct_compound_date
SHOULD_LINEMERGE = false
LINE_BREAKER = ([\n\r]+)
transforms.conf:
# use regex replace to pop out the date form the source, append on the first 10 chars from _raw and then run through strftime and assign the result to _time. If the eval fails to execute _time is not updated and the previously set CURRENT time will remain.
[construct_compound_date]
INGEST_EVAL=_time=strptime(replace(source,".*/(20\d\d\-\d\d\-\d\d)\.log","\1").substr(_raw,0,10),"%Y-%m-%d%H:%M:%S")
The regex replace pops out the date from the source, appends the first 10 chars from _raw
, and then runs through strftime
and assigns the result to _time
. If the eval
fails to run, _time
is not updated and the previously set CURRENT
time remains.
Sample events
Consider a web server that generates thousands of events per second. You only care about errors and the ratio of errors to OK. You want to sample the OK events and provide high resolution for errors.
props.conf:
# data comes in on this sourcetype
[v:orig:data]
TRANSFORMS-sample_200_data = v_sample_200_data
transforms.conf:
# will look for events with status code 200 AND random number not equal to zero
# if true, drop the data
# if false, example will keep *roughly* one event out of 100
[v_sample_200_data]
INGEST_EVAL = queue=if(match(_raw, "status=200") AND (random()%100)!=0, "nullQueue", "indexQueue")
Drop fields from INDEXED_CSV
Both INDEXED_CSV
and INDEXED_JSON
are useful, but create indexed fields for every column or element. This can inflate your TSIDX
size and increase disk usage. Sometimes you need a subset of these fields for fast search but want to have the remaining available via schema on the fly.
props.conf:
[reduced_columns]
DATETIME_CONFIG = CURRENT
INDEXED_EXTRACTIONS = CSV
TRANSFORMS-drop_fields = drop_useless_fields
EXTRACT-removed-columns = [^,]+,[^,]+,[^,]+,(?<random_nonsense>[^,]+),(?<long_payload>[^,]+)
transforms.conf:
[drop_useless_fields]
# note the := syntax
INGEST_EVAL = repeated_field:=null(), random_nonsense:=null(), long_payload:=null()
Export and import data
Sometimes you would like to bulk export data from an existing Splunk Enterprise index and reingest it on your laptop for development. This pattern allows you to run a search that extracts data from an install via CSV export and import it again via a specific source type. This is achieved by creating a “protocol” for encoding via search, and then decoding via transforms. Note that this does not reparse data or carry any indexed fields across.
props.conf:
[import_data] DATETIME_CONFIG = CURRENT
TRANSFORMS-extract-metadata = drop_header, extract_metadata_copy_to_meta, reassign_meta_to_metadata, remove_metadata_from_raw
# Splunk encodes quotes for CSV output, we need to undo this
SEDCMD-strip_double_quotes= s/""/"/g
transforms.conf:
# the header field form a Splunk CSV export starts with the first row being named after the header _raw. We want to drop these
[drop_header]
INGEST_EVAL = queue=if(_raw="\"_raw\"","nullQueue", queue)
[extract_metadata_copy_to_meta]
# we use REGEX to pop out the values for index, host, sourcetype, and source, we then write them to temporary variables in _meta. We assume that % is not found in the primary keys to optimize the REGEX
# alternatively, this can be done using INGEST_EVAL and split() function!
SOURCE_KEY=_raw
WRITE_META = true
REGEX = ^"\d+(?:\.\d+)?%%%([^%]+)%%%([^%]+)%%%([^%]+)%%%([^%]+)%%%
FORMAT = my_index::"$1" my_host::"$2" my_source::"$3" my_sourcetype::"$4"
[reassign_meta_to_metadata]
# copy the temporary user defined fields into the primary metadata locations and then delete the temporary fields
INGEST_EVAL = host:=my_host, source:=my_source, index:=my_index, sourcetype:=my_sourcetype, my_host:=null(), my_source=null(), my_index:=null(), my_sourcetype:=null()
[remove_metadata_from_raw]
# extract the _raw field from the protocol and write back to _raw
INGEST_EVAL = _raw=replace(_raw, "^[^%]+%%%(?:[^%]+)%%%(?:[^%]+)%%%(?:[^%]+)%%%(?:[^%]+)%%%(.*)\"","\1")
Run the following search:
index=* |eval _raw=_time."%%%".index."%%%".host."%%%".source."%%%".sourcetype."%%%"._raw |table _raw
The protocol uses a %%%
as a separator and orders the data as index
, host
, sourcetype
, source
, and then _raw
. It assumes that the %
character is only found in _raw
to optimize our REGEX
statement.
Extract a REGEX
indexed field
By default, Splunk Enterprise ingests data with its universal indexing algorithm, which is a general-purpose tokenization process based around major and minor breakers. However, some log data is consistently named with value attribute pairs and in this instance, you can use REGEX
transforms with REPEAT_MATCH = trueto
implement something similar to INDEXED_CSV
and INDEXED_JSON
but for logs. You disable major breakers and write REGEX
expressions that find value attribute pairs in the following forms a=”b”, a=b, a=
’b’
and write out a::binto _meta
to create an indexed field with the name a
and value b
. Each of the value attribute pairs can be convert via a REGEX
transform to indexed fields. Lots of log files follow this pattern, including the splunkdmetrics.log
.
props.conf:
# this sourcetype is an example for how we can use REPEAT_MATCH and regex to automatically extract fields from log files
[indexed_log]
TIME_FORMAT = %Y-%m-%d %H:%M:%S
SHOULD_LINEMERGE = false
LINE_BREAKER = ([\n\r]+)
TRANSFORMS-extract_indexed_fields= regex_extract_doubled_quoted_av_pairs, regex_extract_single_quoted_av_pairs, regex_extract_unquoted_av_pairs
transforms.conf:
# this regex finds single quoted attribute value pairs, ie the form a="b", and appends them to _meta
[regex_extract_doubled_quoted_av_pairs] SOURCE_KEY = _raw
REGEX = \s([a-zA-Z][a-zA-Z0-9_-]+)="([^"]+)"
REPEAT_MATCH = true
FORMAT = $1::"$2"
WRITE_META = true
[regex_extract_unquoted_av_pairs]
# this regex finds single quoted attribute value pairs, ie the form a=b, and appends them to _meta
SOURCE_KEY = _raw
REGEX = \s([a-zA-Z][a-zA-Z0-9_-]+)=([^\s"',]+)
REPEAT_MATCH = true
FORMAT = $1::"$2"
WRITE_META = true
[regex_extract_single_quoted_av_pairs]
# this regex finds single quoted attribute value pairs, ie the form a='b', and appends them to _meta
SOURCE_KEY = _raw
REGEX = \s([a-zA-Z0-9_-]+)='([^']+)'
REPEAT_MATCH = true
FORMAT = $1::"$2"
WRITE_META = true
With the fields automatically converted into indexed fields via REGEX
, you can do computation on our log file entirely with tstats
, providing high speed computation.
Being overly precise in the numeric values will bloat the size of the TSIDX
file due to high cardinality. When dealing with high precision metrics, indexes are superior as they store numbers as numbers.
Conduct complex and selective encryption routing
You might need to obfuscate data prior to storage in Splunk Enterprise, but in some scenarios, still give the possibility to reverse the obfuscation. General reporting can occur on obfuscated low security datasets, but a select few can be granted access to high security datasets to perform the reversal. This also allows for different retentions, where for the first 30 days reversal is possible; however, after that, the reversal key is removed. This can be useful in compliance and regulatory use-cases, such as for the financial and health industries, and GDPR. Using INGEST_EVAL
and CLONE_SOURCETYPE
makes this possible.
props.conf:
# data comes in on this sourcetype
[v:email:data:orig]
TRANSFORMS-clone_data = v_hash_make_clone, v_hash_make_mask
# map reference data is created here
[v:email:data:reference_map]
TRANSFORMS-make_map_reference = v_hash_make_map_reference
transforms.conf:
# this clones the event for future processing as reference map event
[v_hash_make_clone]
REGEX = (.*)
CLONE_SOURCETYPE = v:email:data:reference_map
# this re-writes raw to replace email with sha256 hash for “low security” index
[v_hash_make_mask]
INGEST_EVAL = email_hash=sha256(replace(_raw, "^(.*)email=(\S+)(.*)$", "\2")), _raw=replace(_raw, "^(.*email)=(\S+)(.*)$", "\1=".email_hash."\3")
# this transform routes data, and emits reference map _raw for “high security” index
[v_hash_make_map_reference] INGEST_EVAL = index=secure, queue=if(match(_raw, "email="), "indexQueue", "nullQueue"), email_hash=sha256(replace(_raw, "^(.*)email=(\S+)(.*)$", "\2")), _raw="hash=\"".email_hash."\" email=\"".replace(_raw, "^(.*)email=(\S+)(.*)$", "\2")."\""
Additional resources
The content in this article comes from a .Conf20 Talk, one of the thousands of Splunk resources available to help users succeed.