Logstash ingestion of AWS billing customer usage reportsΒΆ

To get the CUR reports into elastic search:

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.


# input {
#   file {
#     id => "my_plugin_id"
#     path => "/home/brent/es/data/testinput.csv"
#     start_position => "beginning"
#     sincedb_clean_after => "0.0001"
#     mode => "read"
#     file_completed_action => "log"
#     file_completed_log_path => "test"
#   }
# }


input {
s3 {
   access_key_id => "XXXXXX"
   secret_access_key => "XXXX"
   bucket => "s3logs-cetinich"
   #This bucket is in SYDNEY
   region => "ap-southeast-2"
   include_object_properties => true
   prefix => "cost/test/20201101-20201201/f6186f8d-eaf6-4396-96d1-580658ba5dc6/"
   # prefix => "cost/test/"
   type => "s3"
   delete => false
   exclude_pattern => "\.json?$"
   sincedb_path => "s3sincedb"
   codec => "plain"
   watch_for_new_files => true

}
}


# 

filter {
csv {
   separator => ","
   autodetect_column_names => true
   autogenerate_column_names => true
   convert => {
         "lineItem/BlendedCost" => "float"
         "lineItem/BlendedRate" => "float"
         "lineItem/UsageAmount" => "float"
         "lineItem/UnblendedCost" => "float"
         "lineItem/UnblendedRate" => "float"
         "pricing/publicOnDemandRate" => "float"
      }
}

   if "" in [document_id] {
      uuid {
            target => "document_id"
      }
   }


mutate {
# replace => [ "index", "aws-billing-%{billing_year}.%{billing_month}" ]
remove_field => [ "message" ]
}

   # For whatever reason there is no straighforward way to ensure all the fields are kept from the original event
   # This just loops over the map and re-inject them
   aggregate {
      task_id => "%{identity/LineItemId}"
      code => "
      event.to_hash.each do |key,value|
            if value.is_a?(Hash)
                  map[key] ||= {}
                  map[key].merge!(value)
            else
               map[key] = value
            end
      end
      #  map['[event_data]'] = event.get('[event_data]')
      map['agg_blendedcost'] ||= 0
      map['agg_blendedcost'] += event.get('lineItem/BlendedCost')"
      timeout => 60
      push_map_as_event_on_timeout  => true
   }

# Drop all the original events and only keep the aggregates
if !([agg_blendedcost])  {
drop {}
}

}

   stdout { codec => rubydebug }
}

output {
elasticsearch {
   hosts => ["https://localhost:9200"]
   index => "cur"
   user => "admin"
   password => "admin"
   ilm_enabled => false
}
}

Comments

comments powered by Disqus