Skip to main content

 

Splunk Lantern

Detecting anomalous customer record lookups with statistical baselines

 

You are part of the Fraud and Risk Team at a major regional bank. The bank manages millions of sensitive customer records accessed daily by various employees, including customer service agents, financial advisors, and compliance officers. Due to the highly regulated nature of the financial services industry, the bank must ensure that all customer record lookups are legitimate and compliant with strict data privacy and security regulations.

You've been tasked with finding lookups that could be indicative of suspicious activity. Traditional static threshold-based monitoring often results in numerous false positives due to natural variations in lookup patterns (with fluctuations caused by market hours, financial product cycles, and customer segments), making it difficult to detect truly suspicious activity such as insider threats, unauthorized access, or compromised user accounts.

Prerequisites

Data required

Example data sources

Data Source Source Type Required Fields
Core banking/CRM audit logs core:app:audit _time, user, action, customer_id, result, role, src_ip
Auth logs iam:auth _time, user, auth_result, src_ip, role, method

Solution

To detect anomalous customer record lookups, we'll follow this process:

  1. Create a statistical baseline of normal user activity to account for natural fluctuations.
  2. Create a search that compares current user activity against the baseline to detect abnormalities.
  3. Perform a drill-down investigation that focuses on suspicious login patterns (like failed-then-successful logins) for high-risk users to confirm potential account compromise.
  4. Pivot to IP geolocation to identify the attack's origin, solidifying evidence of a compromised account.

Step 1: Create a statistical baseline of normal user activity

This search calculates the statistical mean (mu) and standard deviation (sigma) of hourly lookups for each user, segmented by both the hour of the day and the day of the week to account for seasonality in user behavior. The resulting behavioral profile for each user is then stored in a lookup file for future reference.

It is recommended to schedule this search to run one time over a representative historical period, such as the last 30 days, in order to generate the user_lookup_baselines.csv file. This approach ensures that you capture an accurate and up-to-date baseline of what is considered normal activity for your users.

We can use an SPL query like one below to build and save the historical behavioral profile for each user into the lookup file:

index=<your index> sourcetype=core:app:audit action=customer_lookup earliest=-18d@d latest=-7d@d
| eval hod=strftime(_time, "%H"), dow=strftime(_time, "%a")
| bin _time span=1h
| stats count as hourly_lookups BY user hod dow _time
| stats avg(hourly_lookups) AS user_mu stdev(hourly_lookups) AS user_sigma count AS data_points BY user hod dow
| where data_points >= 2
| eval user_sigma = if(user_sigma==0 OR isnull(user_sigma), 1, user_sigma)
| fields user hod dow user_mu user_sigma data_points
| outputlookup user_lookup_baselines.csv

Search explanation

The table below provides an explanation of what each part of this search achieves. You can adjust this query based on the specifics of your environment.

Splunk Search Explanation
index=<your index> sourcetype=core:app:audit action=customer_lookup earliest=-18d@d latest=-7d@d

Searches the specified index and sourcetype for events where the action is customer_lookup. It limits the search to a historical window starting 18 days ago up to 7 days ago, aligned to whole days.

| eval hod=strftime(_time, "%H"), dow=strftime(_time, "%a") Creates two new fields: hod (hour of day) and dow (day of week) from the event timestamp.
| bin _time span=1h Groups events into 1-hour time bins.
| stats count as hourly_lookups by user hod dow _time Counts lookup events per user, per hour of day, per day of week, and per time bin.
| stats avg(hourly_lookups) as user_mu stdev(hourly_lookups) as user_sigma count as data_points by user hod dow Calculates average and standard deviation for each user-hour-day combination.
| where data_points >= 2 Filters for statistically reliable samples.
| eval user_sigma = if(user_sigma==0 OR isnull(user_sigma), 1, user_sigma) Avoids division by zero by setting sigma to 1 if 0 or null.
| fields user hod dow user_mu user_sigma data_points Retains only relevant fields.
| outputlookup user_lookup_baselines.csv Saves to a lookup file for future use.

Step 2: Create a search that compares current user activity against the baseline created

Now that a baseline is defined for your users, you can use the search below to analyze recent activity by comparing it to the baseline and calculating a z-score for each event. The z-score measures how many standard deviations an event deviates from the average.

index=<your index> sourcetype=core:app:audit action=customer_lookup earliest=-7d@d
| fields _time user
| eval hod=strftime(_time, "%H"), dow=strftime(_time, "%a")
| bin _time span=1h
| stats count AS current_lookups BY user _time hod dow
| lookup user_lookup_baselines.csv user hod dow OUTPUT user_mu user_sigma
| where isnotnull(user_mu)
| eval user_sigma = if(user_sigma==0, 1, user_sigma)
| eval z_score = round( (current_lookups - user_mu) / user_sigma, 2)
| where z_score >= 3
| stats
sum(z_score) AS risk_score
count AS anomaly_count
avg(z_score) AS avg_z_score
max(z_score) AS max_z_score
BY user
| eval avg_z_score = round(avg_z_score, 1)
| sort - risk_score
| head 25

Search explanation

The table below provides an explanation of what each part of this search achieves. You can adjust this query based on the specifics of your environment.

Splunk Search Explanation
index=<your index> sourcetype=core:app:audit action=customer_lookup earliest=-7d@d Searches the specified index for events of source type core:app:audit where the action is customer_lookup, limited to the last 7 days starting at midnight 7 days ago.
| fields _time user Keeps only the _time and user fields from the events, discarding others to optimize processing.
| eval hod=strftime(_time, "%H"), dow=strftime(_time, "%a") Creates two new fields: hod (hour of day, 00-23) and dow (day of week abbreviation, e.g., Mon), extracted from the event timestamp _time.
| bin _time span=1h Buckets the _time field into 1-hour intervals, grouping events by hour.
| stats count AS current_lookups BY user _time hod dow Counts the number of events per user per hour (_time), hour of day (hod), and day of week (dow), naming this count current_lookups.
| lookup user_lookup_baselines.csv user hod dow OUTPUT user_mu user_sigma Enriches the results by looking up baseline statistics (user_mu = average lookups, user_sigma = standard deviation) for each user, hour of day, and day of week from the CSV file user_lookup_baselines.csv.
| where isnotnull(user_mu) Filters out records where no baseline data exists (e.g., user_mu is null).
| eval user_sigma = if(user_sigma==0, 1, user_sigma) Replaces any zero standard deviation (user_sigma) with 1 to avoid division by zero in subsequent calculations.
| eval z_score = round( (current_lookups - user_mu) / user_sigma, 2) Calculates the z-score for the current lookups, measuring how many standard deviations the current count deviates from the baseline average, rounded to two decimal places.
| where z_score >= 3 Filters to keep only records where the z-score is 3 or higher, indicating significant anomalies.
| stats sum(z_score) AS risk_score count AS anomaly_count avg(z_score) AS avg_z_score max(z_score) AS max_z_score BY user Aggregates anomaly statistics per user: total risk score (sum of z-scores), count of anomalies, average z-score, and maximum z-score.

| eval avg_z_score = round(avg_z_score, 1)

Rounds the average z-score to one decimal place for readability.
| sort - risk_score Sorts the users in descending order by their risk score, so the highest risk users appear first.
| head 25 Limits the output to the top 25 users with the highest risk scores.

Upon review of the search results, we see an outlier - USER-0075(Alice) with a risk score of 370.51, which is 200% more than the next highest risk score.

clipboard_ed73bbd8fdbf7650ba463347db6078c1b.png

If we switch to a visual view of our results, we can see that USER-0075(Alice) stands out above all the others.

clipboard_eb31e4b50793292db6294cc8aebc912e2.png

This warrants further investigation to determine the underlying factors and understand the specific type of risk associated with this user.

Step 3: Perform a drill-down investigation

We'll start by investigating Alice’s login activity, focusing on instances where a successful login follows a failed attempt within a short time frame. We can use the following SPL query to do this:

index=<your index> sourcetype=iam:auth user="USER-0075" earliest=-4d@d latest=-2d@d
| where auth_result="failure" OR auth_result="success"
| eval event_time_epoch = _time
| transaction user src_ip \
startswith="auth_result=failure" \
endswith="auth_result=success" \
maxpause=5m \
mvlist=true
| where eventcount > 1
| eval time_interval_secs = round(duration)
| eval combined_events = mvzip(event_time_epoch, auth_result)
| mvexpand combined_events
| eval temp_fields = split(combined_events, ",")
| eval event_time_epoch = mvindex(temp_fields, 0)
| eval auth_result = mvindex(temp_fields, 1)
| eval event_time = strftime(event_time_epoch, "%Y-%m-%d %H:%M:%S")
| table event_time user src_ip auth_result time_interval_secs

Search explanation

The table below provides an explanation of what each part of this search achieves. You can adjust this query based on the specifics of your environment.

Splunk Search Explanation

index=<your_index> sourcetype=iam:auth user="USER-0075" earliest=-4d@d latest=-2d@d

Searches the specified index for authentication events from user "USER-0075" within the time range starting 4 days ago at midnight up to 2 days ago at midnight.

| where auth_result="failure" OR auth_result="success"

Filters events to include only those where the authentication result was either "failure" or "success".

| eval event_time_epoch = _time

Creates a new field event_time_epoch duplicating the event timestamp _time for later use.

| transaction user src_ip \ startswith="auth_result=failure" \ endswith="auth_result=success" \ maxpause=5m \ mvlist=true

Groups events into transactions per user and source IP where each transaction starts with a failure event and ends with a success event, with a maximum allowed pause of 5 minutes between events. The mvlist=true option keeps the list of events in the transaction.

| where eventcount > 1

Filters to keep only transactions that contain more than one event (that is, at least one failure followed by a success).

| eval time_interval_secs = round(duration)

Calculates the duration of each transaction in seconds, rounded to the nearest whole number, and stores it in time_interval_secs.

| eval combined_events = mvzip(event_time_epoch, auth_result)

Combines the multivalue fields event_time_epoch and auth_result into a single multivalue field combined_events, pairing corresponding elements.

| mvexpand combined_events

Expands the multivalue field combined_events so that each pair becomes a separate event row.

| eval temp_fields = split(combined_events, ",")

Splits each combined_events string into a multivalue field temp_fields by the comma delimiter, separating the timestamp and auth result.

| eval event_time_epoch = mvindex(temp_fields, 0)

Extracts the first element (timestamp) from temp_fields and assigns it back to event_time_epoch.

| eval auth_result = mvindex(temp_fields, 1)

Extracts the second element (authentication result) from temp_fields and assigns it back to auth_result.

| eval event_time = strftime(event_time_epoch, "%Y-%m-%d %H:%M:%S")

Converts the epoch timestamp event_time_epoch into a human-readable date-time string event_time.

| table event_time user src_ip auth_result time_interval_secs

Displays the final output table with columns: event time, user, source IP, authentication result, and the time interval in seconds for the transaction.

Upon running the search, we get the following results:

clipboard_e36e145ce4d7f8898c5e6a599c4b4798d.png

The search reveals a failed login followed by a successful one in the span of one minute, right before the anomalous activity began.

We have found a suspicious login patterns tied to USER-0075(Alice). This pattern is indicative of a potentially compromised account. Now, let’s review the IP addresses associated with the login events to confirm our suspicions.

Step 4: Pivot to IP geolocation

Let’s investigate the associated IPs with USER-0075 by using the built-in SPL command iplocationto examine where in the world these logins are coming from:

index=<your index> sourcetype=iam:auth src_ip="120.80.80.120" earliest=-4d@d latest=-2d@d
| iplocation src_ip
| table _time user src_ip auth_result City Country Region

Search explanation

The table below provides an explanation of what each part of this search achieves. You can adjust this query based on the specifics of your environment.

Splunk Search Explanation
index=<your index> sourcetype=iam:auth src_ip="120.80.80.120" earliest=-4d@d latest=-2d@d Searches the specified index for authentication events from the source IP address "120.80.80.120" within the time range starting 4 days ago at midnight up to 2 days ago at midnight.
| iplocation src_ip Enriches the events by adding geographic location fields (such as City, Country, Region) based on the source IP address.
| table _time user src_ip auth_result City Country Region Selects and displays only the specified fields in the output table: event time, user, source IP, authentication result, and the geographic location fields City, Country, and Region.

Upon running the search, we get the following results:

clipboard_ed7858d768cc41a0b9292ddfcb3b9c239.png

These results seem suspicious to you - as a regional bank, you'd expect to see customer record lookup attempts originating from within your area specifically, and certainly not from China. You can conclude that the account for USER-0075(Alice) is indeed compromised. Not only do you know that the account is compromised, you know which account, when this started, and where the attack originated from.

Next steps

This use case provides a powerful foundation for insider threat detection. To further enhance this capability, consider the following:

  • Integrate with risk-based alerting (RBA): Instead of alerting directly, you can have this detection contribute to a user's risk score in Splunk Enterprise Security. This allows you to correlate this behavior with other suspicious activities (such as data loss prevention alerts, or unusual VPN access) to generate higher-fidelity alerts.
  • Automate response: Use Splunk SOAR to automate response actions when a high-confidence alert triggers. This could include creating a ticket in a case management system, notifying the user's manager, or temporarily restricting access pending investigation.
  • Tune thresholds: The z_score >= 3 threshold is a good starting point. Monitor the alerts and adjust this value based on your organization's risk tolerance and the fidelity of the alerts to find the right balance.

Additionally, these resources might help you understand and implement this guidance:

To learn more about how to detect anomalous customer record access, contact your Splunk sales representative today. You can contact your account team through the Contact Us page.

Splunk OnDemand Services: Use these credit-based services for direct access to Splunk technical consultants with a variety of technical services from a pre-defined catalog. Most customers have OnDemand Services per their license support plan. Engage the ODS team at ondemand@splunk.com if you would like assistance.