Flume Client interaction with Source , Load balancing and failover mechanisms with flume
Flume has a fully plugin-based architecture.
--The client(NettyAvroRpcClient and ThriftRpcClient) operates at the point of origin of
events and delivers them to a Flume agent. Clients typically
operate in the process space of the application they are consuming data from.
--The client
needs to create this object with the host and port of origin, and can then use
the RpcClient to send to the flume agent
--The Flume Client
SDK is a library that enables applications to connect to Flume and send
data into Flume’s data flow over RPC.
--Flume currently supports Avro, log4j, syslog,
and Http POST (with a JSON body) as ways to transfer data from a
external source. Additionally, there’s an ExecSource that
can consume the output of a local process as input to Flume.
--As of Flume 1.4.0, Avro is the default RPC protocol.
Steps from client SDK API to flume agent to set source, sink
and channel.
1)In the user
data-generating application using Client SDK api generate a class where it
starts executing from driver class public static void
main(String[] args). Write the logic
where you are initializing the client with current hostname and port no to
operate/listen and from where we transfer from local process and collect at flume source
agent in providing some properties in flume.conf file.
some important classes to rememeber are
RpcClient and RpcClientFactory
,Event,EventBuilder
to set client properties like client.type,hosts and portno
2)set the flume agent source type to avro(and thrift if using
thrift client)
and bind the port to
exactly to the port you initialized with client and hostname in step 1.
agentname1.sources.processA_avrosource.type = avro/or thrift
agentname1.sources.processA_avrosource.port
= 42424
agentname1.sources.processA_avrosource.bind = localhost
3)In a host Within a user’s data-generating application when
using FLUME
CLIENT SDK API
to tranfer data into flume THRIFT source you have to use few
classes like Event,RpcClient,SecureRpcClientFactory,RpcClientConfigurationConstants,EventBuilder
4)Only the thrift source and sink supports Kerberos based
authentication. In the client class you need to specify kerberos to true,client-principal,client-keytab,server-principal(to
which flume source ),FailoverRpcClient class to be used. client-principal and client-keytab are the properties used by the
Thrift sink to authenticate to the kerberos KDC. The server-principal represents the
principal of the Thrift source this sink is configured to connect to in secure mode.
5)For the failover mechanism to work with avro the client_type must be set to "default_failover" with no
of hosts set with hostnames and portno's(hosts.nm1= host1.infovision.com:41414,hosts.nm2=
host2.infovision.com:41414,hosts.nm3= host3.infovision.com:41414 and hosts as
nm1,nm2,nm3)
At present FailoverRpc Client does not support thrift.
6)The LoadBalancing RPC Client currently does not support
thrift. Whenever there is more load to distribute among hosts for better
performance use LoadBalancingRpcClient class with
client_type must be set to "default_loadbalance" with no of hosts set
with hostnames and portno's(hosts.nm1= host1.infovision.com:41414,hosts.nm2=
host2.infovision.com:41414,hosts.nm3= host3.infovision.com:41414 and hosts as
nm1,nm2,nm3) and host-selector=round_robin/random
7)Failover Sink processor: maintains a prioritized list of sinks, guaranteeing
that so long as one is available events will be processed (delivered).
when using failover mechanism with :sinks some properties to set in configuration file
when using failover mechanism with :sinks some properties to set in configuration file
Agentname1.sinkgroups = sgroups
Agentname1.sinkgroups.sgroups.sinks = k1 k2
Agentname1.sinkgroups.sgroups.processor.type = failover
Agentname1.sinkgroups.sgroups.processor.priority.k1 = 5
Agentname1.sinkgroups.sgroups.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
8) Load balancing Sink Processor:Load balancing sink processor provides the ability to load-balance flow over
multiple sinks.
When using load_balancer mechanism with sinks some properties to set in configuration file
When using load_balancer mechanism with sinks some properties to set in configuration file
Agentname1.sinkgroups = sgroups
Agentname1.sinkgroups.sgroups.sinks = k1 k2
Agentname1.sinkgroups.sgroups.processor.type = load_balance
Agentname1.sinkgroups.sgroups.processor.backoff = true(f backoff
is enabled, the sink processor will blacklist sinks that fail, removing them
for selection for a given timeout.)
Agentname1.sinkgroups.sgroups.processor.selector = random/round_robin.
Agentname1.sinkgroups.sgroups.processor.selector.maxTimeOut = 30000(Used by backoff selectors to limit exponential backoff (in milliseconds)
Custom selection mechanisms are supported via custom classes that inherits from AbstractSinkSelector.
Agentname1.sinkgroups.sgroups.processor.selector.maxTimeOut = 30000(Used by backoff selectors to limit exponential backoff (in milliseconds)
Custom selection mechanisms are supported via custom classes that inherits from AbstractSinkSelector.
9) To run a flume agent
Bashshell pointing to bin directory:> flume-ng agent --name agentname1 -- conf conf
--conf-file /home/kumar/flume/flume_agent1.properties –Dflume.root.logger=INFO,console
hint: flume-ng is a start up script.
------------------- -----------------------------------
10)Flume has a fully plugin-based architecture.
While it has always been possible to include custom Flume components by adding their jars to the FLUME_CLASSPATH variable in the flume-env.sh file,
Flume now supports a special directory called plugins.d which automatically picks up plugins that are packaged in a specific format. This allows for easier management of plugin packaging issues as well as simpler debugging and troubleshooting of several classes of issues, especially library dependency conflicts.
The plugins.d directory is located at $FLUME_HOME/plugins.d.
At startup time, the flume-ng start script looks in the plugins.d directory for plugins that conform to the below format and includes them in proper paths when starting up java.
hint: flume-ng is a start up script.
------------------- -----------------------------------
10)Flume has a fully plugin-based architecture.
While it has always been possible to include custom Flume components by adding their jars to the FLUME_CLASSPATH variable in the flume-env.sh file,
Flume now supports a special directory called plugins.d which automatically picks up plugins that are packaged in a specific format. This allows for easier management of plugin packaging issues as well as simpler debugging and troubleshooting of several classes of issues, especially library dependency conflicts.
The plugins.d directory is located at $FLUME_HOME/plugins.d.
At startup time, the flume-ng start script looks in the plugins.d directory for plugins that conform to the below format and includes them in proper paths when starting up java.
Each plugin (subdirectory) within plugins.d can have up to three sub-directories:
- lib - the plugin’s jar(s)
- libext - the plugin’s dependency jar(s)
- native - any required native libraries, such as .so files
$FLUME_HOME/plugins.d/
$FLUME_HOME/plugins.d/custom-source1/
$FLUME_HOME/plugins.d/customdb-source1/lib/my-dbsource.jar
$FLUME_HOME/plugins.d/customdb-source1/libext/spring-core-2.5.6.jar
$FLUME_HOME/plugins.d/customdb-source2/
$FLUME_HOME/plugins.d/customdb-source2/lib/custom_db.jar
$FLUME_HOME/plugins.d/customdb-source2/native/gettext.so
11)Using Zookeeper Based configurations:
$ bin/flume-ng agent –conf conf -z zkserver1:2181,zkserver2:2181
-p /flume –name a1 -Dflume.root.logger=INFO,console
The configuration file needs to be uploaded in the Zookeeper,
under a configurable prefix. The configuration file is stored
in Zookeeper Node data. Following is how the Zookeeper Node tree would look like for
agents a1 and a2
-/flume (flume node tree base path)
-/a1 (a1 agent configuration file)
-/a2 (a2 agent configuration file)
12) Data Ingestion can be done in no of ways :Flume supports a
number of mechanisms to ingest data from external sources.
-RPC, Executing commands through a "Exec" source, and
Network Streams(which are Avro,Thrift,Syslog and Netcat)
An Avro client included in the Flume distribution can send a
given file to Flume Avro source using avro RPC mechanism:
$ bin/flume-ng avro-client -H localhost -p 41414
-F /usr/logs/log.10
(The above command will send the contents of /usr/logs/log.10
to the Flume source listening on that ports.)
13)Flume Sources: Avro source, Thrift Source, Exec source,
JMS source, Convertor,Spooling Directory Source,
Twitter 1% Firehose Source,Kafka Source,
Netcat source,Sequence Generator Source, Syslog Sources
(syslogtcp,Syslogudp,multiport_syslogtcp,http,Stress source,
Legacy Sources(Avro Legacy source, Thrift Legacy Source,
Custom source, Scribe source)
14)Flume sinks:Hive sink , HDfs Sink, Logger sink, Avro sink,
Thirft Sink,IRC sink,Null sink,File Roll sink,HBase sinks
(HBaseSink, AsyncHBaseSink,MorphlineSolrSink,ElasticSearchSink
(This sink writes data to an elasticsearch cluster. By default,
events will be written so that the Kibana
graphical interface can display them - just as if logstash
wrote them),Kite Dataset Sink, Kafka sink,custom sink
15)Channels:Memory channel, Jdbc channel, file channel,
Kafka channel,Spillable Memory channel,Psuedo transaction
channel(The Pseudo Transaction Channel is only for unit
testing purposes and is NOT meant for production use),
Custom channel
16)Flume channel selector:Replicating channel selector (default),
Multiplexing Channel selector)
17)Flume Sink Processors:Failover Sink Processor,Load balancing
Sink Processor,Custom Sink Processor