Understanding BRPC Threading and Context Management
Delve into the intricate details of BRPC's threading model, including the comparison between user threads and kernel threads, the M:N model using bthreads, context switches with boost
- context
- and context storage management for efficient operation. Explore the advantages and challenges of each approach
- from performance considerations to scalability and data synchronization overheads.
Download Presentation
Please find below an Image/Link to download the presentation.
The content on the website is provided AS IS for your information and personal use only. It may not be sold, licensed, or shared on other websites without obtaining consent from the author. Download presentation by click this link. If you encounter any issues during the download, it is possible that the publisher has removed the file from their server.
E N D
Presentation Transcript
BRPC INTERNAL jrjbear@gmail.com 2018.4
About This PPT If you are a newcomer of brpc: Try it before moving on: https://github.com/brpc/brpc If you are a beginner, you may wonder: What happened when server callback blocks How to trouble shooting brpc (core, latency, ...) This PPT will help you dive into the brpc kernel If you are an experienced user: The internal implementation (thread, memory, ...) How to add a new protocol If you are already a master: Resume is welcomed !
Outline Thread Model --- bthread Buffer Management --- IOBuf Monitoring --- bvar brpc framework brpc read/write model Timer Keeping Memory Management Naming & Loadbalancing other topics involved (cache, atomic operation ...) WARNING: BREAK INTO BRPC
Threading Model 1 : 1 model -- pthread N : 1 model -- fiber User Thread User Threads Kernel Thread Kernel Thread Pros. Cons. Pros. Cons. Performance under single CPU Easy to use/read, no data race Scalability under multiple CPU Data synchronization overhead/complexity Schedule overhead Hard to scale over multiple CPU Vulnerable to blocking
M:N -- bthread bthreads bthreads bthreads Kernel Thread (bthread worker) M bthreads can run in N kernel threads Each bthread can be scheduled to run inside a single worker (in place) -- Good locality run between different workers (work stealing) -- Good scalability
Context Switch -- boost::context other context void foo() { int total=0; for (int i=0; i<10; ++i) { total += i; yield(); } ... } foo s context bthread_make_fcontext bthread_jump_fcontext other context Context contains: registers, stack ... Save context before switch out Jump context to recover former state
Context Storage Context stores in ContextualStack How much size do we need? 3 different size (configurable): STACK_TYPE_SMALL STACK_TYPE_NORMAL STACK_TYPE_LARGE What if the stack overflows? Use mprotect to add a guarding page Use mmap to allocate page-aligned memory (required by mprotect) up to /proc/sys/vm/max_map_count
Scheduling Run Queue Run Queue work stealing bthn bth22 ... bth1 bth2 ... bthn TaskGroup1 TaskGroup2 1 worker (pthread) => 1 TaskGroup Basic scheduling -- using a run queue (FIFO) What if bth1 blocks? bthn will be stolen by other workers RemoteTaskQueue -- for tasks created outside bthread
TaskGroup -- Main Entry run_main_task entry point for TaskGroup has its own context sched_to when current while worker is idle steal tasks from other TaskGroups brief code: while not stop wait until signaled steal bthread from other TaskGroups sched_to(that bthread) Implementation: as fast as possible avoid global contention
Work Stealing Implementation Thread1 Thread2 wakeup by signal // atomic compare and wait futex_wait_private(value, expect) change(value) compare failed futex_wake_private compare failed ParkingLot: wait/signal using futex Distribute contention by multiple ParkingLot PARKING_LOT_NUM in total WorkStealQueue: used as run queue wait-free queue given one thread to push/pop multiple threads to steal (from the back)
TaskControl Singleton to manage all TaskGroups start, stop, add What if we create a bthread in non-worker (normal) pthreads non-worker doesn t have TaskGroup TaskControl choose a TaskGroup push into its RemoteTaskQueue A queue for tasks from non-workers use mutex to protect from concurrent push/pop/steal signal_task -- only notify some ParkingLots (thus some TaskGroups) steal_task -- steal bthreads from all TaskGroups to prevent starvation Overall scheduling order 1. local run queue 2. local remote queue 3. other workers run queue 4. other workers remote queue
TaskGroup Interface sched_to(bth) switch to execute bth sched run the next bthread according to schedule order parameter signal to control whether to notify other TaskGroup ready_to_run(bth) push bth into run queue ready_to_run_remote(bth) push bth into remote queue task_runner wrapper of user bthread function run user function bthread destruction signal join butex ending_sched to fetch the next bthread sched_to(that bthread) set_remained callback to run before the next bthread s execution
bthread Interface bthread_t 32-bit Version to prevent ABA 32-bit slot id in ResourcePool to address TaskMeta start_foreground set_mained(ready_to_run(current_bth)) + sched_to(new_bth) start_background ready_to_run[_remote](new_bth) yield set_mained(ready_to_run(current_bth)) + sched usleep add timer(ready_to_run_remote(current_bth)) + sched join wait on join butex until bthread quits
butex Same semantics as futex Block bthread instead of pthread Component: atomic int (to work with futex) waiter queue mutex (protect queue) you should change atomic value somewhere by yourself wait wake atomic compare to check expect value add timeout if needed add itself to waiter queue sched to the next bthread pop one/all waiter from queue add it/them into run queue pthread mode proxy to use futex 1 futex for 1 waiter
bthread in brpc worker1 s run queue blocked User callback1 locality InputMessenger ::OnNewMessages Read & Cut messages InputMessenger ::OnNewMessages Read & Cut messages locality EventDispatcher::Run EPOLLIN received EventDispatcher::Run EPOLLIN received EventDispatcher::Run EPOLLIN received scalability EventDispatcher epoll bthread up to -event_dispatch_num IO bthread on demand For each fd (Socket), 1 for read and 1 for write Callback bthread container for user code User callback2 InputMessenger ::OnNewMessages Read & Cut messages worker2 s run queue
Notes for Callback What if I call pthread blocking function (IO, lock) in callback? block the underlying worker DANGER! NO bthread can run if all workers have been blocked What if I call bthread blocking function (yield, brpc) in callback? suspend the current bthread worker unaffected (run the next bthread) callback may be stolen to run on another worker, thus unable to use pthread local Be careful about DEADLOCK issue a brpc request inside a mutex => DEADLOCK!!! (Why?) To avoid all these problems: -usercode_in_pthread
Buffer Management Goal Cheap to copy/assign, to append/cut Convertible with protobuf Read/Write from/to fd User friendly interfaces Key point avoid underlying copy as few as memory allocation non contiguous memory management Assumption under RPC lots of append/cut operations buffer s lifecycle = duration of an RPC
IOBuf => BlockRef array SmallView => array size of 2 IOBuf -- framework New IOBuf append IOBuf IOBuf copy BigView => dynamic array cut Management structure Cheap to manipulate Cached in TLSData BlockRef BlockRef BlockRef BlockRef New BlockRef BlockRef Underlying storage DEAFULT_BLOCK_SIZE Shared between BlockRef Cached in TLSData +1 to Block s refcount Block Block
IOBuf in brpc Kernel Socket Request1 append serialize to IOBuf pb1 att1 cut into socket fd att2 pb2 att1 pb1 Request2 pb2 att2 append serialize to IOBuf Response1 parse from IOBuf cut pb1 att1 append from socket fd att2 pb2 att1 pb1 Response2 pb2 att2 parse from IOBuf cut
Timer Keeping Goal provide methods to add/remote timer Key point avoid global contention how to remove a timer timing mechanism usleep ? epoll ? Assumption under RPC huge amount of timer operations timeout seldom occurs
Timer Keeping while not stop check each Bucket for new tasks build a heap (remove deleted tasks) pop the first task from heap run if timeout calculate the next timeout futex_wait_private Global Heap _nearest_run_time Task list mutex _nearest_run_time Bucketn Bucket1 Bucket1 unschedule schedule Timer Timer use TaskId to address Task (in ResourcePool) mark Task as deprecated add to a Bucket update _nearest_run_time signal main loop if need
bthread_id Goal identify concurrent RPC inside a single connection mechanism to wait/signal RPC s completion thread safety on RPC context callback MUST be run only once cancellation, error interruption Key point fast way to fetch RPC s context cheap to create/destroy prevent ABA
bthread_id bthread_id_t Id 32-bit Version to prevent ABA 32-bit slot id in ResourcePool user data & on_error butex for lock butex for join version (as status) pending error queue bthread_id_error bthread_id_join bthread_id_unlock _and_destroy bthread_id_lock unlock lock destroy bthread_id_unlock trigger exclusive access exclusive access User Data on_error
Memory Management Goal fast memory allocation/deallocation Key point avoid global contention fast to address balance between space and time Assumption under RPC fixed allocation size
ResourcePool<T> FreeChunk TLS and Global Id Id ... Id Block Group Block Group Block Group ... up to RP_MAX_BLOCK_NGROUP ResourceId fit in 32-bit under most case group_index block_offset slot_offset Block Block ... Block up to RP_BLOCK_NBLOCK def add_block: create a Block while true if current BlockGroup is full CAS to increase current group index else CAS to increase current block index T T ... T up to BLOCK_NITEM
ResourcePool<T> get_resource check TLS free list check global free list check current Block (in TLS) create a new Block and store in TLS return_resource push back to TLS free list if possible push back to global free list Notes Memory won t return back to OS 1 memory pool for each type (size) Suitable for structures that create/destroy with high frequency
RPC Framework Client Server Mechanism Parallel Channel Selective Channel Partition Channel Policy Channel RPC Method Implementation BNS Naming List Built-in Services Load Balancer RR Hash http Format nshead Protocol Compression gzip snappy Authentication Giano Socket SSL RDMA
Socket & IO Model SocketId 32-bit Version 32-bit slot id Socket read event 1. CAS to check read thread start if not exist read until EAGAIN 2. read bthread check WriteRequest queue in place write if possible append the rest to WriteRequest queue start KeepWrite thread if needed EventDispatcher write User write event while not complete wait for EPOLLOUT write until EAGAIN Write Request queue write bthread
Protocol parse serialize_request serialize pb to IOBuf pack_request pack all request-related stuff into IOBuf process_response parse response from IOBuf signal RPC s completion by OnResponse process_request parse request from IOBuf prepare a done callback, which send response back to Socket call user s callback verify called only once when a connection has been established supported_connection_type check order: single, pooled, short required identify and parse from IOBuf according to format cut off a complete message from IOBuf required by client required by server ParseFromCompressData SerializeAsCompressData compression mechanism
Naming & Load Balancing Goal reload naming once a server has been added/removed balance load between a bunch of servers Key point notify mechanism once naming changes contention between naming and load balancing contention between naming and RPC sharing between multiple channels Assumption under RPC less likely of naming reload heavy load balancing algorithm may have lots of Channels
Naming & Load Balancing DoublyBufferedData def Modify: change Background flip index for each thread lock unlock Foreground Background Naming control control inverse add reset remove LoadBalancer TLS lock TLS lock TLS lock th2 th1 thn Naming Thread def Read: lock fetch data at index unlock
Client side Server side work stealing scheduling ABA-free 1 bthread for 1 request Acceptor NS Wait-free no locking Put them all Together Channel 1 LB Parse Process Request Socket Socket Event Dispatcher bthread swap (saving a CS) NS Process Request Keep Write Channel 2 LB Concurrency within fd Channel 3 Parse Process Request Socket Socket Process Response Parse Service 1 Event Dispatcher Process Response Service 2 Locate context in O(1) time w/o global contention Different color = different thread Process Response Parse
Monitoring Goal collect key points (latency, maximum) around hotspot user-friendly way to show those variables Key point cheap to create/destroy unaffected normal code atomic variable? capable of free combination graphic way to show results Assumption under RPC high frequency of write, low frequency of read
bvar AgentCombiner Variable TLS TLS TLS expose -- add to global map hide -- remove from global map Agent Agent Agent describe -- return current value describe_series -- return json data points atomic variable atomic variable atomic variable IntRecorder Reducer<Op> Modify Modify Modify AgentCombiner Sampler AgentCombiner Sampler Thread 1 Thread 2 Thread N Reduce values by: Calculate average Compress value to fit in 64bit atomic variable Each agent commit to global if local value overflows A Op B => C Requirement of Op A Op B = B Op A A Op (B Op C) = (A Op B) Op C Op can be Combine Adder => Reducer<add> Maxer => Reducer<max> Miner => Reducer<min>
Sampling SampleCollector take_sample schedule Sampler Sampler Sampler list_exposed describe_exposed dump_exposed global bvar map bvar bvar bvar ReducerSampler -- store samples in sliding window SeriesSampler -- store samples in time bucket (60s + 60m + 24h + 30d)
Advanced bvar Window/PerSecond -- Variable + Sampler Status -- Traditional lock implementation PassiveStatus -- update (by getfn) only when get_value is called Percentile Produce and store PercentileSamples NUM_INTERVALS PercentileInterval SAMPLE_SIZE samples get_number(ratio) Use AgentCominber to speed up local update LatencyRecorder IntRecorder - raw latency Maxer Percentile - CDF Window - max latency
Last But Not Least Refer to actual code when confusing The only way to internal details Better grasp of the implementation The comment is always your friend Pull requests are always welcome new feature, protocol, thoughts bug fix, typo documentation MUST read before checkin coding style compilation under common environment unittest