In today’s digital landscape, security is paramount for any organization or individual. As part of maintaining a secure environment, monitoring and analyzing authentication logs is essential. One widely used protocol for remote server management is SSH (Secure Shell), which generates authentication logs containing valuable information about user login attempts, successful logins, failed attempts, and more.
To efficiently manage and analyze these logs, Logstash, a powerful data processing pipeline tool, can be employed to parse SSH authentication logs effectively.
Before exploring into parsing SSH authentication logs with Logstash, it’s crucial to understand the structure of these logs. SSH authentication logs typically include details such as the timestamp of the event, the username attempting to log in, the IP address of the client, the authentication method used (e.g., password, public key), and the outcome of the authentication attempt (success or failure).
To begin parsing SSH authentication logs with Logstash, ensure Logstash is installed and configured on your system. Logstash configuration involves defining input, filter, and output plugins to process and route log data effectively.
To parse SSH authentication logs, we need to define a custom grok pattern in Logstash to match the log format. Below is an example that demonstrates parsing SSH authentication logs:
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:timestamp1} %{DATA:hostname1} (\[)?%{DATA:processname}(\])?(\[%{DATA:processpid}\])?: %{GREEDYDATA:message}"
}
overwrite => [ "message" ]
}
if [message] =~ /^pam_/ {
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{PAM}" }
add_field => { "parser_result" => "pam_success" }
}
if [pam_kvdata] {
mutate {
gsub => [ "pam_kvdata", "= ", '=- ' ]
}
kv {
source => "pam_kvdata"
prefix => "pam_"
}
mutate {
remove_field => [ "pam_kvdata" ]
}
}
if [pam_rhost]{
mutate { add_field => { "siem_sourceip" => "%{pam_rhost}" } }
}
}
if [processname] == "sshd" {
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{SSH_AUTHFAIL_WRONGCREDS}" }
add_field => { "ssh_authresult" => "fail" }
add_field => { "ssh_failreason" => "wrong_credentials" }
add_field => { "parser_result" => "sshd_success" }
}
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{SSH_AUTHFAIL_WRONGUSER}" }
add_field => { "ssh_authresult" => "fail" }
add_field => { "ssh_failreason" => "unknown_user" }
add_field => { "parser_result" => "sshd_success"}
}
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{SSH_AUTH_SUCCESS}" }
add_field => { "ssh_authresult" => "success" }
add_field => { "parser_result" => "sshd_success"}
}
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{SSH_DISCONNECT}" }
add_field => { "ssh_authresult" => "ssh_disconnect" }
add_field => { "parser_result" => "ssh_success" }
}
}
if [processname] == "sudo" {
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{SUDO}" }
add_field => { "parser_result" => "sudo_success" }
}
}
if [processname] == "su" {
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{SU}" }
add_field => { "parser_result" => "su_success" }
}
}
if [processname] == "systemd-logind" {
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{SYSTEMD_LOGIND}" }
add_field => { "parser_result" => "systemd_logind_success" }
}
}
if [processname] in [ "useradd", "groupadd" ] {
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{ACCOUNT_ADD}" }
add_field => { "account_action" => "create" }
add_field => { "parser_result" => "new_account_success" }
}
if [account_kvdata] {
mutate {
gsub => [ "account_kvdata", ", ", '|' ]
}
kv {
source => "account_kvdata"
prefix => "account_"
field_split => "|"
}
mutate {
remove_field => [ "account_kvdata" ]
}
}
}
if [processname] == "usermod" {
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{USERMOD}" }
add_field => { "parser_result" => "usermod_success" }
}
}
if [processname] == "userdel" {
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{USERDEL}" }
add_field => { "parser_result" => "userdel_success" }
}
mutate {
gsub => [ "account_action", "remove", "delete" ]
}
}
if [processname] == "groupdel" {
grok {
patterns_dir => ["/etc/logstash/patterns.d"]
match => { "message" => "%{GROUPDEL}" }
add_field => { "account_action" => "delete" }
add_field => { "parser_result" => "groupdel_success" }
}
}
mutate {
remove_tag => [ "_grokparsefailure" ]
}
if [parser_result] {
if [parser_result] == "new_account_success" {
mutate { add_field => { "linuxeventcode" => "1000" } }
}
if [parser_result] == "userdel_success" {
mutate { add_field => { "linuxeventcode" => "1001" } }
}
}
if [ssh_authresult] {
if [ssh_authresult] == "fail" {
mutate { add_field => { "linuxeventcode" => "1002" } }
}
if [ssh_authresult] == "success" {
mutate { add_field => { "linuxeventcode" => "1003" } }
}
if [ssh_authresult] == "ssh_disconnect" {
mutate { add_field => { "linuxeventcode" => "1004" } }
}
}
if [account_action] {
if [account_action] == "lock" {
mutate { add_field => { "linuxeventcode" => "1005" } }
}
if [account_action] == "unlock" {
mutate { add_field => { "linuxeventcode" => "1006" } }
}
if [account_action] == "password changed" {
mutate { add_field => { "linuxeventcode" => "1007" } }
}
}
date {
match => [ "timestamp1", "MMM dd HH:mm:ss" ]
timezone => "Asia/Kolkata"
locale => "en"
target => "@timestamp"
}
Here are the .grok files mentioned, to be uploaded to the /etc/logstash/patterns.d/ directory:
systemd.grok
SYSTEMD_LOGIND_NEW New session %{INT:systemd_login_sessionid} of user %{USER:systemd_login_user}\.
SYSTEMD_LOGIND_REMOVED Removed session %{INT:systemd_login_sessionid}\.
SYSTEMD_LOGIND_OTHERS %{GREEDYDATA}
SYSTEMD_LOGIND %{SYSTEMD_LOGIND_NEW}|%{SYSTEMD_LOGIND_REMOVED}|%{SYSTEMD_LOGIND_OTHERS}
sudo.grok
SUDO \s*%{USERNAME:sudo_user}\s*:\s*TTY=%{DATA:sudo_tty}\s*;\s*PWD=%{DATA:sudo_pwd}\s*;\s*USER=%{DATA:sudo_targetuser}\s*;\s*COMMAND=%{GREEDYDATA:sudo_command}
SU \+\s+%{DATA:su_tty}\s+%{USER:su_user}:%{USER:su_targetuser}
user-management.grok
USERMOD_CHANGE (?<account_action>change) user '%{USER:account_name}' %{WORD:account_attribute}( from '%{DATA:account_from}' to '%{DATA:account_to}')?
USERMOD_GROUP_MEMBERSHIP (?<account_action>add|delete) '%{USER:account_name}' (?:to|from) %{DATA:account_grouptype} '%{DATA:account_groupname}'
USERMOD_LOCKUNLOCK (?<account_action>lock|unlock) user '%{USER:account_name}' %{GREEDYDATA:greedydetails}
USERMOD %{USERMOD_CHANGE}|%{USERMOD_GROUP_MEMBERSHIP}|%{USERMOD_LOCKUNLOCK}
NEW_ACCOUNT new %{DATA:account_type}:\s+%{GREEDYDATA:account_kvdata}
ACCOUNT_ADD %{NEW_ACCOUNT}|%{USERMOD_GROUP_MEMBERSHIP}
USERDEL_USER (?<account_action>delete) user '%{USER:account_name}'
USERDEL_GROUP (?<account_action>remove)d %{DATA:account_grouptype} '%{DATA:account_groupname}' owned by '%{DATA:account_groupowner}'
USERDEL_GROUP_DELUSER (?<account_action>delete) '%{USER:account_name}' from %{DATA:account_grouptype} '%{DATA:account_groupname}'
USERDEL %{USERDEL_USER}|%{USERDEL_GROUP}|%{USERDEL_GROUP_DELUSER}
GROUPDEL %{DATA:account_type} '%{USER:account_name}' removed( from %{GREEDYDATA:account_repository})?
sshd.grok
SSH_AUTHFAIL_WRONGUSER Failed %{WORD:ssh_authmethod} for invalid user %{USERNAME:siem_sshuser} from %{IP:siem_sourceip} port %{NUMBER:siem_sshport} (?<ssh_protocol>\w+\d+)
SSH_AUTHFAIL_WRONGCREDS Failed %{WORD:ssh_authmethod} for %{USERNAME:siem_sshuser} from %{IP:siem_sourceip} port %{NUMBER:siem_sshport} (?<ssh_protocol>\w+\d+)
SSH_AUTH_SUCCESS Accepted %{WORD:ssh_authmethod} for %{USERNAME:siem_sshuser} from %{IP:siem_sourceip} port %{NUMBER:siem_sshport} (?<ssh_protocol>\w+\d+)(?:: %{WORD:ssh_pubkey_type} %{GREEDYDATA:ssh_pubkey_fingerprint})?
SSH_DISCONNECT Received disconnect from %{IP:siem_sourceip} port %{INT:siem_sshport}.*?:\s+%{GREEDYDATA:ssh_disconnect_reason}
In Logstash, the patterns stored in this directory are used to define custom regular expressions (known as grok patterns) for parsing log files. These patterns help Logstash understand the structure of log messages and extract relevant information from them during the ingestion process.
Placing custom grok patterns in this directory allows Logstash to access and utilize them when processing log data.
And here you can see that we have assigned a static number to following event fields:
S. No. | Event Name | Event Code |
---|---|---|
1 | New Account Created | 1000 |
2 | User Deleted | 1001 |
3 | Failed Login Attempt | 1002 |
4 | Successful Login Attempt | 1003 |
5 | SSH Disconnect | 1004 |
6 | User Locked | 1005 |
7 | User Unlocked | 1006 |
8 | Password Changed | 1007 |
By utilizing Logstash for parsing SSH authentication logs, organizations can efficiently monitor and analyze login activities, identify suspicious behavior, and strengthen their security posture. Custom Logstash configurations allow for flexible parsing of various log formats, enabling tailored log analysis workflows suited to specific security requirements.
With Logstash’s powerful capabilities, managing SSH authentication logs becomes an integral part of an effective security monitoring strategy.
You may also like:- How To Parse FortiGate Firewall Logs with Logstash
- Categorizing IPs with Logstash – Private, Public, and GeoIP Enrichment
- [Solution] Missing logstash-plain.log File in Logstash
- Understanding Netstat – The Network Monitoring Tool
- Using Elasticsearch Ingest Pipeline to Copy Data from One Field to Another
- Top 10 Useful Windows Commands
- Essential Commands For Process Management in Kali Linux
- Essential Log Types for Effective SIEM Deployment
- How To Install Python 2.7.18 From The Source
- 100 Most Important SOC Analyst Interview Questions
This Post Has One Comment