Skip to main content

 

Splunk Lantern

Improving data pipeline processing in Splunk Enterprise

 

Trying to modify splunkd using the props.conf and transforms.conf files is not simple. However, you know that applying EVAL logic to and performing regex extractions on pipeline data allows you to change the value of a field to provide more meaningful information, extract interesting nested fields into top-level fields, and redact certain information from the data. You want to learn how to apply these Splunk transforms during event parsing on an indexer or heavy forwarder.

Altering data ingestion rules is risky. Only advanced Splunk users should do so, and they should always develop on their laptop. When using the configuration samples shown here, you may need to change parameters and values, according to your configuration. In addition, follow these guidelines to help ensure you implement safe transforms:

  • Use Visual Studio Code with the Splunk extension to manage configs.
  • Use the reload URL http://localhost:8000/en-US/debug/refresh instead of restarting your splunkd after making changes.
  • Use _index_earliest and _index_latest to view recently ingested data
  • Use the [copy_to_meta] transform to debug.

Determine license usage

The license_usage.log file is verbose, however due to squashing of host and source values, you might not be able to determine the total license usage from specific hosts. To report license usage of specific hosts or source fields, you can useINGEST_EVAL to compute the string length for each event and write it to an indexed field.

props.conf:
[default]
TRANSFORM-z-last_transform = add_raw_length_to_meta_field

transforms.conf:
[add_raw_length_to_meta_field]
INGEST_EVAL = event_length=len(_raw)

Then run the following search to perform license calculation:

| tstats sum(event_length) AS total_ingestion WHERE index=* host=<hosts_to_measure> _index_earliest=-30d@d _index_latest=-1d@d BY host _time span=1d@d 
| xyseries _time sourcetype total_ingestion 

You can split the results by other fields, such as host or source, as needed.

Estimate data requirements

Prior to a net new data source being onboarded, additional context might be required to ensure sufficient compute and storage is available. You can use INGEST_EVAL and CLONE_SOURCETYPE functionality to emit metrics events that can describe the data coming in.

The metric event takes up to 150 bytes of license.

props.conf:
# data comes in on this sourcetype
[v:orig:data]
# this configuration is universal, and can be reused
TRANSFORMS-enable_estimate_mode_drop_orig = v_estimation_set_metrics, v_estimation_create_metrics, v_estimation_drop_orig

# metrics of metadata are created on this sourcetype
[v:estimate:pipeline]
TRANSFORMS-set_metric_name = v_estimation_metric_info

transforms.conf:
# clone original data, transform it into metrics events
[v_estimation_create_metrics]
REGEX = (.*) CLONE_SOURCETYPE = v:estimate:pipeline

# create metadata about the event, preserve original attributes
# these fields become metric dimensions!
[v_estimation_set_metrics]
INGEST_EVAL = orig_host=host, orig_source=source, orig_sourcetype=sourcetype, orig_index=index

# we do not need to keep the original data, we only want the metadata, so let’s drop it
[v_estimation_drop_orig]
INGEST_EVAL = queue="nullQueue"

# format event into a metric, route it to appropriate metrics index
[v_estimation_metric_info]
INGEST_EVAL = index="<name_of_your_metrics_index_to_write_to>", metric_name="estimation_mode", _value=len(_raw)

Then run the following search to see the estimation:

|mstats prestats=t max(_value) avg(_value) 
     WHERE index=data_metrics AND metric_name="estimation_mode" 
     BY orig_sourcetype span=5m 
|timechart max(_value) avg(_value) BY orig_sourcetype

Selective routing to other destinations

Sometimes data need to be shared with other destinations, Splunk Enterprise, or third party systems. Use cases can require all or a subset of data to be shared. INGEST_EVAL can be used to control data routing groups.

props.conf:
# data comes in on this sourcetype
[v:interesting:data]
TRANSFORMS-example_data_route = v_sample_route_buttercup_bu

transforms.conf:
# if event data host is from buttercup1 OR buttercup2
# route data to {{cloud}}, otherwise send it to onprem indexers
[v_sample_route_buttercup_bu]
INGEST_EVAL = _TCP_ROUTING=if(match(host, "buttercup[12]"), "splunkcloud_indexers", "splunk_onprem_indexers")

outputs.conf:
[tcpout]
defaultGroup = splunk_onprem_indexers

# this output group routes data to {{cloud}}
[tcpout:splunkcloud_indexers]
server = inputs.buttercup.splunkcloud.com:9997

# this output group keeps data on-prem
[tcpout:splunk_onprem_indexers]
server = 10.10.10.10:9997

Manage conflicting time formats

Any well-curated Splunk Enterprise instance uses sourcetype to accurately identify the event format timestamp. However, collisions occasionally occur in a single source type where there are conflicting timestamp formats.INGEST_EVAL offers a new approach of using the strptime() function to solve this problem.

Example Events:

Screenshot 2025-04-24 at 3.52.09 PM.png

props.conf:
[demutliplexed_datetime_formats]
DATETIME_CONFIG = CURRENT
TRANSFORMS-extract_date = demultiplex_datetime

transforms.conf:
[demultiplex_datetime]
# add fall-through case to set custom date or route “unknown” data to special quarantine index
INGEST_EVAL= _time=case(isnotnull(strptime(_raw, "%c")), strptime(_raw, "%c"), isnotnull(strptime(_raw, "%H:%M:%S %y-%m-%d")),strptime(_raw, "%H:%M:%S %y-%m-%d"), isnotnull(strptime(_raw, "%Y-%m-%d %H:%M:%S")), strptime(_raw, "%Y-%m-%d %H:%M:%S"))


This example initially sets the time of the event to be the current time. After this, you use a transform to try to replace that time by testing the known time formats using a case statement and picking the first that matches. This is not very computationally efficient as it invokes strptime multiple times, but you are able to get the answer in a single invocation of INGEST_EVAL.

Extract the time and date from the file name

Sometimes, in edge cases, the date is captured as part of the file name and the time is only logged in the event. Previously, you would need to use datetime_config.xml and hope for the best. With INGEST_EVAL, you can tackle this problem more elegantly.

props.conf:
[compound_date_time]
DATETIME_CONFIG = CURRENT
TRANSFORMS-get-date = construct_compound_date
SHOULD_LINEMERGE = false
LINE_BREAKER = ([\n\r]+)

transforms.conf:
# use regex replace to pop out the date form the source, append on the first 10 chars from _raw and then run through strftime and assign the result to _time. If the eval fails to execute _time is not updated and the previously set CURRENT time will remain.
[construct_compound_date]
INGEST_EVAL=_time=strptime(replace(source,".*/(20\d\d\-\d\d\-\d\d)\.log","\1").substr(_raw,0,10),"%Y-%m-%d%H:%M:%S")

The regex replace pops out the date from the source, appends the first 10 chars from _raw, and then runs through strftime and assigns the result to _time. If the eval fails to run, _time is not updated and the previously set CURRENT time remains.

Sample events

Consider a web server that generates thousands of events per second. You only care about errors and the ratio of errors to OK. You want to sample the OK events and provide high resolution for errors.

props.conf:
# data comes in on this sourcetype
[v:orig:data]
TRANSFORMS-sample_200_data = v_sample_200_data

transforms.conf:
# will look for events with status code 200 AND random number not equal to zero
# if true, drop the data
# if false, example will keep *roughly* one event out of 100
[v_sample_200_data]
INGEST_EVAL = queue=if(match(_raw, "status=200") AND (random()%100)!=0, "nullQueue", "indexQueue")

Drop fields from INDEXED_CSV

Both INDEXED_CSV and INDEXED_JSON are useful, but create indexed fields for every column or element. This can inflate your TSIDX size and increase disk usage. Sometimes you need a subset of these fields for fast search but want to have the remaining available via schema on the fly.

props.conf:
[reduced_columns]
DATETIME_CONFIG = CURRENT
INDEXED_EXTRACTIONS = CSV
TRANSFORMS-drop_fields = drop_useless_fields
EXTRACT-removed-columns = [^,]+,[^,]+,[^,]+,(?<random_nonsense>[^,]+),(?<long_payload>[^,]+)

transforms.conf:
[drop_useless_fields]
# note the := syntax
INGEST_EVAL = repeated_field:=null(), random_nonsense:=null(), long_payload:=null()

Export and import data

Sometimes you would like to bulk export data from an existing Splunk Enterprise index and reingest it on your laptop for development. This pattern allows you to run a search that extracts data from an install via CSV export and import it again via a specific source type. This is achieved by creating a “protocol” for encoding via search, and then decoding via transforms. Note that this does not reparse data or carry any indexed fields across.

props.conf:
[import_data] DATETIME_CONFIG = CURRENT
TRANSFORMS-extract-metadata = drop_header, extract_metadata_copy_to_meta, reassign_meta_to_metadata, remove_metadata_from_raw
# Splunk encodes quotes for CSV output, we need to undo this
SEDCMD-strip_double_quotes= s/""/"/g

transforms.conf:
# the header field form a Splunk CSV export starts with the first row being named after the header _raw. We want to drop these
[drop_header]
INGEST_EVAL = queue=if(_raw="\"_raw\"","nullQueue", queue)

[extract_metadata_copy_to_meta]
# we use REGEX to pop out the values for index, host, sourcetype, and source, we then write them to temporary variables in _meta. We assume that % is not found in the primary keys to optimize the REGEX
# alternatively, this can be done using INGEST_EVAL and split() function!
SOURCE_KEY=_raw
WRITE_META = true
REGEX = ^"\d+(?:\.\d+)?%%%([^%]+)%%%([^%]+)%%%([^%]+)%%%([^%]+)%%%
FORMAT = my_index::"$1" my_host::"$2" my_source::"$3" my_sourcetype::"$4"

[reassign_meta_to_metadata]
# copy the temporary user defined fields into the primary metadata locations and then delete the temporary fields
INGEST_EVAL = host:=my_host, source:=my_source, index:=my_index, sourcetype:=my_sourcetype, my_host:=null(), my_source=null(), my_index:=null(), my_sourcetype:=null()

[remove_metadata_from_raw]
# extract the _raw field from the protocol and write back to _raw
INGEST_EVAL = _raw=replace(_raw, "^[^%]+%%%(?:[^%]+)%%%(?:[^%]+)%%%(?:[^%]+)%%%(?:[^%]+)%%%(.*)\"","\1")

Run the following search:

index=* 
|eval _raw=_time."%%%".index."%%%".host."%%%".source."%%%".sourcetype."%%%"._raw 
|table _raw

The protocol uses a %%% as a separator and orders the data as index, host, sourcetype, source, and then _raw. It assumes that the % character is only found in _raw to optimize our REGEX statement.

Extract a REGEX indexed field

By default, Splunk Enterprise ingests data with its universal indexing algorithm, which is a general-purpose tokenization process based around major and minor breakers. However, some log data is consistently named with value attribute pairs and in this instance, you can use REGEX transforms with REPEAT_MATCH = trueto implement something similar to INDEXED_CSV and INDEXED_JSON but for logs. You disable major breakers and write REGEX expressions that find value attribute pairs in the following forms a=”b”, a=b, a=’b’and write out a::binto _meta to create an indexed field with the name a and value b. Each of the value attribute pairs can be convert via a REGEX transform to indexed fields. Lots of log files follow this pattern, including the splunkdmetrics.log.

props.conf:
# this sourcetype is an example for how we can use REPEAT_MATCH and regex to automatically extract fields from log files
[indexed_log]
TIME_FORMAT = %Y-%m-%d %H:%M:%S
SHOULD_LINEMERGE = false
LINE_BREAKER = ([\n\r]+)
TRANSFORMS-extract_indexed_fields= regex_extract_doubled_quoted_av_pairs, regex_extract_single_quoted_av_pairs, regex_extract_unquoted_av_pairs

transforms.conf:
# this regex finds single quoted attribute value pairs, ie the form a="b", and appends them to _meta
[regex_extract_doubled_quoted_av_pairs] SOURCE_KEY = _raw
REGEX = \s([a-zA-Z][a-zA-Z0-9_-]+)="([^"]+)"
REPEAT_MATCH = true
FORMAT = $1::"$2"
WRITE_META = true

[regex_extract_unquoted_av_pairs]
# this regex finds single quoted attribute value pairs, ie the form a=b, and appends them to _meta
SOURCE_KEY = _raw
REGEX = \s([a-zA-Z][a-zA-Z0-9_-]+)=([^\s"',]+)
REPEAT_MATCH = true
FORMAT = $1::"$2"
WRITE_META = true

[regex_extract_single_quoted_av_pairs]
# this regex finds single quoted attribute value pairs, ie the form a='b', and appends them to _meta
SOURCE_KEY = _raw
REGEX = \s([a-zA-Z0-9_-]+)='([^']+)'
REPEAT_MATCH = true
FORMAT = $1::"$2"
WRITE_META = true

With the fields automatically converted into indexed fields via REGEX, you can do computation on our log file entirely with tstats, providing high speed computation.

Being overly precise in the numeric values will bloat the size of the TSIDX file due to high cardinality. When dealing with high precision metrics, indexes are superior as they store numbers as numbers.

Conduct complex and selective encryption routing

You might need to obfuscate data prior to storage in Splunk Enterprise, but in some scenarios, still give the possibility to reverse the obfuscation. General reporting can occur on obfuscated low security datasets, but a select few can be granted access to high security datasets to perform the reversal. This also allows for different retentions, where for the first 30 days reversal is possible; however, after that, the reversal key is removed. This can be useful in compliance and regulatory use-cases, such as for the financial and health industries, and GDPR. Using INGEST_EVAL and CLONE_SOURCETYPE makes this possible.

props.conf:
# data comes in on this sourcetype
[v:email:data:orig]
TRANSFORMS-clone_data = v_hash_make_clone, v_hash_make_mask

# map reference data is created here
[v:email:data:reference_map]
TRANSFORMS-make_map_reference = v_hash_make_map_reference

transforms.conf:
# this clones the event for future processing as reference map event
[v_hash_make_clone]
REGEX = (.*)
CLONE_SOURCETYPE = v:email:data:reference_map

# this re-writes raw to replace email with sha256 hash for “low security” index
[v_hash_make_mask]
INGEST_EVAL = email_hash=sha256(replace(_raw, "^(.*)email=(\S+)(.*)$", "\2")), _raw=replace(_raw, "^(.*email)=(\S+)(.*)$", "\1=".email_hash."\3")

# this transform routes data, and emits reference map _raw for “high security” index
[v_hash_make_map_reference] INGEST_EVAL = index=secure, queue=if(match(_raw, "email="), "indexQueue", "nullQueue"), email_hash=sha256(replace(_raw, "^(.*)email=(\S+)(.*)$", "\2")), _raw="hash=\"".email_hash."\" email=\"".replace(_raw, "^(.*)email=(\S+)(.*)$", "\2")."\""

Additional resources

The content in this article comes from a .Conf20 Talk, one of the thousands of Splunk resources available to help users succeed.