设为首页 - 加入收藏 ASP站长网(Aspzz.Cn)- 科技、建站、经验、云计算、5G、大数据,站长网!
热搜: 重新 试卷 文件
当前位置: 首页 > 服务器 > 系统 > 正文

如何用 Shell 轻松搞定 Linux 命令审计

发布时间:2022-10-05 11:11 所属栏目:52 来源:互联网
导读:首先,当谈到 Linux 的操作审计需求时,大多数我们希望的是还原线上服务器被人为(误)操作时执行的命令行,以及它关联的上下文。这个需求场景其实跟通用的业务日志采集一致,简单一点可以直接通过 history 将内容发给 syslog,复杂一点的采用 auditd 或 ebp
  首先,当谈到 Linux 的操作审计需求时,大多数我们希望的是还原线上服务器被人为(误)操作时执行的命令行,以及它关联的上下文。这个需求场景其实跟通用的业务日志采集一致,简单一点可以直接通过 history 将内容发给 syslog,复杂一点的采用 auditd 或 ebpf 在内核层面上捕获行为。
 
  不过本文不打算对上述的方案做原理解释,仅仅站在一个运维小白的角度来完成日常 80%(80%的数据来源?我也不知道,大概是二八原则)的操作审计 。既然文章标题是用 Shell 来完成, 由此可见今天的主题跟 Bash 脱不了关系了。
 
  一句话概括今天的主题:利用定制 Bash 源增加日志审计功能,并将用户操作发给 rsyslog 聚合,最后在 elasticsearch 做日志存储和查询。
 
  Linux 部分
  准备一些必要的工具
  rsyslog:  一个Linux上自带并兼容 syslog 语法的日志处理服务
  jq: 一个在 shell 下处理 json 数据的小工具
  logger: 一个可以往 syslog 输入日志的工具
  这些小工具除 jq 外,大多操作系统发行版都自带,如果没有的话也可以直接用操作系统内置的包管理工具安装。
  ash.audit.sh,并将其拷贝到 /etc/profile.d/ 目录下
  复制
  if [ "${SHELL##*/}" != "bash" ]; then
    return
  fi
  if [ "${AUDIT_READY}" = "yes" ]; then
      return
  fi
  declare -rx HISTFILE="$HOME/.bash_history
  declare -rx HISTSIZE=500000
  declare -rx HISTFILESIZE=500000
  declare -rx HISTCONTROL=""
  declare -rx HISTIGNORE=""
  declare -rx HISTCMD
  declare -rx AUDIT_READY="yes"
  shopt -s histappend
  shopt -s cmdhist
  shopt -s histverify
  if shopt -q login_shell && [ -t 0 ]; then
    stty -ixon
  fi
  if groups | grep -q root; then
    declare -x TMOUT=86400
    # chattr +a "$HISTFILE"
  fi
  declare -a LOGIN_INFO=( $(who -mu | awk '{print $1,$2,$6}') )
  declare -rx AUDIT_LOGINUSER="${LOGIN_INFO[0]}"
  declare -rx AUDIT_LOGINPID="${LOGIN_INFO[2]}"
  declare -rx AUDIT_USER="$USER"
  declare -rx AUDIT_PID="$$"
  declare -rx AUDIT_TTY="${LOGIN_INFO[1]}"
  declare -rx AUDIT_SSH="$([ -n "$SSH_CONNECTION" ] && echo "$SSH_CONNECTION" | awk '{print $1":"$2"->"$3":"$4}')"
  declare -rx AUDIT_STR="$AUDIT_LOGINUSER  $AUDIT_LOGINPID  $AUDIT_TTY  $AUDIT_SSH"
  declare -rx AUDIT_TAG=$(echo -n $AUDIT_STR | sha1sum |cut -c1-12)
  declare -x AUDIT_LASTHISTLINE=""
  set +o functrace
  shopt -s extglob
  function AUDIT_DEBUG() {
    if [ -z "$AUDIT_LASTHISTLINE" ]; then
      local AUDIT_CMD="$(fc -l -1 -1)"
      AUDIT_LASTHISTLINE="${AUDIT_CMD%%+([^ 0-9])*}"
    else
      AUDIT_LASTHISTLINE="$AUDIT_HISTLINE"
    fi
    local AUDIT_CMD="$(history 1)"
    AUDIT_HISTLINE="${AUDIT_CMD%%+([^ 0-9])*}"
    if [ "${AUDIT_HISTLINE:-0}" -ne "${AUDIT_LASTHISTLINE:-0}" ] || [ "${AUDIT_HISTLINE:-0}" -eq "1" ]; then
      MESSAGE=$(jq -c -n \
       --arg pwd "$PWD" \
       --arg cmd "${AUDIT_CMD##*( )?(+([0-9])?(\*)+( ))}" \
       --arg user "$AUDIT_LOGINUSER" \
       --arg become "$AUDIT_USER" \
       --arg pid "$AUDIT_PID" \
       --arg info "${AUDIT_STR}" \
       '{cmd: $cmd, user: $user, become: $become, pid: $pid, pwd: $pwd, info: $info}')
      logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE"
    fi
  }
  function AUDIT_EXIT() {
    local AUDIT_STATUS="$?"
    if [ -n "$AUDIT_TTY" ]; then
      MESSAGE_CLOSED=$(jq -c -n \
          --arg action "session closed" \
          --arg user "$AUDIT_LOGINUSER" \
          --arg become "$AUDIT_USER" \
          --arg pid "$AUDIT_PID" \
          --arg info "${AUDIT_STR}" \
          '{user: $user, become: $become, pid: $pid, action: $action, info: $info}')
      logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE_CLOSED"
    fi
    exit "$AUDIT_STATUS"
  }
  declare -frx +t AUDIT_DEBUG
  declare -frx +t AUDIT_EXIT
  if [ -n "$AUDIT_TTY" ]; then
    MESSAGE_OPENED=$(jq -c -n \
        --arg action "session opened" \
        --arg user "$AUDIT_LOGINUSER" \
        --arg become "$AUDIT_USER" \
        --arg pid "$AUDIT_PID" \
        --arg info "${AUDIT_STR}" \
        '{user: $user, become: $become, pid: $pid, action: $action, info: $info}')
    logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE_OPENED"
  fi
  declare -rx PROMPT_COMMAND="[ -n "$AUDIT_DONE" ] && echo ''; AUDIT_DONE=; trap 'AUDIT_DEBUG && AUDIT_DONE=1; trap DEBUG' DEBUG
  declare -rx BASH_COMMAND
  declare -rx SHELLOPT
  trap AUDIT_EXIT EXIT
  1.
  2.
  3.
  4.
  5.
  6.
  7.
  8.
  9.
  10.
  11.
  12.
  13.
  14.
  15.
  16.
  17.
  18.
  19.
  20.
  21.
  22.
  23.
  24.
  25.
  26.
  27.
  28.
  29.
  30.
  31.
  32.
  33.
  34.
  35.
  36.
  37.
  38.
  39.
  40.
  41.
  42.
  43.
  44.
  45.
  46.
  47.
  48.
  49.
  50.
  51.
  52.
  53.
  54.
  55.
  56.
  57.
  58.
  59.
  60.
  61.
  62.
  63.
  64.
  65.
  66.
  67.
  68.
  69.
  70.
  71.
  72.
  73.
  74.
  75.
  76.
  77.
  78.
  79.
  80.
  81.
  82.
  83.
  84.
  85.
  86.
  简单说明下这个脚本,大致就是定义了 shell 的历史条目、登录超时时间、以及审计日志的格式和发送。
 
  配置rsyslog 客户端,本地创建一个 /etc/rsyslog.d/40-audit.conf 文件,用于将本地local6级别的系统日志发送远端的rsyslog服务集中处理
  复制
  $RepeatedMsgReduction off
  local6.info @<>:514
  & stop
  1.
  2.
  3.
  配置完成后,别忘了重启下 rsyslog 服务!
  数据部分
  数据部分顾名思义,用于接收并处理客户端发来的操作系统日志。这里我们用到了 rsyslog 和 elasticsearch 两个服务了。
 
  准备rsyslog-elasticsearch
  要让 rsyslog 将日志发送给 elastichsearch,我们就必须安装它的 es 模块
 
  复制
  # Ubuntu  
  apt-get install -y rsyslog-elasticsearch rsyslog-mmjsonparse  
  #CentOS
  yum install rsyslog-elasticsearch rsyslog-mmjsonparse
  1.
  2.
  3.
  4.
  准备 ElasticSearch 服务
  为了简单部署,本文直接用 docker 快速拉起一个 ES 服务
 
  复制
  docker run -d --name elasticsearch  -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.3.1
  1.
  配置 rsyslog 服务端,创建一个文件 /etc/rsyslog.d/40-audit-server.conf,用于定义日志的写入策略。
  复制
  $RepeatedMsgReduction off
  $ModLoad imudp
  $UDPServerRun 514
  module(load="mmjsonparse")          # for parsing CEE-enhanced syslog messages
  module(load="omelasticsearch")      # for outputting to Elasticsearch
  #try to parse a structured log
  # this is for index names to be like: rsyslog-YYYY.MM.DD
  template(name="rsyslog-index" type="string" string="bashaudit-%$YEAR%.%$MONTH%.%$DAY%")
  # this is for formatting our syslog in JSON with @timestamp
  template(name="json-syslog" type="list") {
      constant(value="{")
        constant(value="\"@timestamp\":\"")     property(name="timegenerated" dateFormat="rfc3339" date.inUTC="on")
        constant(value="\",\"host\":\"")        property(name="fromhost-ip")
        constant(value="\",\"severity\":\"")    property(name="syslogseverity-text")
        constant(value="\",\"facility\":\"")    property(name="syslogfacility-text")
        constant(value="\",\"program\":\"")     property(name="programname")
        constant(value="\",\"tag\":\"")         property(name="syslogtag" format="json")
        constant(value="\",")                   property(name="$!all-json" position.from="2")
      # closing brace is in all-json
  }
  if ($syslogfacility-text == 'local6' and $syslogseverity-text == 'info') then {
   action(type="mmjsonparse")
   action(type="omelasticsearch" template="json-syslog" searchIndex="rsyslog-index" dynSearchIndex="on" server="<your_elasticsearch_address>" serverport="<your_elasticsearch_port>")
          # action(type="omfile" file="/var/log/bashaudit.log")
          stop
  }
  1.
  2.
  3.
  4.
  5.
  6.
  7.
  8.
  9.
  10.
  11.
  12.
  13.
  14.
  15.
  16.
  17.
  18.
  19.
  20.
  21.
  22.
  23.
  24.
  25.
  26.
  这里采用了 rsyslog 的两个 module 来处理收集的日志
 
  mmjsonparse用于 json 格式化日志
  omelasticsearch用于配置 ElastichSearch
  配置完成重启 rsyslog 服务
  查询部分
  审计日志的查询我们可以使用 Kibana 或者自己根据 ElasticSearch API 进行二次开发。这里我们以 Kibana 举例。
 
  复制
  cat << EOF   > ./kibana.yml
  server.port: 15601
  elasticsearch.hosts: ["http://<your_elasticsearch_address>:<your_elasticsearch_port>"]
  i18n.locale: "zh-CN"
  EOF
  docker run -d --ulimit nofile=1000000:1000000 --net host --name elasticsearch-audit -v ./kibana.yml:/usr/share/kibana/config/kibana.yml  --restart always docker.elastic.co/kibana/kibana-oss:7.3.1
  1.
  2.
  3.
  4.
  5.
  6.
  本地访问http://localhost:15601进入 kibana 配置创建一个名为 bashaudit 的索引模式
 
 
 
  之后,我们就能进入 Discover 中查询审计日志了,包含了基本Shell执行时间、来源用户、执行目录等数据。
 
 
 
  再进一步,我们也可以通过调用 API 的方式对审计日志做一些额外的二次开发,例如:
 
  对线上服务器热点用户统计
  对线上服务器做热点操作统计
  对线上危险Shell 操作做告警
 
 
  
 
 
  总结
  本文讲述了采用定制 Bash 的方式,在用户登录初始化 Shell 的方式将其后续的命令行操作发送给 rsyslog 服务进行处理,并将格式化后的日志存储在 ElasticSearch 中方便辅助系统管理者在线上故障定位时使用,也可以依此对 Linux命令行审计做可视化的二次开发。
 
  不过本文基于定制 Bash 的方式仍然具备很多局限性,例如:
 
   不能审计 ShellScript 内的执行逻辑;
   存在用其他 shell 绕过审计,如 zsh 等;
  可以看到要想审计到更详细的内容,光在 Bash(表面功夫)上实现并不能满足,读者可以尝试使用snoopy 对 Shell 脚本内部做跟踪审计。

(编辑:ASP站长网)

    网友评论
    推荐文章
      热点阅读