Monday, July 4, 2016

Flume Client interaction with Source , Load balancing and failover mechanisms with flume



Flume Client interaction with Source , Load balancing and failover mechanisms with flume

Flume has a fully plugin-based architecture.

--The client(NettyAvroRpcClient and ThriftRpcClient) operates at the point of origin of events and delivers them to a Flume agent. Clients typically operate in the process space of the application they are consuming data from.
--The client needs to create this object with the host and port of origin, and can then use the RpcClient to send to the flume agent
--The Flume Client SDK is a library that enables applications to connect to Flume and send data into Flume’s data flow over RPC.
--Flume currently supports Avro, log4j, syslog, and Http POST (with a JSON body) as ways to transfer data from a external source. Additionally, there’s an ExecSource that can consume the output of a local process as input to Flume.
--As of Flume 1.4.0, Avro is the default RPC protocol.
Steps from client SDK API to flume agent to set source, sink and channel.
1)In the user data-generating application using Client SDK api generate a class where it starts executing from driver class public static void main(String[] args). Write the logic where you are initializing the client with current hostname and port no to operate/listen and from where we transfer  from local process and collect at flume source agent in providing  some properties in flume.conf file.
some important classes to rememeber are
RpcClient and RpcClientFactory ,Event,EventBuilder to set client properties like client.type,hosts and portno
2)set the flume agent source type to avro(and thrift if using thrift client)
 and bind the port to exactly to the port you initialized with client and hostname in step 1.
agentname1.sources.processA_avrosource.type = avro/or thrift
agentname1.sources.processA_avrosource.port = 42424
agentname1.sources.processA_avrosource.bind = localhost

3)In a host Within a user’s data-generating application when using FLUME CLIENT SDK API
to tranfer data into flume THRIFT source you have to use few classes like Event,RpcClient,SecureRpcClientFactory,RpcClientConfigurationConstants,EventBuilder

4)Only the thrift source and sink supports Kerberos based authentication. In the client class you need to specify kerberos to true,client-principal,client-keytab,server-principal(to which flume source ),FailoverRpcClient class to be used.  client-principal and client-keytab are the properties used by the Thrift sink to authenticate to the kerberos KDC. The server-principal represents the principal of the Thrift source this sink is configured to connect to in secure mode.

5)For the failover mechanism to work with avro the client_type must be set to "default_failover" with no of hosts set with hostnames and portno's(hosts.nm1= host1.infovision.com:41414,hosts.nm2= host2.infovision.com:41414,hosts.nm3= host3.infovision.com:41414 and hosts as nm1,nm2,nm3)
At present FailoverRpc Client does not support thrift.
6)The LoadBalancing RPC Client currently does not support thrift. Whenever there is more load to distribute among hosts for better performance  use LoadBalancingRpcClient class with client_type must be set to "default_loadbalance" with no of hosts set with hostnames and portno's(hosts.nm1= host1.infovision.com:41414,hosts.nm2= host2.infovision.com:41414,hosts.nm3= host3.infovision.com:41414 and hosts as nm1,nm2,nm3) and host-selector=round_robin/random

7)Failover Sink processor: maintains a prioritized list of sinks, guaranteeing that so long as one is available events will be processed (delivered).
 when using failover mechanism with :sinks some properties to set in configuration file
Agentname1.sinkgroups = sgroups
Agentname1.sinkgroups.sgroups.sinks = k1 k2
Agentname1.sinkgroups.sgroups.processor.type = failover
Agentname1.sinkgroups.sgroups.processor.priority.k1 = 5
Agentname1.sinkgroups.sgroups.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

8) Load balancing Sink Processor:Load balancing sink processor provides the ability to load-balance flow over multiple sinks.
When using load_balancer mechanism with sinks some properties to set in configuration file
Agentname1.sinkgroups = sgroups
Agentname1.sinkgroups.sgroups.sinks = k1 k2
Agentname1.sinkgroups.sgroups.processor.type = load_balance
Agentname1.sinkgroups.sgroups.processor.backoff = true(f backoff is enabled, the sink processor will blacklist sinks that fail, removing them for selection for a given timeout.)
Agentname1.sinkgroups.sgroups.processor.selector = random/round_robin.
Agentname1.sinkgroups.sgroups.processor.selector.maxTimeOut = 30000(Used by backoff selectors to limit exponential backoff (in milliseconds)

Custom selection mechanisms are supported via custom classes that inherits from AbstractSinkSelector.

9) To run a flume agent
Bashshell pointing to bin directory:> flume-ng agent  --name agentname1  -- conf conf  --conf-file /home/kumar/flume/flume_agent1.properties –Dflume.root.logger=INFO,console

hint: flume-ng is a start up script.
 -------------------             -----------------------------------

10)Flume has a fully plugin-based architecture.

While it has always been possible to include custom Flume components by adding their jars to the FLUME_CLASSPATH variable in the flume-env.sh file, 

Flume now supports a special directory called plugins.d which automatically picks up plugins that are packaged in a specific format. This allows for easier management of plugin packaging issues as well as simpler debugging and troubleshooting of several classes of issues, especially library dependency conflicts.

The plugins.d directory is located at $FLUME_HOME/plugins.d.

 At startup time, the flume-ng start script looks in the plugins.d directory for plugins that conform to the below format and includes them in proper paths when starting up java.
Each plugin (subdirectory) within plugins.d can have up to three sub-directories:

  1. lib - the plugin’s jar(s)
  2. libext - the plugin’s dependency jar(s)
  3. native - any required native libraries, such as .so files
Example of two plugins within the plugins.d directory:

$FLUME_HOME/plugins.d/
$FLUME_HOME/plugins.d/custom-source1/
$FLUME_HOME/plugins.d/customdb-source1/lib/my-dbsource.jar
$FLUME_HOME/plugins.d/customdb-source1/libext/spring-core-2.5.6.jar
$FLUME_HOME/plugins.d/customdb-source2/
$FLUME_HOME/plugins.d/customdb-source2/lib/custom_db.jar
$FLUME_HOME/plugins.d/customdb-source2/native/gettext.so
 
11)Using Zookeeper Based configurations: 
$ bin/flume-ng agent –conf conf -z zkserver1:2181,zkserver2:2181 
  -p /flume –name a1 -Dflume.root.logger=INFO,console 
   The configuration file needs to be uploaded in the Zookeeper,
  under a configurable prefix. The configuration file is stored
  in Zookeeper Node data.
  Following is how the Zookeeper Node tree would look like for
    agents a1 and a2
    -/flume (flume node tree base path)
          -/a1 (a1 agent configuration file)
          -/a2 (a2 agent configuration file)
 
12) Data Ingestion can be done in no of ways :Flume supports a
   number of mechanisms to ingest data from external sources. 
   -RPC, Executing commands through a "Exec" source, and
   Network Streams(which are Avro,Thrift,Syslog and Netcat)
 
     An Avro client included in the Flume distribution can send a 
    given file to
    Flume Avro source using avro RPC mechanism:
     $ bin/flume-ng avro-client -H localhost -p 41414 
     -F /usr/logs/log.10 
 
   (The above command will send the contents of /usr/logs/log.10 
     to the Flume source listening on that ports.) 
 
13)Flume Sources: Avro source, Thrift Source, Exec source,
    JMS source, Convertor,Spooling Directory Source,
   Twitter 1% Firehose Source,Kafka Source,
   Netcat source,Sequence Generator Source, Syslog Sources
   (syslogtcp,Syslogudp,multiport_syslogtcp,http,Stress source,
    Legacy Sources(Avro Legacy source, Thrift Legacy Source, 
   Custom source, Scribe source) 
 
14)Flume sinks:Hive sink , HDfs Sink, Logger sink, Avro sink,
   Thirft Sink,IRC sink,Null sink,File Roll sink,HBase sinks
  (HBaseSink, AsyncHBaseSink,MorphlineSolrSink,ElasticSearchSink
  (This sink writes data to an elasticsearch cluster. By default,
   events will be written so that the Kibana 
   graphical interface can display them - just as if logstash 
    wrote them),Kite Dataset Sink, Kafka sink,custom sink
 
15)Channels:Memory channel, Jdbc channel, file channel, 
    Kafka channel,Spillable Memory channel,Psuedo transaction 
    channel(The Pseudo Transaction Channel is only for unit 
    testing purposes and is NOT meant for production use),
    Custom channel
 
16)Flume channel selector:Replicating channel selector (default),
    Multiplexing Channel selector)
 
17)Flume Sink Processors:Failover Sink Processor,Load balancing
    Sink Processor,Custom Sink Processor
 

No comments:

Post a Comment