Skip to main content

 

Splunk Lantern

Examining data definition language operations in GCP CloudSQL

Identifying data definition language (DDL) operations will help track users who modify the database schema.

If you haven't already configured your Splunk platform deployment to ingest GCP Cloud SQL logs from Postgres, MySQL, and SQL Server databases, review Monitoring Google Cloud SQL first.

Data required

How Splunk software can help with this use case

Use the following general base search to get the right source type, source, and specific database (if desired).

index="gcp-auditlogs" 
    sourcetype="google:gcp:pubsub:platform"
    source="projects/project_id/subscriptions/cloudsql_auditlogs
_sub_id" 
    "data.resource.labels.database_id"="gcp-appdcloudplatfo-nprd
-id:db_name"

Postgre SQL

Set the value of the “log_statement” flag to one of ddl, mod, or all in order to capture DDL operation statements.

  • log_statement enumeration
    • none | ddl | mod | all
    • Set to mod to log all Data definition language (DDL) statements, plus data-modifying statements such as INSERT, UPDATE, DELETE, TRUNCATE
    • The default is none.

For more information, see Error reporting and loggin, log_statement (enum).

<base search>
| spath data.textPayload 
| rename data.textPayload as textPayload 
| rex field=textPayload "db=(?<db_name>[^,]+),user=(?<db_user>[^,]+),host=(?<db_host>\S+)" 
| rex field=textPayload "duration:\s(?<duration>\d+\.\d+)\sms" 
| rex field=textPayload "statement:\s(?<sql_statement>.*)" 
| rex field=sql_statement "^(?<sql_operation>\w+)" 
| where match(sql_operation, "(?i)^(CREATE|ALTER|DROP|TRUNCATE|RENAME)")
| table _time, db_name, db_user, db_host, duration, sql_statement, sql_operation, textPayload

MySQL

This search parses the data.textPayload field of audit logs to identify the Client IP, user, and operation run in the database.

<base_search>
| spath data.textPayload
| rex field=data.textPayload "^(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}Z)\s+(?P<user>\w+)\[\w+\]\s+@\s+\[(?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\]\d+\s+\d+\s+(?P<operation_type>\w+)\s+(?P<sql_command>.*)$"
| table _time, user, ip, operation_type, sql_command, data.textPayload

The output of this search would look like the following table:

_time

user

ip

operation_type

sql_command

data.textPayload

2025-08-13 15:24:00.597

root

10.128.0.50

Query

CREATE DATABASE test

2025-08-13T15:24:00.597672Z root[root] @ [10.128.0.50]492834 979970843 Query CREATE DATABASE test

Using these results, we can write a SPL statement that matches DDL operation keywords in the SQL_COMMAND field.

<base_search>
    | spath data.textPayload
    | rex field=data.textPayload "^(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}Z)\s+(?P<user>\w+)\[\w+\]\s+@\s+\[(?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\]\d+\s+\d+\s+(?P<operation_type>\w+)\s+(?P<sql_command>.*)$"
    | where match(sql_command, "(?i)^(CREATE|ALTER|DROP|TRUNCATE|RENAME)\s")
    | table _time, user, ip, operation_type, sql_command, data.textPayload

Any other SQL commands of interest can be added to the list of matched keywords. Using regex, we ensure case-insensitive matching and that the command is followed by a whitespace character to prevent a partial match.

Now that you have a search to identify relevant events, you can also explore how to create an alert to automate the process of generating a ServiceNow ticket when a DDL operation is performed on the database.

image5.png

Use the base SPL search to find the correct events, the rex command to parse the text payload into fields, and finally the match function to identify DDL operations.

image3.png

In the Trigger Actions section of the alert, we can configure the ServiceNow Event Integration custom action to create a ticket in the external system with information about the event. Notice how fields from the event are referenced in the Additional Info input.

SQL Server

Unlike PostgreSQL, GCP CloudSQL SQL Server does not support native DDL logging through database flags. Instead, you use database-level DDL triggers to capture schema changes and Splunk DB Connect to ingest the audit data. This enables you to capture:

  • DDL operation type (CREATE_TABLE, ALTER_TABLE, DROP_TABLE, etc.)
  • Object name and schema
  • Full SQL statement text
  • Username, hostname, timestamp
  • Database name

Following these steps:

  1. Connect to SQL Server instance.

  2. Run these commands in Cloud SQL Studio:

    GRANT ALTER ON DATABASE::master TO [sqlserver];
    GRANT ALTER ANY DATABASE DDL TRIGGER TO [sqlserver];
    
  3. Create Audit DB.
    CREATE DATABASE AuditDB;
    
  4. Create the audit table.
    CREATE TABLE dbo.DdlEvents (
        Id BIGINT IDENTITY PRIMARY KEY,
        EventType SYSNAME,
        ObjectType SYSNAME,
        SchemaName SYSNAME,
        ObjectName SYSNAME,
        Tsql NVARCHAR(MAX),
        LoginName SYSNAME,
        HostName NVARCHAR(128),
        DatabaseName SYSNAME,
        PostTimeUtc DATETIME2,
        EventXml XML
    );
    
  5. Create the DDL trigger.
    CREATE TRIGGER trg_audit_ddl ON DATABASE FOR CREATE_TABLE, ALTER_TABLE, DROP_TABLE, CREATE_PROCEDURE, ALTER_PROCEDURE, DROP_PROCEDURE, CREATE_VIEW, ALTER_VIEW, DROP_VIEW, CREATE_FUNCTION, ALTER_FUNCTION, DROP_FUNCTION AS BEGIN SET NOCOUNT ON; DECLARE @x XML = EVENTDATA(); INSERT dbo.DdlEvents (EventType, ObjectType, SchemaName, ObjectName, Tsql, LoginName, HostName, DatabaseName, PostTimeUtc, EventXml) SELECT @x.value('(/EVENT_INSTANCE/EventType)[1]', 'sysname'), @x.value('(/EVENT_INSTANCE/ObjectType)[1]', 'sysname'), @x.value('(/EVENT_INSTANCE/SchemaName)[1]', 'sysname'), @x.value('(/EVENT_INSTANCE/ObjectName)[1]', 'sysname'), @x.value('(/EVENT_INSTANCE/TSQLCommand/CommandText)[1]','nvarchar(max)'), @x.value('(/EVENT_INSTANCE/LoginName)[1]', 'sysname'), HOST_NAME(), DB_NAME(), SYSUTCDATETIME(), @x; END;
    
  6. Test the trigger.
    CREATE TABLE dbo.TestTable (id INT, name NVARCHAR(100));
    ALTER TABLE dbo.TestTable ADD email NVARCHAR(255);
    DROP TABLE dbo.TestTable;
    
  7. Check the Audit Table. You should see three rows showing the CREATE, ALTER, and DROP operations with full SQL text.
    SELECT 
        Id,
        EventType,
        ObjectType,
        ObjectName,
        Tsql,
        LoginName,
        PostTimeUtc
    FROM dbo.DdlEventsORDER BY PostTimeUtc DESC;
    

Next, you need to set up Cloud SQL Proxy as it is required for Splunk DB Connect to access your SQL Server instance via private IP.

  1. Install Cloud SQL Proxy on your Splunk server.
    1. ssh into your Splunk Server and run:
      # Download Cloud SQL Proxy
      curl -o cloud-sql-proxy https://storage.googleapis.com/cloud-sql-connectors/cloud-sql-proxy/v2.8.0/cloud-sql-proxy.linux.amd64
      # Make it executable
      chmod +x cloud-sql-proxy
      # Move to system pathsudo mv cloud-sql-proxy /usr/local/bin/
      
  2. Start Cloud SQL Proxy. Replace with your actual Instance connection name (Project:Region:Instance).
    /usr/local/bin/cloud-sql-proxy PROJECT_ID:REGION:INSTANCE_NAME --address=0.0.0.0 --port=1433 --private-ip &
    
    #You should see “The proxy has started successfully and is ready for new connections!”
    

Now you need to Configure Splunk DB Connect. Leave the proxy running or make it persistent.

  1. Go to Configuration -> Connections -> New Connection.
    1. Select MS-SQL Server Using MS Generic Driver as your connection Type.
    2. Enter the Host and Port number of your SQL Server Instance.
    3. Select the Default Database as the AuditDB you created earlier.
    4. Add Connection Properties as KVPs:
      1. User: <your username>
      2. Password: <your password>
  2. Go to Data Lab -> Inputs -> New Input.
  3. Select Table and use the connection you just created.
  4. Set the properties:
    1. Mode: Event
    2. Type: Rising
    3. Rising Column: Id
    4. Checkpoint Value: 0
  5. Complete the setup.
    1. Make sure this is enabled, you provide an index and source type.
  6. Wait 5 to 10 minutes and run this SPL:
    <base search>| table _time, EventType, DatabaseName, ObjectName, LoginName, Tsql
    <base search>
    | rename ObjectName AS object_name, LoginName AS db_user, Tsql AS sql_statement, EventType AS event_type
    | rex field=event_type "(?<sql_operation>CREATE|ALTER|DROP)_"
    | search sql_operation=*
    | table _time, DatabaseName, SchemaName, object_name, db_user, sql_operation, sql_statement
    | sort - _time
    

Next steps

Now that you have learned how to identify database connections in your GCP CloudSQL database, check out the other use cases available for this service: