Syslog parse-olasa syslog-ng + syslog-ng relay + blackesk (elastic/syslog-ng/kibana) kornyezetben

Fórumok

Sziasztok!

A kovetkezore keresek megoldast.

Syslog-ng-vel gyujtok logokat, amiket egy firewall box-on levo relay modba konfiguralt syslog-ng-vel tovabbitok egy tavoli syslog-ng szerverbe, ami egy blackesk alapu elasticsearch/syslog-ng/kibana kombo resze.

Szeretnem, ha kibana-ban jol lehetne filterezni a forras host-rol kuldott logokat (ami persze nem csak sima syslog, hanem container logok is) de mire eler a log a kibana-ba, mar be van csomagolva egy MESSAGE mezobe, parse-olatlanul.

Amit szeretnek elerni, hogy a forras host-rol kuldott (es kesobb mar MESSAGE mezobe csomogolt) logokban talalhato key value parok alapjan is lehessen konnyen filterezni kibanaban.

Ehhez gondolom valami parse-olas kellene valahol syslog oldalon ebben a lancolatban, ill. esetleg ingest pipeline-okat is kene konfiguralni elastic oldalon, de nem latom tisztán, mit hol kene reszelni, hogy jo legyen.

Barmilyen tippet szivesen fogadok!

Hozzászólások

Valamennyire tudom formazni a logokat syslog-ng-vel, a destination beallitasoknal, ill. tudom sima vagy json template segitsegevel alakitva tovabbkukdeni, de valahogy egyik se jo.

Az elastic ingest pipeline beallitasaival mar valamennyire tudtam is adatokat kibanyaszni, de ez a tobbszoros beagyazas, ami vegul a komplett syslog sort parese-olatlanul belehajitja a MESSAGE mezobe, mindig arra fut ki, hogy valami illegális karaktert talal es ezert a MESSAGE mezot nem tudja feldolgozni. Barmelyik masik mezot viszont igen.

Esetleg fluentd?

Tehát valami így nézne ki a folyamat:
(logforrás) ----> (syslog-ng) ----> (syslog-ng -> fulentd-> elk)

Fluentd-nél a logforrás lehet fájl is, alapból tud generálni olyan kimenetet ami neked kell "out_elasticsearch".
Fluentd lehet használni ilyen "szövegfeldolgozónak", ebben jobb mint a syslog-ng. Olvass utána akkor megérted, itt mire gondolok, ez is ingyenes:

https://www.fluentd.org/

Egy projekt kapcsán kellett volna hasonlót csinálnom, de félbemaradt :)

 

Ha a forrás gépen továbbküldéskor json-ba alakítanád az összes KV párt, és a relay-en parse-olás nélkül továbbküldenéd a célgepre, akkor ott lehet, hogy az elastic/kibana már tudna vele mit kezdeni.

A pipeline végén milyen formátumban várják az adatokat?

Szia! Ezt probaltam, de sajnos nem segit.

A problema targya leginkabb az, hogy az ESK (blackesk) megkoveteli, hogy legyen egy @timestamp object hozzaadva, amikor betolom a cuccot az elasticseach-be. Es innentol mindent, ami jott, azt betesz egy MESSAGE field-be, viszont parse-olas hianyaban nem vesz tudomast a MESSAGE-ben kuldott object-ekrol, igy azok csak sima string-kent kerulnek be es ezert nem tudok rajuk object alapon szurni. 

Ajánlom, cdbb kommentjét.
Kérdés, hogy kintről az elastic-ba küldöd be a logokat közvetlenül http destination-ön keresztül, vagy a syslog-ng 514-es porti listenerén keresztül?
Mert ha utóbbi, akkor ott a syslog-ng nem vár json bemenetet, és így nem is fogja megfelelően parse-olni.

Tekintve, hogy kereshetővé szeretnéd tenni a KV párjaidat, így vagy az lenne a megoldás, hogy a relay-ről küldöd be egy http desten keresztül az elasticba közvetlenül, vagy a célgépeden a blackesk syslog konfigját átírod, és nyitsz egy újabb porton egy listenert, aminek a bemenetjét json-ra parse-oltatod, és akkor az elastic-ba be kellene kerüljön a cucc (ha a megfelelő logpath-on összekötöd őket), mert ahogy látom, az @timestamp-et automatikusan beleteszi beküldéskor a template miatt.

Ekorul kapirgalok en is.

A syslog-ng 514-es porti listenerén keresztül kuldom be, kulonben az authentikacioval is meg kene kuzdeni es azert van par kornyezet, ahonnan a logok jonnek, igy ez okozna egy kis fejfajast, ha nem is lehetetlen.

Ami itt meg aggaszt kicsit, az a kovetekezo: The JSON parser currently supports only integer, double and string values when interpreting JSON structures. 

Namost a bejovo adat azert eleg vegyes, kezdjuk ott, hogy tartalmaz egy forras oldali timestamp-et, ami a fenti harombol egyik kategoriab se passzol igazan. Ezt kene majd valahogy pl ingest pipeline segitsegevel kiasni es visszakonvertalni timestamp formatumra?

Ill. azt nem tudom meg, hogy ha nem JSON-ban tolom vegig a cuccot a forrastol a relay-en keresztul, akkor a KV pair-okat, hogyan kene be/ki csomagolni, hogy hatekony legyen?

JSON eseteben pl: --pair <key>=\"value\"

Bovebben: destination d_net { tcp("<destination_ip>" port(514) log_fifo_size(1000) template("$(format_json --key ISODATE --key MESSAGE --key FILE_NAME --pair <key>=\"<value>\" )\n\n")); };

Hát, szükség esetén a JSON mehet a végeszközökről a relayedre, és akkor elég csak ott bekonfigolni a http feletti továbbítást.
Kérdés még, hogy az elastic mennyire szigorú a felmutatott certekkel kapcsolatban.
Ha nem érzel magadban erőt, akkor a blackesk-ből kimentheted a syslog által használt certet/kulcsot.

A blackesk konfigja alapján a timestamp egy egyszerű stringérték lesz, mert az ISODATE, amivel a syslog feltölti az értékét, az egy egyszerű string (ISO 8601 timestamp), semmi több.

A forrás oldalon szerintem elég annyi, hogy:

template("$(format-json --scope rfc3164 --scope nv-pairs --key ISODATE @timestamp=${ISODATE})\n")

Ez elvileg a legtöbb syslog-ng makrót beleteszi. Ha előtte kv-parsert is ráengedsz a logok beolvasása után akkor azokat is.

Fogadó oldalon pedig elég egy json parser betétele a logpath-ba, ha a blackesk-oldali syslog-nak küldöd. Ha elastic-ba közvetlenül, akkor az sem kell.

Az erdekesseg kedveert, ha json formatumban vegig kergetem a forrastol, a relay-en keresztul az ESK syslog-jaig (az egyszeruseg kedveert minden lepesben ugyanazokkal a destination beallitasokkal, de meg json parser nelkul a source vagy log szekcioban), akkor az elastic-ban igy nez ki a MESSAGE field tartalma. (kicsit tordeltem az olvashatosag kedveert)

 

<13>1 2022-09-19T11:06:07+02:00 192.168.0.123 465 - - - 

{
	"SOURCE":"s_network","PROGRAM":"465","PRIORITY":"notice",
	
	"MESSAGE":"<13>1 2022-09-19T09:06:07+00:00 x600-device - - - - 
	
	{
		\"SOURCE\":\"s_system\",\"PRIORITY\":\"notice\",\"MESSAGE\":\"[2022-09-19T09:06:07.229Z] [ warning] [vmsvc] RecordRoutingInfo: Unable to collect IPv6 routing table.\",\"ISODATE\":\"2022-09-19T09:06:07+00:00\",\"HOST_IP\":\"192.168.0.123\",\"HOST_FROM\":\"x600-device\",\"HOST\":\"x600-device\",\"FILE_NAME\":\"/var/log/vmware-vmsvc-root.log\",\"FACILITY\":\"user\",\"DATE\":\"Sep 19 09:06:07\",\"@timestamp\":\"2022-09-19T09:06:07+00:00\"
	}",

"LEGACY_MSGHDR":"465 ","ISODATE":"2022-09-19T11:06:07+02:00","HOST_FROM":"192.168.0.123","HOST":"192.168.0.123","FACILITY":"user","ENV":"13","DATE":"Sep 19 11:06:07","@timestamp":"2022-09-19T11:06:07+02:00"

}

 

Destination beallitasok:

destination d_syslog_tcp {
    syslog("<syslog-_server_ip>" transport("tcp") port(514) template("$(format_json --scope rfc3164 --scope nv-pairs --key ISODATE @timestamp=${ISODATE} --key FILE_NAME --pair ENV=\"`ENV`\" )\n\n")  );
};

 

Viszont ingest pipeline-nal nem tudok kiasni ebbol a mezobol egyelore semmit, mert azt mondja, hogy 

{
  "docs": [
    {
      "error": {
        "root_cause": [
          {
            "type": "illegal_argument_exception",
            "reason": "Illegal character inside unquoted field at 59"
          }
        ],
        "type": "illegal_argument_exception",
        "reason": "Illegal character inside unquoted field at 59"
      }
    }
  ]
}

szerintem az idézőjelek escape-elés (vagy annak hiánya) okozhatja.

Én ez fogadó oldalról (QRadar) láttam má rtöbbször, hogy szenvednek a json formátummal.

(Volt ahol egyátalán nem kellett, volt ahol dulpán, vagy triplán volt 'jó)

Valahol ott lesz a gond, hogy ez egy json-ba rakkott json-ba rakott json. Es amikor berakjuk az egyiket a masikba, akkor berakott json mar csak egy string-nek latszik es abbol kene kiasni az objektumokat. Es aztan ezt az egeszet ujra betesszuk egy json egy mezojebe es igy a ketszer betett cuccbol kell kiasni, ami megnehezebb.

Igazabol ami a legnagyobb problemat okozza itt, az a header resz, amit hozzacsap a MESSAGE elejere, de label nelkul:

 

	"MESSAGE":

        "<13>1 2022-09-19T09:06:07+00:00 x600-device - - - -     <--- EZ ITT NEM KENE
	
	{
		\"SOURCE\":\"s_system\",
		\"PRIORITY\":\"notice\",
		\"MESSAGE\":\"[2022-09-19T09:06:07.229Z] [ warning] [vmsvc] RecordRoutingInfo: Unable to collect IPv6 routing table.\",      <--- LOG ENTRY A FORRAS GEPROL
		\"ISODATE\":\"2022-09-19T09:06:07+00:00\",
		\"HOST_IP\":\"192.168.0.123\",
		\"HOST_FROM\":\"x600-device\",
		\"HOST\":\"x600-device\",
		\"FILE_NAME\":\"/var/log/vmware-vmsvc-root.log\",
		\"FACILITY\":\"user\",
		\"DATE\":\"Sep 19 09:06:07\",
		\"@timestamp\":\"2022-09-19T09:06:07+00:00\"
	}",

 

Ezen kivul mar minden key-value formaban van a MESSAGE mezoben, tehat ki lehetne asni onnan, szepen darabonkent. Igazabol a valodi MESSAGE mezo tartalmat  (ami tulajdonkeppen a forras geprol jovo log item) mar nem is akarom parse-olni, csak ne akadjon fel attol, hogy van esetleg benne kettospont, idezojel vagy vesszo.

Jelen pillantaban azon nem tudok atjutni, hogy a fennt jelolt header tartalmaz kettospontot, ami egyben a szeparalo karakter a key-value parok eseteben. Ha az ott nem lenne - es igazabol szamomra teljesen felesleges infokat tartalmaz - akkor mar kb jok is lennenk. 

Opció 1 (csinálunk egy parse-olást):

parser p_json_input { json-parser(); };

source s_json_input {
	syslog(port(10514));
};

log {
  source(s_json_input);
  parser(p_json_input);
  destination(d_es);
  flags(flow-control);
};

 

 

Opció 2 (nem parse-olunk, de cserébe felveszünk egy új destination-t):

source s_json_input {
	syslog(port(10514));
};

destination d_es_json_input {
    elasticsearch-http(
        url("https://es01:9200/_bulk")
        index("syslog-ng_${YEAR}.${MONTH}.${DAY}")
		template("${MSG}\n") 
		type("")
        workers(4)
        batch-lines(4)
        timeout(10)
		persist-name(elasticsearch-syslogng-json)
		user("`elasticUser`")
		password("`elasticPass`")
        tls(
            ca-file("/etc/syslog-ng/certs/ca/ca.crt")
            cert-file("/etc/syslog-ng/certs/syslog01/syslog01.crt")
            key-file("/etc/syslog-ng/certs/syslog01/syslog01.key")
            peer-verify(yes)
        )		
    );
};

log {
  source(s_json_input);
  destination(d_es_json_input);
  flags(flow-control);
};
Szerkesztve: 2022. 09. 18., v – 09:49

a syslog-ng fel tudja parsolni a key-value párokat (is)...

Ezzel aztán már azt csinálsz, amit csak szeretnél.

Én pl egy mysql adatbázisba töltöm:

http://zrubi.hu/2017/siem-at-home/

 

Én a helyedben egyből json-ba tenném, obból már bármi könnyedén adatbázist/keresést/akaármit (is) csinál.

Ráadásul így olyan mezőket tehetsz, be amiket a cél alkalmazásod 'szeret'...

Szia!

Kérdéses hogy a hostok és relayek hogyan küldik be a logot, a központi szerveren  ahogy nézem a blackesk syslog confignál a 122 line a funky: https://github.com/amitn322/blackesk/blob/master/syslog-ng/conf/syslog-…

Én a helyedben megpróbálnám átírni a scope-ot, esetleg írni type-ot a 123-asba ha az Elastic>7

https://www.syslog-ng.com/technical-documents/doc/syslog-ng-open-source…

https://www.syslog-ng.com/technical-documents/doc/syslog-ng-open-source…

https://www.syslog-ng.com/technical-documents/doc/syslog-ng-open-source…

Ha lenne időm ezeket nyálaznám át de így csak beküldöm...

syslogben mar van egy syslog nevu protocol driver, hasznald azt az adatok atkuldesehez:

destination d_centrallog { syslog("192.168.30.184" port(5141)); };

a fogadas meg ilyen:

source s_net_syslog { syslog(transport(tcp) port(5141) max-connections(200) keep-alive(yes)); };

ezzel sok mindent lehet kezelni. pl: a tobbsoros (php/java callstack dump) syslog uzenetek is jol atmennek, egyben. 

A vegtelen ciklus is vegeter egyszer, csak kelloen eros hardver kell hozza!

2019-ben csináltam ilyen log collection dolgot, akkoriban működött persze azóta lehet kell gyúrni rajta valamit.

syslog 01-json-template.conf:

template(name="json-template"
  type="list") {
    constant(value="{")
      constant(value="\"@timestamp\":\"")     property(name="timereported" dateFormat="rfc3339")
      constant(value="\",\"@version\":\"1")
      constant(value="\",\"message\":\"")     property(name="msg" format="json")
      constant(value="\",\"sysloghost\":\"")  property(name="hostname")
      constant(value="\",\"severity\":\"")    property(name="syslogseverity-text")
      constant(value="\",\"facility\":\"")    property(name="syslogfacility-text")
      constant(value="\",\"programname\":\"") property(name="programname")
      constant(value="\",\"procid\":\"")      property(name="procid")
    constant(value="\"}\n")
}

logstash (output amit akarsz):
 

input {
  udp {
    host => "syslog_service_hostname_here"
    port => 10514
    codec => "json"
    type => "rsyslog"
  }
}