Skip to main content
 
 
Splunk Lantern

Enriching data via real-time threat detection with KV Store lookups in Edge Processor

 

Many organizations struggle with efficiently identifying and responding to threats hiding within their networks. Traditional methods, which rely on real-time searches for malicious indicators, often lead to performance bottlenecks and delayed responses. Because of this, a more streamlined process is often needed - one that allows cross-referencing of event fields, such as IP addresses and hostnames, against repositories with known cybersecurity threats.

This article shows you how to focus on using threat intelligence to not only identify risks but also transform data before it reaches its destination. This effectively reduces system strain and expedites threat response, allowing for proactive protection in a digital landscape where staying one step ahead of malicious actors is critical.

Solution

Using event fields present in ingested data means that malicious activity can be preemptively identified and flagged. The data enrichment process can be performed within your environment by leveraging both on-premises and cloud-based Splunk products. Some key components of this solution are:

  • Splunk Edge Processor: Optimizes data collection and analysis by processing, filtering, and enriching data at its source, reducing bandwidth and storage needs. For more information, see About the Edge Processor solution
  • KV Store: Allows users to store, retrieve, and manipulate structured data within the Splunk platform, using a key-value mechanism for efficient data handling. For more information, see About the app key value store.
  • lookup*: SPL command used to enrich event data by matching and adding fields from external data sources, such as CSV files or databases. For more information, see Search reference: lookup.

Lookups are used in a variety of contexts outside of the case highlighted in this article. Beyond cybersecurity, they can help in tasks such as converting machine-generated code to human-readable text, translating error codes, resolving user IDs from names, etc. Lookups are a great solution in many scenarios where matching and enriching data from different sources is required.

To successfully implement this approach, you'll need to maintain a KV Store collection in Splunk Cloud Platform, where aggregated threat intelligence data is used to construct a list of known threats (for example, IP addresses or hostnames). After this collection has been created, it can then be synced and used alongside your Splunk Edge Processor instance to preemptively tag malicious data before routing it to its destination. By processing and flagging threats at the edge, you can significantly reduce the volume of data sent to central servers for analysis. This method not only accelerates the detection and mitigation of threats but also conserves network bandwidth and storage resources.

Prior to data ingest, a pipeline should be designed and applied to each Splunk Edge Processor instance to determine whether any of an incoming event’s fields show signs of malicious or adversarial intent. To do this, you'll need to extract relevant fields from the event data (such as source or destination IP addresses) and query the KV Store to check for similarities. The lookup command helps you do this as it’s designed to match field-value pairs in the source data to those persisted in the specified lookup dataset (the KV Store collection, in this case). If a match is found, indicating a potential threat, the pipeline can then append an additional field to the event, such as is_malicious, which flags it for further analysis or triggering targeted security workflows. If the tagged data is sent to a Splunk indexer, for instance, you could condition your search query on is_malicious=true rather than having to perform an intensive search against lists with millions of unwanted values.

As a result, the cooperation between Splunk Edge Processor and KV Store lookups not only enhances real-time threat detection, but also optimizes data processing by filtering and enriching data before it leaves your environment. This cooperation also facilitates the integration of threat intelligence feeds into the detection process, providing a more robust security infrastructure. See the diagram below for more detail.

How can I get this set up in my environment?

  • The following steps are based on the assumption that you have already established and configured KV Store lookups in your Splunk Cloud Platform environment. If you have not already done so, please be sure to follow the step-by-step instructions provided in the related documentation.
  • Similarly, if your KV Store collection hasn’t yet been populated with the threat intelligence data necessary to make this solution useful, consider using either the REST API or outputlookup command to do so. Generally speaking, Splunk engineers have found that the easiest approach involves converting your data to a list of comma-separated values (CSV format), then using outputlookup to write from a CSV file to a KV Store collection.

To start using Edge Processor to perform real-time threat detection, follow the steps provided below:

  1. Log into your provided tenant to be taken to the Data Management page.
  2. In the top right-hand corner, click the gear icon and click System connections from the drop-down menu.
  3. In the connection panel titled scpbridge, click the refresh icon ⟳.


Your Splunk Edge Processor tenant now has access to all previously-created lookup datasets in your Splunk Cloud Platform instance, and will continue to periodically download updates to these datasets as necessary. It’s important to note that this step will need to be repeated every time a new lookup dataset is created (but not when one is updated) and required by the Edge Processor.

First, create the Edge Processor:

  1. Initiate the creation of a new Edge Processor by clicking the Edge Processors tab on the left-hand side of the screen, then click the New Edge Processor button in the top right corner.
  2. In the designated input fields, enter a name and optional description for your instance. To further specify a default destination for unprocessed data, select the To a default destination tab and choose a destination from the dropdown menu.
  3. To enable receivers so that your Splunk instance is able to ingest data from specific inputs, select values from the section titled Receive data from these inputs.
  4. If you wish to use TLS to secure communications between your instance and its data sources, then do the following:
    1. In the Use TLS with these inputs section, select the inputs for which you want to use TLS encryption.
    2. Upload PEM files containing the appropriate certificates in the Server private key, Server certificate, and CA certificates input fields.

Next, install an instance locally:

  1. Under the Edge Processors tab, double-click the table entry for the Splunk Edge Processor you created previously.
  2. To locate and copy the installation commands, click the Actions dropdown menu in the top right corner, then the Manage instances option, and then Install/uninstall.
  3. On the machine in which the instance will be hosted, open the command-line interface, navigate to the desired target directory, and run the commands copied previously. This should create a splunk-edge/ folder in your working directory.
  4. To verify the instance was installed successfully, return to your tenant’s web UI, click the Manage instances area seen previously, and confirm that a new instance has been created with a Healthy status. This status might take a minute or two to update.

3. Create and apply a pipeline used to detect malicious activity

  1. Navigate to the Pipelines tab in the rightmost section of your tenant’s web UI and click on the New pipeline button in the top right corner.
  2. Define your pipeline’s partition bysourcetype, source, or host, depending on the fields present in your ingested data. Which you select doesn’t necessarily matter, but it’s important that the data being sent through your Edge Processor contains whichever partition you choose (for example,sourcetype=example). Otherwise, the pipeline you'll set up in the following steps will not be used to transform and route your data.


For the sake of illustration, this article focuses on constructing a pipeline tailored toward analyzing ingested Windows event 5156 data - a critical component in Windows security logging. This event data provides detailed information about network connections essential for monitoring network activity and identifying potential security threats, and is often used in enterprise security management for tracking the flow of data across the network, ensuring compliance with security policies, and detecting unusual or suspicious connections that could indicate a breach or cyberattack. For reference, the general format of this log is outlined below:

Log Name:      Security
Source:        Microsoft-Windows-Security-Auditing
Date:          [Date and Time]
Event ID:      5156
Task Category: Filtering Platform Connection
Level:         Information
Keywords:      Audit Success
User:          N/A
Computer:      [Computer Name]

Description:
The Windows Filtering Platform has permitted a connection.

Application Information:
    Process ID:             [Process ID]
    Application Name:       [Application Name]
    
Network Information:
    Direction:              [Inbound/Outbound]
    Source Address:         [Source IP]
    Source Port:            [Source Port]
    Destination Address:    [Destination IP]
    Destination Port:       [Destination Port]
    Protocol:               [Protocol Number]
 
 Filter Information:
    Filter Run-Time ID:     [Filter Run-Time ID]
    Layer Name:             [Layer Name]
    Layer Run-Time ID:      [Layer Run-Time ID]

In the context of identifying malicious activity, your pipeline will be concerned with the source and destination addresses provided. In order to extract this information, you can use regular expressions in conjunction with the rex command, as shown below:

$pipeline = from $source
    | rex field=_raw /Source Address:\s+(?P<src_ip>\d+.\d+.\d+.\d+)/
    | rex field=_raw /Destination Address:\s+(?P<dest_ip>\d+.\d+.\d+.\d+)/
    | into $destination

Now that you have the source and destination addresses stored in src_ip and dest_ip, respectively, you can query the KV Store collection that you set up previously to determine whether or not the event is of potentially malicious nature. For the purpose of this walkthrough, assume that the collection in place contains a simple mapping of IP addresses and hostnames, though the implementation on your end might differ. In other words, the KV Store should resemble the following:

ip_address hostname

179.235.83.156

laptop-39.jones.org

163.233.123.149

db-79.stone.net

59.62.137.192

email-81.reed.com

70.114.225.2

web-80.roberts-porter.com

90.134.10.125

web-59.nichols.com

20.136.249.67

laptop-70.armstrong.net

11.69.111.173

srv-33.thomas.info

217.177.67.42

lt-97.mr-brown.com

149.254.211.162

web-70.hall.com

165.241.43.213

ferguson-stephens.info

···

···

In order to properly cross-reference your extracted IP addresses against the ip_address column stored in the collection, you'll need to utilize the lookup command. The corresponding SPL2 should look like this:

import 'threat_intel_kv_col’ from /envs/splunk/[your-splunk-cloud-id]/lookups

$pipeline = from $source
    | rex field=_raw /Source Address:\s+(?P<src_ip>\d+.\d+.\d+.\d+)/
    | rex field=_raw /Destination Address:\s+(?P<dest_ip>\d+.\d+.\d+.\d+)/
    | lookup ‘threat_intel_kv_col’ ip_address as src_ip OUTPUT ip_address AS match_src
    | lookup ‘threat_intel_kv_col’ ip_address as dest_ip OUTPUT ip_address AS match_dest
    | into $destination

Each of the following components play a critical role in ensuring that your pipeline is able to query the KV Store as desired:

  • import: The import command is responsible for fetching the specified collection and pulling it into your current working environment. In this case, threat_intel_kv_col is the name of the collection that contains your threat intelligence data, and is referenced in the lookup commands that follow.
  • lookup: The lookup command is used to enrich the event data by cross-referencing it with external data sources - in this case, the imported collection. In the SPL2 shown above, the src_ip and dest_ip fields extracted previously are compared against the ip_address values stored in the collection. They output a non-null match_src or match_dest values if and only if a match is successfully identified.

Using the dummy data presented in the table above, suppose you have the following pipeline:

import 'threat_intel_kv_col’ from /envs/splunk/[id]/lookups

$pipeline = from $source
    | eval example_ip1=“179.235.83.156”, example_ip2=”0.0.0.0”
    | lookup 'threat_intel_kv_col’ ip_address as example_ip1
        OUTPUT ip_address AS match1
    | lookup 'threat_intel_kv_col’ ip_address as example_ip2
        OUTPUT ip_address AS match2
    | into $destination

In this case, the match1 and match2 fields assume values of 179.235.83.156 and null, respectively, because the value stored in example_ip1 exists in the collection whereas example_ip2 does not.

Now that you’ve identified whether the source and destination IPs are of a malicious nature, all that remains is to enrich the event’s data by appending an is_malicious field, as well as remove any unnecessary fields created along the way. The SPL2 should look like this:

import 'threat_intel_kv_col’ from /envs/splunk/[id]/lookups

$pipeline = from $source
    | rex field=_raw /Source Address:\s+(?P<src_ip>\d+.\d+.\d+.\d+)/
    | rex field=_raw /Destination Address:\s+(?P<dest_ip>\d+.\d+.\d+.\d+)/
    | lookup 'threat_intel_kv_col’ ip_address as src_ip OUTPUT ip_address AS match_src
    | lookup 'threat_intel_kv_col’ ip_address as dest_ip OUTPUT ip_address AS match_dest
    | eval is_malicious=isnotnull(match_src) OR isnotnull(match_dest)
    | fields - src_ip, dest_ip, match_src, match_dest
    | into $destination

The is_malicious field is appended to the event data as false only when both lookup commands are unable to find a match - in other words, when the outputted match_src and match_dest fields assume null values. Finally, before routing the resulting event to the specified destination, the fields command is used to remove the extracted and intermediate fields created previously. This is not a necessary step; however, it helps to eliminate redundant or unnecessary information, optimizing the event for further processing or analysis. This practice is useful in environments where data cleanliness and efficiency are important, as it ensures that only the most relevant and actionable information is retained.

Next steps

By integrating Splunk Edge Processor with Splunk's KV Store, you can effectively utilize lookups to cross-reference threat intelligence data, enhancing your ability to detect and respond to potential cybersecurity threats in a timely and efficient manner. This strategy not only accelerates the identification of potential security threats but also improves upon the efficiency of data processing in your environment.

Join the #edge-processor Slack channel for direct support with Splunk Edge Processor (request access: http://splk.it/slack). Then, review the additional resources below to help you better understand and implement this use case:

You should also review these additional use cases on Splunk Lantern: